From dc51d109d36d928c04a4151d1ad65f8ab530043e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nikola=20Ga=C5=A1pari=C4=87?= Date: Fri, 24 May 2024 16:04:22 +0200 Subject: [PATCH] Hotfix 4 (#38) * hotfix Nr.4 * Apply fixes from StyleCI * . * Apply fixes from StyleCI --------- Co-authored-by: StyleCI Bot --- config/stomp.php | 12 +++++++++--- src/Queue/Jobs/StompJob.php | 18 +++++------------- src/Queue/StompQueue.php | 33 ++++++++++++++++++--------------- src/StompServiceProvider.php | 2 -- 4 files changed, 32 insertions(+), 33 deletions(-) diff --git a/config/stomp.php b/config/stomp.php index c37d231..69b2e44 100644 --- a/config/stomp.php +++ b/config/stomp.php @@ -25,10 +25,16 @@ 'auto_tries' => env('STOMP_AUTO_TRIES', true), 'auto_backoff' => env('STOMP_AUTO_BACKOFF', true), + /* + * Will failed job be re-queued ? + * We experienced issues with pushing Jobs back to the topic/queue, so we're turning this OFF + */ + 'fail_job_requeue' => env('STOMP_FAILED_JOB_REQUEUE', false), + /** If all messages should fail on timeout. Set to false in order to revert to default (looking in event payload) */ 'fail_on_timeout' => env('STOMP_FAIL_ON_TIMEOUT', true), /** Maximum time in seconds for job execution. This value must be less than send heartbeat in order to run correctly. */ - 'timeout' => env('STOMP_TIMEOUT', 10), + 'timeout' => env('STOMP_TIMEOUT', 45), /** * Incremental multiplier for failed job redelivery. @@ -70,7 +76,7 @@ /** * Heartbeat which we will be sending to server at given millisecond period. */ - 'send_heartbeat' => env('STOMP_SEND_HEARTBEAT', 20000), + 'send_heartbeat' => env('STOMP_SEND_HEARTBEAT', 50000), /** * Setting consumer-window-size to a value greater than 0 will allow it to receive messages until @@ -78,7 +84,7 @@ * Once that happens the client will not receive any more messages until it sends the appropriate ACK or NACK * frame for the messages it already has. */ - 'consumer_window_size' => env('STOMP_CONSUMER_WIN_SIZE', 819200), + 'consumer_window_size' => env('STOMP_CONSUMER_WIN_SIZE', 8192000), /** * Subscribe mode: auto, client. diff --git a/src/Queue/Jobs/StompJob.php b/src/Queue/Jobs/StompJob.php index 1d01992..67e5f15 100644 --- a/src/Queue/Jobs/StompJob.php +++ b/src/Queue/Jobs/StompJob.php @@ -11,7 +11,6 @@ use Illuminate\Queue\Jobs\JobName; use Illuminate\Support\Arr; use Illuminate\Support\Facades\Event; -use Illuminate\Support\Facades\Log; use Illuminate\Support\Str; use Psr\Log\LoggerInterface; use Stomp\Transport\Frame; @@ -108,7 +107,6 @@ public function fire() { $this->log->info("$this->session [STOMP] Executing event..."); $this->isNativeLaravelJob() ? $this->fireLaravelJob() : $this->fireExternalJob(); - $this->ackIfNecessary(); } protected function isNativeLaravelJob(): bool @@ -202,9 +200,10 @@ public function release($delay = 0) { parent::release($delay); - $payload = $this->createStompPayload($delay); - - $this->stompQueue->pushRaw($payload, $this->queue, []); + if (Config::get('fail_job_requeue')) { + $payload = $this->createStompPayload($delay); + $this->stompQueue->pushRaw($payload, $this->queue, []); + } } protected function createStompPayload(int $delay): Message @@ -245,8 +244,6 @@ protected function getBackoff(int $attempts): int */ protected function failed($e) { - $this->ackIfNecessary(); - // External events don't have failed method to call. if (!$this->payload || !$this->isNativeLaravelJob()) { return; @@ -259,12 +256,7 @@ protected function failed($e) $this->instance->failed($this->payload['data'], $e, $this->payload['uuid']); } } catch (\Exception $e) { - Log::error('Exception in job failing: ' . $e->getMessage()); + $this->log->error('Exception in job failing: ' . $e->getMessage()); } } - - protected function ackIfNecessary() - { - $this->stompQueue->ackLastFrameIfNecessary(); - } } diff --git a/src/Queue/StompQueue.php b/src/Queue/StompQueue.php index 9782dfe..e5fd615 100644 --- a/src/Queue/StompQueue.php +++ b/src/Queue/StompQueue.php @@ -50,7 +50,7 @@ class StompQueue extends Queue implements QueueInterface protected array $subscribedTo = []; protected LoggerInterface $log; - protected static int $circuitBreaker = 0; + protected int $circuitBreaker = 0; protected string $session; protected $_lastFrame = null; @@ -238,7 +238,7 @@ protected function writeToMultipleQueues(array $writeQueues, Message $payload): return $allEventsSent; } - protected function write($queue, Message $payload): bool + protected function write($queue, Message $payload, $tryAgain = true): bool { // This will write all the events received in a single batch, then send disconnect frame try { @@ -247,13 +247,17 @@ protected function write($queue, Message $payload): bool $this->log->info("$this->session [STOMP] Message sent successfully? " . ($sent ? 't' : 'f')); return $sent; - } catch (Exception) { - $this->log->error("$this->session [STOMP] PUSH failed. Reconnecting..."); + } catch (Exception $e) { + $this->log->error("$this->session [STOMP] PUSH failed. Reconnecting... " . $e->getMessage()); $this->reconnect(false); - $this->log->info("$this->session [STOMP] Trying to send again..."); + if ($tryAgain) { + $this->log->info("$this->session [STOMP] Trying to send again..."); + + return $this->write($queue, $payload, false); + } - return $this->write($queue, $payload); + return false; } } @@ -453,25 +457,25 @@ protected function reconnect(bool $subscribe = true) try { $this->client->getClient()->connect(); - $this->log->info("$this->session [STOMP] Reconnected successfully."); } catch (Exception $e) { - self::$circuitBreaker++; - $cb = self::$circuitBreaker; + $this->circuitBreaker++; - $this->log->error("$this->session [STOMP] Failed reconnecting (tries: $cb), retrying in 2s..." . print_r($e->getMessage(), true)); + $this->log->error("$this->session [STOMP] Failed reconnecting (tries: {$this->circuitBreaker}), + retrying..." . print_r($e->getMessage(), true)); - if (self::$circuitBreaker <= 5) { + if ($this->circuitBreaker <= 5) { + usleep(100); $this->reconnect($subscribe); } - $this->log->error("$this->session [STOMP] Circuit breaker executed after $cb tries, exiting."); + $this->log->error("$this->session [STOMP] Circuit breaker executed after {$this->circuitBreaker} tries, exiting."); return; } // By this point it should be connected, so it is safe to subscribe - if ($subscribe) { + if ($this->client->getClient()->isConnected() && $subscribe) { $this->log->info("$this->session [STOMP] Connected, subscribing..."); $this->subscribedTo = []; $this->subscribeToQueues(); @@ -486,7 +490,6 @@ public function disconnect() try { $this->ackLastFrameIfNecessary(); - $this->log->info("$this->session [STOMP] Disconnecting..."); $this->client->getClient()->disconnect(); } catch (Exception $e) { @@ -503,7 +506,7 @@ protected function subscribeToQueues(): void continue; } - $winSize = Config::get('consumer_window_size') ?? 819200; + $winSize = Config::get('consumer_window_size') ?: 8192000; if ($this->_ackMode != self::ACK_MODE_CLIENT) { $winSize = -1; } diff --git a/src/StompServiceProvider.php b/src/StompServiceProvider.php index 0de5ff9..6750e2d 100644 --- a/src/StompServiceProvider.php +++ b/src/StompServiceProvider.php @@ -21,7 +21,6 @@ class StompServiceProvider extends ServiceProvider public function register() { $this->mergeConfigFrom(__DIR__ . '/../config/asseco-stomp.php', 'asseco-stomp'); - $this->mergeConfigFrom(__DIR__ . '/../config/stomp.php', 'queue.connections.stomp'); } @@ -35,7 +34,6 @@ public function boot() app()->singleton(Config::class); app()->singleton(ConnectionWrapper::class); app()->singleton(ClientWrapper::class); - app()->singleton(StompQueue::class); /** @var QueueManager $queue */