diff --git a/README.md b/README.md index 6415ca98..c4048569 100644 --- a/README.md +++ b/README.md @@ -11,14 +11,14 @@ Only the latest version will get new features. Bug fixes will be provided using | Package Version | Laravel Version | Bug Fixes Until | | |-----------------|-----------------|----------------------|---------------------------------------------------------------------------------------------| -| 1 | 28 | September 23th, 2025 | [Documentation](https://github.com/vyuldashev/laravel-queue-rabbitmq/blob/master/README.md) | +| 1 | 29 | September 25th, 2025 | [Documentation](https://github.com/vyuldashev/laravel-queue-rabbitmq/blob/master/README.md) | ## Installation You can install this package via composer using this command: ``` -composer require salesmessage/php-lib-rabbitmq:^1.28 --ignore-platform-reqs +composer require salesmessage/php-lib-rabbitmq:^1.29 --ignore-platform-reqs ``` The package will automatically register itself. diff --git a/composer.json b/composer.json index c7fadad7..b74f81ac 100644 --- a/composer.json +++ b/composer.json @@ -34,7 +34,7 @@ }, "extra": { "branch-alias": { - "dev-master": "1.28-dev" + "dev-master": "1.29-dev" }, "laravel": { "providers": [ diff --git a/src/VhostsConsumers/AbstractVhostsConsumer.php b/src/VhostsConsumers/AbstractVhostsConsumer.php index b3f0941b..7d99e10e 100644 --- a/src/VhostsConsumers/AbstractVhostsConsumer.php +++ b/src/VhostsConsumers/AbstractVhostsConsumer.php @@ -60,6 +60,8 @@ abstract class AbstractVhostsConsumer extends Consumer protected int|float $processingStartedAt = 0; + protected int $totalJobsProcessed = 0; + protected int $jobsProcessed = 0; protected bool $hadJobs = false; @@ -176,6 +178,8 @@ public function daemon($connectionName, $queue, WorkerOptions $options) \OpenSwoole\Runtime::enableCoroutine(true, \OpenSwoole\Runtime::HOOK_ALL); \co::run($coroutineContextHandler); } else { + $this->logError('daemon.AsyncMode.IsNotSupported'); + throw new \Exception('Async mode is not supported. Check if Swoole extension is installed'); } @@ -240,10 +244,11 @@ protected function processAmqpMessage(AMQPMessage $message, RabbitMQQueue $conne } $this->jobsProcessed++; + $this->totalJobsProcessed++; $this->logInfo('processAMQPMessage.message_consumed', [ 'processed_jobs_count' => $this->jobsProcessed, - 'is_support_batching' => $isSupportBatching, + 'is_support_batching' => $isSupportBatching ? 'Y' :'N', ]); } @@ -410,6 +415,10 @@ protected function processSingleJob(RabbitMQJob $job): void */ protected function ackMessage(AMQPMessage $message, bool $multiple = false): void { + $this->logInfo('ackMessage.start', [ + 'multiple' => $multiple, + ]); + try { $message->ack($multiple); } catch (Throwable $exception) { @@ -432,6 +441,8 @@ abstract protected function stopConsuming(): void; */ protected function loadVhosts(): void { + $this->logInfo('loadVhosts.start'); + $group = $this->filtersDto->getGroup(); $lastProcessedAtKey = $this->internalStorageManager->getLastProcessedAtKeyName($group); @@ -478,6 +489,9 @@ protected function switchToNextVhost(): bool } $this->currentQueueName = $nextQueue; + + $this->logInfo('switchToNextVhost.success'); + return true; } @@ -503,6 +517,8 @@ protected function getNextVhost(): ?string */ protected function loadVhostQueues(): void { + $this->logInfo('loadVhostQueues.start'); + $group = $this->filtersDto->getGroup(); $lastProcessedAtKey = $this->internalStorageManager->getLastProcessedAtKeyName($group); @@ -539,6 +555,9 @@ protected function switchToNextQueue(): bool } $this->currentQueueName = $nextQueue; + + $this->logInfo('switchToNextQueue.success'); + return true; } @@ -567,20 +586,26 @@ protected function goAheadOrWait(int $waitSeconds = 1): bool { if (false === $this->goAhead()) { if (!$this->hadJobs) { - $this->output->warning(sprintf('No jobs during iteration. Wait %d seconds...', $waitSeconds)); + $this->logWarning('goAheadOrWait.no_jobs_during_iteration', [ + 'wait_seconds' => $waitSeconds, + ]); + $this->sleep($waitSeconds); } $this->loadVhosts(); $this->hadJobs = false; if (empty($this->vhosts)) { - $this->output->warning(sprintf('No active vhosts. Wait %d seconds...', $waitSeconds)); + $this->logWarning('goAheadOrWait.no_active_vhosts', [ + 'wait_seconds' => $waitSeconds, + ]); + $this->sleep($waitSeconds); return $this->goAheadOrWait($waitSeconds); } - $this->output->info('Starting from the first vhost...'); + $this->logInfo('goAheadOrWait.starting_from_the_first_vhost'); return $this->goAheadOrWait($waitSeconds); } @@ -612,6 +637,8 @@ protected function updateLastProcessedAt() return; } + $this->logInfo('updateLastProcessedAt.start'); + $group = $this->filtersDto->getGroup(); $timestamp = time(); @@ -701,25 +728,42 @@ protected function startHeartbeatCheck(): void return; } + $this->logInfo('startHeartbeatCheck.start', [ + 'heartbeat_interval' => $heartbeatInterval, + ]); + $heartbeatHandler = function () { if ($this->shouldQuit || (null !== $this->stopStatusCode)) { + $this->logWarning('startHeartbeatCheck.quit', [ + 'should_quit' => $this->shouldQuit, + 'stop_status_code' => $this->stopStatusCode, + ]); + return; } try { - /** @var AMQPStreamConnection $connection */ + /** @var AMQPStreamConnection|null $connection */ $connection = $this->connection?->getConnection(); if ((null === $connection) || (false === $connection->isConnected()) || $connection->isWriting() || $connection->isBlocked() ) { + $this->logWarning('startHeartbeatCheck.incorrect_connection', [ + 'has_connection' => (null !== $connection) ? 'Y' : 'N', + 'is_connected' => $connection?->isConnected() ? 'Y' : 'N', + 'is_writing' => $connection->isWriting() ? 'Y' : 'N', + 'is_blocked' => $connection->isBlocked() ? 'Y' : 'N', + ]); + return; } $this->connectionMutex->lock(static::HEALTHCHECK_HANDLER_LOCK, 3); $connection->checkHeartBeat(); } catch (MutexTimeout) { + $this->logWarning('startHeartbeatCheck.mutex_timeout'); } catch (Throwable $exception) { $this->logError('startHeartbeatCheck.exception', [ 'eroor' => $exception->getMessage(), @@ -739,6 +783,11 @@ protected function startHeartbeatCheck(): void sleep($heartbeatInterval); $heartbeatHandler(); if ($this->shouldQuit || !is_null($this->stopStatusCode)) { + $this->logWarning('startHeartbeatCheck.go_quit', [ + 'should_quit' => $this->shouldQuit, + 'stop_status_code' => $this->stopStatusCode, + ]); + return; } } @@ -760,7 +809,17 @@ protected function getTagName(): string */ protected function logInfo(string $message, array $data = []): void { - $this->log($message, $data, false); + $this->log($message, $data, 'info'); + } + + /** + * @param string $message + * @param array $data + * @return void + */ + protected function logWarning(string $message, array $data = []): void + { + $this->log($message, $data, 'warning'); } /** @@ -770,19 +829,23 @@ protected function logInfo(string $message, array $data = []): void */ protected function logError(string $message, array $data = []): void { - $this->log($message, $data, true); + $this->log($message, $data, 'error'); } /** * @param string $message * @param array $data - * @param bool $isError + * @param string $logType * @return void */ - protected function log(string $message, array $data = [], bool $isError = false): void + protected function log(string $message, array $data = [], string $logType = 'info'): void { - $data['vhost_name'] = $this->currentVhostName; - $data['queue_name'] = $this->currentQueueName; + if (null !== $this->currentVhostName) { + $data['vhost_name'] = $this->currentVhostName; + } + if (null !== $this->currentQueueName) { + $data['queue_name'] = $this->currentQueueName; + } $outputMessage = $message; foreach ($data as $key => $value) { @@ -791,15 +854,17 @@ protected function log(string $message, array $data = [], bool $isError = false) } $outputMessage .= '. ' . ucfirst(str_replace('_', ' ', $key)) . ': ' . $value; } - if ($isError) { - $this->output->error($outputMessage); - } else { - $this->output->info($outputMessage); - } + + match ($logType) { + 'error' => $this->output->error($outputMessage), + 'warning' => $this->output->warning($outputMessage), + default => $this->output->info($outputMessage) + }; $processingData = [ 'uuid' => $this->processingUuid, 'started_at' => $this->processingStartedAt, + 'total_processed_jobs_count' => $this->totalJobsProcessed, ]; if ($this->processingStartedAt) { $processingData['executive_time_seconds'] = microtime(true) - $this->processingStartedAt; @@ -809,10 +874,11 @@ protected function log(string $message, array $data = [], bool $isError = false) $logMessage = 'Salesmessage.LibRabbitMQ.VhostsConsumers.'; $logMessage .= class_basename(static::class) . '.'; $logMessage .= $message; - if ($isError) { - $this->logger->error($logMessage, $data); - } else { - $this->logger->info($logMessage, $data); - } + + match ($logType) { + 'error' => $this->logger->error($logMessage, $data), + 'warning' => $this->logger->warning($logMessage, $data), + default => $this->logger->info($logMessage, $data) + }; } } diff --git a/src/VhostsConsumers/DirectConsumer.php b/src/VhostsConsumers/DirectConsumer.php index ecdae23c..af3e7d67 100644 --- a/src/VhostsConsumers/DirectConsumer.php +++ b/src/VhostsConsumers/DirectConsumer.php @@ -18,11 +18,21 @@ class DirectConsumer extends AbstractVhostsConsumer { + /** + * @param $connectionName + * @param WorkerOptions $options + * @return int + * @throws \Salesmessage\LibRabbitMQ\Exceptions\MutexTimeout + * @throws \Throwable + */ protected function vhostDaemon($connectionName, WorkerOptions $options) { + $this->logInfo('daemon.start'); + $lastRestart = $this->getTimestampOfLastQueueRestart(); - [$startTime, $jobsProcessed] = [hrtime(true) / 1e9, 0]; + $startTime = hrtime(true) / 1e9; + $this->totalJobsProcessed = 0; $connection = $this->startConsuming(); @@ -31,7 +41,7 @@ protected function vhostDaemon($connectionName, WorkerOptions $options) // if it is we will just pause this worker for a given amount of time and // make sure we do not need to kill this worker process off completely. if (! $this->daemonShouldRun($this->workerOptions, $this->configConnectionName, $this->currentQueueName)) { - $this->output->info('Consuming pause worker...'); + $this->logInfo('daemon.consuming_pause_worker'); $this->pauseWorker($this->workerOptions, $lastRestart); @@ -75,7 +85,7 @@ protected function vhostDaemon($connectionName, WorkerOptions $options) } if (null === $amqpMessage) { - $this->output->info('Consuming sleep. No job...'); + $this->logInfo('daemon.consuming_sleep_no_job'); $this->stopConsuming(); @@ -90,7 +100,7 @@ protected function vhostDaemon($connectionName, WorkerOptions $options) $this->processAmqpMessage($amqpMessage, $connection); if ($this->jobsProcessed >= $this->batchSize) { - $this->output->info('Consuming batch full...'); + $this->logInfo('daemon.consuming_batch_full'); $this->stopConsuming(); @@ -101,6 +111,24 @@ protected function vhostDaemon($connectionName, WorkerOptions $options) continue; } + + // Finally, we will check to see if we have exceeded our memory limits or if + // the queue should restart based on other indications. If so, we'll stop + // this worker and let whatever is "monitoring" it restart the process. + $this->stopStatusCode = $this->getStopStatus( + $this->workerOptions, + $lastRestart, + $startTime, + $this->totalJobsProcessed, + true + ); + if (! is_null($this->stopStatusCode)) { + $this->logWarning('daemon.consuming_stop', [ + 'status_code' => $this->stopStatusCode, + ]); + + return $this->stop($this->stopStatusCode, $this->workerOptions); + } } } diff --git a/src/VhostsConsumers/QueueConsumer.php b/src/VhostsConsumers/QueueConsumer.php index 74076ae8..b9be89ea 100644 --- a/src/VhostsConsumers/QueueConsumer.php +++ b/src/VhostsConsumers/QueueConsumer.php @@ -20,11 +20,21 @@ class QueueConsumer extends AbstractVhostsConsumer { protected bool $hasJob = false; + /** + * @param $connectionName + * @param WorkerOptions $options + * @return int|void + * @throws \Salesmessage\LibRabbitMQ\Exceptions\MutexTimeout + * @throws \Throwable + */ protected function vhostDaemon($connectionName, WorkerOptions $options) { + $this->logInfo('daemon.start'); + $lastRestart = $this->getTimestampOfLastQueueRestart(); - [$startTime, $jobsProcessed] = [hrtime(true) / 1e9, 0]; + $startTime = hrtime(true) / 1e9; + $this->totalJobsProcessed = 0; $connection = $this->startConsuming(); @@ -33,7 +43,7 @@ protected function vhostDaemon($connectionName, WorkerOptions $options) // if it is we will just pause this worker for a given amount of time and // make sure we do not need to kill this worker process off completely. if (! $this->daemonShouldRun($this->workerOptions, $this->configConnectionName, $this->currentQueueName)) { - $this->output->info('Consuming pause worker...'); + $this->logInfo('daemon.consuming_pause_worker'); $this->pauseWorker($this->workerOptions, $lastRestart); @@ -68,7 +78,9 @@ protected function vhostDaemon($connectionName, WorkerOptions $options) // If no job is got off the queue, we will need to sleep the worker. if (false === $this->hasJob) { - $this->output->info('Consuming sleep. No job...'); + $this->logInfo('daemon.consuming_sleep_no_job', [ + 'sleep_seconds' => $this->workerOptions->sleep, + ]); $this->stopConsuming(); @@ -87,12 +99,12 @@ protected function vhostDaemon($connectionName, WorkerOptions $options) $this->workerOptions, $lastRestart, $startTime, - $jobsProcessed, + $this->totalJobsProcessed, $this->hasJob ); if (! is_null($this->stopStatusCode)) { - $this->logInfo('consuming_stop', [ - 'status' => $this->stopStatusCode, + $this->logWarning('daemon.consuming_stop', [ + 'status_code' => $this->stopStatusCode, ]); return $this->stop($this->stopStatusCode, $this->workerOptions); @@ -102,33 +114,6 @@ protected function vhostDaemon($connectionName, WorkerOptions $options) } } - /** - * @param WorkerOptions $options - * @param $lastRestart - * @param $startTime - * @param $jobsProcessed - * @param $hasJob - * @return int|null - */ - protected function getStopStatus( - WorkerOptions $options, - $lastRestart, - $startTime = 0, - $jobsProcessed = 0, - bool $hasJob = false - ): ?int - { - return match (true) { - $this->shouldQuit => static::EXIT_SUCCESS, - $this->memoryExceeded($options->memory) => static::EXIT_MEMORY_LIMIT, - $this->queueShouldRestart($lastRestart) => static::EXIT_SUCCESS, - $options->stopWhenEmpty && !$hasJob => static::EXIT_SUCCESS, - $options->maxTime && hrtime(true) / 1e9 - $startTime >= $options->maxTime => static::EXIT_SUCCESS, - $options->maxJobs && $jobsProcessed >= $options->maxJobs => static::EXIT_SUCCESS, - default => null - }; - } - /** * @return RabbitMQQueue * @throws Exceptions\MutexTimeout @@ -164,6 +149,10 @@ protected function startConsuming(): RabbitMQQueue } if ($this->workerOptions->rest > 0) { + $this->logInfo('startConsuming.rest', [ + 'rest_seconds' => $this->workerOptions->rest, + ]); + $this->sleep($this->workerOptions->rest); } };