From 198ac96de330382475b77463d9bf5365d370df5c Mon Sep 17 00:00:00 2001 From: Alexander Ginko Date: Tue, 28 Oct 2025 22:16:36 +0100 Subject: [PATCH 01/14] SWR-20482 Server: RabbitMQ Improvements: Deduplication --- composer.json | 4 +- src/Services/Api/RabbitApiClient.php | 4 +- src/Services/InternalStorageManager.php | 5 +- .../AbstractVhostsConsumer.php | 64 +++++++++++-------- src/VhostsConsumers/DirectConsumer.php | 18 ++---- src/VhostsConsumers/QueueConsumer.php | 31 ++++----- 6 files changed, 63 insertions(+), 63 deletions(-) diff --git a/composer.json b/composer.json index a759a59f..35df990c 100644 --- a/composer.json +++ b/composer.json @@ -43,7 +43,9 @@ } }, "suggest": { - "ext-pcntl": "Required to use all features of the queue consumer." + "ext-pcntl": "Required to use all features of the queue consumer.", + "ext-swoole": "Required to use async mode for healthcheck (alternative is ext-openswoole).", + "ext-openswoole": "Required to use async mode for healthcheck (alternative is ext-swoole)." }, "scripts": { "test": [ diff --git a/src/Services/Api/RabbitApiClient.php b/src/Services/Api/RabbitApiClient.php index dfd16f1e..0f8fd3bc 100644 --- a/src/Services/Api/RabbitApiClient.php +++ b/src/Services/Api/RabbitApiClient.php @@ -70,7 +70,7 @@ public function request( $contents = $response->getBody()->getContents(); return (array) ($contents ? json_decode($contents, true) : []); - } catch (Throwable $exception) { + } catch (\Throwable $exception) { $rethrowException = $exception; if ($exception instanceof ClientException) { $rethrowException = new RabbitApiClientException($exception->getMessage()); @@ -109,4 +109,4 @@ private function getPassword(): string { return (string) ($this->connectionConfig['hosts'][0]['password'] ?? ''); } -} \ No newline at end of file +} diff --git a/src/Services/InternalStorageManager.php b/src/Services/InternalStorageManager.php index 9e0cd8a0..2708f859 100644 --- a/src/Services/InternalStorageManager.php +++ b/src/Services/InternalStorageManager.php @@ -4,6 +4,7 @@ use Illuminate\Redis\Connections\PredisConnection; use Illuminate\Support\Facades\Redis; +use Illuminate\Support\Str; use Salesmessage\LibRabbitMQ\Dto\QueueApiDto; use Salesmessage\LibRabbitMQ\Dto\VhostApiDto; @@ -38,7 +39,7 @@ public function getVhosts(string $by = 'name', bool $alpha = true): array 'sort' => 'asc', ]); - return array_map(fn($value): string => str_replace_first( + return array_map(fn($value): string => Str::replaceFirst( $this->getVhostStorageKeyPrefix(), '', $value @@ -61,7 +62,7 @@ public function getVhostQueues(string $vhostName, string $by = 'name', bool $alp 'sort' => 'asc', ]); - return array_map(fn($value): string => str_replace_first( + return array_map(fn($value): string => Str::replaceFirst( $this->getQueueStorageKeyPrefix($vhostName), '', $value diff --git a/src/VhostsConsumers/AbstractVhostsConsumer.php b/src/VhostsConsumers/AbstractVhostsConsumer.php index 7d99e10e..fe5cc241 100644 --- a/src/VhostsConsumers/AbstractVhostsConsumer.php +++ b/src/VhostsConsumers/AbstractVhostsConsumer.php @@ -12,8 +12,6 @@ use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Exception\AMQPChannelClosedException; use PhpAmqpLib\Exception\AMQPConnectionClosedException; -use PhpAmqpLib\Exception\AMQPProtocolChannelException; -use PhpAmqpLib\Exception\AMQPRuntimeException; use PhpAmqpLib\Message\AMQPMessage; use Psr\Log\LoggerInterface; use Salesmessage\LibRabbitMQ\Consumer; @@ -54,6 +52,7 @@ abstract class AbstractVhostsConsumer extends Consumer protected ?WorkerOptions $workerOptions = null; + /** @var array, array> */ protected array $batchMessages = []; protected ?string $processingUuid = null; @@ -72,6 +71,8 @@ abstract class AbstractVhostsConsumer extends Consumer protected bool $asyncMode = false; + protected ?Mutex $connectionMutex = null; + /** * @param InternalStorageManager $internalStorageManager * @param LoggerInterface $logger @@ -223,7 +224,7 @@ protected function getStopStatus( /** * @return RabbitMQQueue - * @throws Exceptions\MutexTimeout + * @throws MutexTimeout */ abstract protected function startConsuming(): RabbitMQQueue; @@ -262,18 +263,23 @@ protected function generateProcessingUuid(): string /** * @param AMQPMessage $message - * @return string + * @return non-empty-string */ protected function getMessageClass(AMQPMessage $message): string { $body = json_decode($message->getBody(), true); - return (string) ($body['data']['commandName'] ?? ''); + $messageClass = (string) ($body['data']['commandName'] ?? ''); + if (empty($messageClass)) { + throw new \RuntimeException('Message class is not defined'); + } + return $messageClass; } /** - * @param RabbitMQJob $job - * @return void + * @param AMQPMessage $message + * @return bool + * @throws \ReflectionException */ protected function isSupportBatching(AMQPMessage $message): bool { @@ -296,8 +302,8 @@ protected function addMessageToBatch(AMQPMessage $message): void /** * @param RabbitMQQueue $connection * @return void - * @throws Exceptions\MutexTimeout - * @throws Throwable + * @throws MutexTimeout + * @throws \Throwable */ protected function processBatch(RabbitMQQueue $connection): void { @@ -313,7 +319,6 @@ protected function processBatch(RabbitMQQueue $connection): void $batchTimeStarted = microtime(true); $batchData = []; - /** @var AMQPMessage $batchMessage */ foreach ($batchJobMessages as $batchMessage) { $job = $this->getJobByMessage($batchMessage, $connection); $batchData[] = $job->getPayloadData(); @@ -333,7 +338,7 @@ protected function processBatch(RabbitMQQueue $connection): void 'batch_size' => $batchSize, 'executive_batch_time_seconds' => microtime(true) - $batchTimeStarted, ]); - } catch (Throwable $exception) { + } catch (\Throwable $exception) { $isBatchSuccess = false; $this->logError('processBatch.exception', [ @@ -348,16 +353,19 @@ protected function processBatch(RabbitMQQueue $connection): void } $this->connectionMutex->lock(static::MAIN_HANDLER_LOCK); - if ($isBatchSuccess) { - $lastBatchMessage = end($batchJobMessages); - $this->ackMessage($lastBatchMessage, true); - } else { - foreach ($batchJobMessages as $batchMessage) { - $job = $this->getJobByMessage($batchMessage, $connection); - $this->processSingleJob($job); + try { + if ($isBatchSuccess) { + $lastBatchMessage = end($batchJobMessages); + $this->ackMessage($lastBatchMessage, true); + } else { + foreach ($batchJobMessages as $batchMessage) { + $job = $this->getJobByMessage($batchMessage, $connection); + $this->processSingleJob($job); + } } + } finally { + $this->connectionMutex->unlock(static::MAIN_HANDLER_LOCK); } - $this->connectionMutex->unlock(static::MAIN_HANDLER_LOCK); } $this->updateLastProcessedAt(); @@ -368,7 +376,7 @@ protected function processBatch(RabbitMQQueue $connection): void * @param AMQPMessage $message * @param RabbitMQQueue $connection * @return RabbitMQJob - * @throws Throwable + * @throws \Throwable */ protected function getJobByMessage(AMQPMessage $message, RabbitMQQueue $connection): RabbitMQJob { @@ -421,7 +429,7 @@ protected function ackMessage(AMQPMessage $message, bool $multiple = false): voi try { $message->ack($multiple); - } catch (Throwable $exception) { + } catch (\Throwable $exception) { $this->logError('ackMessage.exception', [ 'message' => $exception->getMessage(), 'trace' => $exception->getTraceAsString(), @@ -432,7 +440,7 @@ protected function ackMessage(AMQPMessage $message, bool $multiple = false): voi /** * @return void - * @throws Exceptions\MutexTimeout + * @throws MutexTimeout */ abstract protected function stopConsuming(): void; @@ -631,7 +639,7 @@ protected function goAhead(): bool /** * @return void */ - protected function updateLastProcessedAt() + protected function updateLastProcessedAt(): void { if ((null === $this->currentVhostName) || (null === $this->currentQueueName)) { return; @@ -690,7 +698,6 @@ protected function initConnection(): RabbitMQQueue $this->prefetchCount, false ); - $this->connectionMutex->unlock(self::MAIN_HANDLER_LOCK); $this->channel = $channel; $this->connection = $connection; @@ -709,6 +716,8 @@ protected function initConnection(): RabbitMQQueue $this->goAheadOrWait(); return $this->initConnection(); + } finally { + $this->connectionMutex->unlock(self::MAIN_HANDLER_LOCK); } return $connection; @@ -725,6 +734,7 @@ protected function startHeartbeatCheck(): void $heartbeatInterval = (int) ($this->config['options']['heartbeat'] ?? 0); if (!$heartbeatInterval) { + $this->logWarning('startHeartbeatCheck.heartbeat_interval_is_not_set'); return; } @@ -764,10 +774,10 @@ protected function startHeartbeatCheck(): void $connection->checkHeartBeat(); } catch (MutexTimeout) { $this->logWarning('startHeartbeatCheck.mutex_timeout'); - } catch (Throwable $exception) { + } catch (\Throwable $exception) { $this->logError('startHeartbeatCheck.exception', [ - 'eroor' => $exception->getMessage(), - 'trace' => $e->getTraceAsString(), + 'error' => $exception->getMessage(), + 'trace' => $exception->getTraceAsString(), ]); $this->shouldQuit = true; diff --git a/src/VhostsConsumers/DirectConsumer.php b/src/VhostsConsumers/DirectConsumer.php index 72abfba7..a17322a2 100644 --- a/src/VhostsConsumers/DirectConsumer.php +++ b/src/VhostsConsumers/DirectConsumer.php @@ -2,18 +2,10 @@ namespace Salesmessage\LibRabbitMQ\VhostsConsumers; -use Illuminate\Console\OutputStyle; -use Illuminate\Contracts\Debug\ExceptionHandler; -use Illuminate\Contracts\Events\Dispatcher; -use Illuminate\Queue\QueueManager; use Illuminate\Queue\WorkerOptions; -use Illuminate\Support\Str; use PhpAmqpLib\Exception\AMQPChannelClosedException; -use PhpAmqpLib\Exception\AMQPConnectionClosedException; use PhpAmqpLib\Exception\AMQPProtocolChannelException; use PhpAmqpLib\Exception\AMQPRuntimeException; -use PhpAmqpLib\Message\AMQPMessage; -use Psr\Log\LoggerInterface; use Salesmessage\LibRabbitMQ\Queue\RabbitMQQueue; class DirectConsumer extends AbstractVhostsConsumer @@ -22,7 +14,6 @@ class DirectConsumer extends AbstractVhostsConsumer * @param $connectionName * @param WorkerOptions $options * @return int - * @throws \Salesmessage\LibRabbitMQ\Exceptions\MutexTimeout * @throws \Throwable */ protected function vhostDaemon($connectionName, WorkerOptions $options) @@ -51,7 +42,6 @@ protected function vhostDaemon($connectionName, WorkerOptions $options) try { $this->connectionMutex->lock(self::MAIN_HANDLER_LOCK); $amqpMessage = $this->channel->basic_get($this->currentQueueName); - $this->connectionMutex->unlock(self::MAIN_HANDLER_LOCK); } catch (AMQPProtocolChannelException|AMQPChannelClosedException $exception) { $amqpMessage = null; @@ -69,7 +59,7 @@ protected function vhostDaemon($connectionName, WorkerOptions $options) $this->exceptions->report($exception); $this->kill(self::EXIT_SUCCESS, $this->workerOptions); - } catch (Exception|Throwable $exception) { + } catch (\Throwable $exception) { $this->logError('daemon.exception', [ 'message' => $exception->getMessage(), 'trace' => $exception->getTraceAsString(), @@ -79,9 +69,11 @@ protected function vhostDaemon($connectionName, WorkerOptions $options) $this->exceptions->report($exception); $this->stopWorkerIfLostConnection($exception); + } finally { + $this->connectionMutex->unlock(self::MAIN_HANDLER_LOCK); } - if (null === $amqpMessage) { + if (!isset($amqpMessage)) { $this->logInfo('daemon.consuming_sleep_no_job'); $this->stopConsuming(); @@ -131,7 +123,6 @@ protected function vhostDaemon($connectionName, WorkerOptions $options) /** * @return RabbitMQQueue - * @throws Exceptions\MutexTimeout */ protected function startConsuming(): RabbitMQQueue { @@ -154,4 +145,3 @@ protected function stopConsuming(): void return; } } - diff --git a/src/VhostsConsumers/QueueConsumer.php b/src/VhostsConsumers/QueueConsumer.php index b9be89ea..700bf8a1 100644 --- a/src/VhostsConsumers/QueueConsumer.php +++ b/src/VhostsConsumers/QueueConsumer.php @@ -2,18 +2,12 @@ namespace Salesmessage\LibRabbitMQ\VhostsConsumers; -use Illuminate\Console\OutputStyle; -use Illuminate\Contracts\Debug\ExceptionHandler; -use Illuminate\Contracts\Events\Dispatcher; -use Illuminate\Queue\QueueManager; use Illuminate\Queue\WorkerOptions; -use Illuminate\Support\Str; use PhpAmqpLib\Exception\AMQPChannelClosedException; -use PhpAmqpLib\Exception\AMQPConnectionClosedException; use PhpAmqpLib\Exception\AMQPProtocolChannelException; use PhpAmqpLib\Exception\AMQPRuntimeException; use PhpAmqpLib\Message\AMQPMessage; -use Psr\Log\LoggerInterface; +use Salesmessage\LibRabbitMQ\Exceptions\MutexTimeout; use Salesmessage\LibRabbitMQ\Queue\RabbitMQQueue; class QueueConsumer extends AbstractVhostsConsumer @@ -24,7 +18,7 @@ class QueueConsumer extends AbstractVhostsConsumer * @param $connectionName * @param WorkerOptions $options * @return int|void - * @throws \Salesmessage\LibRabbitMQ\Exceptions\MutexTimeout + * @throws MutexTimeout * @throws \Throwable */ protected function vhostDaemon($connectionName, WorkerOptions $options) @@ -54,7 +48,6 @@ protected function vhostDaemon($connectionName, WorkerOptions $options) try { $this->connectionMutex->lock(self::MAIN_HANDLER_LOCK); $this->channel->wait(null, true, (int) $this->workerOptions->timeout); - $this->connectionMutex->unlock(self::MAIN_HANDLER_LOCK); } catch (AMQPRuntimeException $exception) { $this->logError('daemon.amqp_runtime_exception', [ 'message' => $exception->getMessage(), @@ -64,7 +57,7 @@ protected function vhostDaemon($connectionName, WorkerOptions $options) $this->exceptions->report($exception); $this->kill(self::EXIT_SUCCESS, $this->workerOptions); - } catch (Exception|Throwable $exception) { + } catch (\Throwable $exception) { $this->logError('daemon.exception', [ 'message' => $exception->getMessage(), 'trace' => $exception->getTraceAsString(), @@ -74,6 +67,8 @@ protected function vhostDaemon($connectionName, WorkerOptions $options) $this->exceptions->report($exception); $this->stopWorkerIfLostConnection($exception); + } finally { + $this->connectionMutex->unlock(self::MAIN_HANDLER_LOCK); } // If no job is got off the queue, we will need to sleep the worker. @@ -116,7 +111,7 @@ protected function vhostDaemon($connectionName, WorkerOptions $options) /** * @return RabbitMQQueue - * @throws Exceptions\MutexTimeout + * @throws MutexTimeout */ protected function startConsuming(): RabbitMQQueue { @@ -180,10 +175,10 @@ protected function startConsuming(): RabbitMQQueue 'trace' => $exception->getTraceAsString(), 'error_class' => get_class($exception), ]); + } finally { + $this->connectionMutex->unlock(self::MAIN_HANDLER_LOCK); } - $this->connectionMutex->unlock(self::MAIN_HANDLER_LOCK); - $this->updateLastProcessedAt(); if (false === $isSuccess) { @@ -198,13 +193,15 @@ protected function startConsuming(): RabbitMQQueue /** * @return void - * @throws Exceptions\MutexTimeout + * @throws MutexTimeout */ protected function stopConsuming(): void { $this->connectionMutex->lock(self::MAIN_HANDLER_LOCK); - $this->channel->basic_cancel($this->getTagName(), true); - $this->connectionMutex->unlock(self::MAIN_HANDLER_LOCK); + try { + $this->channel->basic_cancel($this->getTagName(), true); + } finally { + $this->connectionMutex->unlock(self::MAIN_HANDLER_LOCK); + } } } - From 171d2548b9580b06a4d5702c5a9741779b103fdc Mon Sep 17 00:00:00 2001 From: Alexander Ginko Date: Wed, 29 Oct 2025 16:31:27 +0100 Subject: [PATCH 02/14] SWR-20482 Server: RabbitMQ Improvements: Deduplication --- composer.json | 2 +- config/rabbitmq.php | 11 ++ src/Consumer.php | 7 ++ src/Interfaces/RabbitMQBatchable.php | 4 + src/LaravelLibRabbitMQServiceProvider.php | 51 +++++++-- src/Queue/RabbitMQQueue.php | 2 + .../Deduplication/DeduplicationService.php | 107 ++++++++++++++++++ .../Deduplication/DeduplicationStore.php | 10 ++ .../Deduplication/NullDeduplicationStore.php | 13 +++ .../Deduplication/RedisDeduplicationStore.php | 41 +++++++ src/Services/DlqDetector.php | 22 ++++ .../AbstractVhostsConsumer.php | 54 ++++++--- 12 files changed, 299 insertions(+), 25 deletions(-) create mode 100644 src/Services/Deduplication/DeduplicationService.php create mode 100644 src/Services/Deduplication/DeduplicationStore.php create mode 100644 src/Services/Deduplication/NullDeduplicationStore.php create mode 100644 src/Services/Deduplication/RedisDeduplicationStore.php create mode 100644 src/Services/DlqDetector.php diff --git a/composer.json b/composer.json index 35df990c..54c2c477 100644 --- a/composer.json +++ b/composer.json @@ -34,7 +34,7 @@ }, "extra": { "branch-alias": { - "dev-master": "1.30-dev" + "dev-master": "1.31-dev" }, "laravel": { "providers": [ diff --git a/config/rabbitmq.php b/config/rabbitmq.php index 8b949d9c..28288d6a 100644 --- a/config/rabbitmq.php +++ b/config/rabbitmq.php @@ -25,6 +25,17 @@ 'options' => [ ], + 'deduplication' => [ + 'enabled' => env('RABBITMQ_DEDUPLICATION_ENABLED', false), + 'skip_for_dlq' => env('RABBITMQ_DEDUPLICATION_SKIP_FOR_DLQ', true), + 'ttl' => env('RABBITMQ_DEDUPLICATION_TTL', 7200), + 'connection' => [ + 'driver' => env('RABBITMQ_DEDUPLICATION_DRIVER', 'redis'), + 'name' => env('RABBITMQ_DEDUPLICATION_CONNECTION_NAME', 'persistent'), + 'key_prefix' => env('RABBITMQ_DEDUPLICATION_KEY_PREFIX', 'mq_dedup'), + ], + ], + /* * Set to "horizon" if you wish to use Laravel Horizon. */ diff --git a/src/Consumer.php b/src/Consumer.php index 0f69893f..0f64e85b 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -11,6 +11,7 @@ use PhpAmqpLib\Message\AMQPMessage; use Throwable; use Salesmessage\LibRabbitMQ\Queue\RabbitMQQueue; +use Salesmessage\LibRabbitMQ\Services\Deduplication\DeduplicationService; class Consumer extends Worker { @@ -106,6 +107,12 @@ public function daemon($connectionName, $queue, WorkerOptions $options) false, false, function (AMQPMessage $message) use ($connection, $options, $connectionName, $queue, $jobClass, &$jobsProcessed): void { + /** @var DeduplicationService $dedupService */ + $dedupService = $this->container->make(DeduplicationService::class); + if (!$dedupService->add($message)) { + return; + } + $job = new $jobClass( $this->container, $connection, diff --git a/src/Interfaces/RabbitMQBatchable.php b/src/Interfaces/RabbitMQBatchable.php index f3abc6c5..a5d3796f 100644 --- a/src/Interfaces/RabbitMQBatchable.php +++ b/src/Interfaces/RabbitMQBatchable.php @@ -5,6 +5,10 @@ interface RabbitMQBatchable { /** + * @warning Batch item (job) MUST BE IDEMPOTENT + * since in case of failure, batch will be iterated and re-processed as separate jobs. + * @see \Salesmessage\LibRabbitMQ\VhostsConsumers\AbstractVhostsConsumer::processBatch + * * Processing jobs array of static class * * @param array $batch diff --git a/src/LaravelLibRabbitMQServiceProvider.php b/src/LaravelLibRabbitMQServiceProvider.php index 7048f289..52ee7e8a 100644 --- a/src/LaravelLibRabbitMQServiceProvider.php +++ b/src/LaravelLibRabbitMQServiceProvider.php @@ -3,12 +3,6 @@ namespace Salesmessage\LibRabbitMQ; use Illuminate\Contracts\Debug\ExceptionHandler; -use Illuminate\Queue\Connectors\BeanstalkdConnector; -use Illuminate\Queue\Connectors\DatabaseConnector; -use Illuminate\Queue\Connectors\NullConnector; -use Illuminate\Queue\Connectors\RedisConnector; -use Illuminate\Queue\Connectors\SqsConnector; -use Illuminate\Queue\Connectors\SyncConnector; use Illuminate\Queue\QueueManager; use Illuminate\Support\ServiceProvider; use Psr\Log\LoggerInterface; @@ -16,10 +10,14 @@ use Salesmessage\LibRabbitMQ\Console\ConsumeVhostsCommand; use Salesmessage\LibRabbitMQ\Console\ScanVhostsCommand; use Salesmessage\LibRabbitMQ\Queue\Connectors\RabbitMQVhostsConnector; +use Salesmessage\LibRabbitMQ\Services\Deduplication\DeduplicationService; use Salesmessage\LibRabbitMQ\Services\GroupsService; use Salesmessage\LibRabbitMQ\Services\InternalStorageManager; use Salesmessage\LibRabbitMQ\Services\QueueService; use Salesmessage\LibRabbitMQ\Services\VhostsService; +use Salesmessage\LibRabbitMQ\Services\Deduplication\DeduplicationStore; +use Salesmessage\LibRabbitMQ\Services\Deduplication\NullDeduplicationStore; +use Salesmessage\LibRabbitMQ\Services\Deduplication\RedisDeduplicationStore; use Salesmessage\LibRabbitMQ\VhostsConsumers\DirectConsumer as VhostsDirectConsumer; use Salesmessage\LibRabbitMQ\VhostsConsumers\QueueConsumer as VhostsQueueConsumer; @@ -36,6 +34,8 @@ public function register(): void ); if ($this->app->runningInConsole()) { + $this->bindDeduplicationService(); + $this->app->singleton('rabbitmq.consumer', function () { $isDownForMaintenance = function () { return $this->app->isDownForMaintenance(); @@ -68,7 +68,8 @@ public function register(): void $this->app['events'], $this->app[ExceptionHandler::class], $isDownForMaintenance, - null + null, + $this->app->get(DeduplicationService::class) ); }); @@ -84,7 +85,8 @@ public function register(): void $this->app['events'], $this->app[ExceptionHandler::class], $isDownForMaintenance, - null + null, + $this->app->get(DeduplicationService::class) ); }); @@ -92,7 +94,7 @@ public function register(): void $consumerClass = ('direct' === config('queue.connections.rabbitmq_vhosts.consumer_type')) ? VhostsDirectConsumer::class : VhostsQueueConsumer::class; - + return new ConsumeVhostsCommand( $app[GroupsService::class], $app[$consumerClass], @@ -139,4 +141,35 @@ public function boot(): void return new RabbitMQVhostsConnector($this->app['events']); }); } + + /** + * Config params: + * @phpstan-import-type DeduplicationConfig from DeduplicationService + * + * @return void + */ + private function bindDeduplicationService(): void + { + $this->app->bind(DeduplicationStore::class, static function () { + /** @var DeduplicationConfig $config */ + $config = (array) config('queue.connections.rabbitmq_vhosts.deduplication', []); + $enabled = (bool) ($config['enabled'] ?? false); + if (!$enabled) { + return new NullDeduplicationStore(); + } + + $connectionDriver = $config['connection']['driver'] ?? null; + if ($connectionDriver !== 'redis') { + throw new \InvalidArgumentException('For now only Redis connection is supported for deduplication'); + } + $connectionName = $config['connection']['name'] ?? null; + + $prefix = trim($config['connection']['key_prefix'] ?? ''); + if (empty($prefix)) { + throw new \InvalidArgumentException('Key prefix is required'); + } + + return new RedisDeduplicationStore($connectionName, $prefix); + }); + } } diff --git a/src/Queue/RabbitMQQueue.php b/src/Queue/RabbitMQQueue.php index f0973d75..00336fe6 100644 --- a/src/Queue/RabbitMQQueue.php +++ b/src/Queue/RabbitMQQueue.php @@ -22,6 +22,7 @@ use PhpAmqpLib\Exchange\AMQPExchangeType; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; +use Ramsey\Uuid\Uuid; use RuntimeException; use Throwable; use Salesmessage\LibRabbitMQ\Contracts\RabbitMQQueueContract; @@ -521,6 +522,7 @@ protected function createMessage($payload, int $attempts = 0): array $currentPayload = json_decode($payload, true); if ($correlationId = $currentPayload['id'] ?? null) { $properties['correlation_id'] = $correlationId; + $properties['message_id'] = Uuid::uuid7()->toString(); } if ($this->getConfig()->isPrioritizeDelayed()) { diff --git a/src/Services/Deduplication/DeduplicationService.php b/src/Services/Deduplication/DeduplicationService.php new file mode 100644 index 00000000..d16ab9fa --- /dev/null +++ b/src/Services/Deduplication/DeduplicationService.php @@ -0,0 +1,107 @@ +isEnabled()) { + return true; + } + + $messageId = $this->extractMessageId($message); + if ($messageId === null) { + return false; + } + + if (DlqDetector::isDlqMessage($message)) { + if ($this->getConfig('skip_for_dlq', false)) { + return true; + } + + $messageId = 'dlq:' . $messageId; + } + + $ttl = (int) ($this->getConfig('ttl') ?: self::DEFAULT_TTL); + + return $this->store->add($messageId, $ttl); + } + + public function release(AMQPMessage $message): void + { + if (!$this->isEnabled()) { + return; + } + $messageId = $this->extractMessageId($message); + if ($messageId === null) { + return; + } + + if (DlqDetector::isDlqMessage($message)) { + if ($this->getConfig('skip_for_dlq', false)) { + return; + } + + $messageId = 'dlq:' . $messageId; + } + + $this->store->release($messageId); + } + + protected function extractMessageId(AMQPMessage $message): ?string + { + $props = $message->get_properties(); + $id = $props['message_id'] ?? null; + if (is_string($id) && !empty($id)) { + return $id; + } + return null; + } + + protected function isEnabled(): bool + { + return (bool) $this->getConfig('enabled', false); + } + + protected function getConfig(string $key, mixed $default = null): mixed + { + $value = config("queue.connections.rabbitmq_vhosts.deduplication.$key"); + + return $value !== null ? $value : $default; + } +} diff --git a/src/Services/Deduplication/DeduplicationStore.php b/src/Services/Deduplication/DeduplicationStore.php new file mode 100644 index 00000000..1e1408da --- /dev/null +++ b/src/Services/Deduplication/DeduplicationStore.php @@ -0,0 +1,10 @@ + 7 * 24 * 60 * 60) { + throw new \InvalidArgumentException('Invalid TTL seconds'); + } + + $key = $this->getKey($messageKey); + + return (bool) $this->connection()->set($key, 1, 'EX', $ttlSeconds, 'NX'); + } + + public function release(string $messageKey): void + { + $key = $this->getKey($messageKey); + $this->connection()->del($key); + } + + protected function connection(): Connection + { + return $this->connectionName ? Redis::connection($this->connectionName) : Redis::connection(); + } + + protected function getKey(string $messageKey): string + { + return $this->keyPrefix . ':' . $messageKey; + } +} diff --git a/src/Services/DlqDetector.php b/src/Services/DlqDetector.php new file mode 100644 index 00000000..997fb2f7 --- /dev/null +++ b/src/Services/DlqDetector.php @@ -0,0 +1,22 @@ +get_properties()['application_headers'] ?? null; + + if (!($headersTable instanceof AMQPTable)) { + return false; + } + + $headers = $headersTable->getNativeData(); + + return !empty($headers['x-death']) && !empty($headers['x-opt-deaths']); + } +} diff --git a/src/VhostsConsumers/AbstractVhostsConsumer.php b/src/VhostsConsumers/AbstractVhostsConsumer.php index fe5cc241..54f70bb2 100644 --- a/src/VhostsConsumers/AbstractVhostsConsumer.php +++ b/src/VhostsConsumers/AbstractVhostsConsumer.php @@ -25,6 +25,7 @@ use Salesmessage\LibRabbitMQ\Queue\Jobs\RabbitMQJob; use Salesmessage\LibRabbitMQ\Queue\RabbitMQQueue; use Salesmessage\LibRabbitMQ\Services\InternalStorageManager; +use Salesmessage\LibRabbitMQ\Services\Deduplication\DeduplicationService; abstract class AbstractVhostsConsumer extends Consumer { @@ -81,6 +82,7 @@ abstract class AbstractVhostsConsumer extends Consumer * @param ExceptionHandler $exceptions * @param callable $isDownForMaintenance * @param callable|null $resetScope + * @param ?DeduplicationService $deduplicationService */ public function __construct( protected InternalStorageManager $internalStorageManager, @@ -89,7 +91,8 @@ public function __construct( Dispatcher $events, ExceptionHandler $exceptions, callable $isDownForMaintenance, - callable $resetScope = null + callable $resetScope = null, + protected ?DeduplicationService $deduplicationService = null, ) { parent::__construct($manager, $events, $exceptions, $isDownForMaintenance, $resetScope); } @@ -240,6 +243,14 @@ protected function processAmqpMessage(AMQPMessage $message, RabbitMQQueue $conne if ($isSupportBatching) { $this->addMessageToBatch($message); } else { + if (!$this->deduplicationService?->add($message)) { + $this->ackMessage($message); + $this->logWarning('processAMQPMessage.message_already_processed', [ + 'message_id' => $message->get('message_id'), + ]); + return; + } + $job = $this->getJobByMessage($message, $connection); $this->processSingleJob($job); } @@ -314,30 +325,43 @@ protected function processBatch(RabbitMQQueue $connection): void foreach ($this->batchMessages as $batchJobClass => $batchJobMessages) { $isBatchSuccess = false; + $uniqueMessagesForProcessing = []; $batchSize = count($batchJobMessages); if ($batchSize > 1) { $batchTimeStarted = microtime(true); $batchData = []; foreach ($batchJobMessages as $batchMessage) { + if (!$this->deduplicationService?->add($batchMessage)) { + $this->ackMessage($batchMessage); + $this->logWarning('processBatch.message_already_processed', [ + 'message_id' => $batchMessage->get('message_id'), + ]); + continue; + } + $job = $this->getJobByMessage($batchMessage, $connection); $batchData[] = $job->getPayloadData(); + $uniqueMessagesForProcessing[] = $batchMessage; } - $this->logInfo('processBatch.start', [ - 'batch_job_class' => $batchJobClass, - 'batch_size' => $batchSize, - ]); - try { - $batchJobClass::collection($batchData); - $isBatchSuccess = true; + if (!empty($batchData)) { + $this->logInfo('processBatch.start', [ + 'batch_job_class' => $batchJobClass, + 'batch_size' => $batchSize, + ]); + + $batchJobClass::collection($batchData); + + $this->logInfo('processBatch.finish', [ + 'batch_job_class' => $batchJobClass, + 'batch_size' => $batchSize, + 'executive_batch_time_seconds' => microtime(true) - $batchTimeStarted, + ]); + } - $this->logInfo('processBatch.finish', [ - 'batch_job_class' => $batchJobClass, - 'batch_size' => $batchSize, - 'executive_batch_time_seconds' => microtime(true) - $batchTimeStarted, - ]); + $isBatchSuccess = true; } catch (\Throwable $exception) { $isBatchSuccess = false; @@ -355,10 +379,10 @@ protected function processBatch(RabbitMQQueue $connection): void $this->connectionMutex->lock(static::MAIN_HANDLER_LOCK); try { if ($isBatchSuccess) { - $lastBatchMessage = end($batchJobMessages); + $lastBatchMessage = end($uniqueMessagesForProcessing); $this->ackMessage($lastBatchMessage, true); } else { - foreach ($batchJobMessages as $batchMessage) { + foreach ($uniqueMessagesForProcessing as $batchMessage) { $job = $this->getJobByMessage($batchMessage, $connection); $this->processSingleJob($job); } From 4fa8c982170d0025eec54abf70a28c01ecb03118 Mon Sep 17 00:00:00 2001 From: Alexander Ginko Date: Wed, 29 Oct 2025 21:21:22 +0100 Subject: [PATCH 03/14] SWR-20482 Server: RabbitMQ Improvements: Deduplication --- README.md | 2 +- src/Consumer.php | 12 +++---- .../AbstractVhostsConsumer.php | 31 +++++++++++++------ 3 files changed, 27 insertions(+), 18 deletions(-) diff --git a/README.md b/README.md index c4048569..4d478424 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ Only the latest version will get new features. Bug fixes will be provided using You can install this package via composer using this command: ``` -composer require salesmessage/php-lib-rabbitmq:^1.29 --ignore-platform-reqs +composer require salesmessage/php-lib-rabbitmq:^1.31 --ignore-platform-reqs ``` The package will automatically register itself. diff --git a/src/Consumer.php b/src/Consumer.php index 0f64e85b..066cd4e3 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -107,12 +107,6 @@ public function daemon($connectionName, $queue, WorkerOptions $options) false, false, function (AMQPMessage $message) use ($connection, $options, $connectionName, $queue, $jobClass, &$jobsProcessed): void { - /** @var DeduplicationService $dedupService */ - $dedupService = $this->container->make(DeduplicationService::class); - if (!$dedupService->add($message)) { - return; - } - $job = new $jobClass( $this->container, $connection, @@ -129,7 +123,11 @@ function (AMQPMessage $message) use ($connection, $options, $connectionName, $qu $jobsProcessed++; - $this->runJob($job, $connectionName, $options); + /** @var DeduplicationService $dedupService */ + $dedupService = $this->container->make(DeduplicationService::class); + if ($dedupService->add($message)) { + $this->runJob($job, $connectionName, $options); + } if ($this->supportsAsyncSignals()) { $this->resetTimeoutHandler(); diff --git a/src/VhostsConsumers/AbstractVhostsConsumer.php b/src/VhostsConsumers/AbstractVhostsConsumer.php index 54f70bb2..16f3edf0 100644 --- a/src/VhostsConsumers/AbstractVhostsConsumer.php +++ b/src/VhostsConsumers/AbstractVhostsConsumer.php @@ -243,6 +243,8 @@ protected function processAmqpMessage(AMQPMessage $message, RabbitMQQueue $conne if ($isSupportBatching) { $this->addMessageToBatch($message); } else { + $job = $this->getJobByMessage($message, $connection); + if (!$this->deduplicationService?->add($message)) { $this->ackMessage($message); $this->logWarning('processAMQPMessage.message_already_processed', [ @@ -251,7 +253,6 @@ protected function processAmqpMessage(AMQPMessage $message, RabbitMQQueue $conne return; } - $job = $this->getJobByMessage($message, $connection); $this->processSingleJob($job); } @@ -332,17 +333,27 @@ protected function processBatch(RabbitMQQueue $connection): void $batchData = []; foreach ($batchJobMessages as $batchMessage) { - if (!$this->deduplicationService?->add($batchMessage)) { - $this->ackMessage($batchMessage); - $this->logWarning('processBatch.message_already_processed', [ - 'message_id' => $batchMessage->get('message_id'), + try { + if (!$this->deduplicationService?->add($batchMessage)) { + $this->ackMessage($batchMessage); + $this->logWarning('processBatch.message_already_processed', [ + 'message_id' => $batchMessage->get('message_id'), + ]); + continue; + } + + $job = $this->getJobByMessage($batchMessage, $connection); + $uniqueMessagesForProcessing[] = $batchMessage; + $batchData[] = $job->getPayloadData(); + + } catch (\Throwable $exception) { + $this->deduplicationService->release($batchMessage); + $this->logError('processBatch.message_processing_exception', [ + 'released_message_id' => $batchMessage->get('message_id'), ]); - continue; - } - $job = $this->getJobByMessage($batchMessage, $connection); - $batchData[] = $job->getPayloadData(); - $uniqueMessagesForProcessing[] = $batchMessage; + throw $exception; + } } try { From df629bfd9fb006a7e2556c57dd1cd5a88aed77cb Mon Sep 17 00:00:00 2001 From: Alexander Ginko Date: Wed, 29 Oct 2025 22:59:51 +0100 Subject: [PATCH 04/14] SWR-20482 Server: RabbitMQ Improvements: Deduplication --- src/Consumer.php | 8 +- .../Deduplication/DeduplicationService.php | 95 ++++++++++++------- .../Deduplication/DeduplicationStore.php | 4 +- .../Deduplication/NullDeduplicationStore.php | 7 +- .../Deduplication/RedisDeduplicationStore.php | 10 +- .../AbstractVhostsConsumer.php | 80 +++++++++++----- 6 files changed, 141 insertions(+), 63 deletions(-) diff --git a/src/Consumer.php b/src/Consumer.php index 066cd4e3..d05edc3c 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -125,8 +125,14 @@ function (AMQPMessage $message) use ($connection, $options, $connectionName, $qu /** @var DeduplicationService $dedupService */ $dedupService = $this->container->make(DeduplicationService::class); - if ($dedupService->add($message)) { + $messageState = $dedupService->getState($message); + if ($messageState === DeduplicationService::IN_PROGRESS) { + $message->reject(true); + + } elseif ($messageState === null) { + $dedupService->markAsInProgress($message); $this->runJob($job, $connectionName, $options); + $dedupService->markAsProcessed($message); } if ($this->supportsAsyncSignals()) { diff --git a/src/Services/Deduplication/DeduplicationService.php b/src/Services/Deduplication/DeduplicationService.php index d16ab9fa..7e53a674 100644 --- a/src/Services/Deduplication/DeduplicationService.php +++ b/src/Services/Deduplication/DeduplicationService.php @@ -10,56 +10,54 @@ * enabled: bool, * skip_for_dlq: bool, * ttl: int, + * lock_ttl: int, * connection: array{ * driver: string, * name: string, * key_prefix: string, * } * } - { - * "deduplication_ttl": TTL in seconds for the message to be considered as processed, - * "processing_lock_ttl": TTL in seconds for the lock to be acquired for the message during processing, + * "ttl": TTL in seconds for the message to be considered as processed, + * "lock_ttl": TTL in seconds for the lock to be acquired for the message during in_progress, * "connection.driver": "Only redis is supported now", * "connection.name": "Connection name from config('database.redis.{connection_name}')", * } */ class DeduplicationService { + public const IN_PROGRESS = 'in_progress'; + public const PROCESSED = 'processed'; + + private const DEFAULT_LOCK_TTL = 60; private const DEFAULT_TTL = 7200; - public function __construct(private DeduplicationStore $store) - { - } + public function __construct(private DeduplicationStore $store) {} /** - * Returns "true" if the message was not processed previously, and it's successfully been added to the store. - * Returns "false" if the message was already processed and it's a duplicate. - * * @param AMQPMessage $message - * - * @return bool + * @return string|null - @enum {self::IN_PROGRESS, self::PROCESSED} */ - public function add(AMQPMessage $message): bool + public function getState(AMQPMessage $message): ?string { if (!$this->isEnabled()) { - return true; + return null; } - - $messageId = $this->extractMessageId($message); + $messageId = $this->getMessageId($message); if ($messageId === null) { - return false; + return null; } - if (DlqDetector::isDlqMessage($message)) { - if ($this->getConfig('skip_for_dlq', false)) { - return true; - } - - $messageId = 'dlq:' . $messageId; - } + return $this->store->get($messageId); + } - $ttl = (int) ($this->getConfig('ttl') ?: self::DEFAULT_TTL); + public function markAsInProgress(AMQPMessage $message): bool + { + return $this->add($message, self::IN_PROGRESS, (int) ($this->getConfig('lock_ttl') ?: self::DEFAULT_LOCK_TTL)); + } - return $this->store->add($messageId, $ttl); + public function markAsProcessed(AMQPMessage $message): bool + { + return $this->add($message, self::PROCESSED, (int) ($this->getConfig('ttl') ?: self::DEFAULT_TTL)); } public function release(AMQPMessage $message): void @@ -67,30 +65,55 @@ public function release(AMQPMessage $message): void if (!$this->isEnabled()) { return; } - $messageId = $this->extractMessageId($message); + + $messageId = $this->getMessageId($message); if ($messageId === null) { return; } - if (DlqDetector::isDlqMessage($message)) { - if ($this->getConfig('skip_for_dlq', false)) { - return; - } + $this->store->release($messageId); + } - $messageId = 'dlq:' . $messageId; + /** + * Returns "true" if the message was not processed previously, and it's successfully been added to the store. + * Returns "false" if the message was already processed and it's a duplicate. + * + * @param AMQPMessage $message + * @param string $value + * @param int $ttl + * @return bool + */ + protected function add(AMQPMessage $message, string $value, int $ttl): bool + { + if (!$this->isEnabled()) { + return true; } - $this->store->release($messageId); + $messageId = $this->getMessageId($message); + if ($messageId === null) { + return true; + } + + return $this->store->add($messageId, $value, $ttl); } - protected function extractMessageId(AMQPMessage $message): ?string + protected function getMessageId(AMQPMessage $message): ?string { $props = $message->get_properties(); - $id = $props['message_id'] ?? null; - if (is_string($id) && !empty($id)) { - return $id; + $messageId = $props['message_id'] ?? null; + if (!is_string($messageId) || empty($messageId)) { + return null; + } + + if (DlqDetector::isDlqMessage($message)) { + if ($this->getConfig('skip_for_dlq', false)) { + return true; + } + + $messageId = 'dlq:' . $messageId; } - return null; + + return $messageId; } protected function isEnabled(): bool diff --git a/src/Services/Deduplication/DeduplicationStore.php b/src/Services/Deduplication/DeduplicationStore.php index 1e1408da..ce2ff996 100644 --- a/src/Services/Deduplication/DeduplicationStore.php +++ b/src/Services/Deduplication/DeduplicationStore.php @@ -4,7 +4,9 @@ interface DeduplicationStore { - public function add(string $messageKey, int $ttlSeconds): bool; + public function get(string $messageKey): mixed; + + public function add(string $messageKey, mixed $value, int $ttlSeconds): bool; public function release(string $messageKey): void; } diff --git a/src/Services/Deduplication/NullDeduplicationStore.php b/src/Services/Deduplication/NullDeduplicationStore.php index 207207d2..35f256f8 100644 --- a/src/Services/Deduplication/NullDeduplicationStore.php +++ b/src/Services/Deduplication/NullDeduplicationStore.php @@ -4,7 +4,12 @@ class NullDeduplicationStore implements DeduplicationStore { - public function add(string $messageKey, int $ttlSeconds): bool + public function get(string $messageKey): mixed + { + return null; + } + + public function add(string $messageKey, mixed $value, int $ttlSeconds): bool { return true; } diff --git a/src/Services/Deduplication/RedisDeduplicationStore.php b/src/Services/Deduplication/RedisDeduplicationStore.php index d7d1f9f0..384b0ecd 100644 --- a/src/Services/Deduplication/RedisDeduplicationStore.php +++ b/src/Services/Deduplication/RedisDeduplicationStore.php @@ -12,7 +12,13 @@ public function __construct( protected string $keyPrefix = 'mq_dedup', ) {} - public function add(string $messageKey, int $ttlSeconds): bool + public function get(string $messageKey): mixed + { + $key = $this->getKey($messageKey); + return $this->connection()->get($key); + } + + public function add(string $messageKey, mixed $value, int $ttlSeconds): bool { if ($ttlSeconds <= 0 || $ttlSeconds > 7 * 24 * 60 * 60) { throw new \InvalidArgumentException('Invalid TTL seconds'); @@ -20,7 +26,7 @@ public function add(string $messageKey, int $ttlSeconds): bool $key = $this->getKey($messageKey); - return (bool) $this->connection()->set($key, 1, 'EX', $ttlSeconds, 'NX'); + return (bool) $this->connection()->set($key, $value, 'EX', $ttlSeconds, 'NX'); } public function release(string $messageKey): void diff --git a/src/VhostsConsumers/AbstractVhostsConsumer.php b/src/VhostsConsumers/AbstractVhostsConsumer.php index 16f3edf0..d739aa9c 100644 --- a/src/VhostsConsumers/AbstractVhostsConsumer.php +++ b/src/VhostsConsumers/AbstractVhostsConsumer.php @@ -244,16 +244,7 @@ protected function processAmqpMessage(AMQPMessage $message, RabbitMQQueue $conne $this->addMessageToBatch($message); } else { $job = $this->getJobByMessage($message, $connection); - - if (!$this->deduplicationService?->add($message)) { - $this->ackMessage($message); - $this->logWarning('processAMQPMessage.message_already_processed', [ - 'message_id' => $message->get('message_id'), - ]); - return; - } - - $this->processSingleJob($job); + $this->processSingleJob($job, $message); } $this->jobsProcessed++; @@ -333,8 +324,16 @@ protected function processBatch(RabbitMQQueue $connection): void $batchData = []; foreach ($batchJobMessages as $batchMessage) { + $messageState = $this->deduplicationService?->getState($batchMessage); try { - if (!$this->deduplicationService?->add($batchMessage)) { + if ($messageState === DeduplicationService::IN_PROGRESS) { + $batchMessage->reject(true); + $this->logWarning('processBatch.message_already_in_progress.requeue', [ + 'message_id' => $batchMessage->get('message_id'), + ]); + continue; + } + if ($messageState === DeduplicationService::PROCESSED) { $this->ackMessage($batchMessage); $this->logWarning('processBatch.message_already_processed', [ 'message_id' => $batchMessage->get('message_id'), @@ -342,12 +341,22 @@ protected function processBatch(RabbitMQQueue $connection): void continue; } - $job = $this->getJobByMessage($batchMessage, $connection); - $uniqueMessagesForProcessing[] = $batchMessage; - $batchData[] = $job->getPayloadData(); + $hasPutAsInProgress = $this->deduplicationService?->markAsInProgress($batchMessage); + if ($hasPutAsInProgress === false) { + $this->logWarning('processBatch.message_already_in_progress.skip', [ + 'message_id' => $batchMessage->get('message_id'), + ]); + } else { + $job = $this->getJobByMessage($batchMessage, $connection); + $uniqueMessagesForProcessing[] = $batchMessage; + $batchData[] = $job->getPayloadData(); + } } catch (\Throwable $exception) { - $this->deduplicationService->release($batchMessage); + if ($messageState === null) { + $this->deduplicationService?->release($batchMessage); + } + $this->logError('processBatch.message_processing_exception', [ 'released_message_id' => $batchMessage->get('message_id'), ]); @@ -374,6 +383,10 @@ protected function processBatch(RabbitMQQueue $connection): void $isBatchSuccess = true; } catch (\Throwable $exception) { + foreach ($uniqueMessagesForProcessing as $batchMessage) { + $this->deduplicationService?->release($batchMessage); + } + $isBatchSuccess = false; $this->logError('processBatch.exception', [ @@ -390,12 +403,16 @@ protected function processBatch(RabbitMQQueue $connection): void $this->connectionMutex->lock(static::MAIN_HANDLER_LOCK); try { if ($isBatchSuccess) { + foreach ($uniqueMessagesForProcessing as $batchMessage) { + $this->deduplicationService?->markAsProcessed($batchMessage); + } + $lastBatchMessage = end($uniqueMessagesForProcessing); $this->ackMessage($lastBatchMessage, true); } else { foreach ($uniqueMessagesForProcessing as $batchMessage) { $job = $this->getJobByMessage($batchMessage, $connection); - $this->processSingleJob($job); + $this->processSingleJob($job, $batchMessage); } } } finally { @@ -426,11 +443,7 @@ protected function getJobByMessage(AMQPMessage $message, RabbitMQQueue $connecti ); } - /** - * @param RabbitMQJob $job - * @return void - */ - protected function processSingleJob(RabbitMQJob $job): void + protected function processSingleJob(RabbitMQJob $job, AMQPMessage $message): void { $timeStarted = microtime(true); $this->logInfo('processSingleJob.start'); @@ -439,7 +452,30 @@ protected function processSingleJob(RabbitMQJob $job): void $this->registerTimeoutHandler($job, $this->workerOptions); } - $this->runJob($job, $this->currentConnectionName, $this->workerOptions); + $messageState = $this->deduplicationService?->getState($message); + if ($messageState === DeduplicationService::IN_PROGRESS) { + $message->reject(true); + $this->logWarning('processSingleJob.message_already_in_progress.requeue', [ + 'message_id' => $message->get('message_id'), + ]); + + } elseif ($messageState === DeduplicationService::PROCESSED) { + $this->logWarning('processSingleJob.message_already_processed', [ + 'message_id' => $message->get('message_id'), + ]); + $this->ackMessage($message); + + } else { + $isPutAsInProgress = $this->deduplicationService?->markAsInProgress($message); + if ($isPutAsInProgress === false) { + $this->logWarning('processSingleJob.message_already_in_progress.skip', [ + 'message_id' => $message->get('message_id'), + ]); + } else { + $this->runJob($job, $this->currentConnectionName, $this->workerOptions); + } + } + $this->updateLastProcessedAt(); if ($this->supportsAsyncSignals()) { From a09163446b49a4b8c52c53efce8472bc821ca1ca Mon Sep 17 00:00:00 2001 From: Alexander Ginko Date: Wed, 29 Oct 2025 23:35:44 +0100 Subject: [PATCH 05/14] SWR-20482 Server: RabbitMQ Improvements: Deduplication --- config/rabbitmq.php | 1 + src/Consumer.php | 15 +++++++++++---- .../Deduplication/DeduplicationService.php | 11 ++++++++--- src/Services/Deduplication/DeduplicationStore.php | 2 +- .../Deduplication/NullDeduplicationStore.php | 2 +- .../Deduplication/RedisDeduplicationStore.php | 8 ++++++-- src/VhostsConsumers/AbstractVhostsConsumer.php | 4 ++++ 7 files changed, 32 insertions(+), 11 deletions(-) diff --git a/config/rabbitmq.php b/config/rabbitmq.php index 28288d6a..2d8b5473 100644 --- a/config/rabbitmq.php +++ b/config/rabbitmq.php @@ -29,6 +29,7 @@ 'enabled' => env('RABBITMQ_DEDUPLICATION_ENABLED', false), 'skip_for_dlq' => env('RABBITMQ_DEDUPLICATION_SKIP_FOR_DLQ', true), 'ttl' => env('RABBITMQ_DEDUPLICATION_TTL', 7200), + 'lock_ttl' => env('RABBITMQ_DEDUPLICATION_LOCK_TTL', 180), 'connection' => [ 'driver' => env('RABBITMQ_DEDUPLICATION_DRIVER', 'redis'), 'name' => env('RABBITMQ_DEDUPLICATION_CONNECTION_NAME', 'persistent'), diff --git a/src/Consumer.php b/src/Consumer.php index d05edc3c..113ae831 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -129,10 +129,17 @@ function (AMQPMessage $message) use ($connection, $options, $connectionName, $qu if ($messageState === DeduplicationService::IN_PROGRESS) { $message->reject(true); - } elseif ($messageState === null) { - $dedupService->markAsInProgress($message); - $this->runJob($job, $connectionName, $options); - $dedupService->markAsProcessed($message); + } elseif ($messageState === DeduplicationService::PROCESSED) { + $message->ack(); + + } else { + $hasPutAsInProgress = $dedupService->markAsInProgress($message); + if ($hasPutAsInProgress !== false) { + $this->runJob($job, $connectionName, $options); + $dedupService->markAsProcessed($message); + } else { + $message->reject(true); + } } if ($this->supportsAsyncSignals()) { diff --git a/src/Services/Deduplication/DeduplicationService.php b/src/Services/Deduplication/DeduplicationService.php index 7e53a674..04069a4f 100644 --- a/src/Services/Deduplication/DeduplicationService.php +++ b/src/Services/Deduplication/DeduplicationService.php @@ -52,7 +52,12 @@ public function getState(AMQPMessage $message): ?string public function markAsInProgress(AMQPMessage $message): bool { - return $this->add($message, self::IN_PROGRESS, (int) ($this->getConfig('lock_ttl') ?: self::DEFAULT_LOCK_TTL)); + $ttl = (int) ($this->getConfig('lock_ttl') ?: self::DEFAULT_LOCK_TTL); + if ($ttl <= 0 || $ttl > 300) { + throw new \InvalidArgumentException('Invalid TTL seconds. Should be between 1 and 300'); + } + + return $this->add($message, self::IN_PROGRESS, $ttl); } public function markAsProcessed(AMQPMessage $message): bool @@ -94,7 +99,7 @@ protected function add(AMQPMessage $message, string $value, int $ttl): bool return true; } - return $this->store->add($messageId, $value, $ttl); + return $this->store->set($messageId, $value, $ttl, $value === self::PROCESSED); } protected function getMessageId(AMQPMessage $message): ?string @@ -107,7 +112,7 @@ protected function getMessageId(AMQPMessage $message): ?string if (DlqDetector::isDlqMessage($message)) { if ($this->getConfig('skip_for_dlq', false)) { - return true; + return null; } $messageId = 'dlq:' . $messageId; diff --git a/src/Services/Deduplication/DeduplicationStore.php b/src/Services/Deduplication/DeduplicationStore.php index ce2ff996..c7807f3d 100644 --- a/src/Services/Deduplication/DeduplicationStore.php +++ b/src/Services/Deduplication/DeduplicationStore.php @@ -6,7 +6,7 @@ interface DeduplicationStore { public function get(string $messageKey): mixed; - public function add(string $messageKey, mixed $value, int $ttlSeconds): bool; + public function set(string $messageKey, mixed $value, int $ttlSeconds, bool $withOverride = false): bool; public function release(string $messageKey): void; } diff --git a/src/Services/Deduplication/NullDeduplicationStore.php b/src/Services/Deduplication/NullDeduplicationStore.php index 35f256f8..fdf3d7cd 100644 --- a/src/Services/Deduplication/NullDeduplicationStore.php +++ b/src/Services/Deduplication/NullDeduplicationStore.php @@ -9,7 +9,7 @@ public function get(string $messageKey): mixed return null; } - public function add(string $messageKey, mixed $value, int $ttlSeconds): bool + public function set(string $messageKey, mixed $value, int $ttlSeconds, bool $withOverride = false): bool { return true; } diff --git a/src/Services/Deduplication/RedisDeduplicationStore.php b/src/Services/Deduplication/RedisDeduplicationStore.php index 384b0ecd..324b9c66 100644 --- a/src/Services/Deduplication/RedisDeduplicationStore.php +++ b/src/Services/Deduplication/RedisDeduplicationStore.php @@ -18,15 +18,19 @@ public function get(string $messageKey): mixed return $this->connection()->get($key); } - public function add(string $messageKey, mixed $value, int $ttlSeconds): bool + public function set(string $messageKey, mixed $value, int $ttlSeconds, bool $withOverride = false): bool { if ($ttlSeconds <= 0 || $ttlSeconds > 7 * 24 * 60 * 60) { throw new \InvalidArgumentException('Invalid TTL seconds'); } $key = $this->getKey($messageKey); + $args = [$key, $value, 'EX', $ttlSeconds]; + if (!$withOverride) { + $args[] = 'NX'; + } - return (bool) $this->connection()->set($key, $value, 'EX', $ttlSeconds, 'NX'); + return (bool) $this->connection()->set(...$args); } public function release(string $messageKey): void diff --git a/src/VhostsConsumers/AbstractVhostsConsumer.php b/src/VhostsConsumers/AbstractVhostsConsumer.php index d739aa9c..32526435 100644 --- a/src/VhostsConsumers/AbstractVhostsConsumer.php +++ b/src/VhostsConsumers/AbstractVhostsConsumer.php @@ -343,6 +343,7 @@ protected function processBatch(RabbitMQQueue $connection): void $hasPutAsInProgress = $this->deduplicationService?->markAsInProgress($batchMessage); if ($hasPutAsInProgress === false) { + $batchMessage->reject(true); $this->logWarning('processBatch.message_already_in_progress.skip', [ 'message_id' => $batchMessage->get('message_id'), ]); @@ -468,11 +469,14 @@ protected function processSingleJob(RabbitMQJob $job, AMQPMessage $message): voi } else { $isPutAsInProgress = $this->deduplicationService?->markAsInProgress($message); if ($isPutAsInProgress === false) { + $message->reject(true); $this->logWarning('processSingleJob.message_already_in_progress.skip', [ 'message_id' => $message->get('message_id'), ]); + } else { $this->runJob($job, $this->currentConnectionName, $this->workerOptions); + $this->deduplicationService?->markAsProcessed($message); } } From 913bda689ba695a16904e1ddbbacf06ff416f435 Mon Sep 17 00:00:00 2001 From: Alexander Ginko Date: Thu, 30 Oct 2025 10:06:16 +0100 Subject: [PATCH 06/14] SWR-20482 Server: RabbitMQ Improvements: Deduplication --- config/rabbitmq.php | 2 +- src/Consumer.php | 6 ++--- .../Deduplication/DeduplicationService.php | 26 +++++++++++-------- .../AbstractVhostsConsumer.php | 16 ++++++------ 4 files changed, 27 insertions(+), 23 deletions(-) diff --git a/config/rabbitmq.php b/config/rabbitmq.php index 2d8b5473..8893d31e 100644 --- a/config/rabbitmq.php +++ b/config/rabbitmq.php @@ -29,7 +29,7 @@ 'enabled' => env('RABBITMQ_DEDUPLICATION_ENABLED', false), 'skip_for_dlq' => env('RABBITMQ_DEDUPLICATION_SKIP_FOR_DLQ', true), 'ttl' => env('RABBITMQ_DEDUPLICATION_TTL', 7200), - 'lock_ttl' => env('RABBITMQ_DEDUPLICATION_LOCK_TTL', 180), + 'lock_ttl' => env('RABBITMQ_DEDUPLICATION_LOCK_TTL', 60), 'connection' => [ 'driver' => env('RABBITMQ_DEDUPLICATION_DRIVER', 'redis'), 'name' => env('RABBITMQ_DEDUPLICATION_CONNECTION_NAME', 'persistent'), diff --git a/src/Consumer.php b/src/Consumer.php index 113ae831..7527cf57 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -125,7 +125,7 @@ function (AMQPMessage $message) use ($connection, $options, $connectionName, $qu /** @var DeduplicationService $dedupService */ $dedupService = $this->container->make(DeduplicationService::class); - $messageState = $dedupService->getState($message); + $messageState = $dedupService->getState($message, $queue); if ($messageState === DeduplicationService::IN_PROGRESS) { $message->reject(true); @@ -133,10 +133,10 @@ function (AMQPMessage $message) use ($connection, $options, $connectionName, $qu $message->ack(); } else { - $hasPutAsInProgress = $dedupService->markAsInProgress($message); + $hasPutAsInProgress = $dedupService->markAsInProgress($message, $queue); if ($hasPutAsInProgress !== false) { $this->runJob($job, $connectionName, $options); - $dedupService->markAsProcessed($message); + $dedupService->markAsProcessed($message, $queue); } else { $message->reject(true); } diff --git a/src/Services/Deduplication/DeduplicationService.php b/src/Services/Deduplication/DeduplicationService.php index 04069a4f..a7b678d8 100644 --- a/src/Services/Deduplication/DeduplicationService.php +++ b/src/Services/Deduplication/DeduplicationService.php @@ -37,12 +37,12 @@ public function __construct(private DeduplicationStore $store) {} * @param AMQPMessage $message * @return string|null - @enum {self::IN_PROGRESS, self::PROCESSED} */ - public function getState(AMQPMessage $message): ?string + public function getState(AMQPMessage $message, ?string $queueName = null): ?string { if (!$this->isEnabled()) { return null; } - $messageId = $this->getMessageId($message); + $messageId = $this->getMessageId($message, $queueName); if ($messageId === null) { return null; } @@ -50,28 +50,28 @@ public function getState(AMQPMessage $message): ?string return $this->store->get($messageId); } - public function markAsInProgress(AMQPMessage $message): bool + public function markAsInProgress(AMQPMessage $message, ?string $queueName = null): bool { $ttl = (int) ($this->getConfig('lock_ttl') ?: self::DEFAULT_LOCK_TTL); if ($ttl <= 0 || $ttl > 300) { throw new \InvalidArgumentException('Invalid TTL seconds. Should be between 1 and 300'); } - return $this->add($message, self::IN_PROGRESS, $ttl); + return $this->add($message, self::IN_PROGRESS, $ttl, $queueName); } - public function markAsProcessed(AMQPMessage $message): bool + public function markAsProcessed(AMQPMessage $message, ?string $queueName = null): bool { - return $this->add($message, self::PROCESSED, (int) ($this->getConfig('ttl') ?: self::DEFAULT_TTL)); + return $this->add($message, self::PROCESSED, (int) ($this->getConfig('ttl') ?: self::DEFAULT_TTL), $queueName); } - public function release(AMQPMessage $message): void + public function release(AMQPMessage $message, ?string $queueName = null): void { if (!$this->isEnabled()) { return; } - $messageId = $this->getMessageId($message); + $messageId = $this->getMessageId($message, $queueName); if ($messageId === null) { return; } @@ -88,13 +88,13 @@ public function release(AMQPMessage $message): void * @param int $ttl * @return bool */ - protected function add(AMQPMessage $message, string $value, int $ttl): bool + protected function add(AMQPMessage $message, string $value, int $ttl, ?string $queueName = null): bool { if (!$this->isEnabled()) { return true; } - $messageId = $this->getMessageId($message); + $messageId = $this->getMessageId($message, $queueName); if ($messageId === null) { return true; } @@ -102,7 +102,7 @@ protected function add(AMQPMessage $message, string $value, int $ttl): bool return $this->store->set($messageId, $value, $ttl, $value === self::PROCESSED); } - protected function getMessageId(AMQPMessage $message): ?string + protected function getMessageId(AMQPMessage $message, ?string $queueName = null): ?string { $props = $message->get_properties(); $messageId = $props['message_id'] ?? null; @@ -118,6 +118,10 @@ protected function getMessageId(AMQPMessage $message): ?string $messageId = 'dlq:' . $messageId; } + if (is_string($queueName) && $queueName !== '') { + $messageId = $queueName . ':' . $messageId; + } + return $messageId; } diff --git a/src/VhostsConsumers/AbstractVhostsConsumer.php b/src/VhostsConsumers/AbstractVhostsConsumer.php index 32526435..2069985b 100644 --- a/src/VhostsConsumers/AbstractVhostsConsumer.php +++ b/src/VhostsConsumers/AbstractVhostsConsumer.php @@ -324,7 +324,7 @@ protected function processBatch(RabbitMQQueue $connection): void $batchData = []; foreach ($batchJobMessages as $batchMessage) { - $messageState = $this->deduplicationService?->getState($batchMessage); + $messageState = $this->deduplicationService?->getState($batchMessage, $this->currentQueueName); try { if ($messageState === DeduplicationService::IN_PROGRESS) { $batchMessage->reject(true); @@ -341,7 +341,7 @@ protected function processBatch(RabbitMQQueue $connection): void continue; } - $hasPutAsInProgress = $this->deduplicationService?->markAsInProgress($batchMessage); + $hasPutAsInProgress = $this->deduplicationService?->markAsInProgress($batchMessage, $this->currentQueueName); if ($hasPutAsInProgress === false) { $batchMessage->reject(true); $this->logWarning('processBatch.message_already_in_progress.skip', [ @@ -355,7 +355,7 @@ protected function processBatch(RabbitMQQueue $connection): void } catch (\Throwable $exception) { if ($messageState === null) { - $this->deduplicationService?->release($batchMessage); + $this->deduplicationService?->release($batchMessage, $this->currentQueueName); } $this->logError('processBatch.message_processing_exception', [ @@ -385,7 +385,7 @@ protected function processBatch(RabbitMQQueue $connection): void $isBatchSuccess = true; } catch (\Throwable $exception) { foreach ($uniqueMessagesForProcessing as $batchMessage) { - $this->deduplicationService?->release($batchMessage); + $this->deduplicationService?->release($batchMessage, $this->currentQueueName); } $isBatchSuccess = false; @@ -405,7 +405,7 @@ protected function processBatch(RabbitMQQueue $connection): void try { if ($isBatchSuccess) { foreach ($uniqueMessagesForProcessing as $batchMessage) { - $this->deduplicationService?->markAsProcessed($batchMessage); + $this->deduplicationService?->markAsProcessed($batchMessage, $this->currentQueueName); } $lastBatchMessage = end($uniqueMessagesForProcessing); @@ -453,7 +453,7 @@ protected function processSingleJob(RabbitMQJob $job, AMQPMessage $message): voi $this->registerTimeoutHandler($job, $this->workerOptions); } - $messageState = $this->deduplicationService?->getState($message); + $messageState = $this->deduplicationService?->getState($message, $this->currentQueueName); if ($messageState === DeduplicationService::IN_PROGRESS) { $message->reject(true); $this->logWarning('processSingleJob.message_already_in_progress.requeue', [ @@ -467,7 +467,7 @@ protected function processSingleJob(RabbitMQJob $job, AMQPMessage $message): voi $this->ackMessage($message); } else { - $isPutAsInProgress = $this->deduplicationService?->markAsInProgress($message); + $isPutAsInProgress = $this->deduplicationService?->markAsInProgress($message, $this->currentQueueName); if ($isPutAsInProgress === false) { $message->reject(true); $this->logWarning('processSingleJob.message_already_in_progress.skip', [ @@ -476,7 +476,7 @@ protected function processSingleJob(RabbitMQJob $job, AMQPMessage $message): voi } else { $this->runJob($job, $this->currentConnectionName, $this->workerOptions); - $this->deduplicationService?->markAsProcessed($message); + $this->deduplicationService?->markAsProcessed($message, $this->currentQueueName); } } From c5dc66e8c6cda52fce7e0181a5a525b837b1074e Mon Sep 17 00:00:00 2001 From: Alexander Ginko Date: Thu, 30 Oct 2025 12:33:48 +0100 Subject: [PATCH 07/14] SWR-20482 Server: RabbitMQ Improvements: Deduplication --- README.md | 19 ++++++++++++++---- .../AbstractVhostsConsumer.php | 20 ++++++++++--------- 2 files changed, 26 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index 4d478424..b1efa199 100644 --- a/README.md +++ b/README.md @@ -663,7 +663,18 @@ if not all the issues with the following command: composer fix:style ``` -## Contribution - -You can contribute to this package by discovering bugs and opening issues. Please, add to which version of package you -create pull request or issue. (e.g. [5.2] Fatal error on delayed job) +## Local Setup +- Configure all config items in `config/queue.php` section `connections.rabbitmq_vhosts` (see as example [rabbitmq.php](./config/rabbitmq.php)) +- Create `yml` file in the project root with name `rabbit-groups.yml` and content, for example like this (you can replace `vhosts` and `queues` with `vhosts_mask` and `queues_mask`): +```yaml +groups: + test-notes: + vhosts: + - organization_200005 + queues: + - local-myname.notes.200005 + batch_size: 3 + prefetch_count: 3 +``` +- Run command `php artisan lib-rabbitmq:scan-vhosts` within your project where this library is installed (this command fetches data from RabbitMQ to Redis) +- Run command for consumer `php artisan lib-rabbitmq:consume-vhosts test-notes rabbitmq_vhosts --name=mq-vhost-test-local-notes --memory=300 --timeout=0 --max-jobs=1000 --max-time=600 --async-mode=1` diff --git a/src/VhostsConsumers/AbstractVhostsConsumer.php b/src/VhostsConsumers/AbstractVhostsConsumer.php index 2069985b..478aefd8 100644 --- a/src/VhostsConsumers/AbstractVhostsConsumer.php +++ b/src/VhostsConsumers/AbstractVhostsConsumer.php @@ -316,12 +316,12 @@ protected function processBatch(RabbitMQQueue $connection): void foreach ($this->batchMessages as $batchJobClass => $batchJobMessages) { $isBatchSuccess = false; - - $uniqueMessagesForProcessing = []; $batchSize = count($batchJobMessages); + if ($batchSize > 1) { $batchTimeStarted = microtime(true); + $uniqueMessagesForProcessing = []; $batchData = []; foreach ($batchJobMessages as $batchMessage) { $messageState = $this->deduplicationService?->getState($batchMessage, $this->currentQueueName); @@ -329,14 +329,14 @@ protected function processBatch(RabbitMQQueue $connection): void if ($messageState === DeduplicationService::IN_PROGRESS) { $batchMessage->reject(true); $this->logWarning('processBatch.message_already_in_progress.requeue', [ - 'message_id' => $batchMessage->get('message_id'), + 'message_id' => $batchMessage->get_properties()['message_id'] ?? null, ]); continue; } if ($messageState === DeduplicationService::PROCESSED) { $this->ackMessage($batchMessage); $this->logWarning('processBatch.message_already_processed', [ - 'message_id' => $batchMessage->get('message_id'), + 'message_id' => $batchMessage->get_properties()['message_id'] ?? null, ]); continue; } @@ -345,7 +345,7 @@ protected function processBatch(RabbitMQQueue $connection): void if ($hasPutAsInProgress === false) { $batchMessage->reject(true); $this->logWarning('processBatch.message_already_in_progress.skip', [ - 'message_id' => $batchMessage->get('message_id'), + 'message_id' => $batchMessage->get_properties()['message_id'] ?? null, ]); } else { $job = $this->getJobByMessage($batchMessage, $connection); @@ -359,7 +359,7 @@ protected function processBatch(RabbitMQQueue $connection): void } $this->logError('processBatch.message_processing_exception', [ - 'released_message_id' => $batchMessage->get('message_id'), + 'released_message_id' => $batchMessage->get_properties()['message_id'] ?? null, ]); throw $exception; @@ -399,6 +399,8 @@ protected function processBatch(RabbitMQQueue $connection): void } unset($batchData); + } else { + $uniqueMessagesForProcessing = $batchJobMessages; } $this->connectionMutex->lock(static::MAIN_HANDLER_LOCK); @@ -457,12 +459,12 @@ protected function processSingleJob(RabbitMQJob $job, AMQPMessage $message): voi if ($messageState === DeduplicationService::IN_PROGRESS) { $message->reject(true); $this->logWarning('processSingleJob.message_already_in_progress.requeue', [ - 'message_id' => $message->get('message_id'), + 'message_id' => $message->get_properties()['message_id'] ?? null, ]); } elseif ($messageState === DeduplicationService::PROCESSED) { $this->logWarning('processSingleJob.message_already_processed', [ - 'message_id' => $message->get('message_id'), + 'message_id' => $message->get_properties()['message_id'] ?? null, ]); $this->ackMessage($message); @@ -471,7 +473,7 @@ protected function processSingleJob(RabbitMQJob $job, AMQPMessage $message): voi if ($isPutAsInProgress === false) { $message->reject(true); $this->logWarning('processSingleJob.message_already_in_progress.skip', [ - 'message_id' => $message->get('message_id'), + 'message_id' => $message->get_properties()['message_id'] ?? null, ]); } else { From 1ff003b4abdc66d9a7c23205cc085ab165cc2438 Mon Sep 17 00:00:00 2001 From: Alexander Ginko Date: Thu, 30 Oct 2025 12:44:45 +0100 Subject: [PATCH 08/14] SWR-20482 Server: RabbitMQ Improvements: Deduplication --- .../Deduplication/DeduplicationService.php | 14 +++++++++++--- .../Deduplication/RedisDeduplicationStore.php | 4 ++-- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/src/Services/Deduplication/DeduplicationService.php b/src/Services/Deduplication/DeduplicationService.php index a7b678d8..13241b70 100644 --- a/src/Services/Deduplication/DeduplicationService.php +++ b/src/Services/Deduplication/DeduplicationService.php @@ -31,6 +31,9 @@ class DeduplicationService private const DEFAULT_LOCK_TTL = 60; private const DEFAULT_TTL = 7200; + private const MAX_LOCK_TTL = 300; + private const MAX_TTL = 7 * 24 * 60 * 60; + public function __construct(private DeduplicationStore $store) {} /** @@ -53,8 +56,8 @@ public function getState(AMQPMessage $message, ?string $queueName = null): ?stri public function markAsInProgress(AMQPMessage $message, ?string $queueName = null): bool { $ttl = (int) ($this->getConfig('lock_ttl') ?: self::DEFAULT_LOCK_TTL); - if ($ttl <= 0 || $ttl > 300) { - throw new \InvalidArgumentException('Invalid TTL seconds. Should be between 1 and 300'); + if ($ttl <= 0 || $ttl > self::MAX_LOCK_TTL) { + throw new \InvalidArgumentException(sprintf('Invalid TTL seconds. Should be between 1 and %d', self::MAX_LOCK_TTL)); } return $this->add($message, self::IN_PROGRESS, $ttl, $queueName); @@ -62,7 +65,12 @@ public function markAsInProgress(AMQPMessage $message, ?string $queueName = null public function markAsProcessed(AMQPMessage $message, ?string $queueName = null): bool { - return $this->add($message, self::PROCESSED, (int) ($this->getConfig('ttl') ?: self::DEFAULT_TTL), $queueName); + $ttl = (int) ($this->getConfig('ttl') ?: self::DEFAULT_TTL); + if ($ttl <= 0 || $ttl > self::MAX_TTL) { + throw new \InvalidArgumentException(sprintf('Invalid TTL seconds. Should be between 1 sec and %d sec', self::MAX_TTL)); + } + + return $this->add($message, self::PROCESSED, $ttl, $queueName); } public function release(AMQPMessage $message, ?string $queueName = null): void diff --git a/src/Services/Deduplication/RedisDeduplicationStore.php b/src/Services/Deduplication/RedisDeduplicationStore.php index 324b9c66..0bf828ae 100644 --- a/src/Services/Deduplication/RedisDeduplicationStore.php +++ b/src/Services/Deduplication/RedisDeduplicationStore.php @@ -20,8 +20,8 @@ public function get(string $messageKey): mixed public function set(string $messageKey, mixed $value, int $ttlSeconds, bool $withOverride = false): bool { - if ($ttlSeconds <= 0 || $ttlSeconds > 7 * 24 * 60 * 60) { - throw new \InvalidArgumentException('Invalid TTL seconds'); + if ($ttlSeconds <= 0) { + throw new \InvalidArgumentException('Invalid TTL seconds. Should be greater than 0.'); } $key = $this->getKey($messageKey); From 5d03ee703cfc9f4e8b852d24e48ce1a4a443277f Mon Sep 17 00:00:00 2001 From: Alexander Ginko Date: Thu, 30 Oct 2025 18:01:24 +0100 Subject: [PATCH 09/14] SWR-20482 Server: RabbitMQ Improvements: Deduplication --- config/rabbitmq.php | 1 - src/Services/Deduplication/DeduplicationService.php | 5 ----- 2 files changed, 6 deletions(-) diff --git a/config/rabbitmq.php b/config/rabbitmq.php index 8893d31e..83d997ac 100644 --- a/config/rabbitmq.php +++ b/config/rabbitmq.php @@ -27,7 +27,6 @@ 'deduplication' => [ 'enabled' => env('RABBITMQ_DEDUPLICATION_ENABLED', false), - 'skip_for_dlq' => env('RABBITMQ_DEDUPLICATION_SKIP_FOR_DLQ', true), 'ttl' => env('RABBITMQ_DEDUPLICATION_TTL', 7200), 'lock_ttl' => env('RABBITMQ_DEDUPLICATION_LOCK_TTL', 60), 'connection' => [ diff --git a/src/Services/Deduplication/DeduplicationService.php b/src/Services/Deduplication/DeduplicationService.php index 13241b70..36532d8e 100644 --- a/src/Services/Deduplication/DeduplicationService.php +++ b/src/Services/Deduplication/DeduplicationService.php @@ -8,7 +8,6 @@ /** * @phpstan-type DeduplicationConfig array{ * enabled: bool, - * skip_for_dlq: bool, * ttl: int, * lock_ttl: int, * connection: array{ @@ -119,10 +118,6 @@ protected function getMessageId(AMQPMessage $message, ?string $queueName = null) } if (DlqDetector::isDlqMessage($message)) { - if ($this->getConfig('skip_for_dlq', false)) { - return null; - } - $messageId = 'dlq:' . $messageId; } From e4b77964f48e49606f0535e191c8835a933012dd Mon Sep 17 00:00:00 2001 From: Alexander Ginko Date: Fri, 31 Oct 2025 15:00:11 +0100 Subject: [PATCH 10/14] SWR-20482 Server: RabbitMQ Improvements: Deduplication --- config/rabbitmq.php | 30 +++-- src/Consumer.php | 30 ++--- src/Contracts/RabbitMQConsumable.php | 22 ++++ src/Interfaces/RabbitMQBatchable.php | 13 +- src/LaravelLibRabbitMQServiceProvider.php | 14 +-- .../Deduplication/AppDeduplicationService.php | 18 +++ .../DeduplicationService.php | 116 +++++++++++++++-- .../DeduplicationStore.php | 2 +- .../NullDeduplicationStore.php | 2 +- .../RedisDeduplicationStore.php | 2 +- .../AbstractVhostsConsumer.php | 117 +++++++----------- 11 files changed, 241 insertions(+), 125 deletions(-) create mode 100644 src/Contracts/RabbitMQConsumable.php create mode 100644 src/Services/Deduplication/AppDeduplicationService.php rename src/Services/Deduplication/{ => TransportLevel}/DeduplicationService.php (52%) rename src/Services/Deduplication/{ => TransportLevel}/DeduplicationStore.php (77%) rename src/Services/Deduplication/{ => TransportLevel}/NullDeduplicationStore.php (82%) rename src/Services/Deduplication/{ => TransportLevel}/RedisDeduplicationStore.php (94%) diff --git a/config/rabbitmq.php b/config/rabbitmq.php index 83d997ac..9b457a84 100644 --- a/config/rabbitmq.php +++ b/config/rabbitmq.php @@ -25,14 +25,30 @@ 'options' => [ ], + /** + * Provided on 2 levels: transport and application. + */ 'deduplication' => [ - 'enabled' => env('RABBITMQ_DEDUPLICATION_ENABLED', false), - 'ttl' => env('RABBITMQ_DEDUPLICATION_TTL', 7200), - 'lock_ttl' => env('RABBITMQ_DEDUPLICATION_LOCK_TTL', 60), - 'connection' => [ - 'driver' => env('RABBITMQ_DEDUPLICATION_DRIVER', 'redis'), - 'name' => env('RABBITMQ_DEDUPLICATION_CONNECTION_NAME', 'persistent'), - 'key_prefix' => env('RABBITMQ_DEDUPLICATION_KEY_PREFIX', 'mq_dedup'), + 'transport' => [ + 'enabled' => env('RABBITMQ_DEDUP_TRANSPORT_ENABLED', false), + 'ttl' => env('RABBITMQ_DEDUP_TRANSPORT_TTL', 7200), + 'lock_ttl' => env('RABBITMQ_DEDUP_TRANSPORT_LOCK_TTL', 60), + /** + * Possible: ack, reject + */ + 'action_on_duplication' => env('RABBITMQ_DEDUP_TRANSPORT_ACTION', 'ack'), + /** + * Possible: ack, reject, requeue + */ + 'action_on_lock' => env('RABBITMQ_DEDUP_TRANSPORT_LOCK_ACTION', 'requeue'), + 'connection' => [ + 'driver' => env('RABBITMQ_DEDUP_TRANSPORT_DRIVER', 'redis'), + 'name' => env('RABBITMQ_DEDUP_TRANSPORT_CONNECTION_NAME', 'persistent'), + 'key_prefix' => env('RABBITMQ_DEDUP_TRANSPORT_KEY_PREFIX', 'mq_dedup'), + ], + ], + 'application' => [ + 'enabled' => env('RABBITMQ_DEDUP_APP_ENABLED', true), ], ], diff --git a/src/Consumer.php b/src/Consumer.php index 7527cf57..f78dc241 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -9,9 +9,9 @@ use PhpAmqpLib\Channel\AMQPChannel; use PhpAmqpLib\Exception\AMQPRuntimeException; use PhpAmqpLib\Message\AMQPMessage; -use Throwable; use Salesmessage\LibRabbitMQ\Queue\RabbitMQQueue; -use Salesmessage\LibRabbitMQ\Services\Deduplication\DeduplicationService; +use Salesmessage\LibRabbitMQ\Services\Deduplication\TransportLevel\DeduplicationService; +use Throwable; class Consumer extends Worker { @@ -123,24 +123,16 @@ function (AMQPMessage $message) use ($connection, $options, $connectionName, $qu $jobsProcessed++; - /** @var DeduplicationService $dedupService */ - $dedupService = $this->container->make(DeduplicationService::class); - $messageState = $dedupService->getState($message, $queue); - if ($messageState === DeduplicationService::IN_PROGRESS) { - $message->reject(true); - - } elseif ($messageState === DeduplicationService::PROCESSED) { - $message->ack(); - - } else { - $hasPutAsInProgress = $dedupService->markAsInProgress($message, $queue); - if ($hasPutAsInProgress !== false) { + /** @var DeduplicationService $transportDedupService */ + $transportDedupService = $this->container->make(DeduplicationService::class); + $transportDedupService->decorateWithDeduplication( + function () use ($job, $message, $connectionName, $queue, $options, $transportDedupService) { $this->runJob($job, $connectionName, $options); - $dedupService->markAsProcessed($message, $queue); - } else { - $message->reject(true); - } - } + $transportDedupService->markAsProcessed($message, $queue); + }, + $message, + $queue + ); if ($this->supportsAsyncSignals()) { $this->resetTimeoutHandler(); diff --git a/src/Contracts/RabbitMQConsumable.php b/src/Contracts/RabbitMQConsumable.php new file mode 100644 index 00000000..9fce4a9a --- /dev/null +++ b/src/Contracts/RabbitMQConsumable.php @@ -0,0 +1,22 @@ + $batch + * @return list + */ + public static function getNotDuplicatedBatchedJobs(array $batch): array; + + /** * Processing jobs array of static class * - * @param array $batch - * @return mixed + * @param list $batch */ public static function collection(array $batch): void; } diff --git a/src/LaravelLibRabbitMQServiceProvider.php b/src/LaravelLibRabbitMQServiceProvider.php index 52ee7e8a..d8451bdb 100644 --- a/src/LaravelLibRabbitMQServiceProvider.php +++ b/src/LaravelLibRabbitMQServiceProvider.php @@ -10,14 +10,14 @@ use Salesmessage\LibRabbitMQ\Console\ConsumeVhostsCommand; use Salesmessage\LibRabbitMQ\Console\ScanVhostsCommand; use Salesmessage\LibRabbitMQ\Queue\Connectors\RabbitMQVhostsConnector; -use Salesmessage\LibRabbitMQ\Services\Deduplication\DeduplicationService; +use Salesmessage\LibRabbitMQ\Services\Deduplication\TransportLevel\DeduplicationService; +use Salesmessage\LibRabbitMQ\Services\Deduplication\TransportLevel\DeduplicationStore; +use Salesmessage\LibRabbitMQ\Services\Deduplication\TransportLevel\NullDeduplicationStore; +use Salesmessage\LibRabbitMQ\Services\Deduplication\TransportLevel\RedisDeduplicationStore; use Salesmessage\LibRabbitMQ\Services\GroupsService; use Salesmessage\LibRabbitMQ\Services\InternalStorageManager; use Salesmessage\LibRabbitMQ\Services\QueueService; use Salesmessage\LibRabbitMQ\Services\VhostsService; -use Salesmessage\LibRabbitMQ\Services\Deduplication\DeduplicationStore; -use Salesmessage\LibRabbitMQ\Services\Deduplication\NullDeduplicationStore; -use Salesmessage\LibRabbitMQ\Services\Deduplication\RedisDeduplicationStore; use Salesmessage\LibRabbitMQ\VhostsConsumers\DirectConsumer as VhostsDirectConsumer; use Salesmessage\LibRabbitMQ\VhostsConsumers\QueueConsumer as VhostsQueueConsumer; @@ -68,8 +68,8 @@ public function register(): void $this->app['events'], $this->app[ExceptionHandler::class], $isDownForMaintenance, + $this->app->get(DeduplicationService::class), null, - $this->app->get(DeduplicationService::class) ); }); @@ -85,8 +85,8 @@ public function register(): void $this->app['events'], $this->app[ExceptionHandler::class], $isDownForMaintenance, + $this->app->get(DeduplicationService::class), null, - $this->app->get(DeduplicationService::class) ); }); @@ -152,7 +152,7 @@ private function bindDeduplicationService(): void { $this->app->bind(DeduplicationStore::class, static function () { /** @var DeduplicationConfig $config */ - $config = (array) config('queue.connections.rabbitmq_vhosts.deduplication', []); + $config = (array) config('queue.connections.rabbitmq_vhosts.deduplication.transport', []); $enabled = (bool) ($config['enabled'] ?? false); if (!$enabled) { return new NullDeduplicationStore(); diff --git a/src/Services/Deduplication/AppDeduplicationService.php b/src/Services/Deduplication/AppDeduplicationService.php new file mode 100644 index 00000000..2187557d --- /dev/null +++ b/src/Services/Deduplication/AppDeduplicationService.php @@ -0,0 +1,18 @@ +getState($message, $queueName); + try { + if ($messageState === DeduplicationService::IN_PROGRESS) { + $action = $this->getConfig('action_on_lock', self::ACTION_REQUEUE); + if ($action === self::ACTION_REJECT) { + $message->reject(false); + } else { + $message->reject(true); + } + + $this->logger->warning('DeduplicationService.message_already_in_progress', [ + 'action' => $action, + 'message_id' => $message->get_properties()['message_id'] ?? null, + ]); + return false; + } + + if ($messageState === DeduplicationService::PROCESSED) { + $action = $this->getConfig('action_on_duplication', self::ACTION_ACK); + if ($action === self::ACTION_REJECT) { + $message->reject(false); + } elseif ($action === self::ACTION_REQUEUE) { + $message->reject(true); + } else { + $message->ack(); + } + + $this->logger->warning('DeduplicationService.message_already_processed', [ + 'action' => $action, + 'message_id' => $message->get_properties()['message_id'] ?? null, + ]); + return false; + } + + $hasPutAsInProgress = $this->markAsInProgress($message, $queueName); + if ($hasPutAsInProgress === false) { + $action = $this->getConfig('action_on_lock', self::ACTION_REQUEUE); + if ($action === self::ACTION_REJECT) { + $message->reject(false); + } else { + $message->reject(true); + } + + $this->logger->warning('DeduplicationService.message_already_in_progress.skip', [ + 'action' => $action, + 'message_id' => $message->get_properties()['message_id'] ?? null, + ]); + return false; + + } + + $handler(); + } catch (\Throwable $exception) { + if ($messageState === null) { + $this->release($message, $queueName); + } + + $this->logger->error('DeduplicationService.message_processing_exception', [ + 'released_message_id' => $message->get_properties()['message_id'] ?? null, + ]); + + throw $exception; + } + + return true; + } /** * @param AMQPMessage $message @@ -52,16 +142,6 @@ public function getState(AMQPMessage $message, ?string $queueName = null): ?stri return $this->store->get($messageId); } - public function markAsInProgress(AMQPMessage $message, ?string $queueName = null): bool - { - $ttl = (int) ($this->getConfig('lock_ttl') ?: self::DEFAULT_LOCK_TTL); - if ($ttl <= 0 || $ttl > self::MAX_LOCK_TTL) { - throw new \InvalidArgumentException(sprintf('Invalid TTL seconds. Should be between 1 and %d', self::MAX_LOCK_TTL)); - } - - return $this->add($message, self::IN_PROGRESS, $ttl, $queueName); - } - public function markAsProcessed(AMQPMessage $message, ?string $queueName = null): bool { $ttl = (int) ($this->getConfig('ttl') ?: self::DEFAULT_TTL); @@ -86,6 +166,16 @@ public function release(AMQPMessage $message, ?string $queueName = null): void $this->store->release($messageId); } + protected function markAsInProgress(AMQPMessage $message, ?string $queueName = null): bool + { + $ttl = (int) ($this->getConfig('lock_ttl') ?: self::DEFAULT_LOCK_TTL); + if ($ttl <= 0 || $ttl > self::MAX_LOCK_TTL) { + throw new \InvalidArgumentException(sprintf('Invalid TTL seconds. Should be between 1 and %d', self::MAX_LOCK_TTL)); + } + + return $this->add($message, self::IN_PROGRESS, $ttl, $queueName); + } + /** * Returns "true" if the message was not processed previously, and it's successfully been added to the store. * Returns "false" if the message was already processed and it's a duplicate. @@ -135,7 +225,7 @@ protected function isEnabled(): bool protected function getConfig(string $key, mixed $default = null): mixed { - $value = config("queue.connections.rabbitmq_vhosts.deduplication.$key"); + $value = config("queue.connections.rabbitmq_vhosts.deduplication.transport.$key"); return $value !== null ? $value : $default; } diff --git a/src/Services/Deduplication/DeduplicationStore.php b/src/Services/Deduplication/TransportLevel/DeduplicationStore.php similarity index 77% rename from src/Services/Deduplication/DeduplicationStore.php rename to src/Services/Deduplication/TransportLevel/DeduplicationStore.php index c7807f3d..014e639f 100644 --- a/src/Services/Deduplication/DeduplicationStore.php +++ b/src/Services/Deduplication/TransportLevel/DeduplicationStore.php @@ -1,6 +1,6 @@ deduplicationService?->getState($batchMessage, $this->currentQueueName); - try { - if ($messageState === DeduplicationService::IN_PROGRESS) { - $batchMessage->reject(true); - $this->logWarning('processBatch.message_already_in_progress.requeue', [ - 'message_id' => $batchMessage->get_properties()['message_id'] ?? null, - ]); - continue; - } - if ($messageState === DeduplicationService::PROCESSED) { - $this->ackMessage($batchMessage); - $this->logWarning('processBatch.message_already_processed', [ - 'message_id' => $batchMessage->get_properties()['message_id'] ?? null, - ]); - continue; - } - - $hasPutAsInProgress = $this->deduplicationService?->markAsInProgress($batchMessage, $this->currentQueueName); - if ($hasPutAsInProgress === false) { - $batchMessage->reject(true); - $this->logWarning('processBatch.message_already_in_progress.skip', [ - 'message_id' => $batchMessage->get_properties()['message_id'] ?? null, - ]); - } else { + $this->transportDeduplicationService->decorateWithDeduplication( + function () use ($batchMessage, $connection, &$uniqueMessagesForProcessing, &$batchData) { $job = $this->getJobByMessage($batchMessage, $connection); $uniqueMessagesForProcessing[] = $batchMessage; $batchData[] = $job->getPayloadData(); - } - - } catch (\Throwable $exception) { - if ($messageState === null) { - $this->deduplicationService?->release($batchMessage, $this->currentQueueName); - } - - $this->logError('processBatch.message_processing_exception', [ - 'released_message_id' => $batchMessage->get_properties()['message_id'] ?? null, - ]); - - throw $exception; - } + }, + $batchMessage, + $this->currentQueueName + ); } try { + if (AppDeduplicationService::isEnabled()) { + /** @var RabbitMQBatchable $batchJobClass */ + $batchData = $batchJobClass::getNotDuplicatedBatchedJobs($batchData); + } + if (!empty($batchData)) { $this->logInfo('processBatch.start', [ 'batch_job_class' => $batchJobClass, @@ -385,7 +361,7 @@ protected function processBatch(RabbitMQQueue $connection): void $isBatchSuccess = true; } catch (\Throwable $exception) { foreach ($uniqueMessagesForProcessing as $batchMessage) { - $this->deduplicationService?->release($batchMessage, $this->currentQueueName); + $this->transportDeduplicationService->release($batchMessage, $this->currentQueueName); } $isBatchSuccess = false; @@ -407,7 +383,7 @@ protected function processBatch(RabbitMQQueue $connection): void try { if ($isBatchSuccess) { foreach ($uniqueMessagesForProcessing as $batchMessage) { - $this->deduplicationService?->markAsProcessed($batchMessage, $this->currentQueueName); + $this->transportDeduplicationService?->markAsProcessed($batchMessage, $this->currentQueueName); } $lastBatchMessage = end($uniqueMessagesForProcessing); @@ -430,57 +406,56 @@ protected function processBatch(RabbitMQQueue $connection): void /** * @param AMQPMessage $message * @param RabbitMQQueue $connection - * @return RabbitMQJob + * @return RabbitMQConsumable|RabbitMQJob - since in PHP 8.0 intersection is not supported we use union type * @throws \Throwable */ - protected function getJobByMessage(AMQPMessage $message, RabbitMQQueue $connection): RabbitMQJob + protected function getJobByMessage(AMQPMessage $message, RabbitMQQueue $connection): RabbitMQJob|RabbitMQConsumable { $jobClass = $connection->getJobClass(); - return new $jobClass( + $job = new $jobClass( $this->container, $connection, $message, $this->currentConnectionName, $this->currentQueueName ); + + if (!is_subclass_of($jobClass, RabbitMQConsumable::class)) { + throw new \RuntimeException(sprintf('Job class %s must implement %s', $jobClass, RabbitMQConsumable::class)); + } + + return $job; } - protected function processSingleJob(RabbitMQJob $job, AMQPMessage $message): void + protected function processSingleJob(RabbitMQJob|RabbitMQConsumable $job, AMQPMessage $message): void { $timeStarted = microtime(true); - $this->logInfo('processSingleJob.start'); + $this->logInfo('processSingleJob.start', [ + 'job_consumable_id' => $job->getConsumableId(), + ]); if ($this->supportsAsyncSignals()) { $this->registerTimeoutHandler($job, $this->workerOptions); } - $messageState = $this->deduplicationService?->getState($message, $this->currentQueueName); - if ($messageState === DeduplicationService::IN_PROGRESS) { - $message->reject(true); - $this->logWarning('processSingleJob.message_already_in_progress.requeue', [ - 'message_id' => $message->get_properties()['message_id'] ?? null, - ]); - - } elseif ($messageState === DeduplicationService::PROCESSED) { - $this->logWarning('processSingleJob.message_already_processed', [ - 'message_id' => $message->get_properties()['message_id'] ?? null, - ]); - $this->ackMessage($message); + $this->transportDeduplicationService->decorateWithDeduplication( + function () use ($job, $message) { + if (AppDeduplicationService::isEnabled() && $job->isDuplicated()) { + $this->logWarning('processSingleJob.job_is_duplicated', [ + 'job_consumable_id' => $job->getConsumableId(), + ]); + $this->ackMessage($message); - } else { - $isPutAsInProgress = $this->deduplicationService?->markAsInProgress($message, $this->currentQueueName); - if ($isPutAsInProgress === false) { - $message->reject(true); - $this->logWarning('processSingleJob.message_already_in_progress.skip', [ - 'message_id' => $message->get_properties()['message_id'] ?? null, - ]); + } else { + $this->runJob($job, $this->currentConnectionName, $this->workerOptions); + } - } else { - $this->runJob($job, $this->currentConnectionName, $this->workerOptions); - $this->deduplicationService?->markAsProcessed($message, $this->currentQueueName); - } - } + $this->transportDeduplicationService->markAsProcessed($message, $this->currentQueueName); + }, + $message, + $this->currentQueueName, + ); $this->updateLastProcessedAt(); From 3391b2f868a120fcf2270bac7dbfcc55e9964358 Mon Sep 17 00:00:00 2001 From: Alexander Ginko Date: Fri, 31 Oct 2025 15:12:28 +0100 Subject: [PATCH 11/14] SWR-20482 Server: RabbitMQ Improvements: Deduplication --- .../TransportLevel/DeduplicationService.php | 56 +++++++++++-------- 1 file changed, 32 insertions(+), 24 deletions(-) diff --git a/src/Services/Deduplication/TransportLevel/DeduplicationService.php b/src/Services/Deduplication/TransportLevel/DeduplicationService.php index beead408..b4a2335e 100644 --- a/src/Services/Deduplication/TransportLevel/DeduplicationService.php +++ b/src/Services/Deduplication/TransportLevel/DeduplicationService.php @@ -61,52 +61,34 @@ public function decorateWithDeduplication(\Closure $handler, AMQPMessage $messag $messageState = $this->getState($message, $queueName); try { if ($messageState === DeduplicationService::IN_PROGRESS) { - $action = $this->getConfig('action_on_lock', self::ACTION_REQUEUE); - if ($action === self::ACTION_REJECT) { - $message->reject(false); - } else { - $message->reject(true); - } - + $action = $this->applyActionOnLock($message); $this->logger->warning('DeduplicationService.message_already_in_progress', [ 'action' => $action, 'message_id' => $message->get_properties()['message_id'] ?? null, ]); + return false; } if ($messageState === DeduplicationService::PROCESSED) { - $action = $this->getConfig('action_on_duplication', self::ACTION_ACK); - if ($action === self::ACTION_REJECT) { - $message->reject(false); - } elseif ($action === self::ACTION_REQUEUE) { - $message->reject(true); - } else { - $message->ack(); - } - + $action = $this->applyActionOnDuplication($message); $this->logger->warning('DeduplicationService.message_already_processed', [ 'action' => $action, 'message_id' => $message->get_properties()['message_id'] ?? null, ]); + return false; } $hasPutAsInProgress = $this->markAsInProgress($message, $queueName); if ($hasPutAsInProgress === false) { - $action = $this->getConfig('action_on_lock', self::ACTION_REQUEUE); - if ($action === self::ACTION_REJECT) { - $message->reject(false); - } else { - $message->reject(true); - } - + $action = $this->applyActionOnLock($message); $this->logger->warning('DeduplicationService.message_already_in_progress.skip', [ 'action' => $action, 'message_id' => $message->get_properties()['message_id'] ?? null, ]); - return false; + return false; } $handler(); @@ -218,6 +200,32 @@ protected function getMessageId(AMQPMessage $message, ?string $queueName = null) return $messageId; } + protected function applyActionOnLock(AMQPMessage $message): string + { + $action = $this->getConfig('action_on_lock', self::ACTION_REQUEUE); + if ($action === self::ACTION_REJECT) { + $message->reject(false); + } elseif ($action === self::ACTION_ACK) { + $message->ack(); + } else { + $message->reject(true); + } + + return $action; + } + + protected function applyActionOnDuplication(AMQPMessage $message): string + { + $action = $this->getConfig('action_on_duplication', self::ACTION_ACK); + if ($action === self::ACTION_REJECT) { + $message->reject(false); + } else { + $message->ack(); + } + + return $action; + } + protected function isEnabled(): bool { return (bool) $this->getConfig('enabled', false); From af26195c0115a53f047ac3db4577a167d440732a Mon Sep 17 00:00:00 2001 From: Alexander Ginko Date: Fri, 31 Oct 2025 16:11:45 +0100 Subject: [PATCH 12/14] SWR-20482 Server: RabbitMQ Improvements: Deduplication --- README.md | 1 + src/Contracts/RabbitMQConsumable.php | 7 ------- .../AbstractVhostsConsumer.php | 20 ++++++++----------- 3 files changed, 9 insertions(+), 19 deletions(-) diff --git a/README.md b/README.md index b1efa199..ae38fac9 100644 --- a/README.md +++ b/README.md @@ -676,5 +676,6 @@ groups: batch_size: 3 prefetch_count: 3 ``` +- Make sure that vhosts exist in RabbitMQ (if not - create them) - Run command `php artisan lib-rabbitmq:scan-vhosts` within your project where this library is installed (this command fetches data from RabbitMQ to Redis) - Run command for consumer `php artisan lib-rabbitmq:consume-vhosts test-notes rabbitmq_vhosts --name=mq-vhost-test-local-notes --memory=300 --timeout=0 --max-jobs=1000 --max-time=600 --async-mode=1` diff --git a/src/Contracts/RabbitMQConsumable.php b/src/Contracts/RabbitMQConsumable.php index 9fce4a9a..4e45e3ec 100644 --- a/src/Contracts/RabbitMQConsumable.php +++ b/src/Contracts/RabbitMQConsumable.php @@ -7,13 +7,6 @@ */ interface RabbitMQConsumable { - /** - * Used to track the job's execution (e.g. for logging). - * - * @return string - */ - public function getConsumableId(): string; - /** * Check duplications on the application side. * It's mostly represented as an idempotency checker. diff --git a/src/VhostsConsumers/AbstractVhostsConsumer.php b/src/VhostsConsumers/AbstractVhostsConsumer.php index 3d186156..eeb2e7e1 100644 --- a/src/VhostsConsumers/AbstractVhostsConsumer.php +++ b/src/VhostsConsumers/AbstractVhostsConsumer.php @@ -406,10 +406,10 @@ function () use ($batchMessage, $connection, &$uniqueMessagesForProcessing, &$ba /** * @param AMQPMessage $message * @param RabbitMQQueue $connection - * @return RabbitMQConsumable|RabbitMQJob - since in PHP 8.0 intersection is not supported we use union type + * @return RabbitMQJob * @throws \Throwable */ - protected function getJobByMessage(AMQPMessage $message, RabbitMQQueue $connection): RabbitMQJob|RabbitMQConsumable + protected function getJobByMessage(AMQPMessage $message, RabbitMQQueue $connection): RabbitMQJob { $jobClass = $connection->getJobClass(); @@ -421,19 +421,17 @@ protected function getJobByMessage(AMQPMessage $message, RabbitMQQueue $connecti $this->currentQueueName ); - if (!is_subclass_of($jobClass, RabbitMQConsumable::class)) { - throw new \RuntimeException(sprintf('Job class %s must implement %s', $jobClass, RabbitMQConsumable::class)); + if (!is_subclass_of($job->getPayloadClass(), RabbitMQConsumable::class)) { + throw new \RuntimeException(sprintf('Job class %s must implement %s', $job->getPayloadClass(), RabbitMQConsumable::class)); } return $job; } - protected function processSingleJob(RabbitMQJob|RabbitMQConsumable $job, AMQPMessage $message): void + protected function processSingleJob(RabbitMQJob $job, AMQPMessage $message): void { $timeStarted = microtime(true); - $this->logInfo('processSingleJob.start', [ - 'job_consumable_id' => $job->getConsumableId(), - ]); + $this->logInfo('processSingleJob.start'); if ($this->supportsAsyncSignals()) { $this->registerTimeoutHandler($job, $this->workerOptions); @@ -441,10 +439,8 @@ protected function processSingleJob(RabbitMQJob|RabbitMQConsumable $job, AMQPMes $this->transportDeduplicationService->decorateWithDeduplication( function () use ($job, $message) { - if (AppDeduplicationService::isEnabled() && $job->isDuplicated()) { - $this->logWarning('processSingleJob.job_is_duplicated', [ - 'job_consumable_id' => $job->getConsumableId(), - ]); + if (AppDeduplicationService::isEnabled() && $job->getPayloadData()->isDuplicated()) { + $this->logWarning('processSingleJob.job_is_duplicated'); $this->ackMessage($message); } else { From 342cd983fa8d8865dee6585d5c12b9727d8021e9 Mon Sep 17 00:00:00 2001 From: Alexander Ginko Date: Fri, 31 Oct 2025 17:37:26 +0100 Subject: [PATCH 13/14] SWR-20482 Server: RabbitMQ Improvements: Deduplication --- .../TransportLevel/DeduplicationService.php | 58 ++++++++++++++++++- .../AbstractVhostsConsumer.php | 2 +- 2 files changed, 57 insertions(+), 3 deletions(-) diff --git a/src/Services/Deduplication/TransportLevel/DeduplicationService.php b/src/Services/Deduplication/TransportLevel/DeduplicationService.php index b4a2335e..fed93a73 100644 --- a/src/Services/Deduplication/TransportLevel/DeduplicationService.php +++ b/src/Services/Deduplication/TransportLevel/DeduplicationService.php @@ -3,6 +3,7 @@ namespace Salesmessage\LibRabbitMQ\Services\Deduplication\TransportLevel; use PhpAmqpLib\Message\AMQPMessage; +use PhpAmqpLib\Wire\AMQPTable; use Psr\Log\LoggerInterface; use Salesmessage\LibRabbitMQ\Contracts\RabbitMQConsumable; use Salesmessage\LibRabbitMQ\Services\DlqDetector; @@ -41,8 +42,11 @@ class DeduplicationService private const DEFAULT_LOCK_TTL = 60; private const DEFAULT_TTL = 7200; - private const MAX_LOCK_TTL = 300; + private const MAX_LOCK_TTL = 180; private const MAX_TTL = 7 * 24 * 60 * 60; + private const WAIT_AFTER_PUBLISH = 1; + + private const HEADER_LOCK_REQUEUE_COUNT = 'x-dup-lock-requeue-count'; public function __construct( private DeduplicationStore $store, @@ -208,7 +212,7 @@ protected function applyActionOnLock(AMQPMessage $message): string } elseif ($action === self::ACTION_ACK) { $message->ack(); } else { - $message->reject(true); + $action = $this->republishLockedMessage($message); } return $action; @@ -226,6 +230,56 @@ protected function applyActionOnDuplication(AMQPMessage $message): string return $action; } + /** + * Such a situation normally should not happen or can happen very rarely. + * Republish the locked message with a retry-count guard. + * It's necessary to avoid infinite redelivery loop. + * + * @param AMQPMessage $message + * @return string + */ + protected function republishLockedMessage(AMQPMessage $message): string + { + $props = $message->get_properties(); + $headers = []; + if (($props['application_headers'] ?? null) instanceof AMQPTable) { + $headers = $props['application_headers']->getNativeData(); + } + + $attempts = (int) ($headers[self::HEADER_LOCK_REQUEUE_COUNT] ?? 0); + ++$attempts; + + $maxAttempts = ((int) ($this->getConfig('lock_ttl', 30))) / self::WAIT_AFTER_PUBLISH; + if ($attempts > $maxAttempts) { + $this->logger->warning('DeduplicationService.republishLockedMessage.max_attempts_reached', [ + 'message_id' => $props['message_id'] ?? null, + ]); + $message->ack(); + + return self::ACTION_ACK; + } + + $headers[self::HEADER_LOCK_REQUEUE_COUNT] = $attempts; + + $newProps = $props; + $newProps['application_headers'] = new AMQPTable($headers); + + $newMessage = new AMQPMessage($message->getBody(), $newProps); + $channel = $message->getChannel(); + $channel->basic_publish($newMessage, $message->getExchange(), $message->getRoutingKey()); + + $this->logger->warning('DeduplicationService.republishLockedMessage.republish', [ + 'message_id' => $props['message_id'] ?? null, + 'attempts' => $attempts, + ]); + $message->ack(); + // it's necessary to avoid a high redelivery rate + // normally, such a situation is not expected (or expected very rarely) + sleep(self::WAIT_AFTER_PUBLISH); + + return self::ACTION_REQUEUE; + } + protected function isEnabled(): bool { return (bool) $this->getConfig('enabled', false); diff --git a/src/VhostsConsumers/AbstractVhostsConsumer.php b/src/VhostsConsumers/AbstractVhostsConsumer.php index eeb2e7e1..ff30cf41 100644 --- a/src/VhostsConsumers/AbstractVhostsConsumer.php +++ b/src/VhostsConsumers/AbstractVhostsConsumer.php @@ -381,7 +381,7 @@ function () use ($batchMessage, $connection, &$uniqueMessagesForProcessing, &$ba $this->connectionMutex->lock(static::MAIN_HANDLER_LOCK); try { - if ($isBatchSuccess) { + if ($isBatchSuccess && !empty($uniqueMessagesForProcessing)) { foreach ($uniqueMessagesForProcessing as $batchMessage) { $this->transportDeduplicationService?->markAsProcessed($batchMessage, $this->currentQueueName); } From a84d0831886184e42c5a891f166add753eef8819 Mon Sep 17 00:00:00 2001 From: Alexander Ginko Date: Mon, 3 Nov 2025 14:02:50 +0100 Subject: [PATCH 14/14] SWR-20482 Server: RabbitMQ Improvements: Deduplication --- src/Queue/RabbitMQQueueBatchable.php | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/Queue/RabbitMQQueueBatchable.php b/src/Queue/RabbitMQQueueBatchable.php index a596a6fc..00399cf5 100644 --- a/src/Queue/RabbitMQQueueBatchable.php +++ b/src/Queue/RabbitMQQueueBatchable.php @@ -3,6 +3,7 @@ namespace Salesmessage\LibRabbitMQ\Queue; use PhpAmqpLib\Connection\AbstractConnection; +use Salesmessage\LibRabbitMQ\Contracts\RabbitMQConsumable; use Salesmessage\LibRabbitMQ\Dto\ConnectionNameDto; use Salesmessage\LibRabbitMQ\Dto\QueueApiDto; use Salesmessage\LibRabbitMQ\Dto\VhostApiDto; @@ -77,7 +78,16 @@ protected function createChannel(): AMQPChannel public function push($job, $data = '', $queue = null) { - $queue = $queue ?: $job->onQueue(); + if (!($job instanceof RabbitMQConsumable)) { + throw new \InvalidArgumentException('Job must implement RabbitMQConsumable'); + } + + if (!$queue) { + if (!method_exists($job, 'onQueue')) { + throw new \InvalidArgumentException('Job must implement onQueue method'); + } + $queue = $job->onQueue(); + } try { $result = parent::push($job, $data, $queue);