diff --git a/README.md b/README.md index 16db9cd3..74dbd8b5 100644 --- a/README.md +++ b/README.md @@ -9,16 +9,16 @@ RabbitMQ Queue driver for Laravel Only the latest version will get new features. Bug fixes will be provided using the following scheme: -| Package Version | Laravel Version | Bug Fixes Until | | -|-----------------|-----------------|------------------|---------------------------------------------------------------------------------------------| -| 1 | 20 | April 23th, 2025 | [Documentation](https://github.com/vyuldashev/laravel-queue-rabbitmq/blob/master/README.md) | +| Package Version | Laravel Version | Bug Fixes Until | | +|-----------------|-----------------|----------------------|---------------------------------------------------------------------------------------------| +| 1 | 29 | September 25th, 2025 | [Documentation](https://github.com/vyuldashev/laravel-queue-rabbitmq/blob/master/README.md) | ## Installation You can install this package via composer using this command: ``` -composer require salesmessage/php-lib-rabbitmq:^1.20 --ignore-platform-reqs +composer require salesmessage/php-lib-rabbitmq:^1.33 --ignore-platform-reqs ``` The package will automatically register itself. @@ -632,7 +632,7 @@ There are two ways of consuming messages. Example: ```bash -php artisan lib-rabbitmq:consume-vhosts test-group-1 rabbitmq_vhosts --name=mq-vhosts-test-name --sleep=3 --memory=300 --max-jobs=5000 --max-time=600 --timeout=0 +php artisan lib-rabbitmq:consume-vhosts test-group-1 rabbitmq_vhosts --name=mq-vhosts-test-name --sleep=3 --memory=300 --max-jobs=5000 --max-time=600 --timeout=0 --async-mode=1 ``` ## Testing @@ -663,7 +663,19 @@ if not all the issues with the following command: composer fix:style ``` -## Contribution - -You can contribute to this package by discovering bugs and opening issues. Please, add to which version of package you -create pull request or issue. (e.g. [5.2] Fatal error on delayed job) +## Local Setup +- Configure all config items in `config/queue.php` section `connections.rabbitmq_vhosts` (see as example [rabbitmq.php](./config/rabbitmq.php)) +- Create `yml` file in the project root with name `rabbit-groups.yml` and content, for example like this (you can replace `vhosts` and `queues` with `vhosts_mask` and `queues_mask`): +```yaml +groups: + test-notes: + vhosts: + - organization_200005 + queues: + - local-myname.notes.200005 + batch_size: 3 + prefetch_count: 3 +``` +- Make sure that vhosts exist in RabbitMQ (if not - create them) +- Run command `php artisan lib-rabbitmq:scan-vhosts` within your project where this library is installed (this command fetches data from RabbitMQ to Redis) +- Run command for consumer `php artisan lib-rabbitmq:consume-vhosts test-notes rabbitmq_vhosts --name=mq-vhost-test-local-notes --memory=300 --timeout=0 --max-jobs=1000 --max-time=600 --async-mode=1` diff --git a/composer.json b/composer.json index e7aacf7e..151bc3d1 100644 --- a/composer.json +++ b/composer.json @@ -11,7 +11,7 @@ "require": { "php": "^8.0", "ext-json": "*", - "illuminate/queue": "^9.0|^10.0|^11.0", + "illuminate/queue": "^9.0|^10.0|^11.0|^12.0", "php-amqplib/php-amqplib": "^v3.6" }, "require-dev": { @@ -20,7 +20,7 @@ "laravel/horizon": "^5.0", "orchestra/testbench": "^7.0|^8.0|^9.0", "laravel/pint": "^1.2", - "laravel/framework": "^10.0|^11.0" + "laravel/framework": "^10.0|^11.0|^12.0" }, "autoload": { "psr-4": { @@ -34,7 +34,7 @@ }, "extra": { "branch-alias": { - "dev-master": "1.26-dev" + "dev-master": "1.33-dev" }, "laravel": { "providers": [ @@ -43,7 +43,9 @@ } }, "suggest": { - "ext-pcntl": "Required to use all features of the queue consumer." + "ext-pcntl": "Required to use all features of the queue consumer.", + "ext-swoole": "Required to use async mode for healthcheck (alternative is ext-openswoole).", + "ext-openswoole": "Required to use async mode for healthcheck (alternative is ext-swoole)." }, "scripts": { "test": [ @@ -56,4 +58,4 @@ }, "minimum-stability": "dev", "prefer-stable": true -} \ No newline at end of file +} diff --git a/config/rabbitmq.php b/config/rabbitmq.php index 8b949d9c..9b457a84 100644 --- a/config/rabbitmq.php +++ b/config/rabbitmq.php @@ -25,6 +25,33 @@ 'options' => [ ], + /** + * Provided on 2 levels: transport and application. + */ + 'deduplication' => [ + 'transport' => [ + 'enabled' => env('RABBITMQ_DEDUP_TRANSPORT_ENABLED', false), + 'ttl' => env('RABBITMQ_DEDUP_TRANSPORT_TTL', 7200), + 'lock_ttl' => env('RABBITMQ_DEDUP_TRANSPORT_LOCK_TTL', 60), + /** + * Possible: ack, reject + */ + 'action_on_duplication' => env('RABBITMQ_DEDUP_TRANSPORT_ACTION', 'ack'), + /** + * Possible: ack, reject, requeue + */ + 'action_on_lock' => env('RABBITMQ_DEDUP_TRANSPORT_LOCK_ACTION', 'requeue'), + 'connection' => [ + 'driver' => env('RABBITMQ_DEDUP_TRANSPORT_DRIVER', 'redis'), + 'name' => env('RABBITMQ_DEDUP_TRANSPORT_CONNECTION_NAME', 'persistent'), + 'key_prefix' => env('RABBITMQ_DEDUP_TRANSPORT_KEY_PREFIX', 'mq_dedup'), + ], + ], + 'application' => [ + 'enabled' => env('RABBITMQ_DEDUP_APP_ENABLED', true), + ], + ], + /* * Set to "horizon" if you wish to use Laravel Horizon. */ diff --git a/src/Console/ConsumeVhostsCommand.php b/src/Console/ConsumeVhostsCommand.php index d73a66c3..1288a9c5 100644 --- a/src/Console/ConsumeVhostsCommand.php +++ b/src/Console/ConsumeVhostsCommand.php @@ -30,6 +30,7 @@ class ConsumeVhostsCommand extends WorkCommand {--timeout=60 : The number of seconds a child process can run} {--tries=1 : Number of times to attempt a job before logging it failed} {--rest=0 : Number of seconds to rest between jobs} + {--async-mode=0 : Async processing for some functionality (now only "heartbeat" is supported)} {--max-priority=} {--consumer-tag} @@ -84,6 +85,7 @@ public function handle(): void $consumer->setPrefetchSize((int) $this->option('prefetch-size')); $consumer->setPrefetchCount((int) ($groupConfigData['prefetch_count'] ?? 1000)); $consumer->setBatchSize((int) ($groupConfigData['batch_size'] ?? 1000)); + $consumer->setAsyncMode((bool) $this->option('async-mode')); if ($this->downForMaintenance() && $this->option('once')) { $consumer->sleep($this->option('sleep')); @@ -95,8 +97,10 @@ public function handle(): void // which jobs are coming through a queue and be informed on its progress. $this->listenForEvents(); - $connection = $this->argument('connection') - ?: $this->laravel['config']['queue.default']; + $queueConfigData = $this->laravel['config']['queue']; + $connectionName = $this->argument('connection') ?: ($queueConfigData['default'] ?? ''); + + $consumer->setConfig((array) ($queueConfigData['connections'][$connectionName] ?? [])); if (Terminal::hasSttyAvailable()) { $this->components->info(sprintf( @@ -107,7 +111,7 @@ public function handle(): void } $this->runWorker( - $connection, + $connectionName, '' ); } diff --git a/src/Console/QueueDeclareCommand.php b/src/Console/QueueDeclareCommand.php index 396877ac..24e3c5d0 100644 --- a/src/Console/QueueDeclareCommand.php +++ b/src/Console/QueueDeclareCommand.php @@ -11,10 +11,11 @@ class QueueDeclareCommand extends Command protected $signature = 'lib-rabbitmq:queue-declare {name : The name of the queue to declare} {connection=rabbitmq : The name of the queue connection to use} - {--max-priority} + {--max-priority : Set x-max-priority (ignored for quorum)} {--durable=1} {--auto-delete=0} - {--quorum=0}'; + {--quorum=0 : Declare quorum queue (x-queue-type=quorum)} + {--quorum-initial-group-size= : x-quorum-initial-group-size when quorum is enabled}'; protected $description = 'Declare queue'; @@ -36,12 +37,24 @@ public function handle(RabbitMQConnector $connector): void $arguments = []; $maxPriority = (int) $this->option('max-priority'); - if ($maxPriority) { - $arguments['x-max-priority'] = $maxPriority; - } + $isQuorum = (bool) $this->option('quorum'); - if ($this->option('quorum')) { + if ($isQuorum) { $arguments['x-queue-type'] = 'quorum'; + + $initialGroupSize = (int) $this->option('quorum-initial-group-size'); + if ($initialGroupSize > 0) { + $arguments['x-quorum-initial-group-size'] = $initialGroupSize; + } + + if ($maxPriority) { + // quorum queues do not support priority; ignore and warn + $this->warn('Ignoring --max-priority for quorum queue.'); + } + } else { + if ($maxPriority) { + $arguments['x-max-priority'] = $maxPriority; + } } $queue->declareQueue( diff --git a/src/Console/ScanVhostsCommand.php b/src/Console/ScanVhostsCommand.php index c17d2fb0..49c39fca 100644 --- a/src/Console/ScanVhostsCommand.php +++ b/src/Console/ScanVhostsCommand.php @@ -3,9 +3,6 @@ namespace Salesmessage\LibRabbitMQ\Console; use Illuminate\Console\Command; -use Illuminate\Redis\Connections\PredisConnection; -use Illuminate\Support\Collection; -use Illuminate\Support\Facades\Redis; use Salesmessage\LibRabbitMQ\Dto\QueueApiDto; use Salesmessage\LibRabbitMQ\Dto\VhostApiDto; use Salesmessage\LibRabbitMQ\Services\GroupsService; @@ -16,11 +13,15 @@ class ScanVhostsCommand extends Command { protected $signature = 'lib-rabbitmq:scan-vhosts - {--sleep=10 : Number of seconds to sleep}'; + {--sleep=10 : Number of seconds to sleep} + {--max-time=0 : Maximum seconds the command can run before stopping} + {--with-output=true : Show output details during iteration} + {--max-memory=0 : Maximum memory usage in megabytes before stopping}'; protected $description = 'Scan and index vhosts'; - - private array $groups = []; + + private array $groups; + private bool $silent = false; /** * @param GroupsService $groupsService @@ -39,42 +40,72 @@ public function __construct( $this->groups = $this->groupsService->getAllGroupsNames(); } - /** - * @return int - */ - public function handle() + public function handle(): void { $sleep = (int) $this->option('sleep'); - - $vhosts = $this->vhostsService->getAllVhosts(); - $oldVhosts = $this->internalStorageManager->getVhosts(); - - if ($vhosts->isNotEmpty()) { - foreach ($vhosts as $vhost) { - $vhostDto = $this->processVhost($vhost); - if (null === $vhostDto) { - continue; - } + $maxTime = max(0, (int) $this->option('max-time')); + $this->silent = !filter_var($this->option('with-output'), FILTER_VALIDATE_BOOLEAN); - $oldVhostIndex = array_search($vhostDto->getName(), $oldVhosts, true); - if (false !== $oldVhostIndex) { - unset($oldVhosts[$oldVhostIndex]); - } + $maxMemoryMb = max(0, (int) $this->option('max-memory')); + $maxMemoryBytes = $maxMemoryMb > 0 ? $maxMemoryMb * 1024 * 1024 : 0; + + $startedAt = microtime(true); + + while (true) { + $iterationStartedAt = microtime(true); + + $this->processVhosts(); + + $iterationDuration = microtime(true) - $iterationStartedAt; + $totalRuntime = microtime(true) - $startedAt; + $memoryUsage = memory_get_usage(true); + $memoryPeakUsage = memory_get_peak_usage(true); + + $this->line(sprintf( + 'Iteration finished in %.2f seconds (total runtime %.2f seconds). Memory usage: %s (peak %s).', + $iterationDuration, + $totalRuntime, + $this->formatBytes($memoryUsage), + $this->formatBytes($memoryPeakUsage) + ), 'warning', forcePrint: $sleep === 0); + + if ($sleep === 0) { + return; } - } else { - $this->warn('Vhosts not found.'); - } - $this->removeOldsVhosts($oldVhosts); + if ($maxTime > 0 && $totalRuntime >= $maxTime) { + $this->line(sprintf('Stopping: reached max runtime of %d seconds.', $maxTime), 'warning', forcePrint: true); + return; + } - if ($sleep > 0) { - $this->line(sprintf('Sleep %d seconds...', $sleep)); + if ($maxMemoryBytes > 0 && $memoryUsage >= $maxMemoryBytes) { + $this->line(sprintf( + 'Stopping: memory usage %s exceeded max threshold of %s.', + $this->formatBytes($memoryUsage), + $this->formatBytes($maxMemoryBytes) + ), 'warning', forcePrint: true); + return; + } + $this->line(sprintf('Sleep %d seconds...', $sleep)); sleep($sleep); - return $this->handle(); + } + } + + private function processVhosts(): void + { + $oldVhostsMap = array_flip($this->internalStorageManager->getVhosts()); + + foreach ($this->vhostsService->getAllVhosts() as $vhost) { + $vhostDto = $this->processVhost($vhost); + if (null === $vhostDto) { + continue; + } + + unset($oldVhostsMap[$vhostDto->getName()]); } - return Command::SUCCESS; + $this->removeOldsVhosts(array_keys($oldVhostsMap)); } /** @@ -135,6 +166,11 @@ private function processVhost(array $vhostApiData): ?VhostApiDto return $vhostDto; } + private function formatBytes(int $bytes): string + { + return number_format($bytes / (1024 * 1024), 2) . ' MB'; + } + /** * @param array $oldVhosts * @return void @@ -220,5 +256,11 @@ private function removeOldVhostQueues(VhostApiDto $vhostDto, array $oldVhostQueu )); } } -} + public function line($string, $style = null, $verbosity = null, $forcePrint = false): void + { + if (!$this->silent || $style === 'error' || $forcePrint) { + parent::line($string, $style, $verbosity); + } + } +} diff --git a/src/Consumer.php b/src/Consumer.php index 0f69893f..f78dc241 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -9,8 +9,9 @@ use PhpAmqpLib\Channel\AMQPChannel; use PhpAmqpLib\Exception\AMQPRuntimeException; use PhpAmqpLib\Message\AMQPMessage; -use Throwable; use Salesmessage\LibRabbitMQ\Queue\RabbitMQQueue; +use Salesmessage\LibRabbitMQ\Services\Deduplication\TransportLevel\DeduplicationService; +use Throwable; class Consumer extends Worker { @@ -122,7 +123,16 @@ function (AMQPMessage $message) use ($connection, $options, $connectionName, $qu $jobsProcessed++; - $this->runJob($job, $connectionName, $options); + /** @var DeduplicationService $transportDedupService */ + $transportDedupService = $this->container->make(DeduplicationService::class); + $transportDedupService->decorateWithDeduplication( + function () use ($job, $message, $connectionName, $queue, $options, $transportDedupService) { + $this->runJob($job, $connectionName, $options); + $transportDedupService->markAsProcessed($message, $queue); + }, + $message, + $queue + ); if ($this->supportsAsyncSignals()) { $this->resetTimeoutHandler(); diff --git a/src/Contracts/RabbitMQConsumable.php b/src/Contracts/RabbitMQConsumable.php new file mode 100644 index 00000000..62364e27 --- /dev/null +++ b/src/Contracts/RabbitMQConsumable.php @@ -0,0 +1,20 @@ +prepare($this->lastPushed ?? null)->value; + if (!isset($options['queue_type']) && isset($this->lastPushed) && is_object($this->lastPushed) && $this->lastPushed instanceof RabbitMQConsumable) { + $options['queue_type'] = $this->lastPushed->getQueueType(); + } + return tap(parent::pushRaw($payload, $queue, $options), function () use ($queue, $payload): void { $this->event($this->getQueue($queue), new JobPushed($payload)); }); @@ -64,7 +69,9 @@ public function later($delay, $job, $data = '', $queue = null): mixed { $payload = (new JobPayload($this->createPayload($job, $data)))->prepare($job)->value; - return tap(parent::laterRaw($delay, $payload, $queue), function () use ($payload, $queue): void { + $queueType = ($job instanceof RabbitMQConsumable) ? $job->getQueueType() : null; + + return tap(parent::laterRaw($delay, $payload, $queue, queueType: $queueType), function () use ($payload, $queue): void { $this->event($this->getQueue($queue), new JobPushed($payload)); }); } diff --git a/src/Interfaces/RabbitMQBatchable.php b/src/Interfaces/RabbitMQBatchable.php index f3abc6c5..fdffa5ca 100644 --- a/src/Interfaces/RabbitMQBatchable.php +++ b/src/Interfaces/RabbitMQBatchable.php @@ -4,11 +4,18 @@ interface RabbitMQBatchable { + /** + * Filter out jobs that have already been processed according to the application logic. + * + * @param list $batch + * @return list + */ + public static function getNotDuplicatedBatchedJobs(array $batch): array; + /** * Processing jobs array of static class * - * @param array $batch - * @return mixed + * @param list $batch */ public static function collection(array $batch): void; } diff --git a/src/LaravelLibRabbitMQServiceProvider.php b/src/LaravelLibRabbitMQServiceProvider.php index 7048f289..d8451bdb 100644 --- a/src/LaravelLibRabbitMQServiceProvider.php +++ b/src/LaravelLibRabbitMQServiceProvider.php @@ -3,12 +3,6 @@ namespace Salesmessage\LibRabbitMQ; use Illuminate\Contracts\Debug\ExceptionHandler; -use Illuminate\Queue\Connectors\BeanstalkdConnector; -use Illuminate\Queue\Connectors\DatabaseConnector; -use Illuminate\Queue\Connectors\NullConnector; -use Illuminate\Queue\Connectors\RedisConnector; -use Illuminate\Queue\Connectors\SqsConnector; -use Illuminate\Queue\Connectors\SyncConnector; use Illuminate\Queue\QueueManager; use Illuminate\Support\ServiceProvider; use Psr\Log\LoggerInterface; @@ -16,6 +10,10 @@ use Salesmessage\LibRabbitMQ\Console\ConsumeVhostsCommand; use Salesmessage\LibRabbitMQ\Console\ScanVhostsCommand; use Salesmessage\LibRabbitMQ\Queue\Connectors\RabbitMQVhostsConnector; +use Salesmessage\LibRabbitMQ\Services\Deduplication\TransportLevel\DeduplicationService; +use Salesmessage\LibRabbitMQ\Services\Deduplication\TransportLevel\DeduplicationStore; +use Salesmessage\LibRabbitMQ\Services\Deduplication\TransportLevel\NullDeduplicationStore; +use Salesmessage\LibRabbitMQ\Services\Deduplication\TransportLevel\RedisDeduplicationStore; use Salesmessage\LibRabbitMQ\Services\GroupsService; use Salesmessage\LibRabbitMQ\Services\InternalStorageManager; use Salesmessage\LibRabbitMQ\Services\QueueService; @@ -36,6 +34,8 @@ public function register(): void ); if ($this->app->runningInConsole()) { + $this->bindDeduplicationService(); + $this->app->singleton('rabbitmq.consumer', function () { $isDownForMaintenance = function () { return $this->app->isDownForMaintenance(); @@ -68,7 +68,8 @@ public function register(): void $this->app['events'], $this->app[ExceptionHandler::class], $isDownForMaintenance, - null + $this->app->get(DeduplicationService::class), + null, ); }); @@ -84,7 +85,8 @@ public function register(): void $this->app['events'], $this->app[ExceptionHandler::class], $isDownForMaintenance, - null + $this->app->get(DeduplicationService::class), + null, ); }); @@ -92,7 +94,7 @@ public function register(): void $consumerClass = ('direct' === config('queue.connections.rabbitmq_vhosts.consumer_type')) ? VhostsDirectConsumer::class : VhostsQueueConsumer::class; - + return new ConsumeVhostsCommand( $app[GroupsService::class], $app[$consumerClass], @@ -139,4 +141,35 @@ public function boot(): void return new RabbitMQVhostsConnector($this->app['events']); }); } + + /** + * Config params: + * @phpstan-import-type DeduplicationConfig from DeduplicationService + * + * @return void + */ + private function bindDeduplicationService(): void + { + $this->app->bind(DeduplicationStore::class, static function () { + /** @var DeduplicationConfig $config */ + $config = (array) config('queue.connections.rabbitmq_vhosts.deduplication.transport', []); + $enabled = (bool) ($config['enabled'] ?? false); + if (!$enabled) { + return new NullDeduplicationStore(); + } + + $connectionDriver = $config['connection']['driver'] ?? null; + if ($connectionDriver !== 'redis') { + throw new \InvalidArgumentException('For now only Redis connection is supported for deduplication'); + } + $connectionName = $config['connection']['name'] ?? null; + + $prefix = trim($config['connection']['key_prefix'] ?? ''); + if (empty($prefix)) { + throw new \InvalidArgumentException('Key prefix is required'); + } + + return new RedisDeduplicationStore($connectionName, $prefix); + }); + } } diff --git a/src/Queue/Jobs/RabbitMQJob.php b/src/Queue/Jobs/RabbitMQJob.php index 0622b8ec..a3c16b80 100644 --- a/src/Queue/Jobs/RabbitMQJob.php +++ b/src/Queue/Jobs/RabbitMQJob.php @@ -4,12 +4,14 @@ use Illuminate\Container\Container; use Illuminate\Contracts\Container\BindingResolutionException; +use Illuminate\Contracts\Encryption\Encrypter; use Illuminate\Contracts\Queue\Job as JobContract; use Illuminate\Queue\Jobs\Job; use Illuminate\Support\Arr; use PhpAmqpLib\Exception\AMQPProtocolChannelException; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; +use Salesmessage\LibRabbitMQ\Contracts\RabbitMQConsumable; use Salesmessage\LibRabbitMQ\Horizon\RabbitMQQueue as HorizonRabbitMQQueue; use Salesmessage\LibRabbitMQ\Queue\RabbitMQQueue; @@ -126,8 +128,13 @@ public function release($delay = 0): void { parent::release(); + $consumableJob = $this->getPayloadData(); + if (!($consumableJob instanceof RabbitMQConsumable)) { + throw new \RuntimeException('Job must be an instance of RabbitMQJobBatchable'); + } + // Always create a new message when this Job is released - $this->rabbitmq->laterRaw($delay, $this->message->getBody(), $this->queue, $this->attempts()); + $this->rabbitmq->laterRaw($delay, $this->message->getBody(), $this->queue, $this->attempts(), $consumableJob->getQueueType()); // Releasing a Job means the message was failed to process. // Because this Job message is always recreated and pushed as new message, this Job message is correctly handled. @@ -135,6 +142,39 @@ public function release($delay = 0): void $this->rabbitmq->ack($this); } + /** + * @return object + * @throws \RuntimeException + */ + public function getPayloadData(): object + { + $payload = $this->payload(); + + $data = $payload['data']; + + if (str_starts_with($data['command'], 'O:')) { + return unserialize($data['command']); + } + + if ($this->container->bound(Encrypter::class)) { + return unserialize($this->container[Encrypter::class]->decrypt($data['command'])); + } + + throw new \RuntimeException('Unable to extract job data.'); + } + + /** + * Returns target class name + * + * @return mixed + */ + public function getPayloadClass(): string + { + $payload = $this->payload(); + + return $payload['data']['commandName']; + } + /** * Get the underlying RabbitMQ connection. */ diff --git a/src/Queue/Jobs/RabbitMQJobBatchable.php b/src/Queue/Jobs/RabbitMQJobBatchable.php index 8015c2ad..278ae917 100644 --- a/src/Queue/Jobs/RabbitMQJobBatchable.php +++ b/src/Queue/Jobs/RabbitMQJobBatchable.php @@ -2,8 +2,6 @@ namespace Salesmessage\LibRabbitMQ\Queue\Jobs; -use Illuminate\Contracts\Encryption\Encrypter; -use Illuminate\Queue\Jobs\JobName; use Salesmessage\LibRabbitMQ\Queue\Jobs\RabbitMQJob as BaseJob; /** @@ -11,50 +9,4 @@ */ class RabbitMQJobBatchable extends BaseJob { - /** - * Fire the job. - * - * @return void - */ - public function fire() - { - $payload = $this->payload(); - - [$class, $method] = JobName::parse($payload['job']); - - ($this->instance = $this->resolve($class))->{$method}($this, $payload['data']); - } - - /** - * Returns target class name - * - * @return mixed - */ - public function getPayloadClass(): string - { - $payload = $this->payload(); - - return $payload['data']['commandName']; - } - - /** - * @return object - * @throws \RuntimeException - */ - public function getPayloadData(): object - { - $payload = $this->payload(); - - $data = $payload['data']; - - if (str_starts_with($data['command'], 'O:')) { - return unserialize($data['command']); - } - - if ($this->container->bound(Encrypter::class)) { - return unserialize($this->container[Encrypter::class]->decrypt($data['command'])); - } - - throw new \RuntimeException('Unable to extract job data.'); - } } diff --git a/src/Queue/QueueConfig.php b/src/Queue/QueueConfig.php index 1bf5802f..22d7f2fb 100644 --- a/src/Queue/QueueConfig.php +++ b/src/Queue/QueueConfig.php @@ -30,6 +30,10 @@ class QueueConfig protected bool $quorum = false; + protected ?int $quorumInitialGroupSize = null; + + protected string $quorumQueuePostfix = ''; + protected array $options = []; /** @@ -247,6 +251,41 @@ public function setQuorum($quorum): QueueConfig return $this; } + /** + * When set, used to declare quorum queues with a specific initial group size. + */ + public function getQuorumInitialGroupSize(): ?int + { + return $this->quorumInitialGroupSize; + } + + public function setQuorumInitialGroupSize(?int $size): self + { + if ($size === null) { + $this->quorumInitialGroupSize = null; + return $this; + } + + if ($size <= 0) { + throw new \InvalidArgumentException('Invalid quorum group size'); + } + + $this->quorumInitialGroupSize = $size; + + return $this; + } + + public function getQuorumQueuePostfix(): string + { + return $this->quorumQueuePostfix; + } + + public function setQuorumQueuePostfix(string $postfix): self + { + $this->quorumQueuePostfix = $postfix; + return $this; + } + /** * Holds all unknown queue options provided in the connection config */ diff --git a/src/Queue/QueueConfigFactory.php b/src/Queue/QueueConfigFactory.php index 04f29166..26d0bc2a 100644 --- a/src/Queue/QueueConfigFactory.php +++ b/src/Queue/QueueConfigFactory.php @@ -68,6 +68,15 @@ protected static function getOptionsFromConfig(QueueConfig $queueConfig, array $ $queueConfig->setQuorum($quorum); } + // Feature: Quorum initial group size + if (Arr::has($queueOptions, 'quorum_initial_group_size')) { + $queueConfig->setQuorumInitialGroupSize((int) Arr::pull($queueOptions, 'quorum_initial_group_size')); + } + + if ($quorumPostfix = (string) Arr::pull($queueOptions, 'quorum_queue_postfix')) { + $queueConfig->setQuorumQueuePostfix($quorumPostfix); + } + // All extra options not defined $queueConfig->setOptions($queueOptions); } diff --git a/src/Queue/RabbitMQQueue.php b/src/Queue/RabbitMQQueue.php index f0973d75..83ceb317 100644 --- a/src/Queue/RabbitMQQueue.php +++ b/src/Queue/RabbitMQQueue.php @@ -22,7 +22,9 @@ use PhpAmqpLib\Exchange\AMQPExchangeType; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; +use Ramsey\Uuid\Uuid; use RuntimeException; +use Salesmessage\LibRabbitMQ\Contracts\RabbitMQConsumable; use Throwable; use Salesmessage\LibRabbitMQ\Contracts\RabbitMQQueueContract; use Salesmessage\LibRabbitMQ\Queue\Jobs\RabbitMQJob; @@ -101,13 +103,21 @@ public function size($queue = null): int */ public function push($job, $data = '', $queue = null) { + if (!($job instanceof RabbitMQConsumable)) { + throw new \RuntimeException('Job must be an instance of RabbitMQConsumable'); + } + + $options = [ + 'queue_type' => $job->getQueueType(), + ]; + return $this->enqueueUsing( $job, $this->createPayload($job, $this->getQueue($queue), $data), $queue, null, - function ($payload, $queue) { - return $this->pushRaw($payload, $queue); + function ($payload, $queue) use ($options) { + return $this->pushRaw($payload, $queue, $options); } ); } @@ -119,9 +129,12 @@ function ($payload, $queue) { */ public function pushRaw($payload, $queue = null, array $options = []): int|string|null { + $queueType = $options['queue_type'] ?? null; + unset($options['queue_type']); + [$destination, $exchange, $exchangeType, $attempts] = $this->publishProperties($queue, $options); - $this->declareDestination($destination, $exchange, $exchangeType); + $this->declareDestination($destination, $exchange, $exchangeType, $queueType); [$message, $correlationId] = $this->createMessage($payload, $attempts); @@ -137,13 +150,19 @@ public function pushRaw($payload, $queue = null, array $options = []): int|strin */ public function later($delay, $job, $data = '', $queue = null): mixed { + if (!($job instanceof RabbitMQConsumable)) { + throw new \RuntimeException('Job must be an instance of RabbitMQConsumable'); + } + + $queueType = $job->getQueueType(); + return $this->enqueueUsing( $job, $this->createPayload($job, $this->getQueue($queue), $data), $queue, $delay, - function ($payload, $queue, $delay) { - return $this->laterRaw($delay, $payload, $queue); + function ($payload, $queue, $delay) use ($queueType) { + return $this->laterRaw($delay, $payload, $queue, queueType: $queueType); } ); } @@ -151,7 +170,7 @@ function ($payload, $queue, $delay) { /** * @throws AMQPProtocolChannelException */ - public function laterRaw($delay, string $payload, $queue = null, int $attempts = 0): int|string|null + public function laterRaw($delay, string $payload, $queue = null, int $attempts = 0, ?string $queueType = null): int|string|null { $ttl = $this->secondsUntil($delay) * 1000; @@ -160,12 +179,15 @@ public function laterRaw($delay, string $payload, $queue = null, int $attempts = // When no ttl just publish a new message to the exchange or queue if ($ttl <= 0) { + if ($queueType !== null) { + $options['queue_type'] = $queueType; + } return $this->pushRaw($payload, $queue, $options); } // Create a main queue to handle delayed messages [$mainDestination, $exchange, $exchangeType, $attempts] = $this->publishProperties($queue, $options); - $this->declareDestination($mainDestination, $exchange, $exchangeType); + $this->declareDestination($mainDestination, $exchange, $exchangeType, $queueType); $destination = $this->getQueue($queue).'.delay.'.$ttl; @@ -195,7 +217,13 @@ public function bulk($jobs, $data = '', $queue = null): void protected function publishBatch($jobs, $data = '', $queue = null): void { foreach ($jobs as $job) { - $this->bulkRaw($this->createPayload($job, $queue, $data), $queue, ['job' => $job]); + if (!($job instanceof RabbitMQConsumable)) { + throw new \RuntimeException('Job must be an instance of RabbitMQConsumable'); + } + $this->bulkRaw($this->createPayload($job, $queue, $data), $queue, [ + 'job' => $job, + 'queue_type' => $job->getQueueType(), + ]); } $this->batchPublish(); @@ -206,9 +234,12 @@ protected function publishBatch($jobs, $data = '', $queue = null): void */ public function bulkRaw(string $payload, string $queue = null, array $options = []): int|string|null { + $queueType = $options['queue_type'] ?? null; + unset($options['queue_type']); + [$destination, $exchange, $exchangeType, $attempts] = $this->publishProperties($queue, $options); - $this->declareDestination($destination, $exchange, $exchangeType); + $this->declareDestination($destination, $exchange, $exchangeType, $queueType); [$message, $correlationId] = $this->createMessage($payload, $attempts); @@ -521,6 +552,7 @@ protected function createMessage($payload, int $attempts = 0): array $currentPayload = json_decode($payload, true); if ($correlationId = $currentPayload['id'] ?? null) { $properties['correlation_id'] = $correlationId; + $properties['message_id'] = Uuid::uuid7()->toString(); } if ($this->getConfig()->isPrioritizeDelayed()) { @@ -597,15 +629,16 @@ public function close(): void /** * Get the Queue arguments. */ - protected function getQueueArguments(string $destination): array + protected function getQueueArguments(string $destination, ?string $queueType = null): array { + $isQuorum = $this->getConfig()->isQuorum() || $queueType === RabbitMQConsumable::MQ_TYPE_QUORUM; $arguments = []; // Messages without a priority property are treated as if their priority were 0. // Messages with a priority which is higher than the queue's maximum, are treated as if they were // published with the maximum priority. // Quorum queues does not support priority. - if ($this->getConfig()->isPrioritizeDelayed() && ! $this->getConfig()->isQuorum()) { + if ($this->getConfig()->isPrioritizeDelayed() && ! $isQuorum) { $arguments['x-max-priority'] = $this->getConfig()->getQueueMaxPriority(); } @@ -614,8 +647,14 @@ protected function getQueueArguments(string $destination): array $arguments['x-dead-letter-routing-key'] = $this->getFailedRoutingKey($destination); } - if ($this->getConfig()->isQuorum()) { + if ($isQuorum) { $arguments['x-queue-type'] = 'quorum'; + + // optional: initial group size for quorum queues + $initialGroupSize = $this->getConfig()->getQuorumInitialGroupSize(); + if ($initialGroupSize !== null) { + $arguments['x-quorum-initial-group-size'] = $initialGroupSize; + } } return $arguments; @@ -699,8 +738,12 @@ protected function isQueueDeclared(string $name): bool * * @throws AMQPProtocolChannelException */ - protected function declareDestination(string $destination, string $exchange = null, string $exchangeType = AMQPExchangeType::DIRECT): void - { + protected function declareDestination( + string $destination, + string $exchange = null, + string $exchangeType = AMQPExchangeType::DIRECT, + ?string $queueType = null, + ): void { // When an exchange is provided and no exchange is present in RabbitMQ, create an exchange. if ($exchange && ! $this->isExchangeExists($exchange)) { $this->declareExchange($exchange, $exchangeType); @@ -717,7 +760,7 @@ protected function declareDestination(string $destination, string $exchange = nu } // Create a queue for amq.direct publishing. - $this->declareQueue($destination, true, false, $this->getQueueArguments($destination)); + $this->declareQueue($destination, true, false, $this->getQueueArguments($destination, $queueType)); } /** diff --git a/src/Queue/RabbitMQQueueBatchable.php b/src/Queue/RabbitMQQueueBatchable.php index a596a6fc..d58b89a5 100644 --- a/src/Queue/RabbitMQQueueBatchable.php +++ b/src/Queue/RabbitMQQueueBatchable.php @@ -2,7 +2,7 @@ namespace Salesmessage\LibRabbitMQ\Queue; -use PhpAmqpLib\Connection\AbstractConnection; +use Salesmessage\LibRabbitMQ\Contracts\RabbitMQConsumable; use Salesmessage\LibRabbitMQ\Dto\ConnectionNameDto; use Salesmessage\LibRabbitMQ\Dto\QueueApiDto; use Salesmessage\LibRabbitMQ\Dto\VhostApiDto; @@ -41,8 +41,7 @@ protected function publishBasic( $mandatory = false, $immediate = false, $ticket = null - ): void - { + ): void { try { parent::publishBasic($msg, $exchange, $destination, $mandatory, $immediate, $ticket); } catch (AMQPConnectionClosedException|AMQPChannelClosedException) { @@ -77,7 +76,20 @@ protected function createChannel(): AMQPChannel public function push($job, $data = '', $queue = null) { - $queue = $queue ?: $job->onQueue(); + if (!($job instanceof RabbitMQConsumable)) { + throw new \InvalidArgumentException('Job must implement RabbitMQConsumable'); + } + + if (!$queue) { + if (!method_exists($job, 'onQueue')) { + throw new \InvalidArgumentException('Job must implement onQueue method'); + } + $queue = $job->onQueue(); + } + + if ($job->getQueueType() === RabbitMQConsumable::MQ_TYPE_QUORUM) { + $queue .= $this->getConfig()->getQuorumQueuePostfix(); + } try { $result = parent::push($job, $data, $queue); @@ -101,11 +113,6 @@ public function push($job, $data = '', $queue = null) return $result; } - public function pushRaw($payload, $queue = null, array $options = []): int|string|null - { - return parent::pushRaw($payload, $queue, $options); - } - /** * @return bool * @throws \GuzzleHttp\Exception\GuzzleException @@ -175,4 +182,3 @@ private function isVhostFailedException(AMQPConnectionClosedException $exception return false; } } - diff --git a/src/Services/Api/RabbitApiClient.php b/src/Services/Api/RabbitApiClient.php index dfd16f1e..0f8fd3bc 100644 --- a/src/Services/Api/RabbitApiClient.php +++ b/src/Services/Api/RabbitApiClient.php @@ -70,7 +70,7 @@ public function request( $contents = $response->getBody()->getContents(); return (array) ($contents ? json_decode($contents, true) : []); - } catch (Throwable $exception) { + } catch (\Throwable $exception) { $rethrowException = $exception; if ($exception instanceof ClientException) { $rethrowException = new RabbitApiClientException($exception->getMessage()); @@ -109,4 +109,4 @@ private function getPassword(): string { return (string) ($this->connectionConfig['hosts'][0]['password'] ?? ''); } -} \ No newline at end of file +} diff --git a/src/Services/Deduplication/AppDeduplicationService.php b/src/Services/Deduplication/AppDeduplicationService.php new file mode 100644 index 00000000..2187557d --- /dev/null +++ b/src/Services/Deduplication/AppDeduplicationService.php @@ -0,0 +1,18 @@ +getState($message, $queueName); + try { + if ($messageState === DeduplicationService::IN_PROGRESS) { + $action = $this->applyActionOnLock($message); + $this->logger->warning('DeduplicationService.message_already_in_progress', [ + 'action' => $action, + 'message_id' => $message->get_properties()['message_id'] ?? null, + ]); + + return false; + } + + if ($messageState === DeduplicationService::PROCESSED) { + $action = $this->applyActionOnDuplication($message); + $this->logger->warning('DeduplicationService.message_already_processed', [ + 'action' => $action, + 'message_id' => $message->get_properties()['message_id'] ?? null, + ]); + + return false; + } + + $hasPutAsInProgress = $this->markAsInProgress($message, $queueName); + if ($hasPutAsInProgress === false) { + $action = $this->applyActionOnLock($message); + $this->logger->warning('DeduplicationService.message_already_in_progress.skip', [ + 'action' => $action, + 'message_id' => $message->get_properties()['message_id'] ?? null, + ]); + + return false; + } + + $handler(); + } catch (\Throwable $exception) { + if ($messageState === null) { + $this->release($message, $queueName); + } + + $this->logger->error('DeduplicationService.message_processing_exception', [ + 'released_message_id' => $message->get_properties()['message_id'] ?? null, + ]); + + throw $exception; + } + + return true; + } + + /** + * @param AMQPMessage $message + * @return string|null - @enum {self::IN_PROGRESS, self::PROCESSED} + */ + public function getState(AMQPMessage $message, ?string $queueName = null): ?string + { + if (!$this->isEnabled()) { + return null; + } + $messageId = $this->getMessageId($message, $queueName); + if ($messageId === null) { + return null; + } + + return $this->store->get($messageId); + } + + public function markAsProcessed(AMQPMessage $message, ?string $queueName = null): bool + { + $ttl = (int) ($this->getConfig('ttl') ?: self::DEFAULT_TTL); + if ($ttl <= 0 || $ttl > self::MAX_TTL) { + throw new \InvalidArgumentException(sprintf('Invalid TTL seconds. Should be between 1 sec and %d sec', self::MAX_TTL)); + } + + return $this->add($message, self::PROCESSED, $ttl, $queueName); + } + + public function release(AMQPMessage $message, ?string $queueName = null): void + { + if (!$this->isEnabled()) { + return; + } + + $messageId = $this->getMessageId($message, $queueName); + if ($messageId === null) { + return; + } + + $this->store->release($messageId); + } + + protected function markAsInProgress(AMQPMessage $message, ?string $queueName = null): bool + { + $ttl = (int) ($this->getConfig('lock_ttl') ?: self::DEFAULT_LOCK_TTL); + if ($ttl <= 0 || $ttl > self::MAX_LOCK_TTL) { + throw new \InvalidArgumentException(sprintf('Invalid TTL seconds. Should be between 1 and %d', self::MAX_LOCK_TTL)); + } + + return $this->add($message, self::IN_PROGRESS, $ttl, $queueName); + } + + /** + * Returns "true" if the message was not processed previously, and it's successfully been added to the store. + * Returns "false" if the message was already processed and it's a duplicate. + * + * @param AMQPMessage $message + * @param string $value + * @param int $ttl + * @return bool + */ + protected function add(AMQPMessage $message, string $value, int $ttl, ?string $queueName = null): bool + { + if (!$this->isEnabled()) { + return true; + } + + $messageId = $this->getMessageId($message, $queueName); + if ($messageId === null) { + return true; + } + + return $this->store->set($messageId, $value, $ttl, $value === self::PROCESSED); + } + + protected function getMessageId(AMQPMessage $message, ?string $queueName = null): ?string + { + $props = $message->get_properties(); + $messageId = $props['message_id'] ?? null; + if (!is_string($messageId) || empty($messageId)) { + return null; + } + + if (DlqDetector::isDlqMessage($message)) { + $messageId = 'dlq:' . $messageId; + } + + if (is_string($queueName) && $queueName !== '') { + $messageId = $queueName . ':' . $messageId; + } + + return $messageId; + } + + protected function applyActionOnLock(AMQPMessage $message): string + { + $action = $this->getConfig('action_on_lock', self::ACTION_REQUEUE); + if ($action === self::ACTION_REJECT) { + $message->reject(false); + } elseif ($action === self::ACTION_ACK) { + $message->ack(); + } else { + $action = $this->republishLockedMessage($message); + } + + return $action; + } + + protected function applyActionOnDuplication(AMQPMessage $message): string + { + $action = $this->getConfig('action_on_duplication', self::ACTION_ACK); + if ($action === self::ACTION_REJECT) { + $message->reject(false); + } else { + $message->ack(); + } + + return $action; + } + + /** + * Such a situation normally should not happen or can happen very rarely. + * Republish the locked message with a retry-count guard. + * It's necessary to avoid infinite redelivery loop. + * + * @param AMQPMessage $message + * @return string + */ + protected function republishLockedMessage(AMQPMessage $message): string + { + $props = $message->get_properties(); + $headers = []; + if (($props['application_headers'] ?? null) instanceof AMQPTable) { + $headers = $props['application_headers']->getNativeData(); + } + + $attempts = (int) ($headers[self::HEADER_LOCK_REQUEUE_COUNT] ?? 0); + ++$attempts; + + $maxAttempts = ((int) ($this->getConfig('lock_ttl', 30))) / self::WAIT_AFTER_PUBLISH; + if ($attempts > $maxAttempts) { + $this->logger->warning('DeduplicationService.republishLockedMessage.max_attempts_reached', [ + 'message_id' => $props['message_id'] ?? null, + ]); + $message->ack(); + + return self::ACTION_ACK; + } + + $headers[self::HEADER_LOCK_REQUEUE_COUNT] = $attempts; + + $newProps = $props; + $newProps['application_headers'] = new AMQPTable($headers); + + $newMessage = new AMQPMessage($message->getBody(), $newProps); + $channel = $message->getChannel(); + $channel->basic_publish($newMessage, $message->getExchange(), $message->getRoutingKey()); + + $this->logger->warning('DeduplicationService.republishLockedMessage.republish', [ + 'message_id' => $props['message_id'] ?? null, + 'attempts' => $attempts, + ]); + $message->ack(); + // it's necessary to avoid a high redelivery rate + // normally, such a situation is not expected (or expected very rarely) + sleep(self::WAIT_AFTER_PUBLISH); + + return self::ACTION_REQUEUE; + } + + protected function isEnabled(): bool + { + return (bool) $this->getConfig('enabled', false); + } + + protected function getConfig(string $key, mixed $default = null): mixed + { + $value = config("queue.connections.rabbitmq_vhosts.deduplication.transport.$key"); + + return $value !== null ? $value : $default; + } +} diff --git a/src/Services/Deduplication/TransportLevel/DeduplicationStore.php b/src/Services/Deduplication/TransportLevel/DeduplicationStore.php new file mode 100644 index 00000000..014e639f --- /dev/null +++ b/src/Services/Deduplication/TransportLevel/DeduplicationStore.php @@ -0,0 +1,12 @@ +getKey($messageKey); + return $this->connection()->get($key); + } + + public function set(string $messageKey, mixed $value, int $ttlSeconds, bool $withOverride = false): bool + { + if ($ttlSeconds <= 0) { + throw new \InvalidArgumentException('Invalid TTL seconds. Should be greater than 0.'); + } + + $key = $this->getKey($messageKey); + $args = [$key, $value, 'EX', $ttlSeconds]; + if (!$withOverride) { + $args[] = 'NX'; + } + + return (bool) $this->connection()->set(...$args); + } + + public function release(string $messageKey): void + { + $key = $this->getKey($messageKey); + $this->connection()->del($key); + } + + protected function connection(): Connection + { + return $this->connectionName ? Redis::connection($this->connectionName) : Redis::connection(); + } + + protected function getKey(string $messageKey): string + { + return $this->keyPrefix . ':' . $messageKey; + } +} diff --git a/src/Services/DlqDetector.php b/src/Services/DlqDetector.php new file mode 100644 index 00000000..997fb2f7 --- /dev/null +++ b/src/Services/DlqDetector.php @@ -0,0 +1,22 @@ +get_properties()['application_headers'] ?? null; + + if (!($headersTable instanceof AMQPTable)) { + return false; + } + + $headers = $headersTable->getNativeData(); + + return !empty($headers['x-death']) && !empty($headers['x-opt-deaths']); + } +} diff --git a/src/Services/InternalStorageManager.php b/src/Services/InternalStorageManager.php index 9e0cd8a0..2708f859 100644 --- a/src/Services/InternalStorageManager.php +++ b/src/Services/InternalStorageManager.php @@ -4,6 +4,7 @@ use Illuminate\Redis\Connections\PredisConnection; use Illuminate\Support\Facades\Redis; +use Illuminate\Support\Str; use Salesmessage\LibRabbitMQ\Dto\QueueApiDto; use Salesmessage\LibRabbitMQ\Dto\VhostApiDto; @@ -38,7 +39,7 @@ public function getVhosts(string $by = 'name', bool $alpha = true): array 'sort' => 'asc', ]); - return array_map(fn($value): string => str_replace_first( + return array_map(fn($value): string => Str::replaceFirst( $this->getVhostStorageKeyPrefix(), '', $value @@ -61,7 +62,7 @@ public function getVhostQueues(string $vhostName, string $by = 'name', bool $alp 'sort' => 'asc', ]); - return array_map(fn($value): string => str_replace_first( + return array_map(fn($value): string => Str::replaceFirst( $this->getQueueStorageKeyPrefix($vhostName), '', $value diff --git a/src/Services/VhostsService.php b/src/Services/VhostsService.php index 994373e4..aa933f9e 100644 --- a/src/Services/VhostsService.php +++ b/src/Services/VhostsService.php @@ -2,7 +2,6 @@ namespace Salesmessage\LibRabbitMQ\Services; -use Illuminate\Support\Collection; use Psr\Log\LoggerInterface; use Salesmessage\LibRabbitMQ\Services\Api\RabbitApiClient; use Throwable; @@ -23,52 +22,37 @@ public function __construct( $this->rabbitApiClient->setConnectionConfig($connectionConfig); } - /** - * @param int $page - * @param int $pageSize - * @param Collection|null $vhosts - * @return Collection - */ - public function getAllVhosts( - int $page = 1, - int $pageSize = 500, - ?Collection $vhosts = null, - ): Collection + public function getAllVhosts(int $fromPage = 1): \Generator { - if (null === $vhosts) { - $vhosts = new Collection(); - } - - try { + while (true) { $data = $this->rabbitApiClient->request( 'GET', '/api/vhosts', [ - 'page' => $page, - 'page_size' => $pageSize, + 'page' => $fromPage, + 'page_size' => 500, 'columns' => 'name,messages,messages_ready,messages_unacknowledged', ]); - } catch (Throwable $exception) { - $this->logger->warning('Salesmessage.LibRabbitMQ.Services.VhostsService.getAllVhosts.exception', [ - 'message' => $exception->getMessage(), - 'code' => $exception->getCode(), - 'trace' => $exception->getTraceAsString(), - ]); - - $data = []; - } - $items = (array) ($data['items'] ?? []); - if (!empty($items)) { - $vhosts->push(...$items); + $items = $data['items'] ?? []; + if (!is_array($items) || !isset($data['page_count'])) { + throw new \LogicException('Unexpected response from RabbitMQ API'); + } + if (empty($items)) { + break; + } + + foreach ($items as $item) { + yield $item; + } + + $nextPage = $fromPage + 1; + $totalPages = (int) $data['page_count']; + if ($nextPage > $totalPages) { + break; + } + + $fromPage = $nextPage; } - - $nextPage = $page + 1; - $lastPage = (int) ($data['page_count'] ?? 1); - if ($lastPage >= $nextPage) { - return $this->getAllVhosts($nextPage, $pageSize, $vhosts); - } - - return $vhosts; } /** @@ -120,7 +104,7 @@ public function createVhostForOrganization(int $organizationId): bool { $vhostName = $this->getVhostName($organizationId); $description = $this->getVhostDescription($organizationId); - + return $this->createVhost($vhostName, $description); } diff --git a/src/VhostsConsumers/AbstractVhostsConsumer.php b/src/VhostsConsumers/AbstractVhostsConsumer.php index 2365db94..ff30cf41 100644 --- a/src/VhostsConsumers/AbstractVhostsConsumer.php +++ b/src/VhostsConsumers/AbstractVhostsConsumer.php @@ -8,27 +8,33 @@ use Illuminate\Queue\QueueManager; use Illuminate\Queue\WorkerOptions; use Illuminate\Support\Str; +use PhpAmqpLib\Channel\AMQPChannel; +use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Exception\AMQPChannelClosedException; use PhpAmqpLib\Exception\AMQPConnectionClosedException; -use PhpAmqpLib\Exception\AMQPProtocolChannelException; -use PhpAmqpLib\Exception\AMQPRuntimeException; use PhpAmqpLib\Message\AMQPMessage; use Psr\Log\LoggerInterface; use Salesmessage\LibRabbitMQ\Consumer; +use Salesmessage\LibRabbitMQ\Contracts\RabbitMQConsumable; use Salesmessage\LibRabbitMQ\Dto\ConnectionNameDto; use Salesmessage\LibRabbitMQ\Dto\ConsumeVhostsFiltersDto; use Salesmessage\LibRabbitMQ\Dto\QueueApiDto; use Salesmessage\LibRabbitMQ\Dto\VhostApiDto; +use Salesmessage\LibRabbitMQ\Exceptions\MutexTimeout; use Salesmessage\LibRabbitMQ\Interfaces\RabbitMQBatchable; use Salesmessage\LibRabbitMQ\Mutex; use Salesmessage\LibRabbitMQ\Queue\Jobs\RabbitMQJob; use Salesmessage\LibRabbitMQ\Queue\RabbitMQQueue; +use Salesmessage\LibRabbitMQ\Services\Deduplication\AppDeduplicationService; +use Salesmessage\LibRabbitMQ\Services\Deduplication\TransportLevel\DeduplicationService as TransportDeduplicationService; use Salesmessage\LibRabbitMQ\Services\InternalStorageManager; abstract class AbstractVhostsConsumer extends Consumer { protected const MAIN_HANDLER_LOCK = 'vhost_handler'; + protected const HEALTHCHECK_HANDLER_LOCK = 'healthcheck_vhost_handler'; + protected ?OutputStyle $output = null; protected ?ConsumeVhostsFiltersDto $filtersDto = null; @@ -49,16 +55,27 @@ abstract class AbstractVhostsConsumer extends Consumer protected ?WorkerOptions $workerOptions = null; + /** @var array, array> */ protected array $batchMessages = []; protected ?string $processingUuid = null; protected int|float $processingStartedAt = 0; + protected int $totalJobsProcessed = 0; + protected int $jobsProcessed = 0; protected bool $hadJobs = false; + protected ?int $stopStatusCode = null; + + protected array $config = []; + + protected bool $asyncMode = false; + + protected ?Mutex $connectionMutex = null; + /** * @param InternalStorageManager $internalStorageManager * @param LoggerInterface $logger @@ -66,6 +83,7 @@ abstract class AbstractVhostsConsumer extends Consumer * @param Dispatcher $events * @param ExceptionHandler $exceptions * @param callable $isDownForMaintenance + * @param TransportDeduplicationService $transportDeduplicationService * @param callable|null $resetScope */ public function __construct( @@ -75,7 +93,8 @@ public function __construct( Dispatcher $events, ExceptionHandler $exceptions, callable $isDownForMaintenance, - callable $resetScope = null + protected TransportDeduplicationService $transportDeduplicationService, + callable $resetScope = null, ) { parent::__construct($manager, $events, $exceptions, $isDownForMaintenance, $resetScope); } @@ -110,12 +129,30 @@ public function setBatchSize(int $batchSize): self return $this; } + /** + * @param array $config + * @return $this + */ + public function setConfig(array $config): self + { + $this->config = $config; + return $this; + } + + /** + * @param bool $asyncMode + * @return $this + */ + public function setAsyncMode(bool $asyncMode): self + { + $this->asyncMode = $asyncMode; + return $this; + } + public function daemon($connectionName, $queue, WorkerOptions $options) { $this->goAheadOrWait(); - $this->connectionMutex = new Mutex(false); - $this->configConnectionName = (string) $connectionName; $this->workerOptions = $options; @@ -123,6 +160,42 @@ public function daemon($connectionName, $queue, WorkerOptions $options) $this->listenForSignals(); } + if ($this->asyncMode) { + $this->logInfo('daemon.AsyncMode.On'); + + $coroutineContextHandler = function () use ($connectionName, $options) { + $this->logInfo('daemon.AsyncMode.Coroutines.Running'); + + // we can't move it outside since Mutex should be created within coroutine context + $this->connectionMutex = new Mutex(true); + $this->startHeartbeatCheck(); + \go(function () use ($connectionName, $options) { + $this->vhostDaemon($connectionName, $options); + }); + }; + + if (extension_loaded('swoole')) { + $this->logInfo('daemon.AsyncMode.Swoole'); + + \Co\run($coroutineContextHandler); + } elseif (extension_loaded('openswoole')) { + $this->logInfo('daemon.AsyncMode.OpenSwoole'); + + \OpenSwoole\Runtime::enableCoroutine(true, \OpenSwoole\Runtime::HOOK_ALL); + \co::run($coroutineContextHandler); + } else { + $this->logError('daemon.AsyncMode.IsNotSupported'); + + throw new \Exception('Async mode is not supported. Check if Swoole extension is installed'); + } + + return; + } + + $this->logInfo('daemon.AsyncMode.Off'); + + $this->connectionMutex = new Mutex(false); + $this->startHeartbeatCheck(); $this->vhostDaemon($connectionName, $options); } @@ -156,7 +229,7 @@ protected function getStopStatus( /** * @return RabbitMQQueue - * @throws Exceptions\MutexTimeout + * @throws MutexTimeout */ abstract protected function startConsuming(): RabbitMQQueue; @@ -173,14 +246,15 @@ protected function processAmqpMessage(AMQPMessage $message, RabbitMQQueue $conne $this->addMessageToBatch($message); } else { $job = $this->getJobByMessage($message, $connection); - $this->processSingleJob($job); + $this->processSingleJob($job, $message); } $this->jobsProcessed++; + $this->totalJobsProcessed++; $this->logInfo('processAMQPMessage.message_consumed', [ 'processed_jobs_count' => $this->jobsProcessed, - 'is_support_batching' => $isSupportBatching, + 'is_support_batching' => $isSupportBatching ? 'Y' :'N', ]); } @@ -194,18 +268,23 @@ protected function generateProcessingUuid(): string /** * @param AMQPMessage $message - * @return string + * @return non-empty-string */ protected function getMessageClass(AMQPMessage $message): string { $body = json_decode($message->getBody(), true); - return (string) ($body['data']['commandName'] ?? ''); + $messageClass = (string) ($body['data']['commandName'] ?? ''); + if (empty($messageClass)) { + throw new \RuntimeException('Message class is not defined'); + } + return $messageClass; } /** - * @param RabbitMQJob $job - * @return void + * @param AMQPMessage $message + * @return bool + * @throws \ReflectionException */ protected function isSupportBatching(AMQPMessage $message): bool { @@ -228,8 +307,8 @@ protected function addMessageToBatch(AMQPMessage $message): void /** * @param RabbitMQQueue $connection * @return void - * @throws Exceptions\MutexTimeout - * @throws Throwable + * @throws MutexTimeout + * @throws \Throwable */ protected function processBatch(RabbitMQQueue $connection): void { @@ -239,33 +318,52 @@ protected function processBatch(RabbitMQQueue $connection): void foreach ($this->batchMessages as $batchJobClass => $batchJobMessages) { $isBatchSuccess = false; - $batchSize = count($batchJobMessages); + if ($batchSize > 1) { $batchTimeStarted = microtime(true); + $uniqueMessagesForProcessing = []; $batchData = []; - /** @var AMQPMessage $batchMessage */ foreach ($batchJobMessages as $batchMessage) { - $job = $this->getJobByMessage($batchMessage, $connection); - $batchData[] = $job->getPayloadData(); + $this->transportDeduplicationService->decorateWithDeduplication( + function () use ($batchMessage, $connection, &$uniqueMessagesForProcessing, &$batchData) { + $job = $this->getJobByMessage($batchMessage, $connection); + $uniqueMessagesForProcessing[] = $batchMessage; + $batchData[] = $job->getPayloadData(); + }, + $batchMessage, + $this->currentQueueName + ); } - $this->logInfo('processBatch.start', [ - 'batch_job_class' => $batchJobClass, - 'batch_size' => $batchSize, - ]); - try { - $batchJobClass::collection($batchData); + if (AppDeduplicationService::isEnabled()) { + /** @var RabbitMQBatchable $batchJobClass */ + $batchData = $batchJobClass::getNotDuplicatedBatchedJobs($batchData); + } + + if (!empty($batchData)) { + $this->logInfo('processBatch.start', [ + 'batch_job_class' => $batchJobClass, + 'batch_size' => $batchSize, + ]); + + $batchJobClass::collection($batchData); + + $this->logInfo('processBatch.finish', [ + 'batch_job_class' => $batchJobClass, + 'batch_size' => $batchSize, + 'executive_batch_time_seconds' => microtime(true) - $batchTimeStarted, + ]); + } + $isBatchSuccess = true; + } catch (\Throwable $exception) { + foreach ($uniqueMessagesForProcessing as $batchMessage) { + $this->transportDeduplicationService->release($batchMessage, $this->currentQueueName); + } - $this->logInfo('processBatch.finish', [ - 'batch_job_class' => $batchJobClass, - 'batch_size' => $batchSize, - 'executive_batch_time_seconds' => microtime(true) - $batchTimeStarted, - ]); - } catch (Throwable $exception) { $isBatchSuccess = false; $this->logError('processBatch.exception', [ @@ -277,19 +375,28 @@ protected function processBatch(RabbitMQQueue $connection): void } unset($batchData); + } else { + $uniqueMessagesForProcessing = $batchJobMessages; } $this->connectionMutex->lock(static::MAIN_HANDLER_LOCK); - if ($isBatchSuccess) { - $lastBatchMessage = end($batchJobMessages); - $this->ackMessage($lastBatchMessage, true); - } else { - foreach ($batchJobMessages as $batchMessage) { - $job = $this->getJobByMessage($batchMessage, $connection); - $this->processSingleJob($job); + try { + if ($isBatchSuccess && !empty($uniqueMessagesForProcessing)) { + foreach ($uniqueMessagesForProcessing as $batchMessage) { + $this->transportDeduplicationService?->markAsProcessed($batchMessage, $this->currentQueueName); + } + + $lastBatchMessage = end($uniqueMessagesForProcessing); + $this->ackMessage($lastBatchMessage, true); + } else { + foreach ($uniqueMessagesForProcessing as $batchMessage) { + $job = $this->getJobByMessage($batchMessage, $connection); + $this->processSingleJob($job, $batchMessage); + } } + } finally { + $this->connectionMutex->unlock(static::MAIN_HANDLER_LOCK); } - $this->connectionMutex->unlock(static::MAIN_HANDLER_LOCK); } $this->updateLastProcessedAt(); @@ -300,26 +407,28 @@ protected function processBatch(RabbitMQQueue $connection): void * @param AMQPMessage $message * @param RabbitMQQueue $connection * @return RabbitMQJob - * @throws Throwable + * @throws \Throwable */ protected function getJobByMessage(AMQPMessage $message, RabbitMQQueue $connection): RabbitMQJob { $jobClass = $connection->getJobClass(); - return new $jobClass( + $job = new $jobClass( $this->container, $connection, $message, $this->currentConnectionName, $this->currentQueueName ); + + if (!is_subclass_of($job->getPayloadClass(), RabbitMQConsumable::class)) { + throw new \RuntimeException(sprintf('Job class %s must implement %s', $job->getPayloadClass(), RabbitMQConsumable::class)); + } + + return $job; } - /** - * @param RabbitMQJob $job - * @return void - */ - protected function processSingleJob(RabbitMQJob $job): void + protected function processSingleJob(RabbitMQJob $job, AMQPMessage $message): void { $timeStarted = microtime(true); $this->logInfo('processSingleJob.start'); @@ -328,7 +437,22 @@ protected function processSingleJob(RabbitMQJob $job): void $this->registerTimeoutHandler($job, $this->workerOptions); } - $this->runJob($job, $this->currentConnectionName, $this->workerOptions); + $this->transportDeduplicationService->decorateWithDeduplication( + function () use ($job, $message) { + if (AppDeduplicationService::isEnabled() && $job->getPayloadData()->isDuplicated()) { + $this->logWarning('processSingleJob.job_is_duplicated'); + $this->ackMessage($message); + + } else { + $this->runJob($job, $this->currentConnectionName, $this->workerOptions); + } + + $this->transportDeduplicationService->markAsProcessed($message, $this->currentQueueName); + }, + $message, + $this->currentQueueName, + ); + $this->updateLastProcessedAt(); if ($this->supportsAsyncSignals()) { @@ -347,9 +471,13 @@ protected function processSingleJob(RabbitMQJob $job): void */ protected function ackMessage(AMQPMessage $message, bool $multiple = false): void { + $this->logInfo('ackMessage.start', [ + 'multiple' => $multiple, + ]); + try { $message->ack($multiple); - } catch (Throwable $exception) { + } catch (\Throwable $exception) { $this->logError('ackMessage.exception', [ 'message' => $exception->getMessage(), 'trace' => $exception->getTraceAsString(), @@ -360,7 +488,7 @@ protected function ackMessage(AMQPMessage $message, bool $multiple = false): voi /** * @return void - * @throws Exceptions\MutexTimeout + * @throws MutexTimeout */ abstract protected function stopConsuming(): void; @@ -369,6 +497,8 @@ abstract protected function stopConsuming(): void; */ protected function loadVhosts(): void { + $this->logInfo('loadVhosts.start'); + $group = $this->filtersDto->getGroup(); $lastProcessedAtKey = $this->internalStorageManager->getLastProcessedAtKeyName($group); @@ -415,6 +545,9 @@ protected function switchToNextVhost(): bool } $this->currentQueueName = $nextQueue; + + $this->logInfo('switchToNextVhost.success'); + return true; } @@ -440,6 +573,8 @@ protected function getNextVhost(): ?string */ protected function loadVhostQueues(): void { + $this->logInfo('loadVhostQueues.start'); + $group = $this->filtersDto->getGroup(); $lastProcessedAtKey = $this->internalStorageManager->getLastProcessedAtKeyName($group); @@ -476,6 +611,9 @@ protected function switchToNextQueue(): bool } $this->currentQueueName = $nextQueue; + + $this->logInfo('switchToNextQueue.success'); + return true; } @@ -504,20 +642,26 @@ protected function goAheadOrWait(int $waitSeconds = 1): bool { if (false === $this->goAhead()) { if (!$this->hadJobs) { - $this->output->warning(sprintf('No jobs during iteration. Wait %d seconds...', $waitSeconds)); + $this->logWarning('goAheadOrWait.no_jobs_during_iteration', [ + 'wait_seconds' => $waitSeconds, + ]); + $this->sleep($waitSeconds); } $this->loadVhosts(); $this->hadJobs = false; if (empty($this->vhosts)) { - $this->output->warning(sprintf('No active vhosts. Wait %d seconds...', $waitSeconds)); + $this->logWarning('goAheadOrWait.no_active_vhosts', [ + 'wait_seconds' => $waitSeconds, + ]); + $this->sleep($waitSeconds); return $this->goAheadOrWait($waitSeconds); } - $this->output->info('Starting from the first vhost...'); + $this->logInfo('goAheadOrWait.starting_from_the_first_vhost'); return $this->goAheadOrWait($waitSeconds); } @@ -543,12 +687,14 @@ protected function goAhead(): bool /** * @return void */ - protected function updateLastProcessedAt() + protected function updateLastProcessedAt(): void { if ((null === $this->currentVhostName) || (null === $this->currentQueueName)) { return; } + $this->logInfo('updateLastProcessedAt.start'); + $group = $this->filtersDto->getGroup(); $timestamp = time(); @@ -589,6 +735,7 @@ protected function initConnection(): RabbitMQQueue ); try { + /** @var AMQPChannel $channel */ $channel = $connection->getChannel(true); $this->currentConnectionName = $connection->getConnectionName(); @@ -599,7 +746,6 @@ protected function initConnection(): RabbitMQQueue $this->prefetchCount, false ); - $this->connectionMutex->unlock(self::MAIN_HANDLER_LOCK); $this->channel = $channel; $this->connection = $connection; @@ -618,11 +764,94 @@ protected function initConnection(): RabbitMQQueue $this->goAheadOrWait(); return $this->initConnection(); + } finally { + $this->connectionMutex->unlock(self::MAIN_HANDLER_LOCK); } return $connection; } + /** + * @return void + */ + protected function startHeartbeatCheck(): void + { + if (false === $this->asyncMode) { + return; + } + + $heartbeatInterval = (int) ($this->config['options']['heartbeat'] ?? 0); + if (!$heartbeatInterval) { + $this->logWarning('startHeartbeatCheck.heartbeat_interval_is_not_set'); + return; + } + + $this->logInfo('startHeartbeatCheck.start', [ + 'heartbeat_interval' => $heartbeatInterval, + ]); + + $heartbeatHandler = function () { + if ($this->shouldQuit || (null !== $this->stopStatusCode)) { + $this->logWarning('startHeartbeatCheck.quit', [ + 'should_quit' => $this->shouldQuit, + 'stop_status_code' => $this->stopStatusCode, + ]); + + return; + } + + try { + /** @var AMQPStreamConnection|null $connection */ + $connection = $this->connection?->getConnection(); + if ((null === $connection) + || (false === $connection->isConnected()) + || $connection->isWriting() + || $connection->isBlocked() + ) { + $this->logWarning('startHeartbeatCheck.incorrect_connection', [ + 'has_connection' => (null !== $connection) ? 'Y' : 'N', + 'is_connected' => $connection?->isConnected() ? 'Y' : 'N', + 'is_writing' => $connection->isWriting() ? 'Y' : 'N', + 'is_blocked' => $connection->isBlocked() ? 'Y' : 'N', + ]); + + return; + } + + $this->connectionMutex->lock(static::HEALTHCHECK_HANDLER_LOCK, 3); + $connection->checkHeartBeat(); + } catch (MutexTimeout) { + $this->logWarning('startHeartbeatCheck.mutex_timeout'); + } catch (\Throwable $exception) { + $this->logError('startHeartbeatCheck.exception', [ + 'error' => $exception->getMessage(), + 'trace' => $exception->getTraceAsString(), + ]); + + $this->shouldQuit = true; + } finally { + $this->connectionMutex->unlock(static::HEALTHCHECK_HANDLER_LOCK); + } + }; + + \go(function () use ($heartbeatHandler, $heartbeatInterval) { + $this->logInfo('startHeartbeatCheck.started'); + + while (true) { + sleep($heartbeatInterval); + $heartbeatHandler(); + if ($this->shouldQuit || !is_null($this->stopStatusCode)) { + $this->logWarning('startHeartbeatCheck.go_quit', [ + 'should_quit' => $this->shouldQuit, + 'stop_status_code' => $this->stopStatusCode, + ]); + + return; + } + } + }); + } + /** * @return string */ @@ -638,7 +867,17 @@ protected function getTagName(): string */ protected function logInfo(string $message, array $data = []): void { - $this->log($message, $data, false); + $this->log($message, $data, 'info'); + } + + /** + * @param string $message + * @param array $data + * @return void + */ + protected function logWarning(string $message, array $data = []): void + { + $this->log($message, $data, 'warning'); } /** @@ -648,19 +887,23 @@ protected function logInfo(string $message, array $data = []): void */ protected function logError(string $message, array $data = []): void { - $this->log($message, $data, true); + $this->log($message, $data, 'error'); } /** * @param string $message * @param array $data - * @param bool $isError + * @param string $logType * @return void */ - protected function log(string $message, array $data = [], bool $isError = false): void + protected function log(string $message, array $data = [], string $logType = 'info'): void { - $data['vhost_name'] = $this->currentVhostName; - $data['queue_name'] = $this->currentQueueName; + if (null !== $this->currentVhostName) { + $data['vhost_name'] = $this->currentVhostName; + } + if (null !== $this->currentQueueName) { + $data['queue_name'] = $this->currentQueueName; + } $outputMessage = $message; foreach ($data as $key => $value) { @@ -669,15 +912,17 @@ protected function log(string $message, array $data = [], bool $isError = false) } $outputMessage .= '. ' . ucfirst(str_replace('_', ' ', $key)) . ': ' . $value; } - if ($isError) { - $this->output->error($outputMessage); - } else { - $this->output->info($outputMessage); - } + + match ($logType) { + 'error' => $this->output->error($outputMessage), + 'warning' => $this->output->warning($outputMessage), + default => $this->output->info($outputMessage) + }; $processingData = [ 'uuid' => $this->processingUuid, 'started_at' => $this->processingStartedAt, + 'total_processed_jobs_count' => $this->totalJobsProcessed, ]; if ($this->processingStartedAt) { $processingData['executive_time_seconds'] = microtime(true) - $this->processingStartedAt; @@ -687,10 +932,11 @@ protected function log(string $message, array $data = [], bool $isError = false) $logMessage = 'Salesmessage.LibRabbitMQ.VhostsConsumers.'; $logMessage .= class_basename(static::class) . '.'; $logMessage .= $message; - if ($isError) { - $this->logger->error($logMessage, $data); - } else { - $this->logger->info($logMessage, $data); - } + + match ($logType) { + 'error' => $this->logger->error($logMessage, $data), + 'warning' => $this->logger->warning($logMessage, $data), + default => $this->logger->info($logMessage, $data) + }; } } diff --git a/src/VhostsConsumers/DirectConsumer.php b/src/VhostsConsumers/DirectConsumer.php index 205a78cd..a17322a2 100644 --- a/src/VhostsConsumers/DirectConsumer.php +++ b/src/VhostsConsumers/DirectConsumer.php @@ -2,27 +2,28 @@ namespace Salesmessage\LibRabbitMQ\VhostsConsumers; -use Illuminate\Console\OutputStyle; -use Illuminate\Contracts\Debug\ExceptionHandler; -use Illuminate\Contracts\Events\Dispatcher; -use Illuminate\Queue\QueueManager; use Illuminate\Queue\WorkerOptions; -use Illuminate\Support\Str; use PhpAmqpLib\Exception\AMQPChannelClosedException; -use PhpAmqpLib\Exception\AMQPConnectionClosedException; use PhpAmqpLib\Exception\AMQPProtocolChannelException; use PhpAmqpLib\Exception\AMQPRuntimeException; -use PhpAmqpLib\Message\AMQPMessage; -use Psr\Log\LoggerInterface; use Salesmessage\LibRabbitMQ\Queue\RabbitMQQueue; class DirectConsumer extends AbstractVhostsConsumer { + /** + * @param $connectionName + * @param WorkerOptions $options + * @return int + * @throws \Throwable + */ protected function vhostDaemon($connectionName, WorkerOptions $options) { + $this->logInfo('daemon.start'); + $lastRestart = $this->getTimestampOfLastQueueRestart(); - [$startTime, $jobsProcessed] = [hrtime(true) / 1e9, 0]; + $startTime = hrtime(true) / 1e9; + $this->totalJobsProcessed = 0; $connection = $this->startConsuming(); @@ -31,7 +32,7 @@ protected function vhostDaemon($connectionName, WorkerOptions $options) // if it is we will just pause this worker for a given amount of time and // make sure we do not need to kill this worker process off completely. if (! $this->daemonShouldRun($this->workerOptions, $this->configConnectionName, $this->currentQueueName)) { - $this->output->info('Consuming pause worker...'); + $this->logInfo('daemon.consuming_pause_worker'); $this->pauseWorker($this->workerOptions, $lastRestart); @@ -41,7 +42,6 @@ protected function vhostDaemon($connectionName, WorkerOptions $options) try { $this->connectionMutex->lock(self::MAIN_HANDLER_LOCK); $amqpMessage = $this->channel->basic_get($this->currentQueueName); - $this->connectionMutex->unlock(self::MAIN_HANDLER_LOCK); } catch (AMQPProtocolChannelException|AMQPChannelClosedException $exception) { $amqpMessage = null; @@ -59,7 +59,7 @@ protected function vhostDaemon($connectionName, WorkerOptions $options) $this->exceptions->report($exception); $this->kill(self::EXIT_SUCCESS, $this->workerOptions); - } catch (Exception|Throwable $exception) { + } catch (\Throwable $exception) { $this->logError('daemon.exception', [ 'message' => $exception->getMessage(), 'trace' => $exception->getTraceAsString(), @@ -69,10 +69,12 @@ protected function vhostDaemon($connectionName, WorkerOptions $options) $this->exceptions->report($exception); $this->stopWorkerIfLostConnection($exception); + } finally { + $this->connectionMutex->unlock(self::MAIN_HANDLER_LOCK); } - if (null === $amqpMessage) { - $this->output->info('Consuming sleep. No job...'); + if (!isset($amqpMessage)) { + $this->logInfo('daemon.consuming_sleep_no_job'); $this->stopConsuming(); @@ -87,7 +89,7 @@ protected function vhostDaemon($connectionName, WorkerOptions $options) $this->processAmqpMessage($amqpMessage, $connection); if ($this->jobsProcessed >= $this->batchSize) { - $this->output->info('Consuming batch full...'); + $this->logInfo('daemon.consuming_batch_full'); $this->stopConsuming(); @@ -98,12 +100,29 @@ protected function vhostDaemon($connectionName, WorkerOptions $options) continue; } + + // Finally, we will check to see if we have exceeded our memory limits or if + // the queue should restart based on other indications. If so, we'll stop + // this worker and let whatever is "monitoring" it restart the process. + $this->stopStatusCode = $this->getStopStatus( + $this->workerOptions, + $lastRestart, + $startTime, + $this->totalJobsProcessed, + true + ); + if (! is_null($this->stopStatusCode)) { + $this->logWarning('daemon.consuming_stop', [ + 'status_code' => $this->stopStatusCode, + ]); + + return $this->stop($this->stopStatusCode, $this->workerOptions); + } } } /** * @return RabbitMQQueue - * @throws Exceptions\MutexTimeout */ protected function startConsuming(): RabbitMQQueue { @@ -112,11 +131,6 @@ protected function startConsuming(): RabbitMQQueue $this->logInfo('startConsuming.init'); - $arguments = []; - if ($this->maxPriority) { - $arguments['priority'] = ['I', $this->maxPriority]; - } - $this->jobsProcessed = 0; $connection = $this->initConnection(); @@ -131,4 +145,3 @@ protected function stopConsuming(): void return; } } - diff --git a/src/VhostsConsumers/QueueConsumer.php b/src/VhostsConsumers/QueueConsumer.php index f1ecaadb..700bf8a1 100644 --- a/src/VhostsConsumers/QueueConsumer.php +++ b/src/VhostsConsumers/QueueConsumer.php @@ -2,29 +2,33 @@ namespace Salesmessage\LibRabbitMQ\VhostsConsumers; -use Illuminate\Console\OutputStyle; -use Illuminate\Contracts\Debug\ExceptionHandler; -use Illuminate\Contracts\Events\Dispatcher; -use Illuminate\Queue\QueueManager; use Illuminate\Queue\WorkerOptions; -use Illuminate\Support\Str; use PhpAmqpLib\Exception\AMQPChannelClosedException; -use PhpAmqpLib\Exception\AMQPConnectionClosedException; use PhpAmqpLib\Exception\AMQPProtocolChannelException; use PhpAmqpLib\Exception\AMQPRuntimeException; use PhpAmqpLib\Message\AMQPMessage; -use Psr\Log\LoggerInterface; +use Salesmessage\LibRabbitMQ\Exceptions\MutexTimeout; use Salesmessage\LibRabbitMQ\Queue\RabbitMQQueue; class QueueConsumer extends AbstractVhostsConsumer { protected bool $hasJob = false; + /** + * @param $connectionName + * @param WorkerOptions $options + * @return int|void + * @throws MutexTimeout + * @throws \Throwable + */ protected function vhostDaemon($connectionName, WorkerOptions $options) { + $this->logInfo('daemon.start'); + $lastRestart = $this->getTimestampOfLastQueueRestart(); - [$startTime, $jobsProcessed] = [hrtime(true) / 1e9, 0]; + $startTime = hrtime(true) / 1e9; + $this->totalJobsProcessed = 0; $connection = $this->startConsuming(); @@ -33,7 +37,7 @@ protected function vhostDaemon($connectionName, WorkerOptions $options) // if it is we will just pause this worker for a given amount of time and // make sure we do not need to kill this worker process off completely. if (! $this->daemonShouldRun($this->workerOptions, $this->configConnectionName, $this->currentQueueName)) { - $this->output->info('Consuming pause worker...'); + $this->logInfo('daemon.consuming_pause_worker'); $this->pauseWorker($this->workerOptions, $lastRestart); @@ -44,7 +48,6 @@ protected function vhostDaemon($connectionName, WorkerOptions $options) try { $this->connectionMutex->lock(self::MAIN_HANDLER_LOCK); $this->channel->wait(null, true, (int) $this->workerOptions->timeout); - $this->connectionMutex->unlock(self::MAIN_HANDLER_LOCK); } catch (AMQPRuntimeException $exception) { $this->logError('daemon.amqp_runtime_exception', [ 'message' => $exception->getMessage(), @@ -54,7 +57,7 @@ protected function vhostDaemon($connectionName, WorkerOptions $options) $this->exceptions->report($exception); $this->kill(self::EXIT_SUCCESS, $this->workerOptions); - } catch (Exception|Throwable $exception) { + } catch (\Throwable $exception) { $this->logError('daemon.exception', [ 'message' => $exception->getMessage(), 'trace' => $exception->getTraceAsString(), @@ -64,11 +67,15 @@ protected function vhostDaemon($connectionName, WorkerOptions $options) $this->exceptions->report($exception); $this->stopWorkerIfLostConnection($exception); + } finally { + $this->connectionMutex->unlock(self::MAIN_HANDLER_LOCK); } // If no job is got off the queue, we will need to sleep the worker. if (false === $this->hasJob) { - $this->output->info('Consuming sleep. No job...'); + $this->logInfo('daemon.consuming_sleep_no_job', [ + 'sleep_seconds' => $this->workerOptions->sleep, + ]); $this->stopConsuming(); @@ -83,55 +90,28 @@ protected function vhostDaemon($connectionName, WorkerOptions $options) // Finally, we will check to see if we have exceeded our memory limits or if // the queue should restart based on other indications. If so, we'll stop // this worker and let whatever is "monitoring" it restart the process. - $status = $this->getStopStatus( + $this->stopStatusCode = $this->getStopStatus( $this->workerOptions, $lastRestart, $startTime, - $jobsProcessed, + $this->totalJobsProcessed, $this->hasJob ); - if (! is_null($status)) { - $this->logInfo('consuming_stop', [ - 'status' => $status, + if (! is_null($this->stopStatusCode)) { + $this->logWarning('daemon.consuming_stop', [ + 'status_code' => $this->stopStatusCode, ]); - return $this->stop($status, $this->workerOptions); + return $this->stop($this->stopStatusCode, $this->workerOptions); } $this->hasJob = false; } } - /** - * @param WorkerOptions $options - * @param $lastRestart - * @param $startTime - * @param $jobsProcessed - * @param $hasJob - * @return int|null - */ - protected function getStopStatus( - WorkerOptions $options, - $lastRestart, - $startTime = 0, - $jobsProcessed = 0, - bool $hasJob = false - ): ?int - { - return match (true) { - $this->shouldQuit => static::EXIT_SUCCESS, - $this->memoryExceeded($options->memory) => static::EXIT_MEMORY_LIMIT, - $this->queueShouldRestart($lastRestart) => static::EXIT_SUCCESS, - $options->stopWhenEmpty && !$hasJob => static::EXIT_SUCCESS, - $options->maxTime && hrtime(true) / 1e9 - $startTime >= $options->maxTime => static::EXIT_SUCCESS, - $options->maxJobs && $jobsProcessed >= $options->maxJobs => static::EXIT_SUCCESS, - default => null - }; - } - /** * @return RabbitMQQueue - * @throws Exceptions\MutexTimeout + * @throws MutexTimeout */ protected function startConsuming(): RabbitMQQueue { @@ -164,6 +144,10 @@ protected function startConsuming(): RabbitMQQueue } if ($this->workerOptions->rest > 0) { + $this->logInfo('startConsuming.rest', [ + 'rest_seconds' => $this->workerOptions->rest, + ]); + $this->sleep($this->workerOptions->rest); } }; @@ -191,10 +175,10 @@ protected function startConsuming(): RabbitMQQueue 'trace' => $exception->getTraceAsString(), 'error_class' => get_class($exception), ]); + } finally { + $this->connectionMutex->unlock(self::MAIN_HANDLER_LOCK); } - $this->connectionMutex->unlock(self::MAIN_HANDLER_LOCK); - $this->updateLastProcessedAt(); if (false === $isSuccess) { @@ -209,13 +193,15 @@ protected function startConsuming(): RabbitMQQueue /** * @return void - * @throws Exceptions\MutexTimeout + * @throws MutexTimeout */ protected function stopConsuming(): void { $this->connectionMutex->lock(self::MAIN_HANDLER_LOCK); - $this->channel->basic_cancel($this->getTagName(), true); - $this->connectionMutex->unlock(self::MAIN_HANDLER_LOCK); + try { + $this->channel->basic_cancel($this->getTagName(), true); + } finally { + $this->connectionMutex->unlock(self::MAIN_HANDLER_LOCK); + } } } -