From 971f65b94e5944f349f4bd8964ff7a4a03025a37 Mon Sep 17 00:00:00 2001 From: Viacheslav Shcherbyna Date: Tue, 16 Sep 2025 17:43:06 +0300 Subject: [PATCH 1/7] SWR-19885 #comment Implement Async Heartbeat For Vhosts Consumers --- README.md | 10 +- composer.json | 2 +- src/Console/ConsumeVhostsCommand.php | 10 +- .../AbstractVhostsConsumer.php | 124 +++++++++++++++++- src/VhostsConsumers/QueueConsumer.php | 8 +- 5 files changed, 139 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index 16db9cd3..4bb8f78c 100644 --- a/README.md +++ b/README.md @@ -9,16 +9,16 @@ RabbitMQ Queue driver for Laravel Only the latest version will get new features. Bug fixes will be provided using the following scheme: -| Package Version | Laravel Version | Bug Fixes Until | | -|-----------------|-----------------|------------------|---------------------------------------------------------------------------------------------| -| 1 | 20 | April 23th, 2025 | [Documentation](https://github.com/vyuldashev/laravel-queue-rabbitmq/blob/master/README.md) | +| Package Version | Laravel Version | Bug Fixes Until | | +|-----------------|-----------------|----------------------|---------------------------------------------------------------------------------------------| +| 1 | 27 | September 16th, 2025 | [Documentation](https://github.com/vyuldashev/laravel-queue-rabbitmq/blob/master/README.md) | ## Installation You can install this package via composer using this command: ``` -composer require salesmessage/php-lib-rabbitmq:^1.20 --ignore-platform-reqs +composer require salesmessage/php-lib-rabbitmq:^1.27 --ignore-platform-reqs ``` The package will automatically register itself. @@ -632,7 +632,7 @@ There are two ways of consuming messages. Example: ```bash -php artisan lib-rabbitmq:consume-vhosts test-group-1 rabbitmq_vhosts --name=mq-vhosts-test-name --sleep=3 --memory=300 --max-jobs=5000 --max-time=600 --timeout=0 +php artisan lib-rabbitmq:consume-vhosts test-group-1 rabbitmq_vhosts --name=mq-vhosts-test-name --sleep=3 --memory=300 --max-jobs=5000 --max-time=600 --timeout=0 --async-mode=1 ``` ## Testing diff --git a/composer.json b/composer.json index e7aacf7e..899884d4 100644 --- a/composer.json +++ b/composer.json @@ -34,7 +34,7 @@ }, "extra": { "branch-alias": { - "dev-master": "1.26-dev" + "dev-master": "1.27-dev" }, "laravel": { "providers": [ diff --git a/src/Console/ConsumeVhostsCommand.php b/src/Console/ConsumeVhostsCommand.php index d73a66c3..1288a9c5 100644 --- a/src/Console/ConsumeVhostsCommand.php +++ b/src/Console/ConsumeVhostsCommand.php @@ -30,6 +30,7 @@ class ConsumeVhostsCommand extends WorkCommand {--timeout=60 : The number of seconds a child process can run} {--tries=1 : Number of times to attempt a job before logging it failed} {--rest=0 : Number of seconds to rest between jobs} + {--async-mode=0 : Async processing for some functionality (now only "heartbeat" is supported)} {--max-priority=} {--consumer-tag} @@ -84,6 +85,7 @@ public function handle(): void $consumer->setPrefetchSize((int) $this->option('prefetch-size')); $consumer->setPrefetchCount((int) ($groupConfigData['prefetch_count'] ?? 1000)); $consumer->setBatchSize((int) ($groupConfigData['batch_size'] ?? 1000)); + $consumer->setAsyncMode((bool) $this->option('async-mode')); if ($this->downForMaintenance() && $this->option('once')) { $consumer->sleep($this->option('sleep')); @@ -95,8 +97,10 @@ public function handle(): void // which jobs are coming through a queue and be informed on its progress. $this->listenForEvents(); - $connection = $this->argument('connection') - ?: $this->laravel['config']['queue.default']; + $queueConfigData = $this->laravel['config']['queue']; + $connectionName = $this->argument('connection') ?: ($queueConfigData['default'] ?? ''); + + $consumer->setConfig((array) ($queueConfigData['connections'][$connectionName] ?? [])); if (Terminal::hasSttyAvailable()) { $this->components->info(sprintf( @@ -107,7 +111,7 @@ public function handle(): void } $this->runWorker( - $connection, + $connectionName, '' ); } diff --git a/src/VhostsConsumers/AbstractVhostsConsumer.php b/src/VhostsConsumers/AbstractVhostsConsumer.php index 2365db94..8394177c 100644 --- a/src/VhostsConsumers/AbstractVhostsConsumer.php +++ b/src/VhostsConsumers/AbstractVhostsConsumer.php @@ -8,6 +8,7 @@ use Illuminate\Queue\QueueManager; use Illuminate\Queue\WorkerOptions; use Illuminate\Support\Str; +use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Exception\AMQPChannelClosedException; use PhpAmqpLib\Exception\AMQPConnectionClosedException; use PhpAmqpLib\Exception\AMQPProtocolChannelException; @@ -19,6 +20,7 @@ use Salesmessage\LibRabbitMQ\Dto\ConsumeVhostsFiltersDto; use Salesmessage\LibRabbitMQ\Dto\QueueApiDto; use Salesmessage\LibRabbitMQ\Dto\VhostApiDto; +use Salesmessage\LibRabbitMQ\Exceptions\MutexTimeout; use Salesmessage\LibRabbitMQ\Interfaces\RabbitMQBatchable; use Salesmessage\LibRabbitMQ\Mutex; use Salesmessage\LibRabbitMQ\Queue\Jobs\RabbitMQJob; @@ -29,6 +31,8 @@ abstract class AbstractVhostsConsumer extends Consumer { protected const MAIN_HANDLER_LOCK = 'vhost_handler'; + protected const HEALTHCHECK_HANDLER_LOCK = 'healthcheck_vhost_handler'; + protected ?OutputStyle $output = null; protected ?ConsumeVhostsFiltersDto $filtersDto = null; @@ -59,6 +63,12 @@ abstract class AbstractVhostsConsumer extends Consumer protected bool $hadJobs = false; + protected ?int $stopStatusCode = null; + + protected array $config = []; + + protected bool $asyncMode = false; + /** * @param InternalStorageManager $internalStorageManager * @param LoggerInterface $logger @@ -110,12 +120,30 @@ public function setBatchSize(int $batchSize): self return $this; } + /** + * @param array $config + * @return $this + */ + public function setConfig(array $config): self + { + $this->config = $config; + return $this; + } + + /** + * @param bool $asyncMode + * @return $this + */ + public function setAsyncMode(bool $asyncMode): self + { + $this->asyncMode = $asyncMode; + return $this; + } + public function daemon($connectionName, $queue, WorkerOptions $options) { $this->goAheadOrWait(); - $this->connectionMutex = new Mutex(false); - $this->configConnectionName = (string) $connectionName; $this->workerOptions = $options; @@ -123,6 +151,40 @@ public function daemon($connectionName, $queue, WorkerOptions $options) $this->listenForSignals(); } + if ($this->asyncMode) { + $this->logInfo('daemon.AsyncMode.On'); + + $coroutineContextHandler = function () use ($connectionName, $options) { + $this->logInfo('daemon.AsyncMode.Coroutines.Running'); + + // we can't move it outside since Mutex should be created within coroutine context + $this->connectionMutex = new Mutex(true); + $this->startHeartbeatCheck(); + \go(function () use ($connectionName, $options) { + $this->vhostDaemon($connectionName, $options); + }); + }; + + if (extension_loaded('swoole')) { + $this->logInfo('daemon.AsyncMode.Swoole'); + + \Co\run($coroutineContextHandler); + } elseif (extension_loaded('openswoole')) { + $this->logInfo('daemon.AsyncMode.OpenSwoole'); + + \OpenSwoole\Runtime::enableCoroutine(true, \OpenSwoole\Runtime::HOOK_ALL); + \co::run($coroutineContextHandler); + } else { + throw new \Exception('Async mode is not supported. Check if Swoole extension is installed'); + } + + return; + } + + $this->logInfo('daemon.AsyncMode.Off'); + + $this->connectionMutex = new Mutex(false); + $this->startHeartbeatCheck(); $this->vhostDaemon($connectionName, $options); } @@ -623,6 +685,64 @@ protected function initConnection(): RabbitMQQueue return $connection; } + /** + * @return void + */ + protected function startHeartbeatCheck(): void + { + if (false === $this->asyncMode) { + return; + } + + $heartbeatInterval = (int) ($this->config['options']['heartbeat'] ?? 0); + if (!$heartbeatInterval) { + return; + } + + $heartbeatHandler = function () { + if ($this->shouldQuit || (null !== $this->stopStatusCode)) { + return; + } + + try { + /** @var AMQPStreamConnection $connection */ + $connection = $this->connection?->getConnection(); + if ((null === $connection) + || (false === $connection->isConnected()) + || $connection->isWriting() + || $connection->isBlocked() + ) { + return; + } + + $this->connectionMutex->lock(static::HEALTHCHECK_HANDLER_LOCK, 3); + $connection->checkHeartBeat(); + } catch (MutexTimeout) { + } catch (Throwable $exception) { + $this->logError('startHeartbeatCheck.exception', [ + 'eroor' => $exception->getMessage(), + 'trace' => $e->getTraceAsString(), + ]); + + $this->shouldQuit = true; + } finally { + $this->connectionMutex->unlock(static::HEALTHCHECK_HANDLER_LOCK); + } + }; + + \go(function () use ($heartbeatHandler, $heartbeatInterval) { + $this->logInfo('startHeartbeatCheck.started'); + + while (true) { + sleep($heartbeatInterval); + $heartbeatHandler(); + if ($this->shouldQuit || !is_null($this->stopStatusCode)) { + return; + } + } + }); + } + /** * @return string */ diff --git a/src/VhostsConsumers/QueueConsumer.php b/src/VhostsConsumers/QueueConsumer.php index f1ecaadb..74076ae8 100644 --- a/src/VhostsConsumers/QueueConsumer.php +++ b/src/VhostsConsumers/QueueConsumer.php @@ -83,19 +83,19 @@ protected function vhostDaemon($connectionName, WorkerOptions $options) // Finally, we will check to see if we have exceeded our memory limits or if // the queue should restart based on other indications. If so, we'll stop // this worker and let whatever is "monitoring" it restart the process. - $status = $this->getStopStatus( + $this->stopStatusCode = $this->getStopStatus( $this->workerOptions, $lastRestart, $startTime, $jobsProcessed, $this->hasJob ); - if (! is_null($status)) { + if (! is_null($this->stopStatusCode)) { $this->logInfo('consuming_stop', [ - 'status' => $status, + 'status' => $this->stopStatusCode, ]); - return $this->stop($status, $this->workerOptions); + return $this->stop($this->stopStatusCode, $this->workerOptions); } $this->hasJob = false; From 3589bd1b2c3d16323abdd4ad27f2da5922ca364b Mon Sep 17 00:00:00 2001 From: alexrt23 <53480358+alexrt23@users.noreply.github.com> Date: Thu, 18 Sep 2025 12:37:43 +0300 Subject: [PATCH 2/7] SOC2: Allow Laravel 12 (#12) * chore: allow Laravel 12 * chore: allow Laravel 12 --------- Co-authored-by: Alex Rutski --- composer.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/composer.json b/composer.json index 899884d4..d457561a 100644 --- a/composer.json +++ b/composer.json @@ -11,7 +11,7 @@ "require": { "php": "^8.0", "ext-json": "*", - "illuminate/queue": "^9.0|^10.0|^11.0", + "illuminate/queue": "^9.0|^10.0|^11.0|^12.0", "php-amqplib/php-amqplib": "^v3.6" }, "require-dev": { @@ -20,7 +20,7 @@ "laravel/horizon": "^5.0", "orchestra/testbench": "^7.0|^8.0|^9.0", "laravel/pint": "^1.2", - "laravel/framework": "^10.0|^11.0" + "laravel/framework": "^10.0|^11.0|^12.0" }, "autoload": { "psr-4": { From bc4df30cfe1141975e26d3d60ebdd0df8a7d8433 Mon Sep 17 00:00:00 2001 From: Viacheslav Shcherbyna Date: Tue, 23 Sep 2025 15:43:45 +0300 Subject: [PATCH 3/7] SWR-19884 #comment Change RabbitMQ Queues Policies --- README.md | 4 ++-- composer.json | 2 +- src/VhostsConsumers/AbstractVhostsConsumer.php | 2 ++ src/VhostsConsumers/DirectConsumer.php | 8 +++----- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 4bb8f78c..6415ca98 100644 --- a/README.md +++ b/README.md @@ -11,14 +11,14 @@ Only the latest version will get new features. Bug fixes will be provided using | Package Version | Laravel Version | Bug Fixes Until | | |-----------------|-----------------|----------------------|---------------------------------------------------------------------------------------------| -| 1 | 27 | September 16th, 2025 | [Documentation](https://github.com/vyuldashev/laravel-queue-rabbitmq/blob/master/README.md) | +| 1 | 28 | September 23th, 2025 | [Documentation](https://github.com/vyuldashev/laravel-queue-rabbitmq/blob/master/README.md) | ## Installation You can install this package via composer using this command: ``` -composer require salesmessage/php-lib-rabbitmq:^1.27 --ignore-platform-reqs +composer require salesmessage/php-lib-rabbitmq:^1.28 --ignore-platform-reqs ``` The package will automatically register itself. diff --git a/composer.json b/composer.json index d457561a..c7fadad7 100644 --- a/composer.json +++ b/composer.json @@ -34,7 +34,7 @@ }, "extra": { "branch-alias": { - "dev-master": "1.27-dev" + "dev-master": "1.28-dev" }, "laravel": { "providers": [ diff --git a/src/VhostsConsumers/AbstractVhostsConsumer.php b/src/VhostsConsumers/AbstractVhostsConsumer.php index 8394177c..b3f0941b 100644 --- a/src/VhostsConsumers/AbstractVhostsConsumer.php +++ b/src/VhostsConsumers/AbstractVhostsConsumer.php @@ -8,6 +8,7 @@ use Illuminate\Queue\QueueManager; use Illuminate\Queue\WorkerOptions; use Illuminate\Support\Str; +use PhpAmqpLib\Channel\AMQPChannel; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Exception\AMQPChannelClosedException; use PhpAmqpLib\Exception\AMQPConnectionClosedException; @@ -651,6 +652,7 @@ protected function initConnection(): RabbitMQQueue ); try { + /** @var AMQPChannel $channel */ $channel = $connection->getChannel(true); $this->currentConnectionName = $connection->getConnectionName(); diff --git a/src/VhostsConsumers/DirectConsumer.php b/src/VhostsConsumers/DirectConsumer.php index 205a78cd..ecdae23c 100644 --- a/src/VhostsConsumers/DirectConsumer.php +++ b/src/VhostsConsumers/DirectConsumer.php @@ -41,6 +41,9 @@ protected function vhostDaemon($connectionName, WorkerOptions $options) try { $this->connectionMutex->lock(self::MAIN_HANDLER_LOCK); $amqpMessage = $this->channel->basic_get($this->currentQueueName); + if (null !== $amqpMessage) { + $this->channel->basic_reject($amqpMessage->getDeliveryTag(), false); + } $this->connectionMutex->unlock(self::MAIN_HANDLER_LOCK); } catch (AMQPProtocolChannelException|AMQPChannelClosedException $exception) { $amqpMessage = null; @@ -112,11 +115,6 @@ protected function startConsuming(): RabbitMQQueue $this->logInfo('startConsuming.init'); - $arguments = []; - if ($this->maxPriority) { - $arguments['priority'] = ['I', $this->maxPriority]; - } - $this->jobsProcessed = 0; $connection = $this->initConnection(); From 6dc3d085455fefa79d35ca633ce6dc5c17c357b8 Mon Sep 17 00:00:00 2001 From: Viacheslav Shcherbyna Date: Thu, 25 Sep 2025 13:33:51 +0300 Subject: [PATCH 4/7] SWR-20043 #comment Add Logging For RabbitMQ Vhosts Consumers --- README.md | 4 +- composer.json | 2 +- .../AbstractVhostsConsumer.php | 108 ++++++++++++++---- src/VhostsConsumers/DirectConsumer.php | 36 +++++- src/VhostsConsumers/QueueConsumer.php | 55 ++++----- 5 files changed, 144 insertions(+), 61 deletions(-) diff --git a/README.md b/README.md index 6415ca98..c4048569 100644 --- a/README.md +++ b/README.md @@ -11,14 +11,14 @@ Only the latest version will get new features. Bug fixes will be provided using | Package Version | Laravel Version | Bug Fixes Until | | |-----------------|-----------------|----------------------|---------------------------------------------------------------------------------------------| -| 1 | 28 | September 23th, 2025 | [Documentation](https://github.com/vyuldashev/laravel-queue-rabbitmq/blob/master/README.md) | +| 1 | 29 | September 25th, 2025 | [Documentation](https://github.com/vyuldashev/laravel-queue-rabbitmq/blob/master/README.md) | ## Installation You can install this package via composer using this command: ``` -composer require salesmessage/php-lib-rabbitmq:^1.28 --ignore-platform-reqs +composer require salesmessage/php-lib-rabbitmq:^1.29 --ignore-platform-reqs ``` The package will automatically register itself. diff --git a/composer.json b/composer.json index c7fadad7..b74f81ac 100644 --- a/composer.json +++ b/composer.json @@ -34,7 +34,7 @@ }, "extra": { "branch-alias": { - "dev-master": "1.28-dev" + "dev-master": "1.29-dev" }, "laravel": { "providers": [ diff --git a/src/VhostsConsumers/AbstractVhostsConsumer.php b/src/VhostsConsumers/AbstractVhostsConsumer.php index b3f0941b..7d99e10e 100644 --- a/src/VhostsConsumers/AbstractVhostsConsumer.php +++ b/src/VhostsConsumers/AbstractVhostsConsumer.php @@ -60,6 +60,8 @@ abstract class AbstractVhostsConsumer extends Consumer protected int|float $processingStartedAt = 0; + protected int $totalJobsProcessed = 0; + protected int $jobsProcessed = 0; protected bool $hadJobs = false; @@ -176,6 +178,8 @@ public function daemon($connectionName, $queue, WorkerOptions $options) \OpenSwoole\Runtime::enableCoroutine(true, \OpenSwoole\Runtime::HOOK_ALL); \co::run($coroutineContextHandler); } else { + $this->logError('daemon.AsyncMode.IsNotSupported'); + throw new \Exception('Async mode is not supported. Check if Swoole extension is installed'); } @@ -240,10 +244,11 @@ protected function processAmqpMessage(AMQPMessage $message, RabbitMQQueue $conne } $this->jobsProcessed++; + $this->totalJobsProcessed++; $this->logInfo('processAMQPMessage.message_consumed', [ 'processed_jobs_count' => $this->jobsProcessed, - 'is_support_batching' => $isSupportBatching, + 'is_support_batching' => $isSupportBatching ? 'Y' :'N', ]); } @@ -410,6 +415,10 @@ protected function processSingleJob(RabbitMQJob $job): void */ protected function ackMessage(AMQPMessage $message, bool $multiple = false): void { + $this->logInfo('ackMessage.start', [ + 'multiple' => $multiple, + ]); + try { $message->ack($multiple); } catch (Throwable $exception) { @@ -432,6 +441,8 @@ abstract protected function stopConsuming(): void; */ protected function loadVhosts(): void { + $this->logInfo('loadVhosts.start'); + $group = $this->filtersDto->getGroup(); $lastProcessedAtKey = $this->internalStorageManager->getLastProcessedAtKeyName($group); @@ -478,6 +489,9 @@ protected function switchToNextVhost(): bool } $this->currentQueueName = $nextQueue; + + $this->logInfo('switchToNextVhost.success'); + return true; } @@ -503,6 +517,8 @@ protected function getNextVhost(): ?string */ protected function loadVhostQueues(): void { + $this->logInfo('loadVhostQueues.start'); + $group = $this->filtersDto->getGroup(); $lastProcessedAtKey = $this->internalStorageManager->getLastProcessedAtKeyName($group); @@ -539,6 +555,9 @@ protected function switchToNextQueue(): bool } $this->currentQueueName = $nextQueue; + + $this->logInfo('switchToNextQueue.success'); + return true; } @@ -567,20 +586,26 @@ protected function goAheadOrWait(int $waitSeconds = 1): bool { if (false === $this->goAhead()) { if (!$this->hadJobs) { - $this->output->warning(sprintf('No jobs during iteration. Wait %d seconds...', $waitSeconds)); + $this->logWarning('goAheadOrWait.no_jobs_during_iteration', [ + 'wait_seconds' => $waitSeconds, + ]); + $this->sleep($waitSeconds); } $this->loadVhosts(); $this->hadJobs = false; if (empty($this->vhosts)) { - $this->output->warning(sprintf('No active vhosts. Wait %d seconds...', $waitSeconds)); + $this->logWarning('goAheadOrWait.no_active_vhosts', [ + 'wait_seconds' => $waitSeconds, + ]); + $this->sleep($waitSeconds); return $this->goAheadOrWait($waitSeconds); } - $this->output->info('Starting from the first vhost...'); + $this->logInfo('goAheadOrWait.starting_from_the_first_vhost'); return $this->goAheadOrWait($waitSeconds); } @@ -612,6 +637,8 @@ protected function updateLastProcessedAt() return; } + $this->logInfo('updateLastProcessedAt.start'); + $group = $this->filtersDto->getGroup(); $timestamp = time(); @@ -701,25 +728,42 @@ protected function startHeartbeatCheck(): void return; } + $this->logInfo('startHeartbeatCheck.start', [ + 'heartbeat_interval' => $heartbeatInterval, + ]); + $heartbeatHandler = function () { if ($this->shouldQuit || (null !== $this->stopStatusCode)) { + $this->logWarning('startHeartbeatCheck.quit', [ + 'should_quit' => $this->shouldQuit, + 'stop_status_code' => $this->stopStatusCode, + ]); + return; } try { - /** @var AMQPStreamConnection $connection */ + /** @var AMQPStreamConnection|null $connection */ $connection = $this->connection?->getConnection(); if ((null === $connection) || (false === $connection->isConnected()) || $connection->isWriting() || $connection->isBlocked() ) { + $this->logWarning('startHeartbeatCheck.incorrect_connection', [ + 'has_connection' => (null !== $connection) ? 'Y' : 'N', + 'is_connected' => $connection?->isConnected() ? 'Y' : 'N', + 'is_writing' => $connection->isWriting() ? 'Y' : 'N', + 'is_blocked' => $connection->isBlocked() ? 'Y' : 'N', + ]); + return; } $this->connectionMutex->lock(static::HEALTHCHECK_HANDLER_LOCK, 3); $connection->checkHeartBeat(); } catch (MutexTimeout) { + $this->logWarning('startHeartbeatCheck.mutex_timeout'); } catch (Throwable $exception) { $this->logError('startHeartbeatCheck.exception', [ 'eroor' => $exception->getMessage(), @@ -739,6 +783,11 @@ protected function startHeartbeatCheck(): void sleep($heartbeatInterval); $heartbeatHandler(); if ($this->shouldQuit || !is_null($this->stopStatusCode)) { + $this->logWarning('startHeartbeatCheck.go_quit', [ + 'should_quit' => $this->shouldQuit, + 'stop_status_code' => $this->stopStatusCode, + ]); + return; } } @@ -760,7 +809,17 @@ protected function getTagName(): string */ protected function logInfo(string $message, array $data = []): void { - $this->log($message, $data, false); + $this->log($message, $data, 'info'); + } + + /** + * @param string $message + * @param array $data + * @return void + */ + protected function logWarning(string $message, array $data = []): void + { + $this->log($message, $data, 'warning'); } /** @@ -770,19 +829,23 @@ protected function logInfo(string $message, array $data = []): void */ protected function logError(string $message, array $data = []): void { - $this->log($message, $data, true); + $this->log($message, $data, 'error'); } /** * @param string $message * @param array $data - * @param bool $isError + * @param string $logType * @return void */ - protected function log(string $message, array $data = [], bool $isError = false): void + protected function log(string $message, array $data = [], string $logType = 'info'): void { - $data['vhost_name'] = $this->currentVhostName; - $data['queue_name'] = $this->currentQueueName; + if (null !== $this->currentVhostName) { + $data['vhost_name'] = $this->currentVhostName; + } + if (null !== $this->currentQueueName) { + $data['queue_name'] = $this->currentQueueName; + } $outputMessage = $message; foreach ($data as $key => $value) { @@ -791,15 +854,17 @@ protected function log(string $message, array $data = [], bool $isError = false) } $outputMessage .= '. ' . ucfirst(str_replace('_', ' ', $key)) . ': ' . $value; } - if ($isError) { - $this->output->error($outputMessage); - } else { - $this->output->info($outputMessage); - } + + match ($logType) { + 'error' => $this->output->error($outputMessage), + 'warning' => $this->output->warning($outputMessage), + default => $this->output->info($outputMessage) + }; $processingData = [ 'uuid' => $this->processingUuid, 'started_at' => $this->processingStartedAt, + 'total_processed_jobs_count' => $this->totalJobsProcessed, ]; if ($this->processingStartedAt) { $processingData['executive_time_seconds'] = microtime(true) - $this->processingStartedAt; @@ -809,10 +874,11 @@ protected function log(string $message, array $data = [], bool $isError = false) $logMessage = 'Salesmessage.LibRabbitMQ.VhostsConsumers.'; $logMessage .= class_basename(static::class) . '.'; $logMessage .= $message; - if ($isError) { - $this->logger->error($logMessage, $data); - } else { - $this->logger->info($logMessage, $data); - } + + match ($logType) { + 'error' => $this->logger->error($logMessage, $data), + 'warning' => $this->logger->warning($logMessage, $data), + default => $this->logger->info($logMessage, $data) + }; } } diff --git a/src/VhostsConsumers/DirectConsumer.php b/src/VhostsConsumers/DirectConsumer.php index ecdae23c..af3e7d67 100644 --- a/src/VhostsConsumers/DirectConsumer.php +++ b/src/VhostsConsumers/DirectConsumer.php @@ -18,11 +18,21 @@ class DirectConsumer extends AbstractVhostsConsumer { + /** + * @param $connectionName + * @param WorkerOptions $options + * @return int + * @throws \Salesmessage\LibRabbitMQ\Exceptions\MutexTimeout + * @throws \Throwable + */ protected function vhostDaemon($connectionName, WorkerOptions $options) { + $this->logInfo('daemon.start'); + $lastRestart = $this->getTimestampOfLastQueueRestart(); - [$startTime, $jobsProcessed] = [hrtime(true) / 1e9, 0]; + $startTime = hrtime(true) / 1e9; + $this->totalJobsProcessed = 0; $connection = $this->startConsuming(); @@ -31,7 +41,7 @@ protected function vhostDaemon($connectionName, WorkerOptions $options) // if it is we will just pause this worker for a given amount of time and // make sure we do not need to kill this worker process off completely. if (! $this->daemonShouldRun($this->workerOptions, $this->configConnectionName, $this->currentQueueName)) { - $this->output->info('Consuming pause worker...'); + $this->logInfo('daemon.consuming_pause_worker'); $this->pauseWorker($this->workerOptions, $lastRestart); @@ -75,7 +85,7 @@ protected function vhostDaemon($connectionName, WorkerOptions $options) } if (null === $amqpMessage) { - $this->output->info('Consuming sleep. No job...'); + $this->logInfo('daemon.consuming_sleep_no_job'); $this->stopConsuming(); @@ -90,7 +100,7 @@ protected function vhostDaemon($connectionName, WorkerOptions $options) $this->processAmqpMessage($amqpMessage, $connection); if ($this->jobsProcessed >= $this->batchSize) { - $this->output->info('Consuming batch full...'); + $this->logInfo('daemon.consuming_batch_full'); $this->stopConsuming(); @@ -101,6 +111,24 @@ protected function vhostDaemon($connectionName, WorkerOptions $options) continue; } + + // Finally, we will check to see if we have exceeded our memory limits or if + // the queue should restart based on other indications. If so, we'll stop + // this worker and let whatever is "monitoring" it restart the process. + $this->stopStatusCode = $this->getStopStatus( + $this->workerOptions, + $lastRestart, + $startTime, + $this->totalJobsProcessed, + true + ); + if (! is_null($this->stopStatusCode)) { + $this->logWarning('daemon.consuming_stop', [ + 'status_code' => $this->stopStatusCode, + ]); + + return $this->stop($this->stopStatusCode, $this->workerOptions); + } } } diff --git a/src/VhostsConsumers/QueueConsumer.php b/src/VhostsConsumers/QueueConsumer.php index 74076ae8..b9be89ea 100644 --- a/src/VhostsConsumers/QueueConsumer.php +++ b/src/VhostsConsumers/QueueConsumer.php @@ -20,11 +20,21 @@ class QueueConsumer extends AbstractVhostsConsumer { protected bool $hasJob = false; + /** + * @param $connectionName + * @param WorkerOptions $options + * @return int|void + * @throws \Salesmessage\LibRabbitMQ\Exceptions\MutexTimeout + * @throws \Throwable + */ protected function vhostDaemon($connectionName, WorkerOptions $options) { + $this->logInfo('daemon.start'); + $lastRestart = $this->getTimestampOfLastQueueRestart(); - [$startTime, $jobsProcessed] = [hrtime(true) / 1e9, 0]; + $startTime = hrtime(true) / 1e9; + $this->totalJobsProcessed = 0; $connection = $this->startConsuming(); @@ -33,7 +43,7 @@ protected function vhostDaemon($connectionName, WorkerOptions $options) // if it is we will just pause this worker for a given amount of time and // make sure we do not need to kill this worker process off completely. if (! $this->daemonShouldRun($this->workerOptions, $this->configConnectionName, $this->currentQueueName)) { - $this->output->info('Consuming pause worker...'); + $this->logInfo('daemon.consuming_pause_worker'); $this->pauseWorker($this->workerOptions, $lastRestart); @@ -68,7 +78,9 @@ protected function vhostDaemon($connectionName, WorkerOptions $options) // If no job is got off the queue, we will need to sleep the worker. if (false === $this->hasJob) { - $this->output->info('Consuming sleep. No job...'); + $this->logInfo('daemon.consuming_sleep_no_job', [ + 'sleep_seconds' => $this->workerOptions->sleep, + ]); $this->stopConsuming(); @@ -87,12 +99,12 @@ protected function vhostDaemon($connectionName, WorkerOptions $options) $this->workerOptions, $lastRestart, $startTime, - $jobsProcessed, + $this->totalJobsProcessed, $this->hasJob ); if (! is_null($this->stopStatusCode)) { - $this->logInfo('consuming_stop', [ - 'status' => $this->stopStatusCode, + $this->logWarning('daemon.consuming_stop', [ + 'status_code' => $this->stopStatusCode, ]); return $this->stop($this->stopStatusCode, $this->workerOptions); @@ -102,33 +114,6 @@ protected function vhostDaemon($connectionName, WorkerOptions $options) } } - /** - * @param WorkerOptions $options - * @param $lastRestart - * @param $startTime - * @param $jobsProcessed - * @param $hasJob - * @return int|null - */ - protected function getStopStatus( - WorkerOptions $options, - $lastRestart, - $startTime = 0, - $jobsProcessed = 0, - bool $hasJob = false - ): ?int - { - return match (true) { - $this->shouldQuit => static::EXIT_SUCCESS, - $this->memoryExceeded($options->memory) => static::EXIT_MEMORY_LIMIT, - $this->queueShouldRestart($lastRestart) => static::EXIT_SUCCESS, - $options->stopWhenEmpty && !$hasJob => static::EXIT_SUCCESS, - $options->maxTime && hrtime(true) / 1e9 - $startTime >= $options->maxTime => static::EXIT_SUCCESS, - $options->maxJobs && $jobsProcessed >= $options->maxJobs => static::EXIT_SUCCESS, - default => null - }; - } - /** * @return RabbitMQQueue * @throws Exceptions\MutexTimeout @@ -164,6 +149,10 @@ protected function startConsuming(): RabbitMQQueue } if ($this->workerOptions->rest > 0) { + $this->logInfo('startConsuming.rest', [ + 'rest_seconds' => $this->workerOptions->rest, + ]); + $this->sleep($this->workerOptions->rest); } }; From 661bf840211a743c6ed06dda34c8f23e7d9a6799 Mon Sep 17 00:00:00 2001 From: Vladislav Okan Date: Tue, 14 Oct 2025 08:39:22 +0300 Subject: [PATCH 5/7] SWR-20219: Remove auto-reject messages --- composer.json | 4 ++-- src/VhostsConsumers/DirectConsumer.php | 3 --- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/composer.json b/composer.json index b74f81ac..a759a59f 100644 --- a/composer.json +++ b/composer.json @@ -34,7 +34,7 @@ }, "extra": { "branch-alias": { - "dev-master": "1.29-dev" + "dev-master": "1.30-dev" }, "laravel": { "providers": [ @@ -56,4 +56,4 @@ }, "minimum-stability": "dev", "prefer-stable": true -} \ No newline at end of file +} diff --git a/src/VhostsConsumers/DirectConsumer.php b/src/VhostsConsumers/DirectConsumer.php index af3e7d67..72abfba7 100644 --- a/src/VhostsConsumers/DirectConsumer.php +++ b/src/VhostsConsumers/DirectConsumer.php @@ -51,9 +51,6 @@ protected function vhostDaemon($connectionName, WorkerOptions $options) try { $this->connectionMutex->lock(self::MAIN_HANDLER_LOCK); $amqpMessage = $this->channel->basic_get($this->currentQueueName); - if (null !== $amqpMessage) { - $this->channel->basic_reject($amqpMessage->getDeliveryTag(), false); - } $this->connectionMutex->unlock(self::MAIN_HANDLER_LOCK); } catch (AMQPProtocolChannelException|AMQPChannelClosedException $exception) { $amqpMessage = null; From ab1a4ebac1322db2899e0bc6e53d687fab925753 Mon Sep 17 00:00:00 2001 From: Alexander Ginko <120381488+ahinkoneklo@users.noreply.github.com> Date: Tue, 4 Nov 2025 10:33:59 +0100 Subject: [PATCH 6/7] SWR-20482 Server: RabbitMQ Improvements: Deduplication (#16) * SWR-20482 Server: RabbitMQ Improvements: Deduplication * SWR-20482 Server: RabbitMQ Improvements: Deduplication * SWR-20482 Server: RabbitMQ Improvements: Deduplication * SWR-20482 Server: RabbitMQ Improvements: Deduplication * SWR-20482 Server: RabbitMQ Improvements: Deduplication * SWR-20482 Server: RabbitMQ Improvements: Deduplication * SWR-20482 Server: RabbitMQ Improvements: Deduplication * SWR-20482 Server: RabbitMQ Improvements: Deduplication * SWR-20482 Server: RabbitMQ Improvements: Deduplication * SWR-20482 Server: RabbitMQ Improvements: Deduplication * SWR-20482 Server: RabbitMQ Improvements: Deduplication * SWR-20482 Server: RabbitMQ Improvements: Deduplication * SWR-20482 Server: RabbitMQ Improvements: Deduplication * SWR-20482 Server: RabbitMQ Improvements: Deduplication --- README.md | 22 +- composer.json | 6 +- config/rabbitmq.php | 27 ++ src/Consumer.php | 14 +- src/Contracts/RabbitMQConsumable.php | 15 + src/Interfaces/RabbitMQBatchable.php | 11 +- src/LaravelLibRabbitMQServiceProvider.php | 51 ++- src/Queue/RabbitMQQueue.php | 2 + src/Queue/RabbitMQQueueBatchable.php | 12 +- src/Services/Api/RabbitApiClient.php | 4 +- .../Deduplication/AppDeduplicationService.php | 18 ++ .../TransportLevel/DeduplicationService.php | 294 ++++++++++++++++++ .../TransportLevel/DeduplicationStore.php | 12 + .../TransportLevel/NullDeduplicationStore.php | 18 ++ .../RedisDeduplicationStore.php | 51 +++ src/Services/DlqDetector.php | 22 ++ src/Services/InternalStorageManager.php | 5 +- .../AbstractVhostsConsumer.php | 158 +++++++--- src/VhostsConsumers/DirectConsumer.php | 18 +- src/VhostsConsumers/QueueConsumer.php | 31 +- 20 files changed, 685 insertions(+), 106 deletions(-) create mode 100644 src/Contracts/RabbitMQConsumable.php create mode 100644 src/Services/Deduplication/AppDeduplicationService.php create mode 100644 src/Services/Deduplication/TransportLevel/DeduplicationService.php create mode 100644 src/Services/Deduplication/TransportLevel/DeduplicationStore.php create mode 100644 src/Services/Deduplication/TransportLevel/NullDeduplicationStore.php create mode 100644 src/Services/Deduplication/TransportLevel/RedisDeduplicationStore.php create mode 100644 src/Services/DlqDetector.php diff --git a/README.md b/README.md index c4048569..ae38fac9 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ Only the latest version will get new features. Bug fixes will be provided using You can install this package via composer using this command: ``` -composer require salesmessage/php-lib-rabbitmq:^1.29 --ignore-platform-reqs +composer require salesmessage/php-lib-rabbitmq:^1.31 --ignore-platform-reqs ``` The package will automatically register itself. @@ -663,7 +663,19 @@ if not all the issues with the following command: composer fix:style ``` -## Contribution - -You can contribute to this package by discovering bugs and opening issues. Please, add to which version of package you -create pull request or issue. (e.g. [5.2] Fatal error on delayed job) +## Local Setup +- Configure all config items in `config/queue.php` section `connections.rabbitmq_vhosts` (see as example [rabbitmq.php](./config/rabbitmq.php)) +- Create `yml` file in the project root with name `rabbit-groups.yml` and content, for example like this (you can replace `vhosts` and `queues` with `vhosts_mask` and `queues_mask`): +```yaml +groups: + test-notes: + vhosts: + - organization_200005 + queues: + - local-myname.notes.200005 + batch_size: 3 + prefetch_count: 3 +``` +- Make sure that vhosts exist in RabbitMQ (if not - create them) +- Run command `php artisan lib-rabbitmq:scan-vhosts` within your project where this library is installed (this command fetches data from RabbitMQ to Redis) +- Run command for consumer `php artisan lib-rabbitmq:consume-vhosts test-notes rabbitmq_vhosts --name=mq-vhost-test-local-notes --memory=300 --timeout=0 --max-jobs=1000 --max-time=600 --async-mode=1` diff --git a/composer.json b/composer.json index a759a59f..54c2c477 100644 --- a/composer.json +++ b/composer.json @@ -34,7 +34,7 @@ }, "extra": { "branch-alias": { - "dev-master": "1.30-dev" + "dev-master": "1.31-dev" }, "laravel": { "providers": [ @@ -43,7 +43,9 @@ } }, "suggest": { - "ext-pcntl": "Required to use all features of the queue consumer." + "ext-pcntl": "Required to use all features of the queue consumer.", + "ext-swoole": "Required to use async mode for healthcheck (alternative is ext-openswoole).", + "ext-openswoole": "Required to use async mode for healthcheck (alternative is ext-swoole)." }, "scripts": { "test": [ diff --git a/config/rabbitmq.php b/config/rabbitmq.php index 8b949d9c..9b457a84 100644 --- a/config/rabbitmq.php +++ b/config/rabbitmq.php @@ -25,6 +25,33 @@ 'options' => [ ], + /** + * Provided on 2 levels: transport and application. + */ + 'deduplication' => [ + 'transport' => [ + 'enabled' => env('RABBITMQ_DEDUP_TRANSPORT_ENABLED', false), + 'ttl' => env('RABBITMQ_DEDUP_TRANSPORT_TTL', 7200), + 'lock_ttl' => env('RABBITMQ_DEDUP_TRANSPORT_LOCK_TTL', 60), + /** + * Possible: ack, reject + */ + 'action_on_duplication' => env('RABBITMQ_DEDUP_TRANSPORT_ACTION', 'ack'), + /** + * Possible: ack, reject, requeue + */ + 'action_on_lock' => env('RABBITMQ_DEDUP_TRANSPORT_LOCK_ACTION', 'requeue'), + 'connection' => [ + 'driver' => env('RABBITMQ_DEDUP_TRANSPORT_DRIVER', 'redis'), + 'name' => env('RABBITMQ_DEDUP_TRANSPORT_CONNECTION_NAME', 'persistent'), + 'key_prefix' => env('RABBITMQ_DEDUP_TRANSPORT_KEY_PREFIX', 'mq_dedup'), + ], + ], + 'application' => [ + 'enabled' => env('RABBITMQ_DEDUP_APP_ENABLED', true), + ], + ], + /* * Set to "horizon" if you wish to use Laravel Horizon. */ diff --git a/src/Consumer.php b/src/Consumer.php index 0f69893f..f78dc241 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -9,8 +9,9 @@ use PhpAmqpLib\Channel\AMQPChannel; use PhpAmqpLib\Exception\AMQPRuntimeException; use PhpAmqpLib\Message\AMQPMessage; -use Throwable; use Salesmessage\LibRabbitMQ\Queue\RabbitMQQueue; +use Salesmessage\LibRabbitMQ\Services\Deduplication\TransportLevel\DeduplicationService; +use Throwable; class Consumer extends Worker { @@ -122,7 +123,16 @@ function (AMQPMessage $message) use ($connection, $options, $connectionName, $qu $jobsProcessed++; - $this->runJob($job, $connectionName, $options); + /** @var DeduplicationService $transportDedupService */ + $transportDedupService = $this->container->make(DeduplicationService::class); + $transportDedupService->decorateWithDeduplication( + function () use ($job, $message, $connectionName, $queue, $options, $transportDedupService) { + $this->runJob($job, $connectionName, $options); + $transportDedupService->markAsProcessed($message, $queue); + }, + $message, + $queue + ); if ($this->supportsAsyncSignals()) { $this->resetTimeoutHandler(); diff --git a/src/Contracts/RabbitMQConsumable.php b/src/Contracts/RabbitMQConsumable.php new file mode 100644 index 00000000..4e45e3ec --- /dev/null +++ b/src/Contracts/RabbitMQConsumable.php @@ -0,0 +1,15 @@ + $batch + * @return list + */ + public static function getNotDuplicatedBatchedJobs(array $batch): array; + /** * Processing jobs array of static class * - * @param array $batch - * @return mixed + * @param list $batch */ public static function collection(array $batch): void; } diff --git a/src/LaravelLibRabbitMQServiceProvider.php b/src/LaravelLibRabbitMQServiceProvider.php index 7048f289..d8451bdb 100644 --- a/src/LaravelLibRabbitMQServiceProvider.php +++ b/src/LaravelLibRabbitMQServiceProvider.php @@ -3,12 +3,6 @@ namespace Salesmessage\LibRabbitMQ; use Illuminate\Contracts\Debug\ExceptionHandler; -use Illuminate\Queue\Connectors\BeanstalkdConnector; -use Illuminate\Queue\Connectors\DatabaseConnector; -use Illuminate\Queue\Connectors\NullConnector; -use Illuminate\Queue\Connectors\RedisConnector; -use Illuminate\Queue\Connectors\SqsConnector; -use Illuminate\Queue\Connectors\SyncConnector; use Illuminate\Queue\QueueManager; use Illuminate\Support\ServiceProvider; use Psr\Log\LoggerInterface; @@ -16,6 +10,10 @@ use Salesmessage\LibRabbitMQ\Console\ConsumeVhostsCommand; use Salesmessage\LibRabbitMQ\Console\ScanVhostsCommand; use Salesmessage\LibRabbitMQ\Queue\Connectors\RabbitMQVhostsConnector; +use Salesmessage\LibRabbitMQ\Services\Deduplication\TransportLevel\DeduplicationService; +use Salesmessage\LibRabbitMQ\Services\Deduplication\TransportLevel\DeduplicationStore; +use Salesmessage\LibRabbitMQ\Services\Deduplication\TransportLevel\NullDeduplicationStore; +use Salesmessage\LibRabbitMQ\Services\Deduplication\TransportLevel\RedisDeduplicationStore; use Salesmessage\LibRabbitMQ\Services\GroupsService; use Salesmessage\LibRabbitMQ\Services\InternalStorageManager; use Salesmessage\LibRabbitMQ\Services\QueueService; @@ -36,6 +34,8 @@ public function register(): void ); if ($this->app->runningInConsole()) { + $this->bindDeduplicationService(); + $this->app->singleton('rabbitmq.consumer', function () { $isDownForMaintenance = function () { return $this->app->isDownForMaintenance(); @@ -68,7 +68,8 @@ public function register(): void $this->app['events'], $this->app[ExceptionHandler::class], $isDownForMaintenance, - null + $this->app->get(DeduplicationService::class), + null, ); }); @@ -84,7 +85,8 @@ public function register(): void $this->app['events'], $this->app[ExceptionHandler::class], $isDownForMaintenance, - null + $this->app->get(DeduplicationService::class), + null, ); }); @@ -92,7 +94,7 @@ public function register(): void $consumerClass = ('direct' === config('queue.connections.rabbitmq_vhosts.consumer_type')) ? VhostsDirectConsumer::class : VhostsQueueConsumer::class; - + return new ConsumeVhostsCommand( $app[GroupsService::class], $app[$consumerClass], @@ -139,4 +141,35 @@ public function boot(): void return new RabbitMQVhostsConnector($this->app['events']); }); } + + /** + * Config params: + * @phpstan-import-type DeduplicationConfig from DeduplicationService + * + * @return void + */ + private function bindDeduplicationService(): void + { + $this->app->bind(DeduplicationStore::class, static function () { + /** @var DeduplicationConfig $config */ + $config = (array) config('queue.connections.rabbitmq_vhosts.deduplication.transport', []); + $enabled = (bool) ($config['enabled'] ?? false); + if (!$enabled) { + return new NullDeduplicationStore(); + } + + $connectionDriver = $config['connection']['driver'] ?? null; + if ($connectionDriver !== 'redis') { + throw new \InvalidArgumentException('For now only Redis connection is supported for deduplication'); + } + $connectionName = $config['connection']['name'] ?? null; + + $prefix = trim($config['connection']['key_prefix'] ?? ''); + if (empty($prefix)) { + throw new \InvalidArgumentException('Key prefix is required'); + } + + return new RedisDeduplicationStore($connectionName, $prefix); + }); + } } diff --git a/src/Queue/RabbitMQQueue.php b/src/Queue/RabbitMQQueue.php index f0973d75..00336fe6 100644 --- a/src/Queue/RabbitMQQueue.php +++ b/src/Queue/RabbitMQQueue.php @@ -22,6 +22,7 @@ use PhpAmqpLib\Exchange\AMQPExchangeType; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; +use Ramsey\Uuid\Uuid; use RuntimeException; use Throwable; use Salesmessage\LibRabbitMQ\Contracts\RabbitMQQueueContract; @@ -521,6 +522,7 @@ protected function createMessage($payload, int $attempts = 0): array $currentPayload = json_decode($payload, true); if ($correlationId = $currentPayload['id'] ?? null) { $properties['correlation_id'] = $correlationId; + $properties['message_id'] = Uuid::uuid7()->toString(); } if ($this->getConfig()->isPrioritizeDelayed()) { diff --git a/src/Queue/RabbitMQQueueBatchable.php b/src/Queue/RabbitMQQueueBatchable.php index a596a6fc..00399cf5 100644 --- a/src/Queue/RabbitMQQueueBatchable.php +++ b/src/Queue/RabbitMQQueueBatchable.php @@ -3,6 +3,7 @@ namespace Salesmessage\LibRabbitMQ\Queue; use PhpAmqpLib\Connection\AbstractConnection; +use Salesmessage\LibRabbitMQ\Contracts\RabbitMQConsumable; use Salesmessage\LibRabbitMQ\Dto\ConnectionNameDto; use Salesmessage\LibRabbitMQ\Dto\QueueApiDto; use Salesmessage\LibRabbitMQ\Dto\VhostApiDto; @@ -77,7 +78,16 @@ protected function createChannel(): AMQPChannel public function push($job, $data = '', $queue = null) { - $queue = $queue ?: $job->onQueue(); + if (!($job instanceof RabbitMQConsumable)) { + throw new \InvalidArgumentException('Job must implement RabbitMQConsumable'); + } + + if (!$queue) { + if (!method_exists($job, 'onQueue')) { + throw new \InvalidArgumentException('Job must implement onQueue method'); + } + $queue = $job->onQueue(); + } try { $result = parent::push($job, $data, $queue); diff --git a/src/Services/Api/RabbitApiClient.php b/src/Services/Api/RabbitApiClient.php index dfd16f1e..0f8fd3bc 100644 --- a/src/Services/Api/RabbitApiClient.php +++ b/src/Services/Api/RabbitApiClient.php @@ -70,7 +70,7 @@ public function request( $contents = $response->getBody()->getContents(); return (array) ($contents ? json_decode($contents, true) : []); - } catch (Throwable $exception) { + } catch (\Throwable $exception) { $rethrowException = $exception; if ($exception instanceof ClientException) { $rethrowException = new RabbitApiClientException($exception->getMessage()); @@ -109,4 +109,4 @@ private function getPassword(): string { return (string) ($this->connectionConfig['hosts'][0]['password'] ?? ''); } -} \ No newline at end of file +} diff --git a/src/Services/Deduplication/AppDeduplicationService.php b/src/Services/Deduplication/AppDeduplicationService.php new file mode 100644 index 00000000..2187557d --- /dev/null +++ b/src/Services/Deduplication/AppDeduplicationService.php @@ -0,0 +1,18 @@ +getState($message, $queueName); + try { + if ($messageState === DeduplicationService::IN_PROGRESS) { + $action = $this->applyActionOnLock($message); + $this->logger->warning('DeduplicationService.message_already_in_progress', [ + 'action' => $action, + 'message_id' => $message->get_properties()['message_id'] ?? null, + ]); + + return false; + } + + if ($messageState === DeduplicationService::PROCESSED) { + $action = $this->applyActionOnDuplication($message); + $this->logger->warning('DeduplicationService.message_already_processed', [ + 'action' => $action, + 'message_id' => $message->get_properties()['message_id'] ?? null, + ]); + + return false; + } + + $hasPutAsInProgress = $this->markAsInProgress($message, $queueName); + if ($hasPutAsInProgress === false) { + $action = $this->applyActionOnLock($message); + $this->logger->warning('DeduplicationService.message_already_in_progress.skip', [ + 'action' => $action, + 'message_id' => $message->get_properties()['message_id'] ?? null, + ]); + + return false; + } + + $handler(); + } catch (\Throwable $exception) { + if ($messageState === null) { + $this->release($message, $queueName); + } + + $this->logger->error('DeduplicationService.message_processing_exception', [ + 'released_message_id' => $message->get_properties()['message_id'] ?? null, + ]); + + throw $exception; + } + + return true; + } + + /** + * @param AMQPMessage $message + * @return string|null - @enum {self::IN_PROGRESS, self::PROCESSED} + */ + public function getState(AMQPMessage $message, ?string $queueName = null): ?string + { + if (!$this->isEnabled()) { + return null; + } + $messageId = $this->getMessageId($message, $queueName); + if ($messageId === null) { + return null; + } + + return $this->store->get($messageId); + } + + public function markAsProcessed(AMQPMessage $message, ?string $queueName = null): bool + { + $ttl = (int) ($this->getConfig('ttl') ?: self::DEFAULT_TTL); + if ($ttl <= 0 || $ttl > self::MAX_TTL) { + throw new \InvalidArgumentException(sprintf('Invalid TTL seconds. Should be between 1 sec and %d sec', self::MAX_TTL)); + } + + return $this->add($message, self::PROCESSED, $ttl, $queueName); + } + + public function release(AMQPMessage $message, ?string $queueName = null): void + { + if (!$this->isEnabled()) { + return; + } + + $messageId = $this->getMessageId($message, $queueName); + if ($messageId === null) { + return; + } + + $this->store->release($messageId); + } + + protected function markAsInProgress(AMQPMessage $message, ?string $queueName = null): bool + { + $ttl = (int) ($this->getConfig('lock_ttl') ?: self::DEFAULT_LOCK_TTL); + if ($ttl <= 0 || $ttl > self::MAX_LOCK_TTL) { + throw new \InvalidArgumentException(sprintf('Invalid TTL seconds. Should be between 1 and %d', self::MAX_LOCK_TTL)); + } + + return $this->add($message, self::IN_PROGRESS, $ttl, $queueName); + } + + /** + * Returns "true" if the message was not processed previously, and it's successfully been added to the store. + * Returns "false" if the message was already processed and it's a duplicate. + * + * @param AMQPMessage $message + * @param string $value + * @param int $ttl + * @return bool + */ + protected function add(AMQPMessage $message, string $value, int $ttl, ?string $queueName = null): bool + { + if (!$this->isEnabled()) { + return true; + } + + $messageId = $this->getMessageId($message, $queueName); + if ($messageId === null) { + return true; + } + + return $this->store->set($messageId, $value, $ttl, $value === self::PROCESSED); + } + + protected function getMessageId(AMQPMessage $message, ?string $queueName = null): ?string + { + $props = $message->get_properties(); + $messageId = $props['message_id'] ?? null; + if (!is_string($messageId) || empty($messageId)) { + return null; + } + + if (DlqDetector::isDlqMessage($message)) { + $messageId = 'dlq:' . $messageId; + } + + if (is_string($queueName) && $queueName !== '') { + $messageId = $queueName . ':' . $messageId; + } + + return $messageId; + } + + protected function applyActionOnLock(AMQPMessage $message): string + { + $action = $this->getConfig('action_on_lock', self::ACTION_REQUEUE); + if ($action === self::ACTION_REJECT) { + $message->reject(false); + } elseif ($action === self::ACTION_ACK) { + $message->ack(); + } else { + $action = $this->republishLockedMessage($message); + } + + return $action; + } + + protected function applyActionOnDuplication(AMQPMessage $message): string + { + $action = $this->getConfig('action_on_duplication', self::ACTION_ACK); + if ($action === self::ACTION_REJECT) { + $message->reject(false); + } else { + $message->ack(); + } + + return $action; + } + + /** + * Such a situation normally should not happen or can happen very rarely. + * Republish the locked message with a retry-count guard. + * It's necessary to avoid infinite redelivery loop. + * + * @param AMQPMessage $message + * @return string + */ + protected function republishLockedMessage(AMQPMessage $message): string + { + $props = $message->get_properties(); + $headers = []; + if (($props['application_headers'] ?? null) instanceof AMQPTable) { + $headers = $props['application_headers']->getNativeData(); + } + + $attempts = (int) ($headers[self::HEADER_LOCK_REQUEUE_COUNT] ?? 0); + ++$attempts; + + $maxAttempts = ((int) ($this->getConfig('lock_ttl', 30))) / self::WAIT_AFTER_PUBLISH; + if ($attempts > $maxAttempts) { + $this->logger->warning('DeduplicationService.republishLockedMessage.max_attempts_reached', [ + 'message_id' => $props['message_id'] ?? null, + ]); + $message->ack(); + + return self::ACTION_ACK; + } + + $headers[self::HEADER_LOCK_REQUEUE_COUNT] = $attempts; + + $newProps = $props; + $newProps['application_headers'] = new AMQPTable($headers); + + $newMessage = new AMQPMessage($message->getBody(), $newProps); + $channel = $message->getChannel(); + $channel->basic_publish($newMessage, $message->getExchange(), $message->getRoutingKey()); + + $this->logger->warning('DeduplicationService.republishLockedMessage.republish', [ + 'message_id' => $props['message_id'] ?? null, + 'attempts' => $attempts, + ]); + $message->ack(); + // it's necessary to avoid a high redelivery rate + // normally, such a situation is not expected (or expected very rarely) + sleep(self::WAIT_AFTER_PUBLISH); + + return self::ACTION_REQUEUE; + } + + protected function isEnabled(): bool + { + return (bool) $this->getConfig('enabled', false); + } + + protected function getConfig(string $key, mixed $default = null): mixed + { + $value = config("queue.connections.rabbitmq_vhosts.deduplication.transport.$key"); + + return $value !== null ? $value : $default; + } +} diff --git a/src/Services/Deduplication/TransportLevel/DeduplicationStore.php b/src/Services/Deduplication/TransportLevel/DeduplicationStore.php new file mode 100644 index 00000000..014e639f --- /dev/null +++ b/src/Services/Deduplication/TransportLevel/DeduplicationStore.php @@ -0,0 +1,12 @@ +getKey($messageKey); + return $this->connection()->get($key); + } + + public function set(string $messageKey, mixed $value, int $ttlSeconds, bool $withOverride = false): bool + { + if ($ttlSeconds <= 0) { + throw new \InvalidArgumentException('Invalid TTL seconds. Should be greater than 0.'); + } + + $key = $this->getKey($messageKey); + $args = [$key, $value, 'EX', $ttlSeconds]; + if (!$withOverride) { + $args[] = 'NX'; + } + + return (bool) $this->connection()->set(...$args); + } + + public function release(string $messageKey): void + { + $key = $this->getKey($messageKey); + $this->connection()->del($key); + } + + protected function connection(): Connection + { + return $this->connectionName ? Redis::connection($this->connectionName) : Redis::connection(); + } + + protected function getKey(string $messageKey): string + { + return $this->keyPrefix . ':' . $messageKey; + } +} diff --git a/src/Services/DlqDetector.php b/src/Services/DlqDetector.php new file mode 100644 index 00000000..997fb2f7 --- /dev/null +++ b/src/Services/DlqDetector.php @@ -0,0 +1,22 @@ +get_properties()['application_headers'] ?? null; + + if (!($headersTable instanceof AMQPTable)) { + return false; + } + + $headers = $headersTable->getNativeData(); + + return !empty($headers['x-death']) && !empty($headers['x-opt-deaths']); + } +} diff --git a/src/Services/InternalStorageManager.php b/src/Services/InternalStorageManager.php index 9e0cd8a0..2708f859 100644 --- a/src/Services/InternalStorageManager.php +++ b/src/Services/InternalStorageManager.php @@ -4,6 +4,7 @@ use Illuminate\Redis\Connections\PredisConnection; use Illuminate\Support\Facades\Redis; +use Illuminate\Support\Str; use Salesmessage\LibRabbitMQ\Dto\QueueApiDto; use Salesmessage\LibRabbitMQ\Dto\VhostApiDto; @@ -38,7 +39,7 @@ public function getVhosts(string $by = 'name', bool $alpha = true): array 'sort' => 'asc', ]); - return array_map(fn($value): string => str_replace_first( + return array_map(fn($value): string => Str::replaceFirst( $this->getVhostStorageKeyPrefix(), '', $value @@ -61,7 +62,7 @@ public function getVhostQueues(string $vhostName, string $by = 'name', bool $alp 'sort' => 'asc', ]); - return array_map(fn($value): string => str_replace_first( + return array_map(fn($value): string => Str::replaceFirst( $this->getQueueStorageKeyPrefix($vhostName), '', $value diff --git a/src/VhostsConsumers/AbstractVhostsConsumer.php b/src/VhostsConsumers/AbstractVhostsConsumer.php index 7d99e10e..ff30cf41 100644 --- a/src/VhostsConsumers/AbstractVhostsConsumer.php +++ b/src/VhostsConsumers/AbstractVhostsConsumer.php @@ -12,11 +12,10 @@ use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Exception\AMQPChannelClosedException; use PhpAmqpLib\Exception\AMQPConnectionClosedException; -use PhpAmqpLib\Exception\AMQPProtocolChannelException; -use PhpAmqpLib\Exception\AMQPRuntimeException; use PhpAmqpLib\Message\AMQPMessage; use Psr\Log\LoggerInterface; use Salesmessage\LibRabbitMQ\Consumer; +use Salesmessage\LibRabbitMQ\Contracts\RabbitMQConsumable; use Salesmessage\LibRabbitMQ\Dto\ConnectionNameDto; use Salesmessage\LibRabbitMQ\Dto\ConsumeVhostsFiltersDto; use Salesmessage\LibRabbitMQ\Dto\QueueApiDto; @@ -26,6 +25,8 @@ use Salesmessage\LibRabbitMQ\Mutex; use Salesmessage\LibRabbitMQ\Queue\Jobs\RabbitMQJob; use Salesmessage\LibRabbitMQ\Queue\RabbitMQQueue; +use Salesmessage\LibRabbitMQ\Services\Deduplication\AppDeduplicationService; +use Salesmessage\LibRabbitMQ\Services\Deduplication\TransportLevel\DeduplicationService as TransportDeduplicationService; use Salesmessage\LibRabbitMQ\Services\InternalStorageManager; abstract class AbstractVhostsConsumer extends Consumer @@ -54,6 +55,7 @@ abstract class AbstractVhostsConsumer extends Consumer protected ?WorkerOptions $workerOptions = null; + /** @var array, array> */ protected array $batchMessages = []; protected ?string $processingUuid = null; @@ -72,6 +74,8 @@ abstract class AbstractVhostsConsumer extends Consumer protected bool $asyncMode = false; + protected ?Mutex $connectionMutex = null; + /** * @param InternalStorageManager $internalStorageManager * @param LoggerInterface $logger @@ -79,6 +83,7 @@ abstract class AbstractVhostsConsumer extends Consumer * @param Dispatcher $events * @param ExceptionHandler $exceptions * @param callable $isDownForMaintenance + * @param TransportDeduplicationService $transportDeduplicationService * @param callable|null $resetScope */ public function __construct( @@ -88,7 +93,8 @@ public function __construct( Dispatcher $events, ExceptionHandler $exceptions, callable $isDownForMaintenance, - callable $resetScope = null + protected TransportDeduplicationService $transportDeduplicationService, + callable $resetScope = null, ) { parent::__construct($manager, $events, $exceptions, $isDownForMaintenance, $resetScope); } @@ -223,7 +229,7 @@ protected function getStopStatus( /** * @return RabbitMQQueue - * @throws Exceptions\MutexTimeout + * @throws MutexTimeout */ abstract protected function startConsuming(): RabbitMQQueue; @@ -240,7 +246,7 @@ protected function processAmqpMessage(AMQPMessage $message, RabbitMQQueue $conne $this->addMessageToBatch($message); } else { $job = $this->getJobByMessage($message, $connection); - $this->processSingleJob($job); + $this->processSingleJob($job, $message); } $this->jobsProcessed++; @@ -262,18 +268,23 @@ protected function generateProcessingUuid(): string /** * @param AMQPMessage $message - * @return string + * @return non-empty-string */ protected function getMessageClass(AMQPMessage $message): string { $body = json_decode($message->getBody(), true); - return (string) ($body['data']['commandName'] ?? ''); + $messageClass = (string) ($body['data']['commandName'] ?? ''); + if (empty($messageClass)) { + throw new \RuntimeException('Message class is not defined'); + } + return $messageClass; } /** - * @param RabbitMQJob $job - * @return void + * @param AMQPMessage $message + * @return bool + * @throws \ReflectionException */ protected function isSupportBatching(AMQPMessage $message): bool { @@ -296,8 +307,8 @@ protected function addMessageToBatch(AMQPMessage $message): void /** * @param RabbitMQQueue $connection * @return void - * @throws Exceptions\MutexTimeout - * @throws Throwable + * @throws MutexTimeout + * @throws \Throwable */ protected function processBatch(RabbitMQQueue $connection): void { @@ -307,33 +318,52 @@ protected function processBatch(RabbitMQQueue $connection): void foreach ($this->batchMessages as $batchJobClass => $batchJobMessages) { $isBatchSuccess = false; - $batchSize = count($batchJobMessages); + if ($batchSize > 1) { $batchTimeStarted = microtime(true); + $uniqueMessagesForProcessing = []; $batchData = []; - /** @var AMQPMessage $batchMessage */ foreach ($batchJobMessages as $batchMessage) { - $job = $this->getJobByMessage($batchMessage, $connection); - $batchData[] = $job->getPayloadData(); + $this->transportDeduplicationService->decorateWithDeduplication( + function () use ($batchMessage, $connection, &$uniqueMessagesForProcessing, &$batchData) { + $job = $this->getJobByMessage($batchMessage, $connection); + $uniqueMessagesForProcessing[] = $batchMessage; + $batchData[] = $job->getPayloadData(); + }, + $batchMessage, + $this->currentQueueName + ); } - $this->logInfo('processBatch.start', [ - 'batch_job_class' => $batchJobClass, - 'batch_size' => $batchSize, - ]); - try { - $batchJobClass::collection($batchData); + if (AppDeduplicationService::isEnabled()) { + /** @var RabbitMQBatchable $batchJobClass */ + $batchData = $batchJobClass::getNotDuplicatedBatchedJobs($batchData); + } + + if (!empty($batchData)) { + $this->logInfo('processBatch.start', [ + 'batch_job_class' => $batchJobClass, + 'batch_size' => $batchSize, + ]); + + $batchJobClass::collection($batchData); + + $this->logInfo('processBatch.finish', [ + 'batch_job_class' => $batchJobClass, + 'batch_size' => $batchSize, + 'executive_batch_time_seconds' => microtime(true) - $batchTimeStarted, + ]); + } + $isBatchSuccess = true; + } catch (\Throwable $exception) { + foreach ($uniqueMessagesForProcessing as $batchMessage) { + $this->transportDeduplicationService->release($batchMessage, $this->currentQueueName); + } - $this->logInfo('processBatch.finish', [ - 'batch_job_class' => $batchJobClass, - 'batch_size' => $batchSize, - 'executive_batch_time_seconds' => microtime(true) - $batchTimeStarted, - ]); - } catch (Throwable $exception) { $isBatchSuccess = false; $this->logError('processBatch.exception', [ @@ -345,19 +375,28 @@ protected function processBatch(RabbitMQQueue $connection): void } unset($batchData); + } else { + $uniqueMessagesForProcessing = $batchJobMessages; } $this->connectionMutex->lock(static::MAIN_HANDLER_LOCK); - if ($isBatchSuccess) { - $lastBatchMessage = end($batchJobMessages); - $this->ackMessage($lastBatchMessage, true); - } else { - foreach ($batchJobMessages as $batchMessage) { - $job = $this->getJobByMessage($batchMessage, $connection); - $this->processSingleJob($job); + try { + if ($isBatchSuccess && !empty($uniqueMessagesForProcessing)) { + foreach ($uniqueMessagesForProcessing as $batchMessage) { + $this->transportDeduplicationService?->markAsProcessed($batchMessage, $this->currentQueueName); + } + + $lastBatchMessage = end($uniqueMessagesForProcessing); + $this->ackMessage($lastBatchMessage, true); + } else { + foreach ($uniqueMessagesForProcessing as $batchMessage) { + $job = $this->getJobByMessage($batchMessage, $connection); + $this->processSingleJob($job, $batchMessage); + } } + } finally { + $this->connectionMutex->unlock(static::MAIN_HANDLER_LOCK); } - $this->connectionMutex->unlock(static::MAIN_HANDLER_LOCK); } $this->updateLastProcessedAt(); @@ -368,26 +407,28 @@ protected function processBatch(RabbitMQQueue $connection): void * @param AMQPMessage $message * @param RabbitMQQueue $connection * @return RabbitMQJob - * @throws Throwable + * @throws \Throwable */ protected function getJobByMessage(AMQPMessage $message, RabbitMQQueue $connection): RabbitMQJob { $jobClass = $connection->getJobClass(); - return new $jobClass( + $job = new $jobClass( $this->container, $connection, $message, $this->currentConnectionName, $this->currentQueueName ); + + if (!is_subclass_of($job->getPayloadClass(), RabbitMQConsumable::class)) { + throw new \RuntimeException(sprintf('Job class %s must implement %s', $job->getPayloadClass(), RabbitMQConsumable::class)); + } + + return $job; } - /** - * @param RabbitMQJob $job - * @return void - */ - protected function processSingleJob(RabbitMQJob $job): void + protected function processSingleJob(RabbitMQJob $job, AMQPMessage $message): void { $timeStarted = microtime(true); $this->logInfo('processSingleJob.start'); @@ -396,7 +437,22 @@ protected function processSingleJob(RabbitMQJob $job): void $this->registerTimeoutHandler($job, $this->workerOptions); } - $this->runJob($job, $this->currentConnectionName, $this->workerOptions); + $this->transportDeduplicationService->decorateWithDeduplication( + function () use ($job, $message) { + if (AppDeduplicationService::isEnabled() && $job->getPayloadData()->isDuplicated()) { + $this->logWarning('processSingleJob.job_is_duplicated'); + $this->ackMessage($message); + + } else { + $this->runJob($job, $this->currentConnectionName, $this->workerOptions); + } + + $this->transportDeduplicationService->markAsProcessed($message, $this->currentQueueName); + }, + $message, + $this->currentQueueName, + ); + $this->updateLastProcessedAt(); if ($this->supportsAsyncSignals()) { @@ -421,7 +477,7 @@ protected function ackMessage(AMQPMessage $message, bool $multiple = false): voi try { $message->ack($multiple); - } catch (Throwable $exception) { + } catch (\Throwable $exception) { $this->logError('ackMessage.exception', [ 'message' => $exception->getMessage(), 'trace' => $exception->getTraceAsString(), @@ -432,7 +488,7 @@ protected function ackMessage(AMQPMessage $message, bool $multiple = false): voi /** * @return void - * @throws Exceptions\MutexTimeout + * @throws MutexTimeout */ abstract protected function stopConsuming(): void; @@ -631,7 +687,7 @@ protected function goAhead(): bool /** * @return void */ - protected function updateLastProcessedAt() + protected function updateLastProcessedAt(): void { if ((null === $this->currentVhostName) || (null === $this->currentQueueName)) { return; @@ -690,7 +746,6 @@ protected function initConnection(): RabbitMQQueue $this->prefetchCount, false ); - $this->connectionMutex->unlock(self::MAIN_HANDLER_LOCK); $this->channel = $channel; $this->connection = $connection; @@ -709,6 +764,8 @@ protected function initConnection(): RabbitMQQueue $this->goAheadOrWait(); return $this->initConnection(); + } finally { + $this->connectionMutex->unlock(self::MAIN_HANDLER_LOCK); } return $connection; @@ -725,6 +782,7 @@ protected function startHeartbeatCheck(): void $heartbeatInterval = (int) ($this->config['options']['heartbeat'] ?? 0); if (!$heartbeatInterval) { + $this->logWarning('startHeartbeatCheck.heartbeat_interval_is_not_set'); return; } @@ -764,10 +822,10 @@ protected function startHeartbeatCheck(): void $connection->checkHeartBeat(); } catch (MutexTimeout) { $this->logWarning('startHeartbeatCheck.mutex_timeout'); - } catch (Throwable $exception) { + } catch (\Throwable $exception) { $this->logError('startHeartbeatCheck.exception', [ - 'eroor' => $exception->getMessage(), - 'trace' => $e->getTraceAsString(), + 'error' => $exception->getMessage(), + 'trace' => $exception->getTraceAsString(), ]); $this->shouldQuit = true; diff --git a/src/VhostsConsumers/DirectConsumer.php b/src/VhostsConsumers/DirectConsumer.php index 72abfba7..a17322a2 100644 --- a/src/VhostsConsumers/DirectConsumer.php +++ b/src/VhostsConsumers/DirectConsumer.php @@ -2,18 +2,10 @@ namespace Salesmessage\LibRabbitMQ\VhostsConsumers; -use Illuminate\Console\OutputStyle; -use Illuminate\Contracts\Debug\ExceptionHandler; -use Illuminate\Contracts\Events\Dispatcher; -use Illuminate\Queue\QueueManager; use Illuminate\Queue\WorkerOptions; -use Illuminate\Support\Str; use PhpAmqpLib\Exception\AMQPChannelClosedException; -use PhpAmqpLib\Exception\AMQPConnectionClosedException; use PhpAmqpLib\Exception\AMQPProtocolChannelException; use PhpAmqpLib\Exception\AMQPRuntimeException; -use PhpAmqpLib\Message\AMQPMessage; -use Psr\Log\LoggerInterface; use Salesmessage\LibRabbitMQ\Queue\RabbitMQQueue; class DirectConsumer extends AbstractVhostsConsumer @@ -22,7 +14,6 @@ class DirectConsumer extends AbstractVhostsConsumer * @param $connectionName * @param WorkerOptions $options * @return int - * @throws \Salesmessage\LibRabbitMQ\Exceptions\MutexTimeout * @throws \Throwable */ protected function vhostDaemon($connectionName, WorkerOptions $options) @@ -51,7 +42,6 @@ protected function vhostDaemon($connectionName, WorkerOptions $options) try { $this->connectionMutex->lock(self::MAIN_HANDLER_LOCK); $amqpMessage = $this->channel->basic_get($this->currentQueueName); - $this->connectionMutex->unlock(self::MAIN_HANDLER_LOCK); } catch (AMQPProtocolChannelException|AMQPChannelClosedException $exception) { $amqpMessage = null; @@ -69,7 +59,7 @@ protected function vhostDaemon($connectionName, WorkerOptions $options) $this->exceptions->report($exception); $this->kill(self::EXIT_SUCCESS, $this->workerOptions); - } catch (Exception|Throwable $exception) { + } catch (\Throwable $exception) { $this->logError('daemon.exception', [ 'message' => $exception->getMessage(), 'trace' => $exception->getTraceAsString(), @@ -79,9 +69,11 @@ protected function vhostDaemon($connectionName, WorkerOptions $options) $this->exceptions->report($exception); $this->stopWorkerIfLostConnection($exception); + } finally { + $this->connectionMutex->unlock(self::MAIN_HANDLER_LOCK); } - if (null === $amqpMessage) { + if (!isset($amqpMessage)) { $this->logInfo('daemon.consuming_sleep_no_job'); $this->stopConsuming(); @@ -131,7 +123,6 @@ protected function vhostDaemon($connectionName, WorkerOptions $options) /** * @return RabbitMQQueue - * @throws Exceptions\MutexTimeout */ protected function startConsuming(): RabbitMQQueue { @@ -154,4 +145,3 @@ protected function stopConsuming(): void return; } } - diff --git a/src/VhostsConsumers/QueueConsumer.php b/src/VhostsConsumers/QueueConsumer.php index b9be89ea..700bf8a1 100644 --- a/src/VhostsConsumers/QueueConsumer.php +++ b/src/VhostsConsumers/QueueConsumer.php @@ -2,18 +2,12 @@ namespace Salesmessage\LibRabbitMQ\VhostsConsumers; -use Illuminate\Console\OutputStyle; -use Illuminate\Contracts\Debug\ExceptionHandler; -use Illuminate\Contracts\Events\Dispatcher; -use Illuminate\Queue\QueueManager; use Illuminate\Queue\WorkerOptions; -use Illuminate\Support\Str; use PhpAmqpLib\Exception\AMQPChannelClosedException; -use PhpAmqpLib\Exception\AMQPConnectionClosedException; use PhpAmqpLib\Exception\AMQPProtocolChannelException; use PhpAmqpLib\Exception\AMQPRuntimeException; use PhpAmqpLib\Message\AMQPMessage; -use Psr\Log\LoggerInterface; +use Salesmessage\LibRabbitMQ\Exceptions\MutexTimeout; use Salesmessage\LibRabbitMQ\Queue\RabbitMQQueue; class QueueConsumer extends AbstractVhostsConsumer @@ -24,7 +18,7 @@ class QueueConsumer extends AbstractVhostsConsumer * @param $connectionName * @param WorkerOptions $options * @return int|void - * @throws \Salesmessage\LibRabbitMQ\Exceptions\MutexTimeout + * @throws MutexTimeout * @throws \Throwable */ protected function vhostDaemon($connectionName, WorkerOptions $options) @@ -54,7 +48,6 @@ protected function vhostDaemon($connectionName, WorkerOptions $options) try { $this->connectionMutex->lock(self::MAIN_HANDLER_LOCK); $this->channel->wait(null, true, (int) $this->workerOptions->timeout); - $this->connectionMutex->unlock(self::MAIN_HANDLER_LOCK); } catch (AMQPRuntimeException $exception) { $this->logError('daemon.amqp_runtime_exception', [ 'message' => $exception->getMessage(), @@ -64,7 +57,7 @@ protected function vhostDaemon($connectionName, WorkerOptions $options) $this->exceptions->report($exception); $this->kill(self::EXIT_SUCCESS, $this->workerOptions); - } catch (Exception|Throwable $exception) { + } catch (\Throwable $exception) { $this->logError('daemon.exception', [ 'message' => $exception->getMessage(), 'trace' => $exception->getTraceAsString(), @@ -74,6 +67,8 @@ protected function vhostDaemon($connectionName, WorkerOptions $options) $this->exceptions->report($exception); $this->stopWorkerIfLostConnection($exception); + } finally { + $this->connectionMutex->unlock(self::MAIN_HANDLER_LOCK); } // If no job is got off the queue, we will need to sleep the worker. @@ -116,7 +111,7 @@ protected function vhostDaemon($connectionName, WorkerOptions $options) /** * @return RabbitMQQueue - * @throws Exceptions\MutexTimeout + * @throws MutexTimeout */ protected function startConsuming(): RabbitMQQueue { @@ -180,10 +175,10 @@ protected function startConsuming(): RabbitMQQueue 'trace' => $exception->getTraceAsString(), 'error_class' => get_class($exception), ]); + } finally { + $this->connectionMutex->unlock(self::MAIN_HANDLER_LOCK); } - $this->connectionMutex->unlock(self::MAIN_HANDLER_LOCK); - $this->updateLastProcessedAt(); if (false === $isSuccess) { @@ -198,13 +193,15 @@ protected function startConsuming(): RabbitMQQueue /** * @return void - * @throws Exceptions\MutexTimeout + * @throws MutexTimeout */ protected function stopConsuming(): void { $this->connectionMutex->lock(self::MAIN_HANDLER_LOCK); - $this->channel->basic_cancel($this->getTagName(), true); - $this->connectionMutex->unlock(self::MAIN_HANDLER_LOCK); + try { + $this->channel->basic_cancel($this->getTagName(), true); + } finally { + $this->connectionMutex->unlock(self::MAIN_HANDLER_LOCK); + } } } - From 820e511f9db0598effb9d7201f0f9aa5e5fda7c6 Mon Sep 17 00:00:00 2001 From: Alexander Ginko <120381488+ahinkoneklo@users.noreply.github.com> Date: Wed, 5 Nov 2025 10:17:33 +0100 Subject: [PATCH 7/7] SWR-20483 Server: RabbitMQ Improvements: Quorum Queues (#17) * SWR-20482 Server: RabbitMQ Improvements: Deduplication * SWR-20482 Server: RabbitMQ Improvements: Deduplication * SWR-20482 Server: RabbitMQ Improvements: Deduplication * SWR-20482 Server: RabbitMQ Improvements: Deduplication * SWR-20482 Server: RabbitMQ Improvements: Deduplication * SWR-20482 Server: RabbitMQ Improvements: Deduplication * SWR-20482 Server: RabbitMQ Improvements: Deduplication * SWR-20482 Server: RabbitMQ Improvements: Deduplication * SWR-20482 Server: RabbitMQ Improvements: Deduplication * SWR-20482 Server: RabbitMQ Improvements: Deduplication * SWR-20482 Server: RabbitMQ Improvements: Deduplication * SWR-20482 Server: RabbitMQ Improvements: Deduplication * SWR-20482 Server: RabbitMQ Improvements: Deduplication * SWR-20482 Server: RabbitMQ Improvements: Deduplication * SWR-20483 Server: RabbitMQ Improvements: Quorum Queues * SWR-20483 Server: RabbitMQ Improvements: Quorum Queues * SWR-20483 Server: RabbitMQ Improvements: Quorum Queues --- README.md | 2 +- composer.json | 2 +- src/Console/QueueDeclareCommand.php | 25 ++++++--- src/Contracts/RabbitMQConsumable.php | 5 ++ src/Horizon/RabbitMQQueue.php | 9 +++- src/Queue/Jobs/RabbitMQJob.php | 42 ++++++++++++++- src/Queue/Jobs/RabbitMQJobBatchable.php | 48 ----------------- src/Queue/QueueConfig.php | 39 ++++++++++++++ src/Queue/QueueConfigFactory.php | 9 ++++ src/Queue/RabbitMQQueue.php | 71 +++++++++++++++++++------ src/Queue/RabbitMQQueueBatchable.php | 14 ++--- 11 files changed, 184 insertions(+), 82 deletions(-) diff --git a/README.md b/README.md index ae38fac9..0c6c9cb2 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ Only the latest version will get new features. Bug fixes will be provided using You can install this package via composer using this command: ``` -composer require salesmessage/php-lib-rabbitmq:^1.31 --ignore-platform-reqs +composer require salesmessage/php-lib-rabbitmq:^1.32 --ignore-platform-reqs ``` The package will automatically register itself. diff --git a/composer.json b/composer.json index 54c2c477..ff7d373d 100644 --- a/composer.json +++ b/composer.json @@ -34,7 +34,7 @@ }, "extra": { "branch-alias": { - "dev-master": "1.31-dev" + "dev-master": "1.32-dev" }, "laravel": { "providers": [ diff --git a/src/Console/QueueDeclareCommand.php b/src/Console/QueueDeclareCommand.php index 396877ac..24e3c5d0 100644 --- a/src/Console/QueueDeclareCommand.php +++ b/src/Console/QueueDeclareCommand.php @@ -11,10 +11,11 @@ class QueueDeclareCommand extends Command protected $signature = 'lib-rabbitmq:queue-declare {name : The name of the queue to declare} {connection=rabbitmq : The name of the queue connection to use} - {--max-priority} + {--max-priority : Set x-max-priority (ignored for quorum)} {--durable=1} {--auto-delete=0} - {--quorum=0}'; + {--quorum=0 : Declare quorum queue (x-queue-type=quorum)} + {--quorum-initial-group-size= : x-quorum-initial-group-size when quorum is enabled}'; protected $description = 'Declare queue'; @@ -36,12 +37,24 @@ public function handle(RabbitMQConnector $connector): void $arguments = []; $maxPriority = (int) $this->option('max-priority'); - if ($maxPriority) { - $arguments['x-max-priority'] = $maxPriority; - } + $isQuorum = (bool) $this->option('quorum'); - if ($this->option('quorum')) { + if ($isQuorum) { $arguments['x-queue-type'] = 'quorum'; + + $initialGroupSize = (int) $this->option('quorum-initial-group-size'); + if ($initialGroupSize > 0) { + $arguments['x-quorum-initial-group-size'] = $initialGroupSize; + } + + if ($maxPriority) { + // quorum queues do not support priority; ignore and warn + $this->warn('Ignoring --max-priority for quorum queue.'); + } + } else { + if ($maxPriority) { + $arguments['x-max-priority'] = $maxPriority; + } } $queue->declareQueue( diff --git a/src/Contracts/RabbitMQConsumable.php b/src/Contracts/RabbitMQConsumable.php index 4e45e3ec..62364e27 100644 --- a/src/Contracts/RabbitMQConsumable.php +++ b/src/Contracts/RabbitMQConsumable.php @@ -7,9 +7,14 @@ */ interface RabbitMQConsumable { + public const MQ_TYPE_CLASSIC = 'classic'; + public const MQ_TYPE_QUORUM = 'quorum'; + /** * Check duplications on the application side. * It's mostly represented as an idempotency checker. */ public function isDuplicated(): bool; + + public function getQueueType(): string; } diff --git a/src/Horizon/RabbitMQQueue.php b/src/Horizon/RabbitMQQueue.php index 31d329d1..c98eb65e 100644 --- a/src/Horizon/RabbitMQQueue.php +++ b/src/Horizon/RabbitMQQueue.php @@ -12,6 +12,7 @@ use PhpAmqpLib\Exception\AMQPProtocolChannelException; use Salesmessage\LibRabbitMQ\Queue\Jobs\RabbitMQJob; use Salesmessage\LibRabbitMQ\Queue\RabbitMQQueue as BaseRabbitMQQueue; +use Salesmessage\LibRabbitMQ\Contracts\RabbitMQConsumable; class RabbitMQQueue extends BaseRabbitMQQueue { @@ -50,6 +51,10 @@ public function pushRaw($payload, $queue = null, array $options = []): int|strin { $payload = (new JobPayload($payload))->prepare($this->lastPushed ?? null)->value; + if (!isset($options['queue_type']) && isset($this->lastPushed) && is_object($this->lastPushed) && $this->lastPushed instanceof RabbitMQConsumable) { + $options['queue_type'] = $this->lastPushed->getQueueType(); + } + return tap(parent::pushRaw($payload, $queue, $options), function () use ($queue, $payload): void { $this->event($this->getQueue($queue), new JobPushed($payload)); }); @@ -64,7 +69,9 @@ public function later($delay, $job, $data = '', $queue = null): mixed { $payload = (new JobPayload($this->createPayload($job, $data)))->prepare($job)->value; - return tap(parent::laterRaw($delay, $payload, $queue), function () use ($payload, $queue): void { + $queueType = ($job instanceof RabbitMQConsumable) ? $job->getQueueType() : null; + + return tap(parent::laterRaw($delay, $payload, $queue, queueType: $queueType), function () use ($payload, $queue): void { $this->event($this->getQueue($queue), new JobPushed($payload)); }); } diff --git a/src/Queue/Jobs/RabbitMQJob.php b/src/Queue/Jobs/RabbitMQJob.php index 0622b8ec..a3c16b80 100644 --- a/src/Queue/Jobs/RabbitMQJob.php +++ b/src/Queue/Jobs/RabbitMQJob.php @@ -4,12 +4,14 @@ use Illuminate\Container\Container; use Illuminate\Contracts\Container\BindingResolutionException; +use Illuminate\Contracts\Encryption\Encrypter; use Illuminate\Contracts\Queue\Job as JobContract; use Illuminate\Queue\Jobs\Job; use Illuminate\Support\Arr; use PhpAmqpLib\Exception\AMQPProtocolChannelException; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; +use Salesmessage\LibRabbitMQ\Contracts\RabbitMQConsumable; use Salesmessage\LibRabbitMQ\Horizon\RabbitMQQueue as HorizonRabbitMQQueue; use Salesmessage\LibRabbitMQ\Queue\RabbitMQQueue; @@ -126,8 +128,13 @@ public function release($delay = 0): void { parent::release(); + $consumableJob = $this->getPayloadData(); + if (!($consumableJob instanceof RabbitMQConsumable)) { + throw new \RuntimeException('Job must be an instance of RabbitMQJobBatchable'); + } + // Always create a new message when this Job is released - $this->rabbitmq->laterRaw($delay, $this->message->getBody(), $this->queue, $this->attempts()); + $this->rabbitmq->laterRaw($delay, $this->message->getBody(), $this->queue, $this->attempts(), $consumableJob->getQueueType()); // Releasing a Job means the message was failed to process. // Because this Job message is always recreated and pushed as new message, this Job message is correctly handled. @@ -135,6 +142,39 @@ public function release($delay = 0): void $this->rabbitmq->ack($this); } + /** + * @return object + * @throws \RuntimeException + */ + public function getPayloadData(): object + { + $payload = $this->payload(); + + $data = $payload['data']; + + if (str_starts_with($data['command'], 'O:')) { + return unserialize($data['command']); + } + + if ($this->container->bound(Encrypter::class)) { + return unserialize($this->container[Encrypter::class]->decrypt($data['command'])); + } + + throw new \RuntimeException('Unable to extract job data.'); + } + + /** + * Returns target class name + * + * @return mixed + */ + public function getPayloadClass(): string + { + $payload = $this->payload(); + + return $payload['data']['commandName']; + } + /** * Get the underlying RabbitMQ connection. */ diff --git a/src/Queue/Jobs/RabbitMQJobBatchable.php b/src/Queue/Jobs/RabbitMQJobBatchable.php index 8015c2ad..278ae917 100644 --- a/src/Queue/Jobs/RabbitMQJobBatchable.php +++ b/src/Queue/Jobs/RabbitMQJobBatchable.php @@ -2,8 +2,6 @@ namespace Salesmessage\LibRabbitMQ\Queue\Jobs; -use Illuminate\Contracts\Encryption\Encrypter; -use Illuminate\Queue\Jobs\JobName; use Salesmessage\LibRabbitMQ\Queue\Jobs\RabbitMQJob as BaseJob; /** @@ -11,50 +9,4 @@ */ class RabbitMQJobBatchable extends BaseJob { - /** - * Fire the job. - * - * @return void - */ - public function fire() - { - $payload = $this->payload(); - - [$class, $method] = JobName::parse($payload['job']); - - ($this->instance = $this->resolve($class))->{$method}($this, $payload['data']); - } - - /** - * Returns target class name - * - * @return mixed - */ - public function getPayloadClass(): string - { - $payload = $this->payload(); - - return $payload['data']['commandName']; - } - - /** - * @return object - * @throws \RuntimeException - */ - public function getPayloadData(): object - { - $payload = $this->payload(); - - $data = $payload['data']; - - if (str_starts_with($data['command'], 'O:')) { - return unserialize($data['command']); - } - - if ($this->container->bound(Encrypter::class)) { - return unserialize($this->container[Encrypter::class]->decrypt($data['command'])); - } - - throw new \RuntimeException('Unable to extract job data.'); - } } diff --git a/src/Queue/QueueConfig.php b/src/Queue/QueueConfig.php index 1bf5802f..22d7f2fb 100644 --- a/src/Queue/QueueConfig.php +++ b/src/Queue/QueueConfig.php @@ -30,6 +30,10 @@ class QueueConfig protected bool $quorum = false; + protected ?int $quorumInitialGroupSize = null; + + protected string $quorumQueuePostfix = ''; + protected array $options = []; /** @@ -247,6 +251,41 @@ public function setQuorum($quorum): QueueConfig return $this; } + /** + * When set, used to declare quorum queues with a specific initial group size. + */ + public function getQuorumInitialGroupSize(): ?int + { + return $this->quorumInitialGroupSize; + } + + public function setQuorumInitialGroupSize(?int $size): self + { + if ($size === null) { + $this->quorumInitialGroupSize = null; + return $this; + } + + if ($size <= 0) { + throw new \InvalidArgumentException('Invalid quorum group size'); + } + + $this->quorumInitialGroupSize = $size; + + return $this; + } + + public function getQuorumQueuePostfix(): string + { + return $this->quorumQueuePostfix; + } + + public function setQuorumQueuePostfix(string $postfix): self + { + $this->quorumQueuePostfix = $postfix; + return $this; + } + /** * Holds all unknown queue options provided in the connection config */ diff --git a/src/Queue/QueueConfigFactory.php b/src/Queue/QueueConfigFactory.php index 04f29166..26d0bc2a 100644 --- a/src/Queue/QueueConfigFactory.php +++ b/src/Queue/QueueConfigFactory.php @@ -68,6 +68,15 @@ protected static function getOptionsFromConfig(QueueConfig $queueConfig, array $ $queueConfig->setQuorum($quorum); } + // Feature: Quorum initial group size + if (Arr::has($queueOptions, 'quorum_initial_group_size')) { + $queueConfig->setQuorumInitialGroupSize((int) Arr::pull($queueOptions, 'quorum_initial_group_size')); + } + + if ($quorumPostfix = (string) Arr::pull($queueOptions, 'quorum_queue_postfix')) { + $queueConfig->setQuorumQueuePostfix($quorumPostfix); + } + // All extra options not defined $queueConfig->setOptions($queueOptions); } diff --git a/src/Queue/RabbitMQQueue.php b/src/Queue/RabbitMQQueue.php index 00336fe6..83ceb317 100644 --- a/src/Queue/RabbitMQQueue.php +++ b/src/Queue/RabbitMQQueue.php @@ -24,6 +24,7 @@ use PhpAmqpLib\Wire\AMQPTable; use Ramsey\Uuid\Uuid; use RuntimeException; +use Salesmessage\LibRabbitMQ\Contracts\RabbitMQConsumable; use Throwable; use Salesmessage\LibRabbitMQ\Contracts\RabbitMQQueueContract; use Salesmessage\LibRabbitMQ\Queue\Jobs\RabbitMQJob; @@ -102,13 +103,21 @@ public function size($queue = null): int */ public function push($job, $data = '', $queue = null) { + if (!($job instanceof RabbitMQConsumable)) { + throw new \RuntimeException('Job must be an instance of RabbitMQConsumable'); + } + + $options = [ + 'queue_type' => $job->getQueueType(), + ]; + return $this->enqueueUsing( $job, $this->createPayload($job, $this->getQueue($queue), $data), $queue, null, - function ($payload, $queue) { - return $this->pushRaw($payload, $queue); + function ($payload, $queue) use ($options) { + return $this->pushRaw($payload, $queue, $options); } ); } @@ -120,9 +129,12 @@ function ($payload, $queue) { */ public function pushRaw($payload, $queue = null, array $options = []): int|string|null { + $queueType = $options['queue_type'] ?? null; + unset($options['queue_type']); + [$destination, $exchange, $exchangeType, $attempts] = $this->publishProperties($queue, $options); - $this->declareDestination($destination, $exchange, $exchangeType); + $this->declareDestination($destination, $exchange, $exchangeType, $queueType); [$message, $correlationId] = $this->createMessage($payload, $attempts); @@ -138,13 +150,19 @@ public function pushRaw($payload, $queue = null, array $options = []): int|strin */ public function later($delay, $job, $data = '', $queue = null): mixed { + if (!($job instanceof RabbitMQConsumable)) { + throw new \RuntimeException('Job must be an instance of RabbitMQConsumable'); + } + + $queueType = $job->getQueueType(); + return $this->enqueueUsing( $job, $this->createPayload($job, $this->getQueue($queue), $data), $queue, $delay, - function ($payload, $queue, $delay) { - return $this->laterRaw($delay, $payload, $queue); + function ($payload, $queue, $delay) use ($queueType) { + return $this->laterRaw($delay, $payload, $queue, queueType: $queueType); } ); } @@ -152,7 +170,7 @@ function ($payload, $queue, $delay) { /** * @throws AMQPProtocolChannelException */ - public function laterRaw($delay, string $payload, $queue = null, int $attempts = 0): int|string|null + public function laterRaw($delay, string $payload, $queue = null, int $attempts = 0, ?string $queueType = null): int|string|null { $ttl = $this->secondsUntil($delay) * 1000; @@ -161,12 +179,15 @@ public function laterRaw($delay, string $payload, $queue = null, int $attempts = // When no ttl just publish a new message to the exchange or queue if ($ttl <= 0) { + if ($queueType !== null) { + $options['queue_type'] = $queueType; + } return $this->pushRaw($payload, $queue, $options); } // Create a main queue to handle delayed messages [$mainDestination, $exchange, $exchangeType, $attempts] = $this->publishProperties($queue, $options); - $this->declareDestination($mainDestination, $exchange, $exchangeType); + $this->declareDestination($mainDestination, $exchange, $exchangeType, $queueType); $destination = $this->getQueue($queue).'.delay.'.$ttl; @@ -196,7 +217,13 @@ public function bulk($jobs, $data = '', $queue = null): void protected function publishBatch($jobs, $data = '', $queue = null): void { foreach ($jobs as $job) { - $this->bulkRaw($this->createPayload($job, $queue, $data), $queue, ['job' => $job]); + if (!($job instanceof RabbitMQConsumable)) { + throw new \RuntimeException('Job must be an instance of RabbitMQConsumable'); + } + $this->bulkRaw($this->createPayload($job, $queue, $data), $queue, [ + 'job' => $job, + 'queue_type' => $job->getQueueType(), + ]); } $this->batchPublish(); @@ -207,9 +234,12 @@ protected function publishBatch($jobs, $data = '', $queue = null): void */ public function bulkRaw(string $payload, string $queue = null, array $options = []): int|string|null { + $queueType = $options['queue_type'] ?? null; + unset($options['queue_type']); + [$destination, $exchange, $exchangeType, $attempts] = $this->publishProperties($queue, $options); - $this->declareDestination($destination, $exchange, $exchangeType); + $this->declareDestination($destination, $exchange, $exchangeType, $queueType); [$message, $correlationId] = $this->createMessage($payload, $attempts); @@ -599,15 +629,16 @@ public function close(): void /** * Get the Queue arguments. */ - protected function getQueueArguments(string $destination): array + protected function getQueueArguments(string $destination, ?string $queueType = null): array { + $isQuorum = $this->getConfig()->isQuorum() || $queueType === RabbitMQConsumable::MQ_TYPE_QUORUM; $arguments = []; // Messages without a priority property are treated as if their priority were 0. // Messages with a priority which is higher than the queue's maximum, are treated as if they were // published with the maximum priority. // Quorum queues does not support priority. - if ($this->getConfig()->isPrioritizeDelayed() && ! $this->getConfig()->isQuorum()) { + if ($this->getConfig()->isPrioritizeDelayed() && ! $isQuorum) { $arguments['x-max-priority'] = $this->getConfig()->getQueueMaxPriority(); } @@ -616,8 +647,14 @@ protected function getQueueArguments(string $destination): array $arguments['x-dead-letter-routing-key'] = $this->getFailedRoutingKey($destination); } - if ($this->getConfig()->isQuorum()) { + if ($isQuorum) { $arguments['x-queue-type'] = 'quorum'; + + // optional: initial group size for quorum queues + $initialGroupSize = $this->getConfig()->getQuorumInitialGroupSize(); + if ($initialGroupSize !== null) { + $arguments['x-quorum-initial-group-size'] = $initialGroupSize; + } } return $arguments; @@ -701,8 +738,12 @@ protected function isQueueDeclared(string $name): bool * * @throws AMQPProtocolChannelException */ - protected function declareDestination(string $destination, string $exchange = null, string $exchangeType = AMQPExchangeType::DIRECT): void - { + protected function declareDestination( + string $destination, + string $exchange = null, + string $exchangeType = AMQPExchangeType::DIRECT, + ?string $queueType = null, + ): void { // When an exchange is provided and no exchange is present in RabbitMQ, create an exchange. if ($exchange && ! $this->isExchangeExists($exchange)) { $this->declareExchange($exchange, $exchangeType); @@ -719,7 +760,7 @@ protected function declareDestination(string $destination, string $exchange = nu } // Create a queue for amq.direct publishing. - $this->declareQueue($destination, true, false, $this->getQueueArguments($destination)); + $this->declareQueue($destination, true, false, $this->getQueueArguments($destination, $queueType)); } /** diff --git a/src/Queue/RabbitMQQueueBatchable.php b/src/Queue/RabbitMQQueueBatchable.php index 00399cf5..d58b89a5 100644 --- a/src/Queue/RabbitMQQueueBatchable.php +++ b/src/Queue/RabbitMQQueueBatchable.php @@ -2,7 +2,6 @@ namespace Salesmessage\LibRabbitMQ\Queue; -use PhpAmqpLib\Connection\AbstractConnection; use Salesmessage\LibRabbitMQ\Contracts\RabbitMQConsumable; use Salesmessage\LibRabbitMQ\Dto\ConnectionNameDto; use Salesmessage\LibRabbitMQ\Dto\QueueApiDto; @@ -42,8 +41,7 @@ protected function publishBasic( $mandatory = false, $immediate = false, $ticket = null - ): void - { + ): void { try { parent::publishBasic($msg, $exchange, $destination, $mandatory, $immediate, $ticket); } catch (AMQPConnectionClosedException|AMQPChannelClosedException) { @@ -89,6 +87,10 @@ public function push($job, $data = '', $queue = null) $queue = $job->onQueue(); } + if ($job->getQueueType() === RabbitMQConsumable::MQ_TYPE_QUORUM) { + $queue .= $this->getConfig()->getQuorumQueuePostfix(); + } + try { $result = parent::push($job, $data, $queue); } catch (AMQPConnectionClosedException $exception) { @@ -111,11 +113,6 @@ public function push($job, $data = '', $queue = null) return $result; } - public function pushRaw($payload, $queue = null, array $options = []): int|string|null - { - return parent::pushRaw($payload, $queue, $options); - } - /** * @return bool * @throws \GuzzleHttp\Exception\GuzzleException @@ -185,4 +182,3 @@ private function isVhostFailedException(AMQPConnectionClosedException $exception return false; } } -