diff --git a/README.md b/README.md index 72898252..1d1b6741 100644 --- a/README.md +++ b/README.md @@ -9,16 +9,16 @@ RabbitMQ Queue driver for Laravel Only the latest version will get new features. Bug fixes will be provided using the following scheme: -| Package Version | Laravel Version | Bug Fixes Until | | -|-----------------|-----------------|---------------------|---------------------------------------------------------------------------------------------| -| 1 | 07 | December 26th, 2024 | [Documentation](https://github.com/vyuldashev/laravel-queue-rabbitmq/blob/master/README.md) | +| Package Version | Laravel Version | Bug Fixes Until | | +|-----------------|-----------------|----------------------|---------------------------------------------------------------------------------------------| +| 1 | 29 | September 25th, 2025 | [Documentation](https://github.com/vyuldashev/laravel-queue-rabbitmq/blob/master/README.md) | ## Installation You can install this package via composer using this command: ``` -composer require salesmessage/php-lib-rabbitmq:^1.07 --ignore-platform-reqs +composer require salesmessage/php-lib-rabbitmq:^1.37 --ignore-platform-reqs ``` The package will automatically register itself. @@ -42,6 +42,7 @@ groups: - test-queue-11 queues_mask: test batch_size: 100 + prefetch_count: 100 test-group-2: vhosts: - organization_20 @@ -53,6 +54,7 @@ groups: - test-queue-22 queues_mask: test batch_size: 100 + prefetch_count: 100 test-group-3: vhosts: - organization_30 @@ -63,7 +65,8 @@ groups: - test-queue-3 - test-queue-33 queues_mask: test - batch_size: 100, + batch_size: 100 + prefetch_count: 100 ``` ### Configuration @@ -79,6 +82,7 @@ Add connection to `config/queue.php`: 'rabbitmq_vhosts' => [ 'driver' => 'rabbitmq_vhosts', + 'consumer_type' => env('RABBITMQ_VHOSTS_CONSUMER_TYPE', 'direct'), 'hosts' => [ [ 'host' => env('RABBITMQ_HOST', '127.0.0.1'), @@ -628,7 +632,7 @@ There are two ways of consuming messages. Example: ```bash -php artisan lib-rabbitmq:consume-vhosts test-group-1 rabbitmq_vhosts --name=mq-vhosts-test-name --sleep=3 --memory=300 --max-jobs=5000 --max-time=600 --prefetch-count=100 --timeout=0 +php artisan lib-rabbitmq:consume-vhosts test-group-1 rabbitmq_vhosts --name=mq-vhosts-test-name --sleep=3 --memory=300 --max-jobs=5000 --max-time=600 --timeout=0 --async-mode=1 ``` ## Testing @@ -659,7 +663,19 @@ if not all the issues with the following command: composer fix:style ``` -## Contribution - -You can contribute to this package by discovering bugs and opening issues. Please, add to which version of package you -create pull request or issue. (e.g. [5.2] Fatal error on delayed job) +## Local Setup +- Configure all config items in `config/queue.php` section `connections.rabbitmq_vhosts` (see as example [rabbitmq.php](./config/rabbitmq.php)) +- Create `yml` file in the project root with name `rabbit-groups.yml` and content, for example like this (you can replace `vhosts` and `queues` with `vhosts_mask` and `queues_mask`): +```yaml +groups: + test-notes: + vhosts: + - organization_200005 + queues: + - local-myname.notes.200005 + batch_size: 3 + prefetch_count: 3 +``` +- Make sure that vhosts exist in RabbitMQ (if not - create them) +- Run command `php artisan lib-rabbitmq:scan-vhosts` within your project where this library is installed (this command fetches data from RabbitMQ to Redis) +- Run command for consumer `php artisan lib-rabbitmq:consume-vhosts test-notes rabbitmq_vhosts --name=mq-vhost-test-local-notes --memory=300 --timeout=0 --max-jobs=1000 --max-time=600 --async-mode=1` diff --git a/composer.json b/composer.json index 55307061..710d0054 100644 --- a/composer.json +++ b/composer.json @@ -11,7 +11,7 @@ "require": { "php": "^8.0", "ext-json": "*", - "illuminate/queue": "^9.0|^10.0|^11.0", + "illuminate/queue": "^9.0|^10.0|^11.0|^12.0", "php-amqplib/php-amqplib": "^v3.6" }, "require-dev": { @@ -20,7 +20,7 @@ "laravel/horizon": "^5.0", "orchestra/testbench": "^7.0|^8.0|^9.0", "laravel/pint": "^1.2", - "laravel/framework": "^9.0|^10.0|^11.0" + "laravel/framework": "^10.0|^11.0|^12.0" }, "autoload": { "psr-4": { @@ -34,7 +34,7 @@ }, "extra": { "branch-alias": { - "dev-master": "1.07-dev" + "dev-master": "1.37-dev" }, "laravel": { "providers": [ @@ -43,7 +43,9 @@ } }, "suggest": { - "ext-pcntl": "Required to use all features of the queue consumer." + "ext-pcntl": "Required to use all features of the queue consumer.", + "ext-swoole": "Required to use async mode for healthcheck (alternative is ext-openswoole).", + "ext-openswoole": "Required to use async mode for healthcheck (alternative is ext-swoole)." }, "scripts": { "test": [ diff --git a/config/rabbitmq.php b/config/rabbitmq.php index 2b6afd96..43677de6 100644 --- a/config/rabbitmq.php +++ b/config/rabbitmq.php @@ -6,7 +6,6 @@ * You need to set proper values in `.env`. */ return [ - 'driver' => 'rabbitmq_vhosts', 'queue' => env('RABBITMQ_QUEUE', 'default'), 'connection' => 'default', @@ -23,6 +22,45 @@ ], 'options' => [ + 'quorum' => [ + /** + * Max allowed delivery attempts (RabbitMQ x-delivery-count) for quorum queues. + * 0 disables the check. + */ + 'delivery_limit' => env('RABBITMQ_QUORUM_DELIVERY_LIMIT', 2), + /** + * Action when delivery_limit is reached. + * Possible values: 'ack', 'reject' + */ + 'on_limit_action' => env('RABBITMQ_QUORUM_ON_LIMIT_ACTION', 'reject'), + ], + ], + + /** + * Provided on 2 levels: transport and application. + */ + 'deduplication' => [ + 'transport' => [ + 'enabled' => env('RABBITMQ_DEDUP_TRANSPORT_ENABLED', false), + 'ttl' => env('RABBITMQ_DEDUP_TRANSPORT_TTL', 7200), + 'lock_ttl' => env('RABBITMQ_DEDUP_TRANSPORT_LOCK_TTL', 60), + /** + * Possible: ack, reject + */ + 'action_on_duplication' => env('RABBITMQ_DEDUP_TRANSPORT_ACTION', 'ack'), + /** + * Possible: ack, reject, requeue + */ + 'action_on_lock' => env('RABBITMQ_DEDUP_TRANSPORT_LOCK_ACTION', 'requeue'), + 'connection' => [ + 'driver' => env('RABBITMQ_DEDUP_TRANSPORT_DRIVER', 'redis'), + 'name' => env('RABBITMQ_DEDUP_TRANSPORT_CONNECTION_NAME', 'persistent'), + 'key_prefix' => env('RABBITMQ_DEDUP_TRANSPORT_KEY_PREFIX', 'mq_dedup'), + ], + ], + 'application' => [ + 'enabled' => env('RABBITMQ_DEDUP_APP_ENABLED', true), + ], ], /* @@ -30,4 +68,9 @@ */ 'worker' => env('RABBITMQ_WORKER', 'default'), + /* + * Vhost prefix for organization-specific vhosts. + */ + 'vhost_prefix' => env('RABBITMQ_VHOST_PREFIX', 'organization_'), + 'debug' => env('RABBITMQ_VHOST_DEBUG', false), ]; diff --git a/src/Console/ConsumeVhostsCommand.php b/src/Console/ConsumeVhostsCommand.php index ebf7d311..1288a9c5 100644 --- a/src/Console/ConsumeVhostsCommand.php +++ b/src/Console/ConsumeVhostsCommand.php @@ -8,8 +8,8 @@ use Illuminate\Support\Str; use Salesmessage\LibRabbitMQ\Dto\ConsumeVhostsFiltersDto; use Salesmessage\LibRabbitMQ\Services\GroupsService; +use Salesmessage\LibRabbitMQ\VhostsConsumers\AbstractVhostsConsumer; use Symfony\Component\Console\Terminal; -use Salesmessage\LibRabbitMQ\VhostsConsumer; use Throwable; class ConsumeVhostsCommand extends WorkCommand @@ -30,11 +30,11 @@ class ConsumeVhostsCommand extends WorkCommand {--timeout=60 : The number of seconds a child process can run} {--tries=1 : Number of times to attempt a job before logging it failed} {--rest=0 : Number of seconds to rest between jobs} + {--async-mode=0 : Async processing for some functionality (now only "heartbeat" is supported)} {--max-priority=} {--consumer-tag} {--prefetch-size=0} - {--prefetch-count=1000} '; protected $description = 'Consume messages'; @@ -71,7 +71,7 @@ public function handle(): void trim($groupConfigData['queues_mask'] ?? '') ); - /** @var VhostsConsumer $consumer */ + /** @var AbstractVhostsConsumer $consumer */ $consumer = $this->worker; $consumer->setFiltersDto($filtersDto); @@ -83,8 +83,9 @@ public function handle(): void $consumer->setConsumerTag($this->consumerTag()); $consumer->setMaxPriority((int) $this->option('max-priority')); $consumer->setPrefetchSize((int) $this->option('prefetch-size')); - $consumer->setPrefetchCount((int) $this->option('prefetch-count')); - $consumer->setBatchSize((int) ($groupConfigData['batch_size'] ?? 100)); + $consumer->setPrefetchCount((int) ($groupConfigData['prefetch_count'] ?? 1000)); + $consumer->setBatchSize((int) ($groupConfigData['batch_size'] ?? 1000)); + $consumer->setAsyncMode((bool) $this->option('async-mode')); if ($this->downForMaintenance() && $this->option('once')) { $consumer->sleep($this->option('sleep')); @@ -96,8 +97,10 @@ public function handle(): void // which jobs are coming through a queue and be informed on its progress. $this->listenForEvents(); - $connection = $this->argument('connection') - ?: $this->laravel['config']['queue.default']; + $queueConfigData = $this->laravel['config']['queue']; + $connectionName = $this->argument('connection') ?: ($queueConfigData['default'] ?? ''); + + $consumer->setConfig((array) ($queueConfigData['connections'][$connectionName] ?? [])); if (Terminal::hasSttyAvailable()) { $this->components->info(sprintf( @@ -108,7 +111,7 @@ public function handle(): void } $this->runWorker( - $connection, + $connectionName, '' ); } diff --git a/src/Console/QueueDeclareCommand.php b/src/Console/QueueDeclareCommand.php index 396877ac..24e3c5d0 100644 --- a/src/Console/QueueDeclareCommand.php +++ b/src/Console/QueueDeclareCommand.php @@ -11,10 +11,11 @@ class QueueDeclareCommand extends Command protected $signature = 'lib-rabbitmq:queue-declare {name : The name of the queue to declare} {connection=rabbitmq : The name of the queue connection to use} - {--max-priority} + {--max-priority : Set x-max-priority (ignored for quorum)} {--durable=1} {--auto-delete=0} - {--quorum=0}'; + {--quorum=0 : Declare quorum queue (x-queue-type=quorum)} + {--quorum-initial-group-size= : x-quorum-initial-group-size when quorum is enabled}'; protected $description = 'Declare queue'; @@ -36,12 +37,24 @@ public function handle(RabbitMQConnector $connector): void $arguments = []; $maxPriority = (int) $this->option('max-priority'); - if ($maxPriority) { - $arguments['x-max-priority'] = $maxPriority; - } + $isQuorum = (bool) $this->option('quorum'); - if ($this->option('quorum')) { + if ($isQuorum) { $arguments['x-queue-type'] = 'quorum'; + + $initialGroupSize = (int) $this->option('quorum-initial-group-size'); + if ($initialGroupSize > 0) { + $arguments['x-quorum-initial-group-size'] = $initialGroupSize; + } + + if ($maxPriority) { + // quorum queues do not support priority; ignore and warn + $this->warn('Ignoring --max-priority for quorum queue.'); + } + } else { + if ($maxPriority) { + $arguments['x-max-priority'] = $maxPriority; + } } $queue->declareQueue( diff --git a/src/Console/ScanVhostsCommand.php b/src/Console/ScanVhostsCommand.php index dee26b94..49c39fca 100644 --- a/src/Console/ScanVhostsCommand.php +++ b/src/Console/ScanVhostsCommand.php @@ -3,25 +3,25 @@ namespace Salesmessage\LibRabbitMQ\Console; use Illuminate\Console\Command; -use Illuminate\Redis\Connections\PredisConnection; -use Illuminate\Support\Collection; -use Illuminate\Support\Facades\Redis; use Salesmessage\LibRabbitMQ\Dto\QueueApiDto; use Salesmessage\LibRabbitMQ\Dto\VhostApiDto; use Salesmessage\LibRabbitMQ\Services\GroupsService; use Salesmessage\LibRabbitMQ\Services\QueueService; use Salesmessage\LibRabbitMQ\Services\VhostsService; -use Throwable; use Salesmessage\LibRabbitMQ\Services\InternalStorageManager; class ScanVhostsCommand extends Command { protected $signature = 'lib-rabbitmq:scan-vhosts - {--sleep=10 : Number of seconds to sleep}'; + {--sleep=10 : Number of seconds to sleep} + {--max-time=0 : Maximum seconds the command can run before stopping} + {--with-output=true : Show output details during iteration} + {--max-memory=0 : Maximum memory usage in megabytes before stopping}'; protected $description = 'Scan and index vhosts'; - - private array $groups = ['test-group-1', 'test-group-2', 'test-group-3']; + + private array $groups; + private bool $silent = false; /** * @param GroupsService $groupsService @@ -40,42 +40,72 @@ public function __construct( $this->groups = $this->groupsService->getAllGroupsNames(); } - /** - * @return int - */ - public function handle() + public function handle(): void { $sleep = (int) $this->option('sleep'); - - $vhosts = $this->vhostsService->getAllVhosts(); - $oldVhosts = $this->internalStorageManager->getVhosts(); - - if ($vhosts->isNotEmpty()) { - foreach ($vhosts as $vhost) { - $vhostDto = $this->processVhost($vhost); - if (null === $vhostDto) { - continue; - } + $maxTime = max(0, (int) $this->option('max-time')); + $this->silent = !filter_var($this->option('with-output'), FILTER_VALIDATE_BOOLEAN); - $oldVhostIndex = array_search($vhostDto->getName(), $oldVhosts, true); - if (false !== $oldVhostIndex) { - unset($oldVhosts[$oldVhostIndex]); - } + $maxMemoryMb = max(0, (int) $this->option('max-memory')); + $maxMemoryBytes = $maxMemoryMb > 0 ? $maxMemoryMb * 1024 * 1024 : 0; + + $startedAt = microtime(true); + + while (true) { + $iterationStartedAt = microtime(true); + + $this->processVhosts(); + + $iterationDuration = microtime(true) - $iterationStartedAt; + $totalRuntime = microtime(true) - $startedAt; + $memoryUsage = memory_get_usage(true); + $memoryPeakUsage = memory_get_peak_usage(true); + + $this->line(sprintf( + 'Iteration finished in %.2f seconds (total runtime %.2f seconds). Memory usage: %s (peak %s).', + $iterationDuration, + $totalRuntime, + $this->formatBytes($memoryUsage), + $this->formatBytes($memoryPeakUsage) + ), 'warning', forcePrint: $sleep === 0); + + if ($sleep === 0) { + return; } - } else { - $this->warn('Vhosts not found.'); - } - $this->removeOldsVhosts($oldVhosts); + if ($maxTime > 0 && $totalRuntime >= $maxTime) { + $this->line(sprintf('Stopping: reached max runtime of %d seconds.', $maxTime), 'warning', forcePrint: true); + return; + } - if ($sleep > 0) { - $this->line(sprintf('Sleep %d seconds...', $sleep)); + if ($maxMemoryBytes > 0 && $memoryUsage >= $maxMemoryBytes) { + $this->line(sprintf( + 'Stopping: memory usage %s exceeded max threshold of %s.', + $this->formatBytes($memoryUsage), + $this->formatBytes($maxMemoryBytes) + ), 'warning', forcePrint: true); + return; + } + $this->line(sprintf('Sleep %d seconds...', $sleep)); sleep($sleep); - return $this->handle(); + } + } + + private function processVhosts(): void + { + $oldVhostsMap = array_flip($this->internalStorageManager->getVhosts()); + + foreach ($this->vhostsService->getAllVhosts() as $vhost) { + $vhostDto = $this->processVhost($vhost); + if (null === $vhostDto) { + continue; + } + + unset($oldVhostsMap[$vhostDto->getName()]); } - return Command::SUCCESS; + $this->removeOldsVhosts(array_keys($oldVhostsMap)); } /** @@ -92,18 +122,20 @@ private function processVhost(array $vhostApiData): ?VhostApiDto $indexedSuccessfully = $this->internalStorageManager->indexVhost($vhostDto, $this->groups); if (!$indexedSuccessfully) { $this->warn(sprintf( - 'Skip indexation vhost: "%s". Messages ready: %d.', + 'Skip indexation vhost: "%s". Messages ready: %d. Messages unacknowledged: %d.', $vhostDto->getName(), - $vhostDto->getMessagesReady() + $vhostDto->getMessagesReady(), + $vhostDto->getMessagesUnacknowledged() )); return null; } $this->info(sprintf( - 'Successfully indexed vhost: "%s". Messages ready: %d.', + 'Successfully indexed vhost: "%s". Messages ready: %d. Messages unacknowledged: %d.', $vhostDto->getName(), - $vhostDto->getMessagesReady() + $vhostDto->getMessagesReady(), + $vhostDto->getMessagesUnacknowledged() )); $vhostQueues = $this->queueService->getAllVhostQueues($vhostDto); @@ -134,6 +166,11 @@ private function processVhost(array $vhostApiData): ?VhostApiDto return $vhostDto; } + private function formatBytes(int $bytes): string + { + return number_format($bytes / (1024 * 1024), 2) . ' MB'; + } + /** * @param array $oldVhosts * @return void @@ -172,20 +209,22 @@ private function processVhostQueue(array $queueApiData): ?QueueApiDto $indexedSuccessfully = $this->internalStorageManager->indexQueue($queueApiDto, $this->groups); if (!$indexedSuccessfully) { $this->warn(sprintf( - 'Skip indexation queue: "%s". Vhost: %s. Messages ready: %d.', + 'Skip indexation queue: "%s". Vhost: %s. Messages ready: %d. Messages unacknowledged: %d.', $queueApiDto->getName(), $queueApiDto->getVhostName(), - $queueApiDto->getMessagesReady() + $queueApiDto->getMessagesReady(), + $queueApiDto->getMessagesUnacknowledged() )); return null; } $this->info(sprintf( - 'Successfully indexed queue: "%s". Vhost: %s. Messages ready: %d.', + 'Successfully indexed queue: "%s". Vhost: %s. Messages ready: %d. Messages unacknowledged: %d.', $queueApiDto->getName(), $queueApiDto->getVhostName(), - $queueApiDto->getMessagesReady() + $queueApiDto->getMessagesReady(), + $queueApiDto->getMessagesUnacknowledged() )); return $queueApiDto; @@ -217,5 +256,11 @@ private function removeOldVhostQueues(VhostApiDto $vhostDto, array $oldVhostQueu )); } } -} + public function line($string, $style = null, $verbosity = null, $forcePrint = false): void + { + if (!$this->silent || $style === 'error' || $forcePrint) { + parent::line($string, $style, $verbosity); + } + } +} diff --git a/src/Consumer.php b/src/Consumer.php index a5b7b14a..f78dc241 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -9,8 +9,9 @@ use PhpAmqpLib\Channel\AMQPChannel; use PhpAmqpLib\Exception\AMQPRuntimeException; use PhpAmqpLib\Message\AMQPMessage; -use Throwable; use Salesmessage\LibRabbitMQ\Queue\RabbitMQQueue; +use Salesmessage\LibRabbitMQ\Services\Deduplication\TransportLevel\DeduplicationService; +use Throwable; class Consumer extends Worker { @@ -32,6 +33,8 @@ class Consumer extends Worker /** @var AMQPChannel */ protected $channel; + protected $connection; + /** @var object|null */ protected $currentJob; @@ -120,7 +123,16 @@ function (AMQPMessage $message) use ($connection, $options, $connectionName, $qu $jobsProcessed++; - $this->runJob($job, $connectionName, $options); + /** @var DeduplicationService $transportDedupService */ + $transportDedupService = $this->container->make(DeduplicationService::class); + $transportDedupService->decorateWithDeduplication( + function () use ($job, $message, $connectionName, $queue, $options, $transportDedupService) { + $this->runJob($job, $connectionName, $options); + $transportDedupService->markAsProcessed($message, $queue); + }, + $message, + $queue + ); if ($this->supportsAsyncSignals()) { $this->resetTimeoutHandler(); diff --git a/src/Contracts/RabbitMQConsumable.php b/src/Contracts/RabbitMQConsumable.php new file mode 100644 index 00000000..62364e27 --- /dev/null +++ b/src/Contracts/RabbitMQConsumable.php @@ -0,0 +1,20 @@ +prepare($this->lastPushed ?? null)->value; + if (!isset($options['queue_type']) && isset($this->lastPushed) && is_object($this->lastPushed) && $this->lastPushed instanceof RabbitMQConsumable) { + $options['queue_type'] = $this->lastPushed->getQueueType(); + } + return tap(parent::pushRaw($payload, $queue, $options), function () use ($queue, $payload): void { $this->event($this->getQueue($queue), new JobPushed($payload)); }); @@ -64,7 +69,9 @@ public function later($delay, $job, $data = '', $queue = null): mixed { $payload = (new JobPayload($this->createPayload($job, $data)))->prepare($job)->value; - return tap(parent::laterRaw($delay, $payload, $queue), function () use ($payload, $queue): void { + $queueType = ($job instanceof RabbitMQConsumable) ? $job->getQueueType() : null; + + return tap(parent::laterRaw($delay, $payload, $queue, queueType: $queueType), function () use ($payload, $queue): void { $this->event($this->getQueue($queue), new JobPushed($payload)); }); } diff --git a/src/Interfaces/RabbitMQBatchable.php b/src/Interfaces/RabbitMQBatchable.php index f3abc6c5..1896c128 100644 --- a/src/Interfaces/RabbitMQBatchable.php +++ b/src/Interfaces/RabbitMQBatchable.php @@ -4,11 +4,20 @@ interface RabbitMQBatchable { + public const BATCH_TIMEOUT = 0; + + /** + * Filter out jobs that have already been processed according to the application logic. + * + * @param list $batch + * @return list + */ + public static function getNotDuplicatedBatchedJobs(array $batch): array; + /** * Processing jobs array of static class * - * @param array $batch - * @return mixed + * @param list $batch */ public static function collection(array $batch): void; } diff --git a/src/LaravelLibRabbitMQServiceProvider.php b/src/LaravelLibRabbitMQServiceProvider.php index e7f7f779..f2004b6b 100644 --- a/src/LaravelLibRabbitMQServiceProvider.php +++ b/src/LaravelLibRabbitMQServiceProvider.php @@ -2,23 +2,27 @@ namespace Salesmessage\LibRabbitMQ; +use Illuminate\Cache\RedisStore; +use Illuminate\Contracts\Cache\LockProvider; use Illuminate\Contracts\Debug\ExceptionHandler; -use Illuminate\Queue\Connectors\BeanstalkdConnector; -use Illuminate\Queue\Connectors\DatabaseConnector; -use Illuminate\Queue\Connectors\NullConnector; -use Illuminate\Queue\Connectors\RedisConnector; -use Illuminate\Queue\Connectors\SqsConnector; -use Illuminate\Queue\Connectors\SyncConnector; use Illuminate\Queue\QueueManager; use Illuminate\Support\ServiceProvider; +use Psr\Log\LoggerInterface; use Salesmessage\LibRabbitMQ\Console\ConsumeCommand; use Salesmessage\LibRabbitMQ\Console\ConsumeVhostsCommand; use Salesmessage\LibRabbitMQ\Console\ScanVhostsCommand; use Salesmessage\LibRabbitMQ\Queue\Connectors\RabbitMQVhostsConnector; +use Salesmessage\LibRabbitMQ\Services\Deduplication\TransportLevel\DeduplicationService; +use Salesmessage\LibRabbitMQ\Services\Deduplication\TransportLevel\DeduplicationStore; +use Salesmessage\LibRabbitMQ\Services\Deduplication\TransportLevel\NullDeduplicationStore; +use Salesmessage\LibRabbitMQ\Services\Deduplication\TransportLevel\RedisDeduplicationStore; use Salesmessage\LibRabbitMQ\Services\GroupsService; use Salesmessage\LibRabbitMQ\Services\InternalStorageManager; use Salesmessage\LibRabbitMQ\Services\QueueService; use Salesmessage\LibRabbitMQ\Services\VhostsService; +use Salesmessage\LibRabbitMQ\Services\DeliveryLimitService; +use Salesmessage\LibRabbitMQ\VhostsConsumers\DirectConsumer as VhostsDirectConsumer; +use Salesmessage\LibRabbitMQ\VhostsConsumers\QueueConsumer as VhostsQueueConsumer; class LaravelLibRabbitMQServiceProvider extends ServiceProvider { @@ -33,6 +37,10 @@ public function register(): void ); if ($this->app->runningInConsole()) { + $this->bindDeduplicationService(); + + $this->app->bind(LockProvider::class, RedisStore::class); + $this->app->singleton('rabbitmq.consumer', function () { $isDownForMaintenance = function () { return $this->app->isDownForMaintenance(); @@ -53,25 +61,50 @@ public function register(): void ); }); - $this->app->singleton(VhostsConsumer::class, function () { + $this->app->singleton(VhostsDirectConsumer::class, function () { + $isDownForMaintenance = function () { + return $this->app->isDownForMaintenance(); + }; + + return new VhostsDirectConsumer( + $this->app[InternalStorageManager::class], + $this->app[LoggerInterface::class], + $this->app['queue'], + $this->app['events'], + $this->app[ExceptionHandler::class], + $isDownForMaintenance, + $this->app->get(DeduplicationService::class), + $this->app->get(DeliveryLimitService::class), + null, + ); + }); + + $this->app->singleton(VhostsQueueConsumer::class, function () { $isDownForMaintenance = function () { return $this->app->isDownForMaintenance(); }; - return new VhostsConsumer( + return new VhostsQueueConsumer( $this->app[InternalStorageManager::class], + $this->app[LoggerInterface::class], $this->app['queue'], $this->app['events'], $this->app[ExceptionHandler::class], $isDownForMaintenance, - null + $this->app->get(DeduplicationService::class), + $this->app->get(DeliveryLimitService::class), + null, ); }); $this->app->singleton(ConsumeVhostsCommand::class, static function ($app) { + $consumerClass = ('direct' === config('queue.connections.rabbitmq_vhosts.consumer_type')) + ? VhostsDirectConsumer::class + : VhostsQueueConsumer::class; + return new ConsumeVhostsCommand( $app[GroupsService::class], - $app[VhostsConsumer::class], + $app[$consumerClass], $app['cache.store'] ); }); @@ -115,4 +148,35 @@ public function boot(): void return new RabbitMQVhostsConnector($this->app['events']); }); } + + /** + * Config params: + * @phpstan-import-type DeduplicationConfig from DeduplicationService + * + * @return void + */ + private function bindDeduplicationService(): void + { + $this->app->bind(DeduplicationStore::class, static function () { + /** @var DeduplicationConfig $config */ + $config = (array) config('queue.connections.rabbitmq_vhosts.deduplication.transport', []); + $enabled = (bool) ($config['enabled'] ?? false); + if (!$enabled) { + return new NullDeduplicationStore(); + } + + $connectionDriver = $config['connection']['driver'] ?? null; + if ($connectionDriver !== 'redis') { + throw new \InvalidArgumentException('For now only Redis connection is supported for deduplication'); + } + $connectionName = $config['connection']['name'] ?? null; + + $prefix = trim($config['connection']['key_prefix'] ?? ''); + if (empty($prefix)) { + throw new \InvalidArgumentException('Key prefix is required'); + } + + return new RedisDeduplicationStore($connectionName, $prefix); + }); + } } diff --git a/src/Queue/Jobs/RabbitMQJob.php b/src/Queue/Jobs/RabbitMQJob.php index 0622b8ec..a3c16b80 100644 --- a/src/Queue/Jobs/RabbitMQJob.php +++ b/src/Queue/Jobs/RabbitMQJob.php @@ -4,12 +4,14 @@ use Illuminate\Container\Container; use Illuminate\Contracts\Container\BindingResolutionException; +use Illuminate\Contracts\Encryption\Encrypter; use Illuminate\Contracts\Queue\Job as JobContract; use Illuminate\Queue\Jobs\Job; use Illuminate\Support\Arr; use PhpAmqpLib\Exception\AMQPProtocolChannelException; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; +use Salesmessage\LibRabbitMQ\Contracts\RabbitMQConsumable; use Salesmessage\LibRabbitMQ\Horizon\RabbitMQQueue as HorizonRabbitMQQueue; use Salesmessage\LibRabbitMQ\Queue\RabbitMQQueue; @@ -126,8 +128,13 @@ public function release($delay = 0): void { parent::release(); + $consumableJob = $this->getPayloadData(); + if (!($consumableJob instanceof RabbitMQConsumable)) { + throw new \RuntimeException('Job must be an instance of RabbitMQJobBatchable'); + } + // Always create a new message when this Job is released - $this->rabbitmq->laterRaw($delay, $this->message->getBody(), $this->queue, $this->attempts()); + $this->rabbitmq->laterRaw($delay, $this->message->getBody(), $this->queue, $this->attempts(), $consumableJob->getQueueType()); // Releasing a Job means the message was failed to process. // Because this Job message is always recreated and pushed as new message, this Job message is correctly handled. @@ -135,6 +142,39 @@ public function release($delay = 0): void $this->rabbitmq->ack($this); } + /** + * @return object + * @throws \RuntimeException + */ + public function getPayloadData(): object + { + $payload = $this->payload(); + + $data = $payload['data']; + + if (str_starts_with($data['command'], 'O:')) { + return unserialize($data['command']); + } + + if ($this->container->bound(Encrypter::class)) { + return unserialize($this->container[Encrypter::class]->decrypt($data['command'])); + } + + throw new \RuntimeException('Unable to extract job data.'); + } + + /** + * Returns target class name + * + * @return mixed + */ + public function getPayloadClass(): string + { + $payload = $this->payload(); + + return $payload['data']['commandName']; + } + /** * Get the underlying RabbitMQ connection. */ diff --git a/src/Queue/Jobs/RabbitMQJobBatchable.php b/src/Queue/Jobs/RabbitMQJobBatchable.php index 8015c2ad..278ae917 100644 --- a/src/Queue/Jobs/RabbitMQJobBatchable.php +++ b/src/Queue/Jobs/RabbitMQJobBatchable.php @@ -2,8 +2,6 @@ namespace Salesmessage\LibRabbitMQ\Queue\Jobs; -use Illuminate\Contracts\Encryption\Encrypter; -use Illuminate\Queue\Jobs\JobName; use Salesmessage\LibRabbitMQ\Queue\Jobs\RabbitMQJob as BaseJob; /** @@ -11,50 +9,4 @@ */ class RabbitMQJobBatchable extends BaseJob { - /** - * Fire the job. - * - * @return void - */ - public function fire() - { - $payload = $this->payload(); - - [$class, $method] = JobName::parse($payload['job']); - - ($this->instance = $this->resolve($class))->{$method}($this, $payload['data']); - } - - /** - * Returns target class name - * - * @return mixed - */ - public function getPayloadClass(): string - { - $payload = $this->payload(); - - return $payload['data']['commandName']; - } - - /** - * @return object - * @throws \RuntimeException - */ - public function getPayloadData(): object - { - $payload = $this->payload(); - - $data = $payload['data']; - - if (str_starts_with($data['command'], 'O:')) { - return unserialize($data['command']); - } - - if ($this->container->bound(Encrypter::class)) { - return unserialize($this->container[Encrypter::class]->decrypt($data['command'])); - } - - throw new \RuntimeException('Unable to extract job data.'); - } } diff --git a/src/Queue/QueueConfig.php b/src/Queue/QueueConfig.php index 1bf5802f..22d7f2fb 100644 --- a/src/Queue/QueueConfig.php +++ b/src/Queue/QueueConfig.php @@ -30,6 +30,10 @@ class QueueConfig protected bool $quorum = false; + protected ?int $quorumInitialGroupSize = null; + + protected string $quorumQueuePostfix = ''; + protected array $options = []; /** @@ -247,6 +251,41 @@ public function setQuorum($quorum): QueueConfig return $this; } + /** + * When set, used to declare quorum queues with a specific initial group size. + */ + public function getQuorumInitialGroupSize(): ?int + { + return $this->quorumInitialGroupSize; + } + + public function setQuorumInitialGroupSize(?int $size): self + { + if ($size === null) { + $this->quorumInitialGroupSize = null; + return $this; + } + + if ($size <= 0) { + throw new \InvalidArgumentException('Invalid quorum group size'); + } + + $this->quorumInitialGroupSize = $size; + + return $this; + } + + public function getQuorumQueuePostfix(): string + { + return $this->quorumQueuePostfix; + } + + public function setQuorumQueuePostfix(string $postfix): self + { + $this->quorumQueuePostfix = $postfix; + return $this; + } + /** * Holds all unknown queue options provided in the connection config */ diff --git a/src/Queue/QueueConfigFactory.php b/src/Queue/QueueConfigFactory.php index 04f29166..26d0bc2a 100644 --- a/src/Queue/QueueConfigFactory.php +++ b/src/Queue/QueueConfigFactory.php @@ -68,6 +68,15 @@ protected static function getOptionsFromConfig(QueueConfig $queueConfig, array $ $queueConfig->setQuorum($quorum); } + // Feature: Quorum initial group size + if (Arr::has($queueOptions, 'quorum_initial_group_size')) { + $queueConfig->setQuorumInitialGroupSize((int) Arr::pull($queueOptions, 'quorum_initial_group_size')); + } + + if ($quorumPostfix = (string) Arr::pull($queueOptions, 'quorum_queue_postfix')) { + $queueConfig->setQuorumQueuePostfix($quorumPostfix); + } + // All extra options not defined $queueConfig->setOptions($queueOptions); } diff --git a/src/Queue/RabbitMQQueue.php b/src/Queue/RabbitMQQueue.php index f0973d75..83ceb317 100644 --- a/src/Queue/RabbitMQQueue.php +++ b/src/Queue/RabbitMQQueue.php @@ -22,7 +22,9 @@ use PhpAmqpLib\Exchange\AMQPExchangeType; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; +use Ramsey\Uuid\Uuid; use RuntimeException; +use Salesmessage\LibRabbitMQ\Contracts\RabbitMQConsumable; use Throwable; use Salesmessage\LibRabbitMQ\Contracts\RabbitMQQueueContract; use Salesmessage\LibRabbitMQ\Queue\Jobs\RabbitMQJob; @@ -101,13 +103,21 @@ public function size($queue = null): int */ public function push($job, $data = '', $queue = null) { + if (!($job instanceof RabbitMQConsumable)) { + throw new \RuntimeException('Job must be an instance of RabbitMQConsumable'); + } + + $options = [ + 'queue_type' => $job->getQueueType(), + ]; + return $this->enqueueUsing( $job, $this->createPayload($job, $this->getQueue($queue), $data), $queue, null, - function ($payload, $queue) { - return $this->pushRaw($payload, $queue); + function ($payload, $queue) use ($options) { + return $this->pushRaw($payload, $queue, $options); } ); } @@ -119,9 +129,12 @@ function ($payload, $queue) { */ public function pushRaw($payload, $queue = null, array $options = []): int|string|null { + $queueType = $options['queue_type'] ?? null; + unset($options['queue_type']); + [$destination, $exchange, $exchangeType, $attempts] = $this->publishProperties($queue, $options); - $this->declareDestination($destination, $exchange, $exchangeType); + $this->declareDestination($destination, $exchange, $exchangeType, $queueType); [$message, $correlationId] = $this->createMessage($payload, $attempts); @@ -137,13 +150,19 @@ public function pushRaw($payload, $queue = null, array $options = []): int|strin */ public function later($delay, $job, $data = '', $queue = null): mixed { + if (!($job instanceof RabbitMQConsumable)) { + throw new \RuntimeException('Job must be an instance of RabbitMQConsumable'); + } + + $queueType = $job->getQueueType(); + return $this->enqueueUsing( $job, $this->createPayload($job, $this->getQueue($queue), $data), $queue, $delay, - function ($payload, $queue, $delay) { - return $this->laterRaw($delay, $payload, $queue); + function ($payload, $queue, $delay) use ($queueType) { + return $this->laterRaw($delay, $payload, $queue, queueType: $queueType); } ); } @@ -151,7 +170,7 @@ function ($payload, $queue, $delay) { /** * @throws AMQPProtocolChannelException */ - public function laterRaw($delay, string $payload, $queue = null, int $attempts = 0): int|string|null + public function laterRaw($delay, string $payload, $queue = null, int $attempts = 0, ?string $queueType = null): int|string|null { $ttl = $this->secondsUntil($delay) * 1000; @@ -160,12 +179,15 @@ public function laterRaw($delay, string $payload, $queue = null, int $attempts = // When no ttl just publish a new message to the exchange or queue if ($ttl <= 0) { + if ($queueType !== null) { + $options['queue_type'] = $queueType; + } return $this->pushRaw($payload, $queue, $options); } // Create a main queue to handle delayed messages [$mainDestination, $exchange, $exchangeType, $attempts] = $this->publishProperties($queue, $options); - $this->declareDestination($mainDestination, $exchange, $exchangeType); + $this->declareDestination($mainDestination, $exchange, $exchangeType, $queueType); $destination = $this->getQueue($queue).'.delay.'.$ttl; @@ -195,7 +217,13 @@ public function bulk($jobs, $data = '', $queue = null): void protected function publishBatch($jobs, $data = '', $queue = null): void { foreach ($jobs as $job) { - $this->bulkRaw($this->createPayload($job, $queue, $data), $queue, ['job' => $job]); + if (!($job instanceof RabbitMQConsumable)) { + throw new \RuntimeException('Job must be an instance of RabbitMQConsumable'); + } + $this->bulkRaw($this->createPayload($job, $queue, $data), $queue, [ + 'job' => $job, + 'queue_type' => $job->getQueueType(), + ]); } $this->batchPublish(); @@ -206,9 +234,12 @@ protected function publishBatch($jobs, $data = '', $queue = null): void */ public function bulkRaw(string $payload, string $queue = null, array $options = []): int|string|null { + $queueType = $options['queue_type'] ?? null; + unset($options['queue_type']); + [$destination, $exchange, $exchangeType, $attempts] = $this->publishProperties($queue, $options); - $this->declareDestination($destination, $exchange, $exchangeType); + $this->declareDestination($destination, $exchange, $exchangeType, $queueType); [$message, $correlationId] = $this->createMessage($payload, $attempts); @@ -521,6 +552,7 @@ protected function createMessage($payload, int $attempts = 0): array $currentPayload = json_decode($payload, true); if ($correlationId = $currentPayload['id'] ?? null) { $properties['correlation_id'] = $correlationId; + $properties['message_id'] = Uuid::uuid7()->toString(); } if ($this->getConfig()->isPrioritizeDelayed()) { @@ -597,15 +629,16 @@ public function close(): void /** * Get the Queue arguments. */ - protected function getQueueArguments(string $destination): array + protected function getQueueArguments(string $destination, ?string $queueType = null): array { + $isQuorum = $this->getConfig()->isQuorum() || $queueType === RabbitMQConsumable::MQ_TYPE_QUORUM; $arguments = []; // Messages without a priority property are treated as if their priority were 0. // Messages with a priority which is higher than the queue's maximum, are treated as if they were // published with the maximum priority. // Quorum queues does not support priority. - if ($this->getConfig()->isPrioritizeDelayed() && ! $this->getConfig()->isQuorum()) { + if ($this->getConfig()->isPrioritizeDelayed() && ! $isQuorum) { $arguments['x-max-priority'] = $this->getConfig()->getQueueMaxPriority(); } @@ -614,8 +647,14 @@ protected function getQueueArguments(string $destination): array $arguments['x-dead-letter-routing-key'] = $this->getFailedRoutingKey($destination); } - if ($this->getConfig()->isQuorum()) { + if ($isQuorum) { $arguments['x-queue-type'] = 'quorum'; + + // optional: initial group size for quorum queues + $initialGroupSize = $this->getConfig()->getQuorumInitialGroupSize(); + if ($initialGroupSize !== null) { + $arguments['x-quorum-initial-group-size'] = $initialGroupSize; + } } return $arguments; @@ -699,8 +738,12 @@ protected function isQueueDeclared(string $name): bool * * @throws AMQPProtocolChannelException */ - protected function declareDestination(string $destination, string $exchange = null, string $exchangeType = AMQPExchangeType::DIRECT): void - { + protected function declareDestination( + string $destination, + string $exchange = null, + string $exchangeType = AMQPExchangeType::DIRECT, + ?string $queueType = null, + ): void { // When an exchange is provided and no exchange is present in RabbitMQ, create an exchange. if ($exchange && ! $this->isExchangeExists($exchange)) { $this->declareExchange($exchange, $exchangeType); @@ -717,7 +760,7 @@ protected function declareDestination(string $destination, string $exchange = nu } // Create a queue for amq.direct publishing. - $this->declareQueue($destination, true, false, $this->getQueueArguments($destination)); + $this->declareQueue($destination, true, false, $this->getQueueArguments($destination, $queueType)); } /** diff --git a/src/Queue/RabbitMQQueueBatchable.php b/src/Queue/RabbitMQQueueBatchable.php index 94671d61..0bb37381 100644 --- a/src/Queue/RabbitMQQueueBatchable.php +++ b/src/Queue/RabbitMQQueueBatchable.php @@ -2,7 +2,8 @@ namespace Salesmessage\LibRabbitMQ\Queue; -use PhpAmqpLib\Connection\AbstractConnection; +use Psr\Log\LoggerInterface; +use Salesmessage\LibRabbitMQ\Contracts\RabbitMQConsumable; use Salesmessage\LibRabbitMQ\Dto\ConnectionNameDto; use Salesmessage\LibRabbitMQ\Dto\QueueApiDto; use Salesmessage\LibRabbitMQ\Dto\VhostApiDto; @@ -12,6 +13,7 @@ use PhpAmqpLib\Channel\AMQPChannel; use Salesmessage\LibRabbitMQ\Services\GroupsService; use Salesmessage\LibRabbitMQ\Services\InternalStorageManager; +use Salesmessage\LibRabbitMQ\Services\Lock\LockService; use Salesmessage\LibRabbitMQ\Services\VhostsService; class RabbitMQQueueBatchable extends BaseRabbitMQQueue @@ -22,6 +24,10 @@ class RabbitMQQueueBatchable extends BaseRabbitMQQueue private VhostsService $vhostsService; + private LockService $lockService; + + private LoggerInterface $logger; + /** * @param QueueConfig $config */ @@ -30,6 +36,8 @@ public function __construct(QueueConfig $config) $this->internalStorageManager = app(InternalStorageManager::class); $this->groupsService = app(GroupsService::class); $this->vhostsService = app(VhostsService::class); + $this->lockService = app(LockService::class); + $this->logger = app(LoggerInterface::class); parent::__construct($config); } @@ -41,8 +49,7 @@ protected function publishBasic( $mandatory = false, $immediate = false, $ticket = null - ): void - { + ): void { try { parent::publishBasic($msg, $exchange, $destination, $mandatory, $immediate, $ticket); } catch (AMQPConnectionClosedException|AMQPChannelClosedException) { @@ -65,7 +72,11 @@ protected function createChannel(): AMQPChannel { try { return parent::createChannel(); - } catch (AMQPConnectionClosedException) { + } catch (AMQPConnectionClosedException $exception) { + if ($this->isVhostFailedException($exception) && (false === $this->createNotExistsVhost())) { + throw $exception; + } + $this->reconnect(); return parent::createChannel(); } @@ -73,7 +84,20 @@ protected function createChannel(): AMQPChannel public function push($job, $data = '', $queue = null) { - $queue = $queue ?: $job->onQueue(); + if (!($job instanceof RabbitMQConsumable)) { + throw new \InvalidArgumentException('Job must implement RabbitMQConsumable'); + } + + if (!$queue) { + if (!method_exists($job, 'onQueue')) { + throw new \InvalidArgumentException('Job must implement onQueue method'); + } + $queue = $job->onQueue(); + } + + if ($job->getQueueType() === RabbitMQConsumable::MQ_TYPE_QUORUM) { + $queue .= $this->getConfig()->getQuorumQueuePostfix(); + } try { $result = parent::push($job, $data, $queue); @@ -97,24 +121,40 @@ public function push($job, $data = '', $queue = null) return $result; } - public function pushRaw($payload, $queue = null, array $options = []): int|string|null - { - return parent::pushRaw($payload, $queue, $options); - } - - /** - * @return bool - * @throws \GuzzleHttp\Exception\GuzzleException - * @throws \Salesmessage\LibRabbitMQ\Exceptions\RabbitApiClientException - */ - private function createNotExistsVhost(): bool + private function createNotExistsVhost(int $attempts = 0): bool { $dto = new ConnectionNameDto($this->getConnectionName()); if (null === $dto->getVhostName()) { return false; } - return $this->vhostsService->createVhost($dto->getVhostName(), 'Automatically created vhost'); + $hasCreated = false; + $creationHandler = function () use ($dto, &$hasCreated) { + $hasCreated = $this->vhostsService->createVhost($dto->getVhostName(), 'Automatically created vhost'); + }; + + try { + $lockKey = 'vhost:' . $dto->getVhostName() . ':creation'; + $handlerWasRun = $this->lockService->lock($lockKey, $creationHandler, skipHandlingOnLock: true); + // if handler was not run, it means that another process has possibly created the vhost + // and we need to just re-check if it exists + if (!$handlerWasRun) { + $hasCreated = isset($this->vhostsService->getVhost($dto->getVhostName(), 'name')['name']); + } + } catch (\Throwable $e) { + $this->logger->error('RabbitMQQueueBatchable.createNotExistsVhost.exception', [ + 'vhost_name' => $dto->getVhostName(), + 'error' => $e->getMessage(), + 'trace' => $e->getTraceAsString(), + ]); + } + + if (!$hasCreated && $attempts < 2) { + sleep(1); + return $this->createNotExistsVhost(++$attempts); + } + + return $hasCreated; } /** @@ -148,5 +188,26 @@ private function addQueueToIndex(string $queue): bool return $isQueueActivated && $isVhostActivated; } -} + /** + * @param AMQPConnectionClosedException $exception + * @return bool + */ + private function isVhostFailedException(AMQPConnectionClosedException $exception): bool + { + $dto = new ConnectionNameDto($this->getConnectionName()); + $vhostName = (string) $dto->getVhostName(); + + $notFoundErrorMessage = sprintf('NOT_ALLOWED - vhost %s not found', $vhostName); + if ((403 === $exception->getCode()) && str_contains($exception->getMessage(), $notFoundErrorMessage)) { + return true; + } + + $deletedErrorMessage = sprintf('CONNECTION_FORCED - vhost \'%s\' is deleted', $vhostName); + if (str_contains($exception->getMessage(), $deletedErrorMessage)) { + return true; + } + + return false; + } +} diff --git a/src/Services/Api/RabbitApiClient.php b/src/Services/Api/RabbitApiClient.php index 55e04328..0f8fd3bc 100644 --- a/src/Services/Api/RabbitApiClient.php +++ b/src/Services/Api/RabbitApiClient.php @@ -16,8 +16,8 @@ class RabbitApiClient public function __construct() { $this->client = new HttpClient([ - RequestOptions::TIMEOUT => 60, - RequestOptions::CONNECT_TIMEOUT => 60, + RequestOptions::TIMEOUT => 30, + RequestOptions::CONNECT_TIMEOUT => 30, ]); } @@ -70,7 +70,7 @@ public function request( $contents = $response->getBody()->getContents(); return (array) ($contents ? json_decode($contents, true) : []); - } catch (Throwable $exception) { + } catch (\Throwable $exception) { $rethrowException = $exception; if ($exception instanceof ClientException) { $rethrowException = new RabbitApiClientException($exception->getMessage()); @@ -85,7 +85,7 @@ public function request( */ private function getBaseUrl(): string { - $host = $this->connectionConfig['hosts'][0]['host'] ?? ''; + $host = $this->connectionConfig['hosts'][0]['api_host'] ?? ''; $port = $this->connectionConfig['hosts'][0]['api_port'] ?? ''; $scheme = $this->connectionConfig['secure'] ? 'https://' : 'http://'; @@ -109,4 +109,4 @@ private function getPassword(): string { return (string) ($this->connectionConfig['hosts'][0]['password'] ?? ''); } -} \ No newline at end of file +} diff --git a/src/Services/Deduplication/AppDeduplicationService.php b/src/Services/Deduplication/AppDeduplicationService.php new file mode 100644 index 00000000..2187557d --- /dev/null +++ b/src/Services/Deduplication/AppDeduplicationService.php @@ -0,0 +1,18 @@ +getState($message, $queueName); + try { + if ($messageState === DeduplicationService::IN_PROGRESS) { + $action = $this->applyActionOnLock($message); + $this->logger->warning('DeduplicationService.message_already_in_progress', [ + 'action' => $action, + 'message_id' => $message->get_properties()['message_id'] ?? null, + ]); + + return false; + } + + if ($messageState === DeduplicationService::PROCESSED) { + $action = $this->applyActionOnDuplication($message); + $this->logger->warning('DeduplicationService.message_already_processed', [ + 'action' => $action, + 'message_id' => $message->get_properties()['message_id'] ?? null, + ]); + + return false; + } + + $hasPutAsInProgress = $this->markAsInProgress($message, $queueName); + if ($hasPutAsInProgress === false) { + $action = $this->applyActionOnLock($message); + $this->logger->warning('DeduplicationService.message_already_in_progress.skip', [ + 'action' => $action, + 'message_id' => $message->get_properties()['message_id'] ?? null, + ]); + + return false; + } + + $handler(); + } catch (\Throwable $exception) { + if ($messageState === null) { + $this->release($message, $queueName); + } + + $this->logger->error('DeduplicationService.message_processing_exception', [ + 'released_message_id' => $message->get_properties()['message_id'] ?? null, + ]); + + throw $exception; + } + + return true; + } + + /** + * @param AMQPMessage $message + * @return string|null - @enum {self::IN_PROGRESS, self::PROCESSED} + */ + public function getState(AMQPMessage $message, ?string $queueName = null): ?string + { + if (!$this->isEnabled()) { + return null; + } + $messageId = $this->getMessageId($message, $queueName); + if ($messageId === null) { + return null; + } + + return $this->store->get($messageId); + } + + public function markAsProcessed(AMQPMessage $message, ?string $queueName = null): bool + { + $ttl = (int) ($this->getConfig('ttl') ?: self::DEFAULT_TTL); + if ($ttl <= 0 || $ttl > self::MAX_TTL) { + throw new \InvalidArgumentException(sprintf('Invalid TTL seconds. Should be between 1 sec and %d sec', self::MAX_TTL)); + } + + return $this->add($message, self::PROCESSED, $ttl, $queueName); + } + + public function release(AMQPMessage $message, ?string $queueName = null): void + { + if (!$this->isEnabled()) { + return; + } + + $messageId = $this->getMessageId($message, $queueName); + if ($messageId === null) { + return; + } + + $this->store->release($messageId); + } + + protected function markAsInProgress(AMQPMessage $message, ?string $queueName = null): bool + { + $ttl = (int) ($this->getConfig('lock_ttl') ?: self::DEFAULT_LOCK_TTL); + if ($ttl <= 0 || $ttl > self::MAX_LOCK_TTL) { + throw new \InvalidArgumentException(sprintf('Invalid TTL seconds. Should be between 1 and %d', self::MAX_LOCK_TTL)); + } + + return $this->add($message, self::IN_PROGRESS, $ttl, $queueName); + } + + /** + * Returns "true" if the message was not processed previously, and it's successfully been added to the store. + * Returns "false" if the message was already processed and it's a duplicate. + * + * @param AMQPMessage $message + * @param string $value + * @param int $ttl + * @return bool + */ + protected function add(AMQPMessage $message, string $value, int $ttl, ?string $queueName = null): bool + { + if (!$this->isEnabled()) { + return true; + } + + $messageId = $this->getMessageId($message, $queueName); + if ($messageId === null) { + return true; + } + + return $this->store->set($messageId, $value, $ttl, $value === self::PROCESSED); + } + + protected function getMessageId(AMQPMessage $message, ?string $queueName = null): ?string + { + $props = $message->get_properties(); + $messageId = $props['message_id'] ?? null; + if (!is_string($messageId) || empty($messageId)) { + return null; + } + + if (DlqDetector::isDlqMessage($message)) { + $messageId = 'dlq:' . $messageId; + } + + if (is_string($queueName) && $queueName !== '') { + $messageId = $queueName . ':' . $messageId; + } + + return $messageId; + } + + protected function applyActionOnLock(AMQPMessage $message): string + { + $action = $this->getConfig('action_on_lock', self::ACTION_REQUEUE); + if ($action === self::ACTION_REJECT) { + $message->reject(false); + } elseif ($action === self::ACTION_ACK) { + $message->ack(); + } else { + $action = $this->republishLockedMessage($message); + } + + return $action; + } + + protected function applyActionOnDuplication(AMQPMessage $message): string + { + $action = $this->getConfig('action_on_duplication', self::ACTION_ACK); + if ($action === self::ACTION_REJECT) { + $message->reject(false); + } else { + $message->ack(); + } + + return $action; + } + + /** + * Such a situation normally should not happen or can happen very rarely. + * Republish the locked message with a retry-count guard. + * It's necessary to avoid infinite redelivery loop. + * + * @param AMQPMessage $message + * @return string + */ + protected function republishLockedMessage(AMQPMessage $message): string + { + $props = $message->get_properties(); + $headers = []; + if (($props['application_headers'] ?? null) instanceof AMQPTable) { + $headers = $props['application_headers']->getNativeData(); + } + + $attempts = (int) ($headers[self::HEADER_LOCK_REQUEUE_COUNT] ?? 0); + ++$attempts; + + $maxAttempts = 1 + (((int) ($this->getConfig('lock_ttl', 30))) / self::WAIT_AFTER_PUBLISH); + if ($attempts > $maxAttempts) { + $this->logger->warning('DeduplicationService.republishLockedMessage.max_attempts_reached', [ + 'message_id' => $props['message_id'] ?? null, + ]); + $message->reject(false); + + return self::ACTION_REJECT; + } + + $headers[self::HEADER_LOCK_REQUEUE_COUNT] = $attempts; + // this header will be added during publishing, and we should not use counter from the previous message + unset($headers['x-delivery-count']); + + $newProps = $props; + $newProps['application_headers'] = new AMQPTable($headers); + + $newMessage = new AMQPMessage($message->getBody(), $newProps); + $channel = $message->getChannel(); + $channel->basic_publish($newMessage, $message->getExchange(), $message->getRoutingKey()); + + $this->logger->warning('DeduplicationService.republishLockedMessage.republish', [ + 'message_id' => $props['message_id'] ?? null, + 'attempts' => $attempts, + ]); + $message->ack(); + // it's necessary to avoid a high redelivery rate + // normally, such a situation is not expected (or expected very rarely) + // for example, when we have OOM + sleep(self::WAIT_AFTER_PUBLISH); + + return self::ACTION_REQUEUE; + } + + protected function isEnabled(): bool + { + return (bool) $this->getConfig('enabled', false); + } + + protected function getConfig(string $key, mixed $default = null): mixed + { + $value = config("queue.connections.rabbitmq_vhosts.deduplication.transport.$key"); + + return $value !== null ? $value : $default; + } +} diff --git a/src/Services/Deduplication/TransportLevel/DeduplicationStore.php b/src/Services/Deduplication/TransportLevel/DeduplicationStore.php new file mode 100644 index 00000000..014e639f --- /dev/null +++ b/src/Services/Deduplication/TransportLevel/DeduplicationStore.php @@ -0,0 +1,12 @@ +getKey($messageKey); + return $this->connection()->get($key); + } + + public function set(string $messageKey, mixed $value, int $ttlSeconds, bool $withOverride = false): bool + { + if ($ttlSeconds <= 0) { + throw new \InvalidArgumentException('Invalid TTL seconds. Should be greater than 0.'); + } + + $key = $this->getKey($messageKey); + $args = [$key, $value, 'EX', $ttlSeconds]; + if (!$withOverride) { + $args[] = 'NX'; + } + + return (bool) $this->connection()->set(...$args); + } + + public function release(string $messageKey): void + { + $key = $this->getKey($messageKey); + $this->connection()->del($key); + } + + protected function connection(): Connection + { + return $this->connectionName ? Redis::connection($this->connectionName) : Redis::connection(); + } + + protected function getKey(string $messageKey): string + { + return $this->keyPrefix . ':' . $messageKey; + } +} diff --git a/src/Services/DeliveryLimitService.php b/src/Services/DeliveryLimitService.php new file mode 100644 index 00000000..5e85e436 --- /dev/null +++ b/src/Services/DeliveryLimitService.php @@ -0,0 +1,82 @@ +config->get('queue.connections.rabbitmq_vhosts.options', []); + $limit = (int) ($config['quorum']['delivery_limit'] ?? 0); + if ($limit <= 0) { + return true; + } + + if (!$this->isFromQuorumQueue($message)) { + return true; + } + + $deliveryCount = $this->getMessageDeliveryCount($message); + if ($deliveryCount < $limit) { + return true; + } + + $action = strtolower((string) ($config['quorum']['on_limit_action'] ?? 'reject')); + try { + $this->logger->warning('Salesmessage.LibRabbitMQ.Services.DeliveryLimitService.limitReached', [ + 'message_id' => $message->get_properties()['message_id'] ?? null, + 'action' => $action, + ]); + + if ($action === 'ack') { + $message->ack(); + } else { + $message->reject(false); + } + } catch (\Throwable $exception) { + $this->logger->error('Salesmessage.LibRabbitMQ.Services.DeliveryLimitService.handle.exception', [ + 'message' => $exception->getMessage(), + 'trace' => $exception->getTraceAsString(), + 'error_class' => get_class($exception), + ]); + } + + return false; + } + + private function getMessageDeliveryCount(AMQPMessage $message): int + { + $properties = $message->get_properties(); + /** @var AMQPTable|null $headers */ + $headers = $properties['application_headers'] ?? null; + if (!$headers instanceof AMQPTable) { + return 0; + } + + return (int) ($headers->getNativeData()['x-delivery-count'] ?? 0); + } + + private function isFromQuorumQueue(AMQPMessage $message): bool + { + $properties = $message->get_properties(); + /** @var AMQPTable|null $headers */ + $headers = $properties['application_headers'] ?? null; + if (!$headers instanceof AMQPTable) { + return false; + } + + $data = $headers->getNativeData(); + + return array_key_exists('x-delivery-count', $data); + } +} diff --git a/src/Services/DlqDetector.php b/src/Services/DlqDetector.php new file mode 100644 index 00000000..997fb2f7 --- /dev/null +++ b/src/Services/DlqDetector.php @@ -0,0 +1,22 @@ +get_properties()['application_headers'] ?? null; + + if (!($headersTable instanceof AMQPTable)) { + return false; + } + + $headers = $headersTable->getNativeData(); + + return !empty($headers['x-death']) && !empty($headers['x-opt-deaths']); + } +} diff --git a/src/Services/InternalStorageManager.php b/src/Services/InternalStorageManager.php index f903de13..2708f859 100644 --- a/src/Services/InternalStorageManager.php +++ b/src/Services/InternalStorageManager.php @@ -4,6 +4,7 @@ use Illuminate\Redis\Connections\PredisConnection; use Illuminate\Support\Facades\Redis; +use Illuminate\Support\Str; use Salesmessage\LibRabbitMQ\Dto\QueueApiDto; use Salesmessage\LibRabbitMQ\Dto\VhostApiDto; @@ -38,7 +39,7 @@ public function getVhosts(string $by = 'name', bool $alpha = true): array 'sort' => 'asc', ]); - return array_map(fn($value): string => str_replace_first( + return array_map(fn($value): string => Str::replaceFirst( $this->getVhostStorageKeyPrefix(), '', $value @@ -61,7 +62,7 @@ public function getVhostQueues(string $vhostName, string $by = 'name', bool $alp 'sort' => 'asc', ]); - return array_map(fn($value): string => str_replace_first( + return array_map(fn($value): string => Str::replaceFirst( $this->getQueueStorageKeyPrefix($vhostName), '', $value @@ -75,7 +76,7 @@ public function getVhostQueues(string $vhostName, string $by = 'name', bool $alp */ public function indexVhost(VhostApiDto $vhostDto, array $groups = []): bool { - if ($vhostDto->getMessagesReady() > 0) { + if (($vhostDto->getMessagesReady() > 0) || ($vhostDto->getMessagesUnacknowledged() > 0)) { return $this->addVhost($vhostDto, $groups); } @@ -185,7 +186,7 @@ private function getVhostStorageKeyPrefix(): string */ public function indexQueue(QueueApiDto $queueDto, array $groups): bool { - if ($queueDto->getMessagesReady() > 0) { + if (($queueDto->getMessagesReady() > 0) || ($queueDto->getMessagesUnacknowledged() > 0)) { return $this->addQueue($queueDto, $groups); } diff --git a/src/Services/Lock/LockService.php b/src/Services/Lock/LockService.php new file mode 100644 index 00000000..3560696f --- /dev/null +++ b/src/Services/Lock/LockService.php @@ -0,0 +1,51 @@ +format('Uu')) / 1000; + $milliseconds = $waitForLockSec * 1000; + $lock = $this->lockProvider->lock($lockKey, $lockSec); + + /** logic was taken from @see \Illuminate\Cache\Lock::block */ + while (!$lock->acquire()) { + $now = ((int) now()->format('Uu')) / 1000; + if (($now + self::SLEEP_MS - $milliseconds) >= $starting) { + throw new LockTimeoutException; + } + Sleep::usleep(self::SLEEP_MS * 1000); + $hadLock = true; + } + + try { + if ($skipHandlingOnLock && $hadLock) { + return false; + } + + $handler(); + } finally { + $lock->release(); + } + + return true; + } +} diff --git a/src/Services/VhostsService.php b/src/Services/VhostsService.php index 30533192..6efbead2 100644 --- a/src/Services/VhostsService.php +++ b/src/Services/VhostsService.php @@ -2,15 +2,12 @@ namespace Salesmessage\LibRabbitMQ\Services; -use Illuminate\Support\Collection; use Psr\Log\LoggerInterface; use Salesmessage\LibRabbitMQ\Services\Api\RabbitApiClient; use Throwable; class VhostsService { - public const VHOST_PREFIX = 'organization_'; - /** * @param RabbitApiClient $rabbitApiClient * @param LoggerInterface $logger @@ -25,52 +22,37 @@ public function __construct( $this->rabbitApiClient->setConnectionConfig($connectionConfig); } - /** - * @param int $page - * @param int $pageSize - * @param Collection|null $vhosts - * @return Collection - */ - public function getAllVhosts( - int $page = 1, - int $pageSize = 500, - ?Collection $vhosts = null, - ): Collection + public function getAllVhosts(int $fromPage = 1): \Generator { - if (null === $vhosts) { - $vhosts = new Collection(); - } - - try { + while (true) { $data = $this->rabbitApiClient->request( 'GET', '/api/vhosts', [ - 'page' => $page, - 'page_size' => $pageSize, + 'page' => $fromPage, + 'page_size' => 500, 'columns' => 'name,messages,messages_ready,messages_unacknowledged', ]); - } catch (Throwable $exception) { - $this->logger->warning('Salesmessage.LibRabbitMQ.Services.VhostsService.getAllVhosts.exception', [ - 'message' => $exception->getMessage(), - 'code' => $exception->getCode(), - 'trace' => $exception->getTraceAsString(), - ]); - - $data = []; - } - - $items = (array) ($data['items'] ?? []); - if (!empty($items)) { - $vhosts->push(...$items); - } - $nextPage = $page + 1; - $lastPage = (int) ($data['page_count'] ?? 1); - if ($lastPage >= $nextPage) { - return $this->getAllVhosts($nextPage, $pageSize, $vhosts); + $items = $data['items'] ?? []; + if (!is_array($items) || !isset($data['page_count'])) { + throw new \LogicException('Unexpected response from RabbitMQ API'); + } + if (empty($items)) { + break; + } + + foreach ($items as $item) { + yield $item; + } + + $nextPage = $fromPage + 1; + $totalPages = (int) $data['page_count']; + if ($nextPage > $totalPages) { + break; + } + + $fromPage = $nextPage; } - - return $vhosts; } /** @@ -90,14 +72,16 @@ public function getVhostForOrganization(int $organizationId): array * @throws \GuzzleHttp\Exception\GuzzleException * @throws \Salesmessage\LibRabbitMQ\Exceptions\RabbitApiClientException */ - public function getVhost(string $vhostName): array - { + public function getVhost( + string $vhostName, + string $columns = 'name,messages,messages_ready,messages_unacknowledged', + ): array { try { $data = $this->rabbitApiClient->request( 'GET', '/api/vhosts/' . $vhostName, [ - 'columns' => 'name,messages,messages_ready,messages_unacknowledged', + 'columns' => $columns, ] ); } catch (Throwable $exception) { @@ -122,7 +106,7 @@ public function createVhostForOrganization(int $organizationId): bool { $vhostName = $this->getVhostName($organizationId); $description = $this->getVhostDescription($organizationId); - + return $this->createVhost($vhostName, $description); } @@ -145,7 +129,6 @@ public function createVhost(string $vhostName, string $description): bool 'default_queue_type' => 'classic', ] ); - $isCreated = true; } catch (Throwable $exception) { $this->logger->warning('Salesmessage.LibRabbitMQ.Services.VhostsService.createVhost.exception', [ 'vhost_name' => $vhostName, @@ -154,14 +137,10 @@ public function createVhost(string $vhostName, string $description): bool 'trace' => $exception->getTraceAsString(), ]); - $isCreated = false; - } - - if ($isCreated) { - $this->setVhostPermissions($vhostName); + return false; } - return $isCreated; + return $this->setVhostPermissions($vhostName); } /** @@ -182,7 +161,6 @@ public function setVhostPermissions(string $vhostName): bool 'read' => '.*', ] ); - $isSuccess = true; } catch (Throwable $exception) { $this->logger->warning('Salesmessage.LibRabbitMQ.Services.VhostsService.setVhostPermissions.exception', [ 'vhost_name' => $vhostName, @@ -190,11 +168,10 @@ public function setVhostPermissions(string $vhostName): bool 'code' => $exception->getCode(), 'trace' => $exception->getTraceAsString(), ]); - - $isSuccess = false; + return false; } - return $isSuccess; + return true; } /** @@ -203,7 +180,9 @@ public function setVhostPermissions(string $vhostName): bool */ public function getVhostName(int $organizationId): string { - return self::VHOST_PREFIX . $organizationId; + $vhostPrefix = config('queue.connections.rabbitmq_vhosts.vhost_prefix', 'organization_'); + + return $vhostPrefix . $organizationId; } /** diff --git a/src/VhostsConsumer.php b/src/VhostsConsumer.php deleted file mode 100644 index 911f35ff..00000000 --- a/src/VhostsConsumer.php +++ /dev/null @@ -1,650 +0,0 @@ -output = $output; - return $this; - } - - /** - * @param ConsumeVhostsFiltersDto $filtersDto - * @return $this - */ - public function setFiltersDto(ConsumeVhostsFiltersDto $filtersDto): self - { - $this->filtersDto = $filtersDto; - return $this; - } - - /** - * @param int $batchSize - * @return $this - */ - public function setBatchSize(int $batchSize): self - { - $this->batchSize = $batchSize; - return $this; - } - - public function daemon($connectionName, $queue, WorkerOptions $options) - { - $this->goAheadOrWait(); - - $this->configConnectionName = (string) $connectionName; - $this->workerOptions = $options; - - if ($this->supportsAsyncSignals()) { - $this->listenForSignals(); - } - - $lastRestart = $this->getTimestampOfLastQueueRestart(); - - [$startTime, $jobsProcessed] = [hrtime(true) / 1e9, 0]; - - /** @var RabbitMQQueue $connection */ - $connection = $this->manager->connection( - ConnectionNameDto::getVhostConnectionName($this->currentVhostName, $this->configConnectionName) - ); - $this->currentConnectionName = $connection->getConnectionName(); - - $this->channel = $connection->getChannel(); - $this->connectionMutex = new Mutex(false); - - $this->connectionMutex->lock(self::MAIN_HANDLER_LOCK); - $this->channel->basic_qos( - $this->prefetchSize, - $this->prefetchCount, - false - ); - $this->connectionMutex->unlock(self::MAIN_HANDLER_LOCK); - - $this->startConsuming(); - - while ($this->channel->is_consuming()) { - // Before reserving any jobs, we will make sure this queue is not paused and - // if it is we will just pause this worker for a given amount of time and - // make sure we do not need to kill this worker process off completely. - if (! $this->daemonShouldRun($this->workerOptions, $this->configConnectionName, $this->currentQueueName)) { - $this->output->info('Consuming pause worker...'); - - $this->pauseWorker($this->workerOptions, $lastRestart); - - continue; - } - - // If the daemon should run (not in maintenance mode, etc.), then we can wait for a job. - try { - $this->connectionMutex->lock(self::MAIN_HANDLER_LOCK); - $this->channel->wait(null, true, (int) $this->workerOptions->timeout); - $this->connectionMutex->unlock(self::MAIN_HANDLER_LOCK); - } catch (AMQPRuntimeException $exception) { - $this->output->error('Consuming AMQP Runtime exception. Error: ' . $exception->getMessage()); - - $this->exceptions->report($exception); - - $this->kill(self::EXIT_ERROR, $this->workerOptions); - } catch (Exception|Throwable $exception) { - $this->output->error('Consuming exception. Error: ' . $exception->getMessage()); - - $this->exceptions->report($exception); - - $this->stopWorkerIfLostConnection($exception); - } - - // If no job is got off the queue, we will need to sleep the worker. - if (false === $this->hasJob) { - $this->output->info('Consuming sleep. No job...'); - - $this->processBatch($connection); - - $this->stopConsuming(); - $this->goAheadOrWait(); - $this->startConsuming(); - - $this->sleep($this->workerOptions->sleep); - } - - // Finally, we will check to see if we have exceeded our memory limits or if - // the queue should restart based on other indications. If so, we'll stop - // this worker and let whatever is "monitoring" it restart the process. - $status = $this->getStopStatus( - $this->workerOptions, - $lastRestart, - $startTime, - $jobsProcessed, - $this->hasJob - ); - if (! is_null($status)) { - $this->output->info(['Consuming stop.', $status]); - - return $this->stop($status, $this->workerOptions); - } - - $this->hasJob = false; - } - } - - /** - * @param WorkerOptions $options - * @param $lastRestart - * @param $startTime - * @param $jobsProcessed - * @param $hasJob - * @return int|null - */ - protected function getStopStatus( - WorkerOptions $options, - $lastRestart, - $startTime = 0, - $jobsProcessed = 0, - bool $hasJob = false - ): ?int - { - return match (true) { - $this->shouldQuit => static::EXIT_SUCCESS, - $this->memoryExceeded($options->memory) => static::EXIT_MEMORY_LIMIT, - $this->queueShouldRestart($lastRestart) => static::EXIT_SUCCESS, - $options->stopWhenEmpty && !$hasJob => static::EXIT_SUCCESS, - $options->maxTime && hrtime(true) / 1e9 - $startTime >= $options->maxTime => static::EXIT_SUCCESS, - $options->maxJobs && $jobsProcessed >= $options->maxJobs => static::EXIT_SUCCESS, - default => null - }; - } - - private function startConsuming() - { - $this->output->info(sprintf( - 'Start consuming. Vhost: "%s". Queue: "%s"', - $this->currentVhostName, - $this->currentQueueName - )); - - $arguments = []; - if ($this->maxPriority) { - $arguments['priority'] = ['I', $this->maxPriority]; - } - - $jobsProcessed = 0; - - /** @var RabbitMQQueue $connection */ - $connection = $this->manager->connection( - ConnectionNameDto::getVhostConnectionName($this->currentVhostName, $this->configConnectionName) - ); - $this->currentConnectionName = $connection->getConnectionName(); - $this->channel = $connection->getChannel(); - - $callback = function (AMQPMessage $message) use ($connection, &$jobsProcessed): void { - $this->hasJob = true; - - if ($this->isSupportBatching($message)) { - $this->addMessageToBatch($message); - } else { - $job = $this->getJobByMessage($message, $connection); - $this->processSingleJob($job); - } - - $jobsProcessed++; - - $this->output->info(sprintf( - 'Consume message. Vhost: "%s". Queue: "%s". Num: %s', - $this->currentVhostName, - $this->currentQueueName, - $jobsProcessed - )); - - if ($jobsProcessed >= $this->batchSize) { - $this->processBatch($connection); - - $this->stopConsuming(); - $this->goAheadOrWait(); - $this->startConsuming(); - } - - if ($this->workerOptions->rest > 0) { - $this->sleep($this->workerOptions->rest); - } - }; - - $this->connectionMutex->lock(self::MAIN_HANDLER_LOCK); - $this->channel->basic_consume( - $this->currentQueueName, - $this->consumerTag, - false, - false, - false, - false, - $callback, - null, - $arguments - ); - $this->connectionMutex->unlock(self::MAIN_HANDLER_LOCK); - - $this->updateLastProcessedAt(); - } - - /** - * @param AMQPMessage $message - * @return string - */ - private function getMessageClass(AMQPMessage $message): string - { - $body = json_decode($message->getBody(), true); - - return (string) ($body['data']['commandName'] ?? ''); - } - - /** - * @param RabbitMQJob $job - * @return void - */ - private function isSupportBatching(AMQPMessage $message): bool - { - $class = $this->getMessageClass($message); - - $reflection = new \ReflectionClass($class); - - return $reflection->implementsInterface(RabbitMQBatchable::class); - } - - /** - * @param AMQPMessage $message - * @return void - */ - private function addMessageToBatch(AMQPMessage $message): void - { - $this->batchMessages[$this->getMessageClass($message)][] = $message; - } - - /** - * @param RabbitMQQueue $connection - * @return void - * @throws Exceptions\MutexTimeout - * @throws Throwable - */ - private function processBatch(RabbitMQQueue $connection): void - { - if (empty($this->batchMessages)) { - return; - } - - foreach ($this->batchMessages as $batchJobClass => $batchJobMessages) { - $isBatchSuccess = false; - - $batchSize = count($batchJobMessages); - if ($batchSize > 1) { - $batchData = []; - /** @var AMQPMessage $batchMessage */ - foreach ($batchJobMessages as $batchMessage) { - $job = $this->getJobByMessage($batchMessage, $connection); - $batchData[] = $job->getPayloadData(); - } - - try { - $batchJobClass::collection($batchData); - $isBatchSuccess = true; - - $this->output->comment('Process batch jobs success. Job class: ' . $batchJobClass . 'Size: ' . $batchSize); - } catch (Throwable $exception) { - $isBatchSuccess = false; - - $this->output->error('Process batch jobs error. Job class: ' . $batchJobClass . ' Error: ' . $exception->getMessage()); - } - - unset($batchData); - } - - $this->connectionMutex->lock(static::MAIN_HANDLER_LOCK); - if ($isBatchSuccess) { - $lastBatchMessage = end($batchJobMessages); - $this->ackMessage($lastBatchMessage, true); - } else { - foreach ($batchJobMessages as $batchMessage) { - $job = $this->getJobByMessage($batchMessage, $connection); - $this->processSingleJob($job); - } - } - $this->connectionMutex->unlock(static::MAIN_HANDLER_LOCK); - } - $this->updateLastProcessedAt(); - - $this->batchMessages = []; - } - - /** - * @param AMQPMessage $message - * @param RabbitMQQueue $connection - * @return RabbitMQJob - * @throws Throwable - */ - private function getJobByMessage(AMQPMessage $message, RabbitMQQueue $connection): RabbitMQJob - { - $jobClass = $connection->getJobClass(); - - return new $jobClass( - $this->container, - $connection, - $message, - $this->currentConnectionName, - $this->currentQueueName - ); - } - - /** - * @param RabbitMQJob $job - * @return void - */ - private function processSingleJob(RabbitMQJob $job): void - { - if ($this->supportsAsyncSignals()) { - $this->registerTimeoutHandler($job, $this->workerOptions); - } - - $this->runJob($job, $this->currentConnectionName, $this->workerOptions); - $this->updateLastProcessedAt(); - - $this->output->info('Process single job...'); - - if ($this->supportsAsyncSignals()) { - $this->resetTimeoutHandler(); - } - } - - /** - * @param AMQPMessage $message - * @param bool $multiple - * @return void - */ - private function ackMessage(AMQPMessage $message, bool $multiple = false): void - { - try { - $message->ack($multiple); - } catch (Throwable $exception) { - $this->output->error('Ack message error: ' . $exception->getMessage()); - } - } - - /** - * @return void - * @throws Exceptions\MutexTimeout - */ - private function stopConsuming() - { - $this->connectionMutex->lock(self::MAIN_HANDLER_LOCK); - $this->channel->basic_cancel($this->consumerTag, true); - $this->connectionMutex->unlock(self::MAIN_HANDLER_LOCK); - } - - /** - * @return void - */ - private function loadVhosts(): void - { - $group = $this->filtersDto->getGroup(); - $lastProcessedAtKey = $this->internalStorageManager->getLastProcessedAtKeyName($group); - - $vhosts = $this->internalStorageManager->getVhosts($lastProcessedAtKey, false); - - // filter vhosts - $filterVhosts = $this->filtersDto->getVhosts(); - if (!empty($filterVhosts)) { - $vhosts = array_filter($vhosts, fn($value) => in_array($value, $filterVhosts, true)); - } - - // filter vhosts mask - $filterVhostsMask = $this->filtersDto->getVhostsMask(); - if ('' !== $filterVhostsMask) { - $vhosts = array_filter($vhosts, fn($value) => str_contains($value, $filterVhostsMask)); - } - - $this->vhosts = $vhosts; - $this->vhostQueues = []; - - $this->currentVhostName = null; - $this->currentQueueName = null; - } - - /** - * @return bool - */ - private function switchToNextVhost(): bool - { - $nextVhost = $this->getNextVhost(); - if (null === $nextVhost) { - $this->currentVhostName = null; - $this->currentQueueName = null; - return false; - } - - $this->currentVhostName = $nextVhost; - $this->loadVhostQueues(); - - $nextQueue = $this->getNextQueue(); - if (null === $nextQueue) { - $this->currentQueueName = null; - return $this->switchToNextVhost(); - } - - $this->currentQueueName = $nextQueue; - return true; - } - - /** - * @return string|null - */ - private function getNextVhost(): ?string - { - if (null === $this->currentVhostName) { - return !empty($this->vhosts) ? (string) reset($this->vhosts) : null; - } - - $currentIndex = array_search($this->currentVhostName, $this->vhosts, true); - if ((false !== $currentIndex) && isset($this->vhosts[(int) $currentIndex + 1])) { - return (string) $this->vhosts[(int) $currentIndex + 1]; - } - - return null; - } - - /** - * @return void - */ - private function loadVhostQueues(): void - { - $group = $this->filtersDto->getGroup(); - $lastProcessedAtKey = $this->internalStorageManager->getLastProcessedAtKeyName($group); - - $vhostQueues = (null !== $this->currentVhostName) - ? $this->internalStorageManager->getVhostQueues($this->currentVhostName, $lastProcessedAtKey, false) - : []; - - // filter queues - $filterQueues = $this->filtersDto->getQueues(); - if (!empty($vhostQueues) && !empty($filterQueues)) { - $vhostQueues = array_filter($vhostQueues, fn($value) => in_array($value, $filterQueues, true)); - } - - // filter queues mask - $filterQueuesMask = $this->filtersDto->getQueuesMask(); - if ('' !== $filterQueuesMask) { - $vhostQueues = array_filter($vhostQueues, fn($value) => str_contains($value, $filterQueuesMask)); - } - - $this->vhostQueues = $vhostQueues; - - $this->currentQueueName = null; - } - - /** - * @return bool - */ - private function switchToNextQueue(): bool - { - $nextQueue = $this->getNextQueue(); - if (null === $nextQueue) { - $this->currentQueueName = null; - return false; - } - - $this->currentQueueName = $nextQueue; - return true; - } - - /** - * @return string|null - */ - private function getNextQueue(): ?string - { - if (null === $this->currentQueueName) { - return !empty($this->vhostQueues) ? (string) reset($this->vhostQueues) : null; - } - - $currentIndex = array_search($this->currentQueueName, $this->vhostQueues, true); - if ((false !== $currentIndex) && isset($this->vhostQueues[(int) $currentIndex + 1])) { - return (string) $this->vhostQueues[(int) $currentIndex + 1]; - } - - return null; - } - - /** - * @param int $waitSeconds - * @return bool - */ - private function goAheadOrWait(int $waitSeconds = 1): bool - { - if (false === $this->goAhead()) { - $this->loadVhosts(); - if (empty($this->vhosts)) { - $this->output->warning(sprintf('No active vhosts. Wait %d seconds...', $waitSeconds)); - $this->sleep($waitSeconds); - - return $this->goAheadOrWait($waitSeconds); - } - - $this->output->info('Starting from the first vhost...'); - return $this->goAheadOrWait($waitSeconds); - } - - return true; - } - - /** - * @return bool - */ - private function goAhead(): bool - { - if ($this->switchToNextQueue()) { - return true; - } - - if ($this->switchToNextVhost()) { - return true; - } - - return false; - } - - /** - * @return void - */ - private function updateLastProcessedAt() - { - if ((null === $this->currentVhostName) || (null === $this->currentQueueName)) { - return; - } - - $group = $this->filtersDto->getGroup(); - $timestamp = time(); - - $queueDto = new QueueApiDto([ - 'name' => $this->currentQueueName, - 'vhost' => $this->currentVhostName, - ]); - $queueDto - ->setGroupName($group) - ->setLastProcessedAt($timestamp); - $this->internalStorageManager->updateQueueLastProcessedAt($queueDto); - - $vhostDto = new VhostApiDto([ - 'name' => $queueDto->getVhostName(), - ]); - $vhostDto - ->setGroupName($group) - ->setLastProcessedAt($timestamp); - $this->internalStorageManager->updateVhostLastProcessedAt($vhostDto); - } -} - diff --git a/src/VhostsConsumers/AbstractVhostsConsumer.php b/src/VhostsConsumers/AbstractVhostsConsumer.php new file mode 100644 index 00000000..f2c4bc19 --- /dev/null +++ b/src/VhostsConsumers/AbstractVhostsConsumer.php @@ -0,0 +1,1031 @@ +, array> */ + protected array $batchMessages = []; + + protected ?string $processingUuid = null; + + protected int|float $processingStartedAt = 0; + + protected int $totalJobsProcessed = 0; + + protected int $jobsProcessed = 0; + + protected ?float $startTime = null; + + protected ?int $lastRestart = null; + + protected bool $hadJobs = false; + + protected ?int $stopStatusCode = null; + + protected array $config = []; + + protected bool $asyncMode = false; + + protected ?Mutex $connectionMutex = null; + + /** + * @param InternalStorageManager $internalStorageManager + * @param LoggerInterface $logger + * @param QueueManager $manager + * @param Dispatcher $events + * @param ExceptionHandler $exceptions + * @param callable $isDownForMaintenance + * @param TransportDeduplicationService $transportDeduplicationService + * @param callable|null $resetScope + */ + public function __construct( + protected InternalStorageManager $internalStorageManager, + protected LoggerInterface $logger, + QueueManager $manager, + Dispatcher $events, + ExceptionHandler $exceptions, + callable $isDownForMaintenance, + protected TransportDeduplicationService $transportDeduplicationService, + protected DeliveryLimitService $deliveryLimitService, + callable $resetScope = null, + ) { + parent::__construct($manager, $events, $exceptions, $isDownForMaintenance, $resetScope); + } + + /** + * @param OutputStyle $output + * @return $this + */ + public function setOutput(OutputStyle $output): self + { + $this->output = $output; + return $this; + } + + /** + * @param ConsumeVhostsFiltersDto $filtersDto + * @return $this + */ + public function setFiltersDto(ConsumeVhostsFiltersDto $filtersDto): self + { + $this->filtersDto = $filtersDto; + return $this; + } + + /** + * @param int $batchSize + * @return $this + */ + public function setBatchSize(int $batchSize): self + { + $this->batchSize = $batchSize; + return $this; + } + + /** + * @param array $config + * @return $this + */ + public function setConfig(array $config): self + { + $this->config = $config; + return $this; + } + + /** + * @param bool $asyncMode + * @return $this + */ + public function setAsyncMode(bool $asyncMode): self + { + $this->asyncMode = $asyncMode; + return $this; + } + + public function daemon($connectionName, $queue, WorkerOptions $options) + { + if ($this->supportsAsyncSignals()) { + $this->logInfo('daemon.asyncSignals.supported'); + $this->listenForSignals(); + } else { + $this->logInfo('daemon.asyncSignals.notSupported'); + } + + $this->startTime = hrtime(true) / 1e9; + $this->configConnectionName = (string) $connectionName; + $this->workerOptions = $options; + $this->lastRestart = $this->getTimestampOfLastQueueRestart(); + + $goAhead = $this->goAheadOrWait($options->sleep); + if ($goAhead === false) { + return $this->stopStatusCode; + } + + if ($this->asyncMode) { + $this->logInfo('daemon.AsyncMode.On'); + + $coroutineContextHandler = function () use ($connectionName, $options) { + $this->logInfo('daemon.AsyncMode.Coroutines.Running'); + + // we can't move it outside since Mutex should be created within coroutine context + $this->connectionMutex = new Mutex(true); + $this->startHeartbeatCheck(); + \go(function () use ($connectionName, $options) { + $this->vhostDaemon($connectionName, $options); + }); + }; + + if (extension_loaded('swoole')) { + $this->logInfo('daemon.AsyncMode.Swoole'); + + \Swoole\Runtime::enableCoroutine(true, SWOOLE_HOOK_ALL); + \Co\run($coroutineContextHandler); + } elseif (extension_loaded('openswoole')) { + $this->logInfo('daemon.AsyncMode.OpenSwoole'); + + \OpenSwoole\Runtime::enableCoroutine(true, \OpenSwoole\Runtime::HOOK_ALL); + \co::run($coroutineContextHandler); + } else { + $this->logError('daemon.AsyncMode.IsNotSupported'); + + throw new \Exception('Async mode is not supported. Check if Swoole extension is installed'); + } + + return; + } + + $this->logInfo('daemon.AsyncMode.Off'); + + $this->connectionMutex = new Mutex(false); + $this->startHeartbeatCheck(); + $this->vhostDaemon($connectionName, $options); + } + + abstract protected function vhostDaemon($connectionName, WorkerOptions $options); + + abstract protected function startConsuming(): ?RabbitMQQueue; + + /** + * @param AMQPMessage $message + * @param RabbitMQQueue $connection + * @return void + */ + protected function processAmqpMessage(AMQPMessage $message, RabbitMQQueue $connection): void + { + if (!$this->deliveryLimitService->isAllowed($message)) { + $this->logWarning('processAMQPMessage.delivery_limit_reached'); + return; + } + + $this->hadJobs = true; + $isSupportBatching = $this->isSupportBatching($message); + if ($isSupportBatching) { + $this->addMessageToBatch($message); + } else { + $job = $this->getJobByMessage($message, $connection); + $this->processSingleJob($job, $message); + } + + $this->jobsProcessed++; + $this->totalJobsProcessed++; + + $this->logInfo('processAMQPMessage.message_consumed', [ + 'processed_jobs_count' => $this->jobsProcessed, + 'is_support_batching' => $isSupportBatching ? 'Y' :'N', + ]); + } + + /** + * @return string + */ + protected function generateProcessingUuid(): string + { + return sprintf('%s:%d:%s', $this->filtersDto->getGroup(), time(), Str::random(16)); + } + + /** + * @param AMQPMessage $message + * @return non-empty-string + */ + protected function getMessageClass(AMQPMessage $message): string + { + $body = json_decode($message->getBody(), true); + + $messageClass = (string) ($body['data']['commandName'] ?? ''); + if (empty($messageClass)) { + throw new \RuntimeException('Message class is not defined'); + } + return $messageClass; + } + + /** + * @param AMQPMessage $message + * @return bool + * @throws \ReflectionException + */ + protected function isSupportBatching(AMQPMessage $message): bool + { + $class = $this->getMessageClass($message); + + $reflection = new \ReflectionClass($class); + + return $reflection->implementsInterface(RabbitMQBatchable::class); + } + + /** + * @param AMQPMessage $message + * @return void + */ + protected function addMessageToBatch(AMQPMessage $message): void + { + $this->batchMessages[$this->getMessageClass($message)][] = $message; + } + + /** + * @param RabbitMQQueue $connection + * @return void + * @throws MutexTimeout + * @throws \Throwable + */ + protected function processBatch(RabbitMQQueue $connection): void + { + if (empty($this->batchMessages)) { + $this->logDebug('processBatch.noMessagesInBatch'); + return; + } + + foreach ($this->batchMessages as $batchJobClass => $batchJobMessages) { + $isBatchSuccess = false; + $batchSize = count($batchJobMessages); + + if ($batchSize > 1) { + $batchTimeStarted = microtime(true); + + $uniqueMessagesForProcessing = []; + $batchData = []; + $this->logDebug('processBatch.lockingMessagesForProcessing'); + foreach ($batchJobMessages as $batchMessage) { + $this->transportDeduplicationService->decorateWithDeduplication( + function () use ($batchMessage, $connection, &$uniqueMessagesForProcessing, &$batchData) { + $job = $this->getJobByMessage($batchMessage, $connection); + $uniqueMessagesForProcessing[] = $batchMessage; + $batchData[] = $job->getPayloadData(); + }, + $batchMessage, + $this->currentQueueName + ); + } + $this->logDebug('processBatch.messagesLockedForProcessing'); + + try { + if (AppDeduplicationService::isEnabled()) { + /** @var RabbitMQBatchable $batchJobClass */ + $batchData = $batchJobClass::getNotDuplicatedBatchedJobs($batchData); + } + + if (!empty($batchData)) { + $this->logInfo('processBatch.start', [ + 'batch_job_class' => $batchJobClass, + 'batch_size' => $batchSize, + ]); + + if ($this->supportsAsyncSignals()) { + $this->registerTimeoutHandlerForBatch($batchJobClass, $this->workerOptions); + } + $batchJobClass::collection($batchData); + + $this->logInfo('processBatch.finish', [ + 'batch_job_class' => $batchJobClass, + 'batch_size' => $batchSize, + 'executive_batch_time_seconds' => microtime(true) - $batchTimeStarted, + ]); + } + + $isBatchSuccess = true; + } catch (\Throwable $exception) { + $isBatchSuccess = false; + + $this->logError('processBatch.exception', [ + 'batch_job_class' => $batchJobClass, + 'message' => $exception->getMessage(), + 'trace' => $exception->getTraceAsString(), + 'error_class' => get_class($exception), + ]); + + foreach ($uniqueMessagesForProcessing as $batchMessage) { + $this->transportDeduplicationService->release($batchMessage, $this->currentQueueName); + } + + $this->logDebug('processBatch.exception.messagesReleased'); + } finally { + if ($this->supportsAsyncSignals()) { + $this->resetTimeoutHandler(); + } + } + + unset($batchData); + } else { + $uniqueMessagesForProcessing = $batchJobMessages; + } + + $this->connectionMutex->lock(static::MAIN_HANDLER_LOCK); + try { + if ($isBatchSuccess && !empty($uniqueMessagesForProcessing)) { + $this->logDebug('processBatch.markingMessagesAsProcessed'); + foreach ($uniqueMessagesForProcessing as $batchMessage) { + $this->transportDeduplicationService?->markAsProcessed($batchMessage, $this->currentQueueName); + } + $this->logDebug('processBatch.messagesMarkedAsProcessed'); + + $lastBatchMessage = end($uniqueMessagesForProcessing); + $this->ackMessage($lastBatchMessage, true); + } else { + foreach ($uniqueMessagesForProcessing as $batchMessage) { + $job = $this->getJobByMessage($batchMessage, $connection); + $this->processSingleJob($job, $batchMessage); + } + } + } finally { + $this->connectionMutex->unlock(static::MAIN_HANDLER_LOCK); + } + } + $this->updateLastProcessedAt(); + + $this->batchMessages = []; + + $this->logDebug('processBatch.finished'); + } + + /** + * @param AMQPMessage $message + * @param RabbitMQQueue $connection + * @return RabbitMQJob + * @throws \Throwable + */ + protected function getJobByMessage(AMQPMessage $message, RabbitMQQueue $connection): RabbitMQJob + { + $jobClass = $connection->getJobClass(); + + $job = new $jobClass( + $this->container, + $connection, + $message, + $this->currentConnectionName, + $this->currentQueueName + ); + + if (!is_subclass_of($job->getPayloadClass(), RabbitMQConsumable::class)) { + throw new \RuntimeException(sprintf('Job class %s must implement %s', $job->getPayloadClass(), RabbitMQConsumable::class)); + } + + return $job; + } + + protected function processSingleJob(RabbitMQJob $job, AMQPMessage $message): void + { + $timeStarted = microtime(true); + $this->logInfo('processSingleJob.start'); + + if ($this->supportsAsyncSignals()) { + $this->registerTimeoutHandler($job, $this->workerOptions); + } + + $this->transportDeduplicationService->decorateWithDeduplication( + function () use ($job, $message) { + if (AppDeduplicationService::isEnabled() && $job->getPayloadData()->isDuplicated()) { + $this->logWarning('processSingleJob.job_is_duplicated'); + $this->ackMessage($message); + + } else { + $this->logDebug('processSingleJob.jobStarted'); + $this->runJob($job, $this->currentConnectionName, $this->workerOptions); + $this->logDebug('processSingleJob.jobFinished'); + } + + $this->transportDeduplicationService->markAsProcessed($message, $this->currentQueueName); + $this->logDebug('processSingleJob.messageMarkedAsProcessed'); + }, + $message, + $this->currentQueueName, + ); + + $this->updateLastProcessedAt(); + + if ($this->supportsAsyncSignals()) { + $this->resetTimeoutHandler(); + } + + $this->logInfo('processSingleJob.finish', [ + 'executive_job_time_seconds' => microtime(true) - $timeStarted, + ]); + } + + /** + * @param AMQPMessage $message + * @param bool $multiple + * @return void + */ + protected function ackMessage(AMQPMessage $message, bool $multiple = false): void + { + $this->logInfo('ackMessage.start', [ + 'multiple' => $multiple, + ]); + + try { + $message->ack($multiple); + $this->logDebug('ackMessage.success'); + } catch (\Throwable $exception) { + $this->logError('ackMessage.exception', [ + 'message' => $exception->getMessage(), + 'trace' => $exception->getTraceAsString(), + 'error_class' => get_class($exception), + ]); + } + } + + /** + * @return void + * @throws MutexTimeout + */ + abstract protected function stopConsuming(): void; + + /** + * @return void + */ + protected function loadVhosts(): void + { + $this->logInfo('loadVhosts.start'); + + $group = $this->filtersDto->getGroup(); + $lastProcessedAtKey = $this->internalStorageManager->getLastProcessedAtKeyName($group); + + $vhosts = $this->internalStorageManager->getVhosts($lastProcessedAtKey, false); + + // filter vhosts + $filterVhosts = $this->filtersDto->getVhosts(); + if (!empty($filterVhosts)) { + $vhosts = array_filter($vhosts, fn($value) => in_array($value, $filterVhosts, true)); + } + + // filter vhosts mask + $filterVhostsMask = $this->filtersDto->getVhostsMask(); + if ('' !== $filterVhostsMask) { + $vhosts = array_filter($vhosts, fn($value) => str_contains($value, $filterVhostsMask)); + } + + $this->vhosts = $vhosts; + $this->vhostQueues = []; + + $this->currentVhostName = null; + $this->currentQueueName = null; + } + + /** + * @return bool + */ + protected function switchToNextVhost(): bool + { + while (true) { + $nextVhost = $this->getNextVhost(); + if (null === $nextVhost) { + $this->currentVhostName = null; + $this->currentQueueName = null; + return false; + } + + $this->currentVhostName = $nextVhost; + $this->loadVhostQueues(); + + $nextQueue = $this->getNextQueue(); + if (null === $nextQueue) { + $this->currentQueueName = null; + continue; + } + + $this->currentQueueName = $nextQueue; + + $this->logInfo('switchToNextVhost.success'); + + return true; + } + } + + /** + * @return string|null + */ + protected function getNextVhost(): ?string + { + if (null === $this->currentVhostName) { + return !empty($this->vhosts) ? (string) reset($this->vhosts) : null; + } + + $currentIndex = array_search($this->currentVhostName, $this->vhosts, true); + if ((false !== $currentIndex) && isset($this->vhosts[(int) $currentIndex + 1])) { + return (string) $this->vhosts[(int) $currentIndex + 1]; + } + + return null; + } + + /** + * @return void + */ + protected function loadVhostQueues(): void + { + $this->logInfo('loadVhostQueues.start'); + + $group = $this->filtersDto->getGroup(); + $lastProcessedAtKey = $this->internalStorageManager->getLastProcessedAtKeyName($group); + + $vhostQueues = (null !== $this->currentVhostName) + ? $this->internalStorageManager->getVhostQueues($this->currentVhostName, $lastProcessedAtKey, false) + : []; + + // filter queues + $filterQueues = $this->filtersDto->getQueues(); + if (!empty($vhostQueues) && !empty($filterQueues)) { + $vhostQueues = array_filter($vhostQueues, fn($value) => in_array($value, $filterQueues, true)); + } + + // filter queues mask + $filterQueuesMask = $this->filtersDto->getQueuesMask(); + if ('' !== $filterQueuesMask) { + $vhostQueues = array_filter($vhostQueues, fn($value) => str_contains($value, $filterQueuesMask)); + } + + $this->vhostQueues = $vhostQueues; + + $this->currentQueueName = null; + } + + /** + * @return bool + */ + protected function switchToNextQueue(): bool + { + $nextQueue = $this->getNextQueue(); + if (null === $nextQueue) { + $this->currentQueueName = null; + return false; + } + + $this->currentQueueName = $nextQueue; + + $this->logInfo('switchToNextQueue.success'); + + return true; + } + + /** + * @return string|null + */ + protected function getNextQueue(): ?string + { + if (null === $this->currentQueueName) { + return !empty($this->vhostQueues) ? (string) reset($this->vhostQueues) : null; + } + + $currentIndex = array_search($this->currentQueueName, $this->vhostQueues, true); + if ((false !== $currentIndex) && isset($this->vhostQueues[(int) $currentIndex + 1])) { + return (string) $this->vhostQueues[(int) $currentIndex + 1]; + } + + return null; + } + + /** + * @param int $waitSeconds + * @return bool + */ + protected function goAheadOrWait(int $waitSeconds = 1): bool + { + while (true) { + $this->stopStatusCode = $this->stopIfNecessary( + $this->workerOptions, + $this->lastRestart, + $this->startTime, + $this->totalJobsProcessed, + true + ); + if (! is_null($this->stopStatusCode)) { + $this->logWarning('daemon.consuming_stop.from_goAheadOrWait', [ + 'status_code' => $this->stopStatusCode, + ]); + + $this->stop($this->stopStatusCode, $this->workerOptions); + + return false; + } + + if ($this->goAhead()) { + return true; + } + + if (!$this->hadJobs) { + $this->logWarning('goAheadOrWait.no_jobs_during_iteration', [ + 'wait_seconds' => $waitSeconds, + ]); + + $this->sleep($waitSeconds); + } + + $this->loadVhosts(); + $this->hadJobs = false; + if (empty($this->vhosts)) { + $this->logWarning('goAheadOrWait.no_active_vhosts', [ + 'wait_seconds' => $waitSeconds, + ]); + + $this->sleep($waitSeconds); + continue; + } + + $this->logInfo('goAheadOrWait.starting_from_the_first_vhost'); + } + } + + /** + * @return bool + */ + protected function goAhead(): bool + { + if ($this->switchToNextQueue()) { + return true; + } + + if ($this->switchToNextVhost()) { + return true; + } + + return false; + } + + /** + * @return void + */ + protected function updateLastProcessedAt(): void + { + if ((null === $this->currentVhostName) || (null === $this->currentQueueName)) { + return; + } + + $this->logInfo('updateLastProcessedAt.start'); + + $group = $this->filtersDto->getGroup(); + $timestamp = time(); + + $queueDto = new QueueApiDto([ + 'name' => $this->currentQueueName, + 'vhost' => $this->currentVhostName, + ]); + $queueDto + ->setGroupName($group) + ->setLastProcessedAt($timestamp); + $this->internalStorageManager->updateQueueLastProcessedAt($queueDto); + + $vhostDto = new VhostApiDto([ + 'name' => $queueDto->getVhostName(), + ]); + $vhostDto + ->setGroupName($group) + ->setLastProcessedAt($timestamp); + $this->internalStorageManager->updateVhostLastProcessedAt($vhostDto); + } + + protected function initConnection(int $attempts = 0): ?RabbitMQQueue + { + if ($this->channel) { + try { + $this->channel->close(); + } catch (\Exception $e) { + // Ignore close errors + } + $this->channel = null; + } + + $connection = $this->manager->connection( + ConnectionNameDto::getVhostConnectionName($this->currentVhostName, $this->configConnectionName) + ); + + try { + /** @var AMQPChannel $channel */ + $channel = $connection->getChannel(true); + + $this->currentConnectionName = $connection->getConnectionName(); + + $this->connectionMutex->lock(self::MAIN_HANDLER_LOCK); + $channel->basic_qos( + $this->prefetchSize, + $this->prefetchCount, + false + ); + + $this->channel = $channel; + $this->connection = $connection; + } catch (AMQPConnectionClosedException | AMQPChannelClosedException $exception) { + $this->logError('initConnection.exception', [ + 'message' => $exception->getMessage(), + 'trace' => $exception->getTraceAsString(), + ]); + + $vhostDto = new VhostApiDto([ + 'name' => $this->currentVhostName, + ]); + + $this->internalStorageManager->removeVhost($vhostDto); + $this->loadVhosts(); + $goAhead = $this->goAheadOrWait($this->workerOptions?->sleep ?? 1); + if ($goAhead === false) { + return null; + } + + if ($attempts > 10) { + $this->logError('initConnection.too_many_attempts', [ + 'attempts' => $attempts, + 'original_error' => $exception->getMessage(), + 'trace' => $exception->getTraceAsString(), + ]); + throw new \RuntimeException('Too many connection attempts'); + } + + sleep(1); + + return $this->initConnection(++$attempts); + } finally { + $this->connectionMutex->unlock(self::MAIN_HANDLER_LOCK); + } + + return $connection; + } + + /** + * @return void + */ + protected function startHeartbeatCheck(): void + { + if (false === $this->asyncMode) { + $this->logWarning('startHeartbeatCheck.async_mode_is_disabled'); + return; + } + + $heartbeatInterval = (int) ($this->config['options']['heartbeat'] ?? 0); + if (!$heartbeatInterval) { + $this->logWarning('startHeartbeatCheck.heartbeat_interval_is_not_set'); + return; + } + + $this->logInfo('startHeartbeatCheck.start', [ + 'heartbeat_interval' => $heartbeatInterval, + ]); + + $heartbeatHandler = function () { + $this->logDebug('startHeartbeatCheck.heartbeat_check_started'); + + if ($this->shouldQuit || (null !== $this->stopStatusCode)) { + $this->logWarning('startHeartbeatCheck.quit', [ + 'should_quit' => $this->shouldQuit, + 'stop_status_code' => $this->stopStatusCode, + ]); + + return; + } + + try { + /** @var AMQPStreamConnection|null $connection */ + $connection = $this->connection?->getConnection(); + if ((null === $connection) + || (false === $connection->isConnected()) + || $connection->isWriting() + || $connection->isBlocked() + ) { + $this->logWarning('startHeartbeatCheck.incorrect_connection', [ + 'has_connection' => (null !== $connection) ? 'Y' : 'N', + 'is_connected' => $connection?->isConnected() ? 'Y' : 'N', + 'is_writing' => $connection->isWriting() ? 'Y' : 'N', + 'is_blocked' => $connection->isBlocked() ? 'Y' : 'N', + ]); + + return; + } + + $this->connectionMutex->lock(static::HEALTHCHECK_HANDLER_LOCK, 3); + $connection->checkHeartBeat(); + + $this->logDebug('startHeartbeatCheck.heartbeat_checked'); + } catch (MutexTimeout) { + $this->logWarning('startHeartbeatCheck.mutex_timeout'); + } catch (\Throwable $exception) { + $this->logError('startHeartbeatCheck.exception', [ + 'error' => $exception->getMessage(), + 'trace' => $exception->getTraceAsString(), + ]); + + $this->shouldQuit = true; + } finally { + $this->connectionMutex->unlock(static::HEALTHCHECK_HANDLER_LOCK); + } + }; + + \go(function () use ($heartbeatHandler, $heartbeatInterval) { + $this->logInfo('startHeartbeatCheck.started'); + + while (true) { + sleep($heartbeatInterval); + $heartbeatHandler(); + if ($this->shouldQuit || !is_null($this->stopStatusCode)) { + $this->logWarning('startHeartbeatCheck.go_quit', [ + 'should_quit' => $this->shouldQuit, + 'stop_status_code' => $this->stopStatusCode, + ]); + + return; + } + } + }); + } + + /** + * @return string + */ + protected function getTagName(): string + { + return $this->consumerTag . '_' . $this->currentVhostName; + } + + /** + * @param string $message + * @param array $data + * @return void + */ + protected function logInfo(string $message, array $data = []): void + { + $this->log($message, $data, 'info'); + } + + /** + * @param string $message + * @param array $data + * @return void + */ + protected function logWarning(string $message, array $data = []): void + { + $this->log($message, $data, 'warning'); + } + + /** + * @param string $message + * @param array $data + * @return void + */ + protected function logError(string $message, array $data = []): void + { + $this->log($message, $data, 'error'); + } + + protected function logDebug(string $message, array $data = []): void + { + $this->log($message, $data, 'debug'); + } + + /** + * @param string $message + * @param array $data + * @param string $logType + * @return void + */ + protected function log(string $message, array $data = [], string $logType = 'info'): void + { + if ($logType === 'debug' && !($this->config['debug'] ?? false)) { + return; + } + + if (null !== $this->currentVhostName) { + $data['vhost_name'] = $this->currentVhostName; + } + if (null !== $this->currentQueueName) { + $data['queue_name'] = $this->currentQueueName; + } + + $outputMessage = $message; + foreach ($data as $key => $value) { + if (in_array($key, ['trace', 'error_class'])) { + continue; + } + $outputMessage .= '. ' . ucfirst(str_replace('_', ' ', $key)) . ': ' . $value; + } + + match ($logType) { + 'error' => $this->output->error($outputMessage), + 'warning' => $this->output->warning($outputMessage), + default => $this->output->info($outputMessage) + }; + + $processingData = [ + 'uuid' => $this->processingUuid, + 'started_at' => $this->processingStartedAt, + 'total_processed_jobs_count' => $this->totalJobsProcessed, + ]; + if ($this->processingStartedAt) { + $processingData['executive_time_seconds'] = microtime(true) - $this->processingStartedAt; + } + $data['processing'] = $processingData; + + $logMessage = 'Salesmessage.LibRabbitMQ.VhostsConsumers.'; + $logMessage .= class_basename(static::class) . '.'; + $logMessage .= $message; + + match ($logType) { + 'error' => $this->logger->error($logMessage, $data), + 'warning' => $this->logger->warning($logMessage, $data), + 'debug' => $this->logger->debug($logMessage, $data), + default => $this->logger->info($logMessage, $data) + }; + } + + /** + * @param class-string $job + * @param WorkerOptions|null $options + * @return void + */ + protected function registerTimeoutHandlerForBatch(string $job, ?WorkerOptions $options): void + { + $timeout = max($job::BATCH_TIMEOUT ?: (int) $options?->timeout, 0); + if (!$timeout) { + return; + } + + pcntl_signal(SIGALRM, function () use ($job, $options, $timeout) { + $this->logError('Timeout reached. Stopping batchable consumer.', [ + 'job' => $job, + 'timeout' => $timeout, + ]); + + $this->kill(static::EXIT_ERROR, $options); + }, true); + + pcntl_alarm($timeout); + } + + public function kill($status = 0, $options = null) + { + $this->logger->error('Stopped job execution.', [ + 'status' => $status, + 'options' => $options, + ]); + + parent::kill($status, $options); + } +} diff --git a/src/VhostsConsumers/DirectConsumer.php b/src/VhostsConsumers/DirectConsumer.php new file mode 100644 index 00000000..fe017bee --- /dev/null +++ b/src/VhostsConsumers/DirectConsumer.php @@ -0,0 +1,156 @@ +logInfo('daemon.start'); + + $this->totalJobsProcessed = 0; + + $connection = $this->startConsuming(); + if ($connection === null) { + return $this->stopStatusCode; + } + + while (true) { + // Before reserving any jobs, we will make sure this queue is not paused and + // if it is we will just pause this worker for a given amount of time and + // make sure we do not need to kill this worker process off completely. + if (! $this->daemonShouldRun($this->workerOptions, $this->configConnectionName, $this->currentQueueName)) { + $this->logInfo('daemon.consuming_pause_worker'); + + $this->pauseWorker($this->workerOptions, $this->lastRestart); + + continue; + } + + try { + $this->connectionMutex->lock(self::MAIN_HANDLER_LOCK); + $amqpMessage = $this->channel->basic_get($this->currentQueueName); + } catch (AMQPProtocolChannelException|AMQPChannelClosedException $exception) { + $amqpMessage = null; + + $this->logError('daemon.channel_exception', [ + 'message' => $exception->getMessage(), + 'trace' => $exception->getTraceAsString(), + 'error_class' => get_class($exception), + ]); + } catch (AMQPRuntimeException $exception) { + $this->logError('daemon.amqp_runtime_exception', [ + 'message' => $exception->getMessage(), + 'trace' => $exception->getTraceAsString(), + ]); + + $this->exceptions->report($exception); + + $this->kill(self::EXIT_SUCCESS, $this->workerOptions); + } catch (\Throwable $exception) { + $this->logError('daemon.exception', [ + 'message' => $exception->getMessage(), + 'trace' => $exception->getTraceAsString(), + 'error_class' => get_class($exception), + ]); + + $this->exceptions->report($exception); + + $this->stopWorkerIfLostConnection($exception); + } finally { + $this->connectionMutex->unlock(self::MAIN_HANDLER_LOCK); + } + + if (!isset($amqpMessage)) { + $this->logInfo('daemon.consuming_sleep_no_job'); + + $this->stopConsuming(); + + $this->processBatch($connection); + + $goAhead = $this->goAheadOrWait($this->workerOptions->sleep); + if ($goAhead === false) { + return $this->stopStatusCode; + } + $connection = $this->startConsuming(); + if ($connection === null) { + return $this->stopStatusCode; + } + + continue; + } + + $this->processAmqpMessage($amqpMessage, $connection); + + if ($this->jobsProcessed >= $this->batchSize) { + $this->logInfo('daemon.consuming_batch_full'); + + $this->stopConsuming(); + + $this->processBatch($connection); + + $goAhead = $this->goAheadOrWait($this->workerOptions->sleep); + if ($goAhead === false) { + return $this->stopStatusCode; + } + $connection = $this->startConsuming(); + if ($connection === null) { + return $this->stopStatusCode; + } + + continue; + } + + // Finally, we will check to see if we have exceeded our memory limits or if + // the queue should restart based on other indications. If so, we'll stop + // this worker and let whatever is "monitoring" it restart the process. + $this->stopStatusCode = $this->stopIfNecessary( + $this->workerOptions, + $this->lastRestart, + $this->startTime, + $this->totalJobsProcessed, + true + ); + if (! is_null($this->stopStatusCode)) { + $this->logWarning('daemon.consuming_stop', [ + 'status_code' => $this->stopStatusCode, + ]); + + return $this->stop($this->stopStatusCode, $this->workerOptions); + } + } + } + + protected function startConsuming(): ?RabbitMQQueue + { + $this->processingUuid = $this->generateProcessingUuid(); + $this->processingStartedAt = microtime(true); + + $this->logInfo('startConsuming.init'); + + $this->jobsProcessed = 0; + + $connection = $this->initConnection(); + + $this->updateLastProcessedAt(); + + return $connection; + } + + protected function stopConsuming(): void + { + return; + } +} diff --git a/src/VhostsConsumers/QueueConsumer.php b/src/VhostsConsumers/QueueConsumer.php new file mode 100644 index 00000000..5a425d44 --- /dev/null +++ b/src/VhostsConsumers/QueueConsumer.php @@ -0,0 +1,234 @@ +logInfo('daemon.start'); + + $this->totalJobsProcessed = 0; + + $connection = $this->startConsuming(); + if ($connection === null) { + return $this->stopStatusCode; + } + + while ($this->channel->is_consuming()) { + // Before reserving any jobs, we will make sure this queue is not paused and + // if it is we will just pause this worker for a given amount of time and + // make sure we do not need to kill this worker process off completely. + if (! $this->daemonShouldRun($this->workerOptions, $this->configConnectionName, $this->currentQueueName)) { + $this->logInfo('daemon.consuming_pause_worker'); + + $this->pauseWorker($this->workerOptions, $this->lastRestart); + + continue; + } + + // If the daemon should run (not in maintenance mode, etc.), then we can wait for a job. + try { + $this->connectionMutex->lock(self::MAIN_HANDLER_LOCK); + $this->channel->wait(null, true, (int) $this->workerOptions->timeout); + } catch (AMQPRuntimeException $exception) { + $this->logError('daemon.amqp_runtime_exception', [ + 'message' => $exception->getMessage(), + 'trace' => $exception->getTraceAsString(), + ]); + + $this->exceptions->report($exception); + + $this->kill(self::EXIT_SUCCESS, $this->workerOptions); + } catch (\Throwable $exception) { + $this->logError('daemon.exception', [ + 'message' => $exception->getMessage(), + 'trace' => $exception->getTraceAsString(), + 'error_class' => get_class($exception), + ]); + + $this->exceptions->report($exception); + + $this->stopWorkerIfLostConnection($exception); + } finally { + $this->connectionMutex->unlock(self::MAIN_HANDLER_LOCK); + } + + // If no job is got off the queue, we will need to sleep the worker. + if (false === $this->hasJob) { + $this->logInfo('daemon.consuming_sleep_no_job', [ + 'sleep_seconds' => $this->workerOptions->sleep, + ]); + + $this->stopConsuming(); + + $this->processBatch($connection); + + $goAhead = $this->goAheadOrWait($this->workerOptions->sleep); + if ($goAhead === false) { + return $this->stopStatusCode; + } + if ($this->startConsuming() === null) { + return $this->stopStatusCode; + } + + $this->sleep($this->workerOptions->sleep); + } + + // Finally, we will check to see if we have exceeded our memory limits or if + // the queue should restart based on other indications. If so, we'll stop + // this worker and let whatever is "monitoring" it restart the process. + $this->stopStatusCode = $this->stopIfNecessary( + $this->workerOptions, + $this->lastRestart, + $this->startTime, + $this->totalJobsProcessed, + $this->hasJob ?: null, + ); + if (! is_null($this->stopStatusCode)) { + $this->logWarning('daemon.consuming_stop', [ + 'status_code' => $this->stopStatusCode, + ]); + + return $this->stop($this->stopStatusCode, $this->workerOptions); + } + + $this->hasJob = false; + } + } + + protected function startConsuming(int $attempts = 0): ?RabbitMQQueue + { + $this->processingUuid = $this->generateProcessingUuid(); + $this->processingStartedAt = microtime(true); + + $this->logInfo('startConsuming.init'); + + $arguments = []; + if ($this->maxPriority) { + $arguments['priority'] = ['I', $this->maxPriority]; + } + + $this->jobsProcessed = 0; + + $connection = $this->initConnection(); + if ($connection === null) { + return null; + } + + $stopConsuming = false; + $callback = function (AMQPMessage $message) use ($connection, &$stopConsuming): void { + $this->hasJob = true; + + $this->processAMQPMessage($message, $connection); + + if ($this->jobsProcessed >= $this->batchSize) { + $this->stopConsuming(); + + $this->processBatch($connection); + + $goAhead = $this->goAheadOrWait($this->workerOptions->sleep); + if ($goAhead === false) { + $stopConsuming = true; + return; + } + if ($this->startConsuming() === null) { + $stopConsuming = true; + return; + } + } + + if ($this->workerOptions->rest > 0) { + $this->logInfo('startConsuming.rest', [ + 'rest_seconds' => $this->workerOptions->rest, + ]); + + $this->sleep($this->workerOptions->rest); + } + }; + + $isSuccess = true; + + $this->connectionMutex->lock(self::MAIN_HANDLER_LOCK); + try { + $this->channel->basic_consume( + $this->currentQueueName, + $this->getTagName(), + false, + false, + false, + false, + $callback, + null, + $arguments + ); + } catch (AMQPProtocolChannelException|AMQPChannelClosedException $exception) { + $isSuccess = false; + + $this->logError('startConsuming.exception', [ + 'message' => $exception->getMessage(), + 'trace' => $exception->getTraceAsString(), + 'error_class' => get_class($exception), + ]); + } finally { + $this->connectionMutex->unlock(self::MAIN_HANDLER_LOCK); + } + + $this->updateLastProcessedAt(); + + if ($stopConsuming) { + return null; + } + + if (false === $isSuccess) { + if ($attempts > 10) { + $this->logError('startConsuming.failed_to_consume_after_attempts', [ + 'attempts' => $attempts, + ]); + return null; + } + + $goAhead = $this->goAheadOrWait($this->workerOptions->sleep); + if ($goAhead === false) { + return null; + } + + $this->stopConsuming(); + + return $this->startConsuming(++$attempts); + } + + return $connection; + } + + /** + * @return void + * @throws MutexTimeout + */ + protected function stopConsuming(): void + { + $this->connectionMutex->lock(self::MAIN_HANDLER_LOCK); + try { + $this->channel->basic_cancel($this->getTagName(), true); + } finally { + $this->connectionMutex->unlock(self::MAIN_HANDLER_LOCK); + } + } +}