From ae3f767d3d9f6804c934a94888f65cda5100ae07 Mon Sep 17 00:00:00 2001 From: ngasparic Date: Tue, 4 Jun 2024 16:30:56 +0200 Subject: [PATCH 1/5] option to define queue name Option to define queue name to listen; artisan queue:work --queue= --- README.md | 9 +- config/asseco-stomp.php | 4 + config/stomp.php | 15 ++- ...28_220001_create_stomp_event_log_table.php | 42 ++++++++ src/Queue/Jobs/StompJob.php | 4 +- src/Queue/Stomp/Config.php | 8 ++ src/Queue/StompQueue.php | 100 ++++++++++++++---- src/StompServiceProvider.php | 9 ++ 8 files changed, 166 insertions(+), 25 deletions(-) create mode 100644 migrations/2024_05_28_220001_create_stomp_event_log_table.php diff --git a/README.md b/README.md index 4af16fe..cef739b 100644 --- a/README.md +++ b/README.md @@ -46,7 +46,14 @@ Subscribing with client acknowledgement option (ENV variables): ``` STOMP_CONSUMER_WIN_SIZE=819200 // number of bytes that Broker will send to client before it expects ACK -STOMP_CONSUMER_ACK_MODE=client // mode: client (ACK needs to be sent) | auto (no ACK, and window-size has to be -1 in that case) +STOMP_CONSUMER_ACK_MODE=auto // mode: client (ACK needs to be sent) | auto (no ACK, and window-size has to be -1 in that case) +``` + +Options for Laravel worker: + +``` +STOMP_CONSUMER_ALL_QUEUES = default; // which queue name(s) represent that all queues from Config should be read +STOMP_READ_MESSAGE_DB_LOG = false // write POP-ed events in DB table `stomp_event_logs` ``` You can see all other available ``.env`` variables, their defaults and usage explanation within diff --git a/config/asseco-stomp.php b/config/asseco-stomp.php index 8182ae7..aebecb1 100644 --- a/config/asseco-stomp.php +++ b/config/asseco-stomp.php @@ -4,4 +4,8 @@ return [ 'log_manager' => LogManager::class, + + 'migrations' => [ + 'run' => true, + ] ]; diff --git a/config/stomp.php b/config/stomp.php index 69b2e44..f55f14f 100644 --- a/config/stomp.php +++ b/config/stomp.php @@ -33,7 +33,10 @@ /** If all messages should fail on timeout. Set to false in order to revert to default (looking in event payload) */ 'fail_on_timeout' => env('STOMP_FAIL_ON_TIMEOUT', true), - /** Maximum time in seconds for job execution. This value must be less than send heartbeat in order to run correctly. */ + + /** + * Maximum time in seconds for job execution. This value must be less than send heartbeat in order to run correctly. + */ 'timeout' => env('STOMP_TIMEOUT', 45), /** @@ -56,6 +59,8 @@ */ 'default_queue' => env('STOMP_DEFAULT_QUEUE'), + 'enable_read_events_DB_logs' => env('STOMP_READ_MESSAGE_DB_LOG', false) === true, + /** * Use Laravel logger for outputting logs. */ @@ -89,7 +94,13 @@ /** * Subscribe mode: auto, client. */ - 'consumer_ack_mode' => env('STOMP_CONSUMER_ACK_MODE', 'client'), + 'consumer_ack_mode' => env('STOMP_CONSUMER_ACK_MODE', 'auto'), + + /** + * Queue name(s) that represent that all queues should be read + * If no queue is specified, Laravel puts 'default' - so this should be entered here + */ + 'worker_queue_name_all' => explode(';', env('STOMP_CONSUMER_ALL_QUEUES', 'default;')), /** * Array of supported versions. diff --git a/migrations/2024_05_28_220001_create_stomp_event_log_table.php b/migrations/2024_05_28_220001_create_stomp_event_log_table.php new file mode 100644 index 0000000..9707f99 --- /dev/null +++ b/migrations/2024_05_28_220001_create_stomp_event_log_table.php @@ -0,0 +1,42 @@ +id(); + $table->string('session_id')->nullable(); + $table->string('queue_name')->nullable(); + $table->string('subscription_id')->nullable(); + $table->string('message_id')->nullable(); + + $table->text('payload')->nullable(); + + $table->timestamp('created_at')->useCurrent(); + + }); + + } + + /** + * Reverse the migrations. + * + * @return void + */ + public function down() + { + Schema::dropIfExists('stomp_event_logs'); + } + +}; diff --git a/src/Queue/Jobs/StompJob.php b/src/Queue/Jobs/StompJob.php index 67e5f15..501602c 100644 --- a/src/Queue/Jobs/StompJob.php +++ b/src/Queue/Jobs/StompJob.php @@ -137,7 +137,7 @@ protected function fireLaravelJob(): void { if ($this->laravelJobClassExists()) { [$class, $method] = JobName::parse($this->payload['job']); - ($this->instance = $this->resolve($class))->{$method}($this, $this->payload['data']); + ($this->instance = $this->resolve($class))->{$method}($this, $this->payload['data'] ?? []); } else { $this->log->error("$this->session [STOMP] Laravel job class does not exist!"); } @@ -253,7 +253,7 @@ protected function failed($e) try { if (method_exists($this->instance = $this->resolve($class), 'failed')) { - $this->instance->failed($this->payload['data'], $e, $this->payload['uuid']); + $this->instance->failed($this->payload['data'] ?? [], $e, $this->payload['uuid']); } } catch (\Exception $e) { $this->log->error('Exception in job failing: ' . $e->getMessage()); diff --git a/src/Queue/Stomp/Config.php b/src/Queue/Stomp/Config.php index 6108d32..6165c63 100644 --- a/src/Queue/Stomp/Config.php +++ b/src/Queue/Stomp/Config.php @@ -32,4 +32,12 @@ protected static function appName(): string { return Str::snake(config('app.name', 'localhost')); } + + public static function queueNamesForProcessAllQueues() { + return self::get('worker_queue_name_all'); + } + + public static function shouldReadMessagesBeLoggedToDB() { + return self::get('enable_read_events_DB_logs'); + } } diff --git a/src/Queue/StompQueue.php b/src/Queue/StompQueue.php index e5fd615..7f5d709 100644 --- a/src/Queue/StompQueue.php +++ b/src/Queue/StompQueue.php @@ -18,6 +18,7 @@ use Illuminate\Queue\InvalidPayloadException; use Illuminate\Queue\Queue; use Illuminate\Support\Arr; +use Illuminate\Support\Facades\DB; use Illuminate\Support\Str; use Psr\Log\LoggerInterface; use Stomp\Exception\ConnectionException; @@ -53,8 +54,13 @@ class StompQueue extends Queue implements QueueInterface protected int $circuitBreaker = 0; protected string $session; + /** @var null|Frame */ protected $_lastFrame = null; - protected $_ackMode = 'client'; + + protected string $_ackMode = 'client'; + + protected array $_queueNamesForProcessAllQueues = ['']; + protected bool $_customReadQueusDefined = false; public function __construct(ClientWrapper $stompClient) { @@ -66,25 +72,30 @@ public function __construct(ClientWrapper $stompClient) $this->session = $this->client->getClient()->getSessionId(); $this->_ackMode = strtolower(Config::get('consumer_ack_mode') ?? 'client'); + + // specify which queue names should be considered as "All queues from Config" + // "default" & "" + $this->_queueNamesForProcessAllQueues = Config::queueNamesForProcessAllQueues(); + $this->_readMessagesLogToDb = Config::shouldReadMessagesBeLoggedToDB(); } /** * Append queue name to topic/address to avoid random hashes in broker. * + * @param string|null $queuesString * @return array */ - protected function setReadQueues(): array + protected function setReadQueues( ?string $queuesString = ''): array { - $queues = $this->parseQueues(Config::readQueues()); + $queuesString = $queuesString ?: Config::readQueues(); + $queues = $this->parseQueues($queuesString); foreach ($queues as &$queue) { $default = Config::defaultQueue(); - if (!str_contains($queue, self::AMQ_QUEUE_SEPARATOR)) { $queue .= self::AMQ_QUEUE_SEPARATOR . $default . '_' . substr(Str::uuid(), -5); continue; } - if (Config::get('prepend_queues')) { $topic = Str::before($queue, self::AMQ_QUEUE_SEPARATOR); $queueName = Str::after($queue, self::AMQ_QUEUE_SEPARATOR); @@ -216,9 +227,9 @@ protected function writeToMultipleQueues(array $writeQueues, Message $payload): * @var $payload Message */ $this->log->info("$this->session [STOMP] Pushing stomp payload to queue: " . print_r([ - 'body' => $payload->getBody(), - 'headers' => $payload->getHeaders(), - 'queue' => $writeQueues, + 'body' => $payload->getBody(), + 'headers' => $payload->getHeaders(), + 'queues' => $writeQueues, ], true)); $allEventsSent = true; @@ -253,7 +264,6 @@ protected function write($queue, Message $payload, $tryAgain = true): bool if ($tryAgain) { $this->log->info("$this->session [STOMP] Trying to send again..."); - return $this->write($queue, $payload, false); } @@ -349,6 +359,9 @@ protected function hasEvent($job): bool */ public function pop($queue = null) { + + $this->setReadQueuesForWorker( $queue ); + $this->ackLastFrameIfNecessary(); $frame = $this->read($queue); @@ -366,21 +379,20 @@ public function pop($queue = null) $queueFromFrame = $this->getQueueFromFrame($frame); if (!$queueFromFrame) { - $this->log->error("$this->session [STOMP] Wrong frame received. Expected MESSAGE, got: " . print_r($frame, true)); + $this->log->warning("$this->session [STOMP] Wrong frame received. Expected MESSAGE, got: " . print_r($frame, true)); $this->_lastFrame = null; - return null; } $this->_lastFrame = $frame; + $this->writeMessageToDBIfNeeded( $frame, $queueFromFrame ); + return new StompJob($this->container, $this, $frame, $queueFromFrame); } protected function read($queue) { - // This will read from queue, then push on same session ID if there are events following, then delete event which was read - // If job fails, it will be re-pushed on same session ID but with additional headers for redelivery try { $this->log->info("$this->session [STOMP] POP"); @@ -457,7 +469,11 @@ protected function reconnect(bool $subscribe = true) try { $this->client->getClient()->connect(); + $newSessionId = $this->client->getClient()->getSessionId(); + $this->log->info("$this->session [STOMP] Reconnected successfully."); + $this->log->info("$this->session [STOMP] Switching session to: $newSessionId"); + $this->session = $newSessionId; } catch (Exception $e) { $this->circuitBreaker++; @@ -475,7 +491,7 @@ protected function reconnect(bool $subscribe = true) } // By this point it should be connected, so it is safe to subscribe - if ($this->client->getClient()->isConnected() && $subscribe) { + if ($subscribe && $this->client->getClient()->isConnected()) { $this->log->info("$this->session [STOMP] Connected, subscribing..."); $this->subscribedTo = []; $this->subscribeToQueues(); @@ -497,8 +513,18 @@ public function disconnect() } } + /** + * Subscribe to queues + * @return void + */ protected function subscribeToQueues(): void { + $winSize = Config::get('consumer_window_size') ?: 8192000; + if ($this->_ackMode != self::ACK_MODE_CLIENT) { + // New Artemis version can't work without this as it will consume only first message otherwise. + $winSize = -1; + } + foreach ($this->readQueues as $queue) { $alreadySubscribed = in_array($queue, $this->subscribedTo); @@ -506,14 +532,9 @@ protected function subscribeToQueues(): void continue; } - $winSize = Config::get('consumer_window_size') ?: 8192000; - if ($this->_ackMode != self::ACK_MODE_CLIENT) { - $winSize = -1; - } + $this->log->info("$this->session [STOMP] subscribeToQueue `$queue` with ack-mode: {$this->_ackMode} & window-size: $winSize"); $this->client->subscribe($queue, null, $this->_ackMode, [ - // New Artemis version can't work without this as it will consume only first message otherwise. - //'consumer-window-size' => '-1', // we can define this if we are using ack mode = client 'consumer-window-size' => (string) $winSize, ]); @@ -529,9 +550,48 @@ protected function subscribeToQueues(): void */ public function ackLastFrameIfNecessary() { + if ($this->_ackMode == self::ACK_MODE_CLIENT && $this->_lastFrame) { + $this->log->debug("$this->session [STOMP] ACK-ing last frame. Msg #" . $this->_lastFrame->getMessageId()); $this->client->ack($this->_lastFrame); $this->_lastFrame = null; } } + + /** + * Set read queues for queue worker, if queue parameter is defined + * > php artisan queue:work --queue=eloquent::live30 + * @param $queue + * @return void + */ + protected function setReadQueuesForWorker( $queue ) { + + if ($this->_customReadQueusDefined) { + // already setup + return; + } + + $queue = (string)$queue; + if (!in_array( $queue, $this->_queueNamesForProcessAllQueues)) { + // one or more queue + $this->readQueues = $this->setReadQueues( $queue ); + } + + $this->_customReadQueusDefined = true; + } + + protected function writeMessageToDBIfNeeded(Frame $frame, $queueFromFrame) { + if ($this->_readMessagesLogToDb) { + DB::table('stomp_event_logs')->insert( + [ + 'session_id' => $this->session, + 'queue_name' => $queueFromFrame, + 'subscription_id' => $frame['subscription'], + 'message_id' => $frame->getMessageId(), + 'payload' => print_r($frame, true), + 'created_at' => date('Y-m-d H:i:s.u'), + ] + ); + } + } } diff --git a/src/StompServiceProvider.php b/src/StompServiceProvider.php index 6750e2d..31f48d7 100644 --- a/src/StompServiceProvider.php +++ b/src/StompServiceProvider.php @@ -50,5 +50,14 @@ public function boot() return $logsEnabled ? new $logManager($app) : new NullLogger(); }); + + + if (config('asseco-stomp.migrations.run')) { + $this->loadMigrationsFrom(__DIR__ . '/../migrations'); + } + + $this->publishes([ + __DIR__ . '/../migrations' => database_path('migrations'), + ], 'asseco-stomp'); } } From d7f830b6f5934d596ef313e1c0a03e20da226a36 Mon Sep 17 00:00:00 2001 From: StyleCI Bot Date: Tue, 4 Jun 2024 14:31:14 +0000 Subject: [PATCH 2/5] Apply fixes from StyleCI --- config/asseco-stomp.php | 2 +- config/stomp.php | 2 +- ...28_220001_create_stomp_event_log_table.php | 4 -- src/Queue/Stomp/Config.php | 6 ++- src/Queue/StompQueue.php | 51 ++++++++++--------- src/StompServiceProvider.php | 1 - 6 files changed, 33 insertions(+), 33 deletions(-) diff --git a/config/asseco-stomp.php b/config/asseco-stomp.php index aebecb1..39241ba 100644 --- a/config/asseco-stomp.php +++ b/config/asseco-stomp.php @@ -7,5 +7,5 @@ 'migrations' => [ 'run' => true, - ] + ], ]; diff --git a/config/stomp.php b/config/stomp.php index f55f14f..c95f5e3 100644 --- a/config/stomp.php +++ b/config/stomp.php @@ -98,7 +98,7 @@ /** * Queue name(s) that represent that all queues should be read - * If no queue is specified, Laravel puts 'default' - so this should be entered here + * If no queue is specified, Laravel puts 'default' - so this should be entered here. */ 'worker_queue_name_all' => explode(';', env('STOMP_CONSUMER_ALL_QUEUES', 'default;')), diff --git a/migrations/2024_05_28_220001_create_stomp_event_log_table.php b/migrations/2024_05_28_220001_create_stomp_event_log_table.php index 9707f99..8289e42 100644 --- a/migrations/2024_05_28_220001_create_stomp_event_log_table.php +++ b/migrations/2024_05_28_220001_create_stomp_event_log_table.php @@ -2,7 +2,6 @@ use Illuminate\Database\Migrations\Migration; use Illuminate\Database\Schema\Blueprint; -use Illuminate\Support\Facades\DB; use Illuminate\Support\Facades\Schema; return new class extends Migration @@ -24,9 +23,7 @@ public function up() $table->text('payload')->nullable(); $table->timestamp('created_at')->useCurrent(); - }); - } /** @@ -38,5 +35,4 @@ public function down() { Schema::dropIfExists('stomp_event_logs'); } - }; diff --git a/src/Queue/Stomp/Config.php b/src/Queue/Stomp/Config.php index 6165c63..e980fa6 100644 --- a/src/Queue/Stomp/Config.php +++ b/src/Queue/Stomp/Config.php @@ -33,11 +33,13 @@ protected static function appName(): string return Str::snake(config('app.name', 'localhost')); } - public static function queueNamesForProcessAllQueues() { + public static function queueNamesForProcessAllQueues() + { return self::get('worker_queue_name_all'); } - public static function shouldReadMessagesBeLoggedToDB() { + public static function shouldReadMessagesBeLoggedToDB() + { return self::get('enable_read_events_DB_logs'); } } diff --git a/src/Queue/StompQueue.php b/src/Queue/StompQueue.php index 7f5d709..6e84254 100644 --- a/src/Queue/StompQueue.php +++ b/src/Queue/StompQueue.php @@ -82,10 +82,10 @@ public function __construct(ClientWrapper $stompClient) /** * Append queue name to topic/address to avoid random hashes in broker. * - * @param string|null $queuesString + * @param string|null $queuesString * @return array */ - protected function setReadQueues( ?string $queuesString = ''): array + protected function setReadQueues(?string $queuesString = ''): array { $queuesString = $queuesString ?: Config::readQueues(); $queues = $this->parseQueues($queuesString); @@ -227,9 +227,9 @@ protected function writeToMultipleQueues(array $writeQueues, Message $payload): * @var $payload Message */ $this->log->info("$this->session [STOMP] Pushing stomp payload to queue: " . print_r([ - 'body' => $payload->getBody(), - 'headers' => $payload->getHeaders(), - 'queues' => $writeQueues, + 'body' => $payload->getBody(), + 'headers' => $payload->getHeaders(), + 'queues' => $writeQueues, ], true)); $allEventsSent = true; @@ -264,6 +264,7 @@ protected function write($queue, Message $payload, $tryAgain = true): bool if ($tryAgain) { $this->log->info("$this->session [STOMP] Trying to send again..."); + return $this->write($queue, $payload, false); } @@ -359,8 +360,7 @@ protected function hasEvent($job): bool */ public function pop($queue = null) { - - $this->setReadQueuesForWorker( $queue ); + $this->setReadQueuesForWorker($queue); $this->ackLastFrameIfNecessary(); @@ -381,12 +381,13 @@ public function pop($queue = null) if (!$queueFromFrame) { $this->log->warning("$this->session [STOMP] Wrong frame received. Expected MESSAGE, got: " . print_r($frame, true)); $this->_lastFrame = null; + return null; } $this->_lastFrame = $frame; - $this->writeMessageToDBIfNeeded( $frame, $queueFromFrame ); + $this->writeMessageToDBIfNeeded($frame, $queueFromFrame); return new StompJob($this->container, $this, $frame, $queueFromFrame); } @@ -514,7 +515,8 @@ public function disconnect() } /** - * Subscribe to queues + * Subscribe to queues. + * * @return void */ protected function subscribeToQueues(): void @@ -550,7 +552,6 @@ protected function subscribeToQueues(): void */ public function ackLastFrameIfNecessary() { - if ($this->_ackMode == self::ACK_MODE_CLIENT && $this->_lastFrame) { $this->log->debug("$this->session [STOMP] ACK-ing last frame. Msg #" . $this->_lastFrame->getMessageId()); $this->client->ack($this->_lastFrame); @@ -560,36 +561,38 @@ public function ackLastFrameIfNecessary() /** * Set read queues for queue worker, if queue parameter is defined - * > php artisan queue:work --queue=eloquent::live30 - * @param $queue + * > php artisan queue:work --queue=eloquent::live30. + * + * @param $queue * @return void */ - protected function setReadQueuesForWorker( $queue ) { - + protected function setReadQueuesForWorker($queue) + { if ($this->_customReadQueusDefined) { // already setup return; } - $queue = (string)$queue; - if (!in_array( $queue, $this->_queueNamesForProcessAllQueues)) { + $queue = (string) $queue; + if (!in_array($queue, $this->_queueNamesForProcessAllQueues)) { // one or more queue - $this->readQueues = $this->setReadQueues( $queue ); + $this->readQueues = $this->setReadQueues($queue); } $this->_customReadQueusDefined = true; } - protected function writeMessageToDBIfNeeded(Frame $frame, $queueFromFrame) { + protected function writeMessageToDBIfNeeded(Frame $frame, $queueFromFrame) + { if ($this->_readMessagesLogToDb) { DB::table('stomp_event_logs')->insert( [ - 'session_id' => $this->session, - 'queue_name' => $queueFromFrame, - 'subscription_id' => $frame['subscription'], - 'message_id' => $frame->getMessageId(), - 'payload' => print_r($frame, true), - 'created_at' => date('Y-m-d H:i:s.u'), + 'session_id' => $this->session, + 'queue_name' => $queueFromFrame, + 'subscription_id' => $frame['subscription'], + 'message_id' => $frame->getMessageId(), + 'payload' => print_r($frame, true), + 'created_at' => date('Y-m-d H:i:s.u'), ] ); } diff --git a/src/StompServiceProvider.php b/src/StompServiceProvider.php index 31f48d7..ec77a68 100644 --- a/src/StompServiceProvider.php +++ b/src/StompServiceProvider.php @@ -51,7 +51,6 @@ public function boot() return $logsEnabled ? new $logManager($app) : new NullLogger(); }); - if (config('asseco-stomp.migrations.run')) { $this->loadMigrationsFrom(__DIR__ . '/../migrations'); } From 571975e8a13e0f0c4cb9894143b9edcc133341f9 Mon Sep 17 00:00:00 2001 From: ngasparic Date: Tue, 4 Jun 2024 16:35:11 +0200 Subject: [PATCH 3/5] . --- src/Queue/StompQueue.php | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/Queue/StompQueue.php b/src/Queue/StompQueue.php index 6e84254..22dc724 100644 --- a/src/Queue/StompQueue.php +++ b/src/Queue/StompQueue.php @@ -62,6 +62,8 @@ class StompQueue extends Queue implements QueueInterface protected array $_queueNamesForProcessAllQueues = ['']; protected bool $_customReadQueusDefined = false; + protected bool $_readMessagesLogToDb = false; + public function __construct(ClientWrapper $stompClient) { $this->readQueues = $this->setReadQueues(); From 73aba3548b54cb9f01e6b5459cd91ed5a6877e0f Mon Sep 17 00:00:00 2001 From: ngasparic Date: Wed, 5 Jun 2024 08:31:20 +0200 Subject: [PATCH 4/5] auto --- src/Queue/StompQueue.php | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/Queue/StompQueue.php b/src/Queue/StompQueue.php index 22dc724..3fca0d3 100644 --- a/src/Queue/StompQueue.php +++ b/src/Queue/StompQueue.php @@ -33,7 +33,8 @@ class StompQueue extends Queue implements QueueInterface const CORRELATION = 'X-Correlation-ID'; - const ACK_MODE_CLIENT = 'client'; + const ACK_MODE_CLIENT = 'client'; + const ACK_MODE_AUTO = 'auto'; /** * Stomp instance from stomp-php repo. @@ -57,7 +58,7 @@ class StompQueue extends Queue implements QueueInterface /** @var null|Frame */ protected $_lastFrame = null; - protected string $_ackMode = 'client'; + protected string $_ackMode = ''; protected array $_queueNamesForProcessAllQueues = ['']; protected bool $_customReadQueusDefined = false; @@ -73,7 +74,7 @@ public function __construct(ClientWrapper $stompClient) $this->session = $this->client->getClient()->getSessionId(); - $this->_ackMode = strtolower(Config::get('consumer_ack_mode') ?? 'client'); + $this->_ackMode = strtolower(Config::get('consumer_ack_mode') ?? self::ACK_MODE_AUTO); // specify which queue names should be considered as "All queues from Config" // "default" & "" @@ -523,7 +524,7 @@ public function disconnect() */ protected function subscribeToQueues(): void { - $winSize = Config::get('consumer_window_size') ?: 8192000; + $winSize = Config::get('consumer_window_size'); if ($this->_ackMode != self::ACK_MODE_CLIENT) { // New Artemis version can't work without this as it will consume only first message otherwise. $winSize = -1; From 08010913cec020c8bdeaa2eb953f8c91819117f5 Mon Sep 17 00:00:00 2001 From: StyleCI Bot Date: Wed, 5 Jun 2024 06:31:39 +0000 Subject: [PATCH 5/5] Apply fixes from StyleCI --- src/Queue/StompQueue.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Queue/StompQueue.php b/src/Queue/StompQueue.php index 3fca0d3..752b13c 100644 --- a/src/Queue/StompQueue.php +++ b/src/Queue/StompQueue.php @@ -33,8 +33,8 @@ class StompQueue extends Queue implements QueueInterface const CORRELATION = 'X-Correlation-ID'; - const ACK_MODE_CLIENT = 'client'; - const ACK_MODE_AUTO = 'auto'; + const ACK_MODE_CLIENT = 'client'; + const ACK_MODE_AUTO = 'auto'; /** * Stomp instance from stomp-php repo.