Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client ACK #34

Merged
merged 11 commits into from
May 14, 2024
38 changes: 19 additions & 19 deletions config/stomp.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,31 @@

return [

'driver' => 'stomp',
'read_queues' => env('STOMP_READ_QUEUES'),
'write_queues' => env('STOMP_WRITE_QUEUES'),
'protocol' => env('STOMP_PROTOCOL', 'tcp'),
'host' => env('STOMP_HOST', '127.0.0.1'),
'port' => env('STOMP_PORT', 61613),
'username' => env('STOMP_USERNAME', 'admin'),
'password' => env('STOMP_PASSWORD', 'admin'),
'driver' => 'stomp',
'read_queues' => env('STOMP_READ_QUEUES'),
'write_queues' => env('STOMP_WRITE_QUEUES'),
'protocol' => env('STOMP_PROTOCOL', 'tcp'),
'host' => env('STOMP_HOST', '127.0.0.1'),
'port' => env('STOMP_PORT', 61613),
'username' => env('STOMP_USERNAME', 'admin'),
'password' => env('STOMP_PASSWORD', 'admin'),

/**
* Set to "horizon" if you wish to use Laravel Horizon.
*/
'worker' => env('STOMP_WORKER', 'default'),
'worker' => env('STOMP_WORKER', 'default'),

/**
* Calculate tries and backoff automatically without the need to specify it
* in the queue work command.
*/
'auto_tries' => env('STOMP_AUTO_TRIES', true),
'auto_backoff' => env('STOMP_AUTO_BACKOFF', true),
'auto_tries' => env('STOMP_AUTO_TRIES', true),
'auto_backoff' => env('STOMP_AUTO_BACKOFF', true),

/** If all messages should fail on timeout. Set to false in order to revert to default (looking in event payload) */
'fail_on_timeout' => env('STOMP_FAIL_ON_TIMEOUT', true),
'fail_on_timeout' => env('STOMP_FAIL_ON_TIMEOUT', true),
/** Maximum time in seconds for job execution. This value must be less than send heartbeat in order to run correctly. */
'timeout' => env('STOMP_TIMEOUT', 10),
'timeout' => env('STOMP_TIMEOUT', 10),

/**
* Incremental multiplier for failed job redelivery.
Expand All @@ -48,32 +48,32 @@
* hash as queue name. In case of multiple services connecting in such
* a way, it becomes unclear which queue is from which service.
*/
'default_queue' => env('STOMP_DEFAULT_QUEUE'),
'default_queue' => env('STOMP_DEFAULT_QUEUE'),

/**
* Use Laravel logger for outputting logs.
*/
'enable_logs' => env('STOMP_LOGS', false) === true,
'enable_logs' => env('STOMP_LOGS', false) === true,

/**
* Should the read queues be prepended. Useful for i.e. Artemis where queue
* name is unique across whole broker instance. This will thus add some
* uniqueness to the queues.
*/
'prepend_queues' => true,
'prepend_queues' => true,

/**
* Heartbeat which will be requested from server at given millisecond period.
*/
'receive_heartbeat' => env('STOMP_RECEIVE_HEARTBEAT', 0),
'receive_heartbeat' => env('STOMP_RECEIVE_HEARTBEAT', 0),

/**
* Heartbeat which we will be sending to server at given millisecond period.
*/
'send_heartbeat' => env('STOMP_SEND_HEARTBEAT', 20000),
'send_heartbeat' => env('STOMP_SEND_HEARTBEAT', 20000),

/**
* Array of supported versions.
*/
'version' => [Version::VERSION_1_2],
'version' => [Version::VERSION_1_2],
];
2 changes: 1 addition & 1 deletion src/Queue/Jobs/StompJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ protected function getSubscriptionName(): string
public function delete()
{
$this->log->info("$this->session [STOMP] Deleting a message from queue: " . print_r([
'queue' => $this->queue,
'queue' => $this->queue,
'message' => $this->frame,
], true));

Expand Down
23 changes: 19 additions & 4 deletions src/Queue/StompQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ class StompQueue extends Queue implements QueueInterface
protected static int $circuitBreaker = 0;
protected string $session;

protected $_lastFrame = null;

public function __construct(ClientWrapper $stompClient)
{
$this->readQueues = $this->setReadQueues();
Expand Down Expand Up @@ -208,9 +210,9 @@ protected function writeToMultipleQueues(array $writeQueues, Message $payload):
* @var $payload Message
*/
$this->log->info("$this->session [STOMP] Pushing stomp payload to queue: " . print_r([
'body' => $payload->getBody(),
'body' => $payload->getBody(),
'headers' => $payload->getHeaders(),
'queue' => $writeQueues,
'queue' => $writeQueues,
], true));

$allEventsSent = true;
Expand Down Expand Up @@ -337,6 +339,12 @@ protected function hasEvent($job): bool
*/
public function pop($queue = null)
{
if ($this->_lastFrame) {
// ACK
$this->client->ack($this->_lastFrame);
$this->_lastFrame = null;
}

$frame = $this->read($queue);

if (!($frame instanceof Frame)) {
Expand Down Expand Up @@ -472,6 +480,12 @@ public function disconnect()
}

try {
if ($this->_lastFrame) {
// ACK
$this->client->ack($this->_lastFrame);
$this->_lastFrame = null;
}

$this->log->info("$this->session [STOMP] Disconnecting...");
$this->client->getClient()->disconnect();
} catch (Exception $e) {
Expand All @@ -488,9 +502,10 @@ protected function subscribeToQueues(): void
continue;
}

$this->client->subscribe($queue, null, 'auto', [
$this->client->subscribe($queue, null, 'client', [
ngaspari marked this conversation as resolved.
Show resolved Hide resolved
// New Artemis version can't work without this as it will consume only first message otherwise.
'consumer-window-size' => '-1',
//'consumer-window-size' => '-1',
'consumer-window-size' => '2',
ngaspari marked this conversation as resolved.
Show resolved Hide resolved
]);

$this->subscribedTo[] = $queue;
Expand Down
Loading