diff --git a/README.md b/README.md index 16db9cd3..4bb8f78c 100644 --- a/README.md +++ b/README.md @@ -9,16 +9,16 @@ RabbitMQ Queue driver for Laravel Only the latest version will get new features. Bug fixes will be provided using the following scheme: -| Package Version | Laravel Version | Bug Fixes Until | | -|-----------------|-----------------|------------------|---------------------------------------------------------------------------------------------| -| 1 | 20 | April 23th, 2025 | [Documentation](https://github.com/vyuldashev/laravel-queue-rabbitmq/blob/master/README.md) | +| Package Version | Laravel Version | Bug Fixes Until | | +|-----------------|-----------------|----------------------|---------------------------------------------------------------------------------------------| +| 1 | 27 | September 16th, 2025 | [Documentation](https://github.com/vyuldashev/laravel-queue-rabbitmq/blob/master/README.md) | ## Installation You can install this package via composer using this command: ``` -composer require salesmessage/php-lib-rabbitmq:^1.20 --ignore-platform-reqs +composer require salesmessage/php-lib-rabbitmq:^1.27 --ignore-platform-reqs ``` The package will automatically register itself. @@ -632,7 +632,7 @@ There are two ways of consuming messages. Example: ```bash -php artisan lib-rabbitmq:consume-vhosts test-group-1 rabbitmq_vhosts --name=mq-vhosts-test-name --sleep=3 --memory=300 --max-jobs=5000 --max-time=600 --timeout=0 +php artisan lib-rabbitmq:consume-vhosts test-group-1 rabbitmq_vhosts --name=mq-vhosts-test-name --sleep=3 --memory=300 --max-jobs=5000 --max-time=600 --timeout=0 --async-mode=1 ``` ## Testing diff --git a/composer.json b/composer.json index e7aacf7e..899884d4 100644 --- a/composer.json +++ b/composer.json @@ -34,7 +34,7 @@ }, "extra": { "branch-alias": { - "dev-master": "1.26-dev" + "dev-master": "1.27-dev" }, "laravel": { "providers": [ diff --git a/src/Console/ConsumeVhostsCommand.php b/src/Console/ConsumeVhostsCommand.php index d73a66c3..1288a9c5 100644 --- a/src/Console/ConsumeVhostsCommand.php +++ b/src/Console/ConsumeVhostsCommand.php @@ -30,6 +30,7 @@ class ConsumeVhostsCommand extends WorkCommand {--timeout=60 : The number of seconds a child process can run} {--tries=1 : Number of times to attempt a job before logging it failed} {--rest=0 : Number of seconds to rest between jobs} + {--async-mode=0 : Async processing for some functionality (now only "heartbeat" is supported)} {--max-priority=} {--consumer-tag} @@ -84,6 +85,7 @@ public function handle(): void $consumer->setPrefetchSize((int) $this->option('prefetch-size')); $consumer->setPrefetchCount((int) ($groupConfigData['prefetch_count'] ?? 1000)); $consumer->setBatchSize((int) ($groupConfigData['batch_size'] ?? 1000)); + $consumer->setAsyncMode((bool) $this->option('async-mode')); if ($this->downForMaintenance() && $this->option('once')) { $consumer->sleep($this->option('sleep')); @@ -95,8 +97,10 @@ public function handle(): void // which jobs are coming through a queue and be informed on its progress. $this->listenForEvents(); - $connection = $this->argument('connection') - ?: $this->laravel['config']['queue.default']; + $queueConfigData = $this->laravel['config']['queue']; + $connectionName = $this->argument('connection') ?: ($queueConfigData['default'] ?? ''); + + $consumer->setConfig((array) ($queueConfigData['connections'][$connectionName] ?? [])); if (Terminal::hasSttyAvailable()) { $this->components->info(sprintf( @@ -107,7 +111,7 @@ public function handle(): void } $this->runWorker( - $connection, + $connectionName, '' ); } diff --git a/src/VhostsConsumers/AbstractVhostsConsumer.php b/src/VhostsConsumers/AbstractVhostsConsumer.php index 2365db94..8394177c 100644 --- a/src/VhostsConsumers/AbstractVhostsConsumer.php +++ b/src/VhostsConsumers/AbstractVhostsConsumer.php @@ -8,6 +8,7 @@ use Illuminate\Queue\QueueManager; use Illuminate\Queue\WorkerOptions; use Illuminate\Support\Str; +use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Exception\AMQPChannelClosedException; use PhpAmqpLib\Exception\AMQPConnectionClosedException; use PhpAmqpLib\Exception\AMQPProtocolChannelException; @@ -19,6 +20,7 @@ use Salesmessage\LibRabbitMQ\Dto\ConsumeVhostsFiltersDto; use Salesmessage\LibRabbitMQ\Dto\QueueApiDto; use Salesmessage\LibRabbitMQ\Dto\VhostApiDto; +use Salesmessage\LibRabbitMQ\Exceptions\MutexTimeout; use Salesmessage\LibRabbitMQ\Interfaces\RabbitMQBatchable; use Salesmessage\LibRabbitMQ\Mutex; use Salesmessage\LibRabbitMQ\Queue\Jobs\RabbitMQJob; @@ -29,6 +31,8 @@ abstract class AbstractVhostsConsumer extends Consumer { protected const MAIN_HANDLER_LOCK = 'vhost_handler'; + protected const HEALTHCHECK_HANDLER_LOCK = 'healthcheck_vhost_handler'; + protected ?OutputStyle $output = null; protected ?ConsumeVhostsFiltersDto $filtersDto = null; @@ -59,6 +63,12 @@ abstract class AbstractVhostsConsumer extends Consumer protected bool $hadJobs = false; + protected ?int $stopStatusCode = null; + + protected array $config = []; + + protected bool $asyncMode = false; + /** * @param InternalStorageManager $internalStorageManager * @param LoggerInterface $logger @@ -110,12 +120,30 @@ public function setBatchSize(int $batchSize): self return $this; } + /** + * @param array $config + * @return $this + */ + public function setConfig(array $config): self + { + $this->config = $config; + return $this; + } + + /** + * @param bool $asyncMode + * @return $this + */ + public function setAsyncMode(bool $asyncMode): self + { + $this->asyncMode = $asyncMode; + return $this; + } + public function daemon($connectionName, $queue, WorkerOptions $options) { $this->goAheadOrWait(); - $this->connectionMutex = new Mutex(false); - $this->configConnectionName = (string) $connectionName; $this->workerOptions = $options; @@ -123,6 +151,40 @@ public function daemon($connectionName, $queue, WorkerOptions $options) $this->listenForSignals(); } + if ($this->asyncMode) { + $this->logInfo('daemon.AsyncMode.On'); + + $coroutineContextHandler = function () use ($connectionName, $options) { + $this->logInfo('daemon.AsyncMode.Coroutines.Running'); + + // we can't move it outside since Mutex should be created within coroutine context + $this->connectionMutex = new Mutex(true); + $this->startHeartbeatCheck(); + \go(function () use ($connectionName, $options) { + $this->vhostDaemon($connectionName, $options); + }); + }; + + if (extension_loaded('swoole')) { + $this->logInfo('daemon.AsyncMode.Swoole'); + + \Co\run($coroutineContextHandler); + } elseif (extension_loaded('openswoole')) { + $this->logInfo('daemon.AsyncMode.OpenSwoole'); + + \OpenSwoole\Runtime::enableCoroutine(true, \OpenSwoole\Runtime::HOOK_ALL); + \co::run($coroutineContextHandler); + } else { + throw new \Exception('Async mode is not supported. Check if Swoole extension is installed'); + } + + return; + } + + $this->logInfo('daemon.AsyncMode.Off'); + + $this->connectionMutex = new Mutex(false); + $this->startHeartbeatCheck(); $this->vhostDaemon($connectionName, $options); } @@ -623,6 +685,64 @@ protected function initConnection(): RabbitMQQueue return $connection; } + /** + * @return void + */ + protected function startHeartbeatCheck(): void + { + if (false === $this->asyncMode) { + return; + } + + $heartbeatInterval = (int) ($this->config['options']['heartbeat'] ?? 0); + if (!$heartbeatInterval) { + return; + } + + $heartbeatHandler = function () { + if ($this->shouldQuit || (null !== $this->stopStatusCode)) { + return; + } + + try { + /** @var AMQPStreamConnection $connection */ + $connection = $this->connection?->getConnection(); + if ((null === $connection) + || (false === $connection->isConnected()) + || $connection->isWriting() + || $connection->isBlocked() + ) { + return; + } + + $this->connectionMutex->lock(static::HEALTHCHECK_HANDLER_LOCK, 3); + $connection->checkHeartBeat(); + } catch (MutexTimeout) { + } catch (Throwable $exception) { + $this->logError('startHeartbeatCheck.exception', [ + 'eroor' => $exception->getMessage(), + 'trace' => $e->getTraceAsString(), + ]); + + $this->shouldQuit = true; + } finally { + $this->connectionMutex->unlock(static::HEALTHCHECK_HANDLER_LOCK); + } + }; + + \go(function () use ($heartbeatHandler, $heartbeatInterval) { + $this->logInfo('startHeartbeatCheck.started'); + + while (true) { + sleep($heartbeatInterval); + $heartbeatHandler(); + if ($this->shouldQuit || !is_null($this->stopStatusCode)) { + return; + } + } + }); + } + /** * @return string */ diff --git a/src/VhostsConsumers/QueueConsumer.php b/src/VhostsConsumers/QueueConsumer.php index f1ecaadb..74076ae8 100644 --- a/src/VhostsConsumers/QueueConsumer.php +++ b/src/VhostsConsumers/QueueConsumer.php @@ -83,19 +83,19 @@ protected function vhostDaemon($connectionName, WorkerOptions $options) // Finally, we will check to see if we have exceeded our memory limits or if // the queue should restart based on other indications. If so, we'll stop // this worker and let whatever is "monitoring" it restart the process. - $status = $this->getStopStatus( + $this->stopStatusCode = $this->getStopStatus( $this->workerOptions, $lastRestart, $startTime, $jobsProcessed, $this->hasJob ); - if (! is_null($status)) { + if (! is_null($this->stopStatusCode)) { $this->logInfo('consuming_stop', [ - 'status' => $status, + 'status' => $this->stopStatusCode, ]); - return $this->stop($status, $this->workerOptions); + return $this->stop($this->stopStatusCode, $this->workerOptions); } $this->hasJob = false;