From 15fd1c14153909206f56c2840f5395822392a959 Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Tue, 29 Apr 2025 14:35:35 +1200 Subject: [PATCH 1/4] Add hook based worker stop callback --- src/Queue/Server.php | 47 +++++++++++++++++--------------------------- 1 file changed, 18 insertions(+), 29 deletions(-) diff --git a/src/Queue/Server.php b/src/Queue/Server.php index a465676..4c1ce54 100644 --- a/src/Queue/Server.php +++ b/src/Queue/Server.php @@ -51,7 +51,12 @@ class Server /** * Hook that is called when worker starts */ - protected Hook $workerStartHook; + protected ?Hook $workerStartHook = null; + + /** + * Hook that is called when worker stops + */ + protected ?Hook $workerStopHook = null; /** * @var array @@ -287,6 +292,12 @@ function (?Message $message, Throwable $th) { } }); + if (!\is_null($this->workerStartHook)) { + $this->adapter->workerStop(function () { + $this->workerStopHook->getAction()(...$this->getArguments($this->workerStopHook)); + }); + } + $this->adapter->start(); } catch (Throwable $error) { self::setResource('error', fn () => $error); @@ -309,38 +320,16 @@ public function workerStart(): Hook return $hook; } - /** - * Returns Worker starts hook. - * @return Hook - */ - public function getWorkerStart(): Hook - { - return $this->workerStartHook; - } - /** * Is called when a Worker stops. - * @param callable|null $callback - * @return self - * @throws Exception + * @return Hook */ - public function workerStop(?callable $callback = null): self + public function workerStop(): Hook { - try { - $this->adapter->workerStop(function (string $workerId) use ($callback) { - Console::success("[Worker] Worker {$workerId} is ready!"); - if (!is_null($callback)) { - call_user_func($callback); - } - }); - } catch (Throwable $error) { - self::setResource('error', fn () => $error); - foreach ($this->errorHooks as $hook) { - call_user_func_array($hook->getAction(), $this->getArguments($hook)); - } - } - - return $this; + $hook = new Hook(); + $hook->groups(['*']); + $this->workerStopHook = $hook; + return $hook; } /** From 71ce64a70a2d4ec3294d281bde0b6b37021c0f45 Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Tue, 29 Apr 2025 14:36:44 +1200 Subject: [PATCH 2/4] Fix wrong event name --- src/Queue/Adapter/Swoole.php | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/Queue/Adapter/Swoole.php b/src/Queue/Adapter/Swoole.php index fef30da..9e0ccfe 100644 --- a/src/Queue/Adapter/Swoole.php +++ b/src/Queue/Adapter/Swoole.php @@ -2,6 +2,7 @@ namespace Utopia\Queue\Adapter; +use Swoole\Constant; use Swoole\Process\Pool; use Utopia\Queue\Adapter; use Utopia\Queue\Consumer; @@ -33,8 +34,8 @@ public function stop(): self public function workerStart(callable $callback): self { - $this->pool->on('WorkerStart', function (Pool $pool, string $workerId) use ($callback) { - call_user_func($callback, $workerId); + $this->pool->on(Constant::EVENT_WORKER_START, function (Pool $pool, string $workerId) use ($callback) { + $callback($workerId); }); return $this; @@ -42,8 +43,8 @@ public function workerStart(callable $callback): self public function workerStop(callable $callback): self { - $this->pool->on('WorkerStart', function (Pool $pool, string $workerId) use ($callback) { - call_user_func($callback, $workerId); + $this->pool->on(Constant::EVENT_WORKER_STOP, function (Pool $pool, string $workerId) use ($callback) { + $callback($workerId); }); return $this; From 9788e9bb5851d3303314c16403590f2538579d6a Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Tue, 29 Apr 2025 14:36:52 +1200 Subject: [PATCH 3/4] Add test --- tests/Queue/servers/Swoole/worker.php | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/Queue/servers/Swoole/worker.php b/tests/Queue/servers/Swoole/worker.php index 3645a1d..02b9ef3 100644 --- a/tests/Queue/servers/Swoole/worker.php +++ b/tests/Queue/servers/Swoole/worker.php @@ -30,4 +30,10 @@ echo "Worker Started" . PHP_EOL; }); +$server + ->workerStop() + ->action(function () { + echo "Worker Stopped" . PHP_EOL; + }); + $server->start(); From 6af664f60474b20d6ef10b3d66349a0bf8e81f68 Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Tue, 29 Apr 2025 14:46:07 +1200 Subject: [PATCH 4/4] Call error hooks on worker start/stop failure --- src/Queue/Adapter/Swoole.php | 8 ++++++-- src/Queue/Server.php | 23 +++++++++++++++++++++-- 2 files changed, 27 insertions(+), 4 deletions(-) diff --git a/src/Queue/Adapter/Swoole.php b/src/Queue/Adapter/Swoole.php index 9e0ccfe..6d79d88 100644 --- a/src/Queue/Adapter/Swoole.php +++ b/src/Queue/Adapter/Swoole.php @@ -11,8 +11,12 @@ class Swoole extends Adapter { protected Pool $pool; - public function __construct(Consumer $consumer, int $workerNum, string $queue, string $namespace = 'utopia-queue') - { + public function __construct( + Consumer $consumer, + int $workerNum, + string $queue, + string $namespace = 'utopia-queue' + ) { parent::__construct($workerNum, $queue, $namespace); $this->consumer = $consumer; diff --git a/src/Queue/Server.php b/src/Queue/Server.php index 4c1ce54..ab96c08 100644 --- a/src/Queue/Server.php +++ b/src/Queue/Server.php @@ -218,8 +218,19 @@ public function start(): self $this->adapter->workerStart(function (string $workerId) { Console::success("[Worker] Worker {$workerId} is ready!"); self::setResource('workerId', fn () => $workerId); + if (!is_null($this->workerStartHook)) { - call_user_func_array($this->workerStartHook->getAction(), $this->getArguments($this->workerStartHook)); + try { + $this->workerStartHook->getAction()(...$this->getArguments($this->workerStartHook)); + } catch (Throwable $error) { + self::setResource('error', fn () => $error); + + foreach ($this->errorHooks as $hook) { + ($hook->getAction())($this->getArguments($hook)); + } + + return; + } } while (true) { @@ -294,7 +305,15 @@ function (?Message $message, Throwable $th) { if (!\is_null($this->workerStartHook)) { $this->adapter->workerStop(function () { - $this->workerStopHook->getAction()(...$this->getArguments($this->workerStopHook)); + try { + $this->workerStopHook->getAction()(...$this->getArguments($this->workerStopHook)); + } catch (Throwable $error) { + self::setResource('error', fn () => $error); + + foreach ($this->errorHooks as $hook) { + $hook->getAction()($this->getArguments($hook)); + } + } }); }