Skip to content

Commit

Permalink
Added composer support and namespaces.
Browse files Browse the repository at this point in the history
  • Loading branch information
siad007 committed Dec 10, 2017
1 parent 980e582 commit 14a71eb
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 99 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
.idea/*
.idea/*
/vendor/
composer.lock
Empty file added .travis.yml
Empty file.
21 changes: 21 additions & 0 deletions composer.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"name": "mehr-als-nix/parallel",
"description": "This package is a port of PhpDumentor/Parallel",
"type": "library",
"license": "MIT",
"authors": [
{
"name": "siad007",
"email": "siad.ardroumli@gmail.com"
}
],
"minimum-stability": "stable",
"require-dev": {
"phpunit/phpunit": "^6.5"
},
"autoload": {
"psr-4": {
"MehrAlsNix\\Parallel\\": "src"
}
}
}
36 changes: 14 additions & 22 deletions example.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,21 @@
* @link http://docblox-project.org
*/

/** Include the manager as we do not autoload */
require_once 'Manager.php';

/** Include the worker as we do not autoload */
require_once 'Worker.php';

/** Include the worker's pipe as we do not autoload */
require_once 'WorkerPipe.php';
include __DIR__ . '/vendor/autoload.php';

// -----------------------------------------------------------------------------
// method 1: using a fluent interface and the addWorker helper.
// -----------------------------------------------------------------------------

$mgr = new DocBlox_Parallel_Manager();
$mgr = new \MehrAlsNix\Parallel\Manager();
$mgr
->addWorker(new DocBlox_Parallel_Worker(function() { sleep(1); return 'a'; }))
->addWorker(new DocBlox_Parallel_Worker(function() { sleep(1); return 'b'; }))
->addWorker(new DocBlox_Parallel_Worker(function() { sleep(1); return 'c'; }))
->addWorker(new DocBlox_Parallel_Worker(function() { sleep(1); return 'd'; }))
->addWorker(new DocBlox_Parallel_Worker(function() { sleep(1); return 'e'; }))
->addWorker(new \MehrAlsNix\Parallel\Worker(function() { sleep(1); return 'a'; }))
->addWorker(new \MehrAlsNix\Parallel\Worker(function() { sleep(1); return 'b'; }))
->addWorker(new \MehrAlsNix\Parallel\Worker(function() { sleep(1); return 'c'; }))
->addWorker(new \MehrAlsNix\Parallel\Worker(function() { sleep(1); return 'd'; }))
->addWorker(new \MehrAlsNix\Parallel\Worker(function() { sleep(1); return 'e'; }))
->execute();

/** @var DocBlox_Parallel_Worker $worker */
foreach ($mgr as $worker) {
var_dump($worker->getResult());
}
Expand All @@ -43,15 +35,15 @@
// method 2: using the manager as worker array
// -----------------------------------------------------------------------------

$mgr = new DocBlox_Parallel_Manager();
$mgr[] = new DocBlox_Parallel_Worker(function() { sleep(1); return 'f'; });
$mgr[] = new DocBlox_Parallel_Worker(function() { sleep(1); return 'g'; });
$mgr[] = new DocBlox_Parallel_Worker(function() { sleep(1); return 'h'; });
$mgr[] = new DocBlox_Parallel_Worker(function() { sleep(1); return 'i'; });
$mgr[] = new DocBlox_Parallel_Worker(function() { sleep(1); return 'j'; });
$mgr = new \MehrAlsNix\Parallel\Manager();
$mgr[] = new \MehrAlsNix\Parallel\Worker(function() { sleep(1); return 'f'; });
$mgr[] = new \MehrAlsNix\Parallel\Worker(function() { sleep(1); return 'g'; });
$mgr[] = new \MehrAlsNix\Parallel\Worker(function() { sleep(1); return 'h'; });
$mgr[] = new \MehrAlsNix\Parallel\Worker(function() { sleep(1); return 'i'; });
$mgr[] = new \MehrAlsNix\Parallel\Worker(function() { sleep(1); return 'j'; });
$mgr->execute();

/** @var DocBlox_Parallel_Worker $worker */
/** @var Worker $worker */
foreach ($mgr as $worker) {
var_dump($worker->getResult());
}
110 changes: 55 additions & 55 deletions Manager.php → src/Manager.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
* @link http://docblox-project.org
*/

namespace MehrAlsNix\Parallel;

/**
* Manager class for Parallel processes.
*
Expand All @@ -24,7 +26,7 @@
* @license http://www.opensource.org/licenses/mit-license.php MIT
* @link http://docblox-project.org
*/
class DocBlox_Parallel_Manager extends ArrayObject
class Manager extends \ArrayObject
{
/** @var int The maximum number of processes to run simultaneously */
protected $process_limit = 2;
Expand All @@ -36,18 +38,18 @@ class DocBlox_Parallel_Manager extends ArrayObject
* Tries to autodetect the optimal number of process by counting the number
* of processors.
*
* @param array $input Input for the array object.
* @param int $flags flags for the array object.
* @param array $input Input for the array object.
* @param int $flags flags for the array object.
* @param string $iterator_class Iterator class for this array object.
* @throws \InvalidArgumentException
*/
public function __construct(
$input = array(), $flags = 0, $iterator_class = "ArrayIterator"
) {
public function __construct($input = array(), $flags = 0, $iterator_class = 'ArrayIterator')
{
parent::__construct($input, $flags, $iterator_class);

if (is_readable('/proc/cpuinfo')) {
$processors = 0;
exec("cat /proc/cpuinfo | grep processor | wc -l", $processors);
exec('cat /proc/cpuinfo | grep processor | wc -l', $processors);
$this->setProcessLimit(reset($processors));
}
}
Expand All @@ -67,33 +69,29 @@ public function __construct(
*
* $mgr = new DocBlox_Parallel_Manager();
* $mgr->setProcessLimit(2)
* ->addWorker(new DocBlox_Parallel_Worker($cb1))
* ->addWorker(new DocBlox_Parallel_Worker($cb2))
* ->addWorker(new Worker($cb1))
* ->addWorker(new Worker($cb2))
* ->execute();
*
* @param int $index The key for this worker.
* @param DocBlox_Parallel_Worker $newval The worker to add onto the queue.
* @param int $index The key for this worker.
* @param Worker $newval The worker to add onto the queue.
*
* @see DocBlox_Parallel_Manager::execute()
*
* @throws RuntimeException if this method is invoked while the
* @throws \RuntimeException if this method is invoked while the
* manager is busy executing tasks.
* @throws InvalidArgumentException if the provided element is not of type
* DocBlox_Parallel_Worker.
* @throws \InvalidArgumentException if the provided element is not of type
* Worker.
*
* @return void
*/
public function offsetSet($index, $newval)
{
if (!$newval instanceof DocBlox_Parallel_Worker) {
throw new InvalidArgumentException(
'Provided element must be of type DocBlox_Parallel_Worker'
);
if (!$newval instanceof Worker) {
throw new \InvalidArgumentException('Provided element must be of type Worker');
}
if ($this->isRunning()) {
throw new RuntimeException(
'Workers may not be added during execution of the manager'
);
throw new \RuntimeException('Workers may not be added during execution of the manager');
}

parent::offsetSet($index, $newval);
Expand All @@ -103,11 +101,11 @@ public function offsetSet($index, $newval)
* Convenience method to make the addition of workers explicit and allow a
* fluent interface.
*
* @param DocBlox_Parallel_Worker $worker The worker to add onto the queue.
* @param Worker $worker The worker to add onto the queue.
*
* @return self
*/
public function addWorker(DocBlox_Parallel_Worker $worker)
public function addWorker(Worker $worker)
{
$this[] = $worker;

Expand All @@ -125,11 +123,12 @@ public function addWorker(DocBlox_Parallel_Worker $worker)
* @see DocBlox_Parallel_Manager::addWorker() for an example
*
* @return self
* @throws \InvalidArgumentException
*/
public function setProcessLimit($process_limit)
{
if ($process_limit < 1) {
throw new InvalidArgumentException(
throw new \InvalidArgumentException(
'Number of simultaneous processes may not be less than 1'
);
}
Expand Down Expand Up @@ -167,13 +166,15 @@ public function isRunning()
* many times as the ProcessLimit dictates at the same time.
*
* @return void
* @throws \InvalidArgumentException
* @throws \RuntimeException
*/
public function execute()
{
/** @var int[] $processes */
$processes = $this->startExecution();

/** @var DocBlox_Parallel_Worker $worker */
/** @var Worker $worker */
foreach ($this as $worker) {

// if requirements are not met, execute workers in series.
Expand Down Expand Up @@ -206,8 +207,7 @@ protected function startExecution()
if (!$this->checkRequirements()) {
trigger_error(
'The PCNTL extension is not available, running workers in series '
. 'instead of parallel',
E_USER_NOTICE
. 'instead of parallel'
);
}

Expand All @@ -230,7 +230,7 @@ protected function stopExecution(array &$processes)
pcntl_waitpid(array_shift($processes), $status);
}

/** @var DocBlox_Parallel_Worker $worker */
/** @var Worker $worker */
foreach ($this as $worker) {
$worker->pipe->push();
}
Expand All @@ -255,40 +255,40 @@ protected function stopExecution(array &$processes)
* If there are more workers than may be ran simultaneously then this method
* will wait until a slot becomes available and then starts the next worker.
*
* @param DocBlox_Parallel_Worker $worker The worker to process.
* @param int[] &$processes The list of running processes.
* @param Worker $worker The worker to process.
* @param int[] &$processes The list of running processes.
*
* @throws RuntimeException if we are unable to fork.
* @throws \InvalidArgumentException
* @throws \RuntimeException if we are unable to fork.
*
* @return void
*/
protected function forkAndRun(
DocBlox_Parallel_Worker $worker, array &$processes
) {
$worker->pipe = new DocBlox_Parallel_WorkerPipe($worker);
protected function forkAndRun(Worker $worker, array &$processes)
{
$worker->pipe = new WorkerPipe($worker);

// fork the process and register the PID
$pid = pcntl_fork();

switch ($pid) {
case -1:
throw new RuntimeException('Unable to establish a fork');
case 0: // Child process
$worker->execute();

$worker->pipe->pull();

// Kill -9 this process to prevent closing of shared file handlers.
// Not doing this causes, for example, MySQL connections to be cleaned.
posix_kill(getmypid(), SIGKILL);
default: // Parent process
// Keep track if the worker children
$processes[] = $pid;

if (count($processes) >= $this->getProcessLimit()) {
pcntl_waitpid(array_shift($processes), $status);
}
break;
case -1:
throw new \RuntimeException('Unable to establish a fork');
case 0: // Child process
$worker->execute();

$worker->pipe->pull();

// Kill -9 this process to prevent closing of shared file handlers.
// Not doing this causes, for example, MySQL connections to be cleaned.
posix_kill(getmypid(), SIGKILL);
default: // Parent process
// Keep track if the worker children
$processes[] = $pid;

if (count($processes) >= $this->getProcessLimit()) {
pcntl_waitpid(array_shift($processes), $status);
}
break;
}
}

Expand All @@ -299,6 +299,6 @@ protected function forkAndRun(
*/
protected function checkRequirements()
{
return (bool)(extension_loaded('pcntl'));
return (bool) extension_loaded('pcntl');
}
}
}
Loading

0 comments on commit 14a71eb

Please sign in to comment.