diff --git a/README.md b/README.md index 8f0081f1..9498aa38 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ Only the latest version will get new features. Bug fixes will be provided using | Package Version | Laravel Version | Bug Fixes Until | | |-----------------|-----------------|------------------|---------------------------------------------------------------------------------------------| -| 13 | 9 | August 8th, 2023 | [Documentation](https://github.com/vyuldashev/laravel-queue-rabbitmq/blob/master/README.md) | +| 13 | 49 | August 8th, 2023 | [Documentation](https://github.com/vyuldashev/laravel-queue-rabbitmq/blob/master/README.md) | ## Installation diff --git a/composer.json b/composer.json index 20cf5310..41dc957a 100644 --- a/composer.json +++ b/composer.json @@ -1,5 +1,5 @@ { - "name": "vladimir-yuldashev/laravel-queue-rabbitmq", + "name": "salesmessage/laravel-queue-rabbitmq", "description": "RabbitMQ driver for Laravel Queue. Supports Laravel Horizon.", "license": "MIT", "authors": [ @@ -11,7 +11,7 @@ "require": { "php": "^8.0", "ext-json": "*", - "illuminate/queue": "^9.0|^10.0", + "illuminate/queue": "^9.0|^10.0|^11.0|^12.0", "php-amqplib/php-amqplib": "^v3.2" }, "require-dev": { @@ -20,7 +20,7 @@ "laravel/horizon": "^5.0", "orchestra/testbench": "^7.0|^8.0", "laravel/pint": "^1.2", - "laravel/framework": "^9.0|^10.0" + "laravel/framework": "^10.0|^11.0|^12.0" }, "autoload": { "psr-4": { @@ -34,7 +34,7 @@ }, "extra": { "branch-alias": { - "dev-master": "13.0-dev" + "dev-master": "13.55-dev" }, "laravel": { "providers": [ diff --git a/src/BatchableConsumer.php b/src/BatchableConsumer.php new file mode 100644 index 00000000..3990bd0d --- /dev/null +++ b/src/BatchableConsumer.php @@ -0,0 +1,876 @@ + $currentQueues runtime list of queues */ + protected array $currentQueues = []; + /** @var int $currentQueue cursor for round-robin */ + protected int $currentQueue = 0; + + /** @var int $processedJob counter for batches */ + protected int $processedJob = 0; + + /** @var array $currentMessages current batch */ + protected array $currentMessages = []; + + /** @var string $mask mask of queues to listen */ + protected string $mask = ''; + + /** @var bool $preCheck check queue before switch using internal rabbitmq API */ + protected bool $preCheck = true; + + /** @var bool $roundRobin switch between given queues by round-round */ + protected bool $roundRobin = true; + + /** @var int $processed counter processed jobs by current worker */ + private int $processed = 0; + + /** @var Queue */ + private Queue $queueConnection; + + /** @var string */ + private string $connectionName; + /** @var string */ + private string $queue; + /** @var WorkerOptions */ + private WorkerOptions $options; + + /** @var int $currentPrefetch */ + private int $currentPrefetch = 0; + + /** @var int $currentConsumeInterval */ + private int $currentConsumeInterval = 0; + + /** @var bool $autoPrefetch */ + private bool $autoPrefetch = false; + + /** @var null|array $consumeIntervalMapping */ + private ?array $consumeIntervalMapping = null; + + /** @var AMQPMessage[] $pastQueuesMessages */ + private array $pastQueuesMessages = []; + + private ?int $workerExitCode = null; + + /** + * The name and signature of the console command. + * + * @var string + */ + protected string $signature = 'rabbitmq:work'; + + /** + * The console command description. + * + * @var string + */ + protected string $description = 'Consumer for rabbitmq queues using queue:work'; + + /** + * @param string $value + */ + public function setMask(string $value): void + { + $this->mask = $value; + } + + /** + * @param bool $value + */ + public function setPreCheck(bool $value): void + { + $this->preCheck = $value; + } + + /** + * @param bool $value + */ + public function setRoundRobin(bool $value): void + { + $this->roundRobin = $value; + } + + /** + * @param bool $value + */ + public function setAutoPrefetch(bool $value): void + { + $this->autoPrefetch = $value; + } + + /** + * @param array $value + */ + public function setConsumeIntervalMapping(array $value): void + { + $this->consumeIntervalMapping = $value; + } + + /** + * @return string + */ + public function getMask(): string + { + return $this->mask; + } + + /** + * Consumer logic + * + * @param string $connectionName + * @param string $queue + * @param WorkerOptions $options + * @return int + * @throws \Throwable + */ + public function daemon($connectionName, $queue, WorkerOptions $options): int + { + if ($this->supportsAsyncSignals()) { + $this->listenForSignals(); + } + + $this->queue = $queue; + $this->options = $options; + $this->connectionName = $connectionName; + + $lastRestart = $this->getTimestampOfLastQueueRestart(); + + [$startTime, $jobsProcessed] = [hrtime(true) / 1e9, 0]; + + /** @var Queue $connection */ + $this->queueConnection = $this->manager->connection($this->connectionName); + + $heartbeatHandler = function () { + $this->startHeartbeatCheck(); + }; + $mainHandler = function () use ($options, $queue, $lastRestart, $startTime, $jobsProcessed) { + $this->start(); + while (true) { + // Before reserving any jobs, we will make sure this queue is not paused and + // if it is we will just pause this worker for a given amount of time and + // make sure we do not need to kill this worker process off completely. + if (!$this->daemonShouldRun($options, $this->connectionName, $queue)) { + $this->pauseWorker($options, $lastRestart); + continue; + } + + // If the daemon should run (not in maintenance mode, etc.), then we can wait for a job. + try { + if (!$this->channel?->getConnection()) { + logger()->info('RabbitMQConsumer.connection.broken.kill', [ + 'workerName' => $this->name, + ]); + $this->kill(self::EXIT_ERROR, $options); + } + + $this->connectionMutex->lock(static::MAIN_HANDLER_LOCK); + $this->channel->wait(null, false, $this->currentConsumeInterval); + $this->connectionMutex->unlock(static::MAIN_HANDLER_LOCK); + } catch (AMQPRuntimeException $exception) { + $this->exceptions->report($exception); + + $this->kill(self::EXIT_ERROR, $options); + } catch (AMQPTimeoutException) { + if ($this->currentPrefetch > 1) { + logger()->info('RabbitMQConsumer.prefetch.currentConsumeInterval.triggered', [ + 'consumeInterval' => $this->currentConsumeInterval, + 'workerName' => $this->name, + 'messagesReady' => count($this->currentMessages), + 'currentPrefetchCount' => $this->currentPrefetch, + ]); + if ($this->roundRobin) { + $this->stopConsume(); + $this->processBatch(); + $this->switchToNextQueue(); + } else { + $this->processBatch(); + } + } else { + if ($this->roundRobin) { + logger()->info('RabbitMQConsumer.singlePrefetch.currentConsumeInterval.triggered', [ + 'workerName' => $this->name, + 'messagesReady' => count($this->currentMessages), + 'currentPrefetchCount' => $this->currentPrefetch, + ]); + + $this->stopConsume(); + $this->switchToNextQueue(); + } + } + } catch (\Exception | \Throwable $exception) { + $this->exceptions->report($exception); + + $this->stopWorkerIfLostConnection($exception); + } + + // Finally, we will check to see if we have exceeded our memory limits or if + // the queue should restart based on other indications. If so, we'll stop + // this worker and let whatever is "monitoring" it restart the process. + $this->workerExitCode = $this->stopIfNecessary( + $options, + $lastRestart, + $startTime, + $jobsProcessed, + $this->currentJob + ); + + if (!is_null($this->workerExitCode)) { + return $this->stop($this->workerExitCode, $options); + } + } + }; + + $this->channel = $this->queueConnection->getChannel(); + + if ($this->isAsyncMode()) { + logger()->info('RabbitMQConsumer.AsyncMode.On'); + $resultStatus = 0; + $coroutineContextHandler = function () use ($heartbeatHandler, $mainHandler, &$resultStatus) { + logger()->info('RabbitMQConsumer.AsyncMode.Coroutines.Running'); + // we can't move it outside since Mutex should be created within coroutine context + $this->connectionMutex = new Mutex(true); + $heartbeatHandler(); + \go(function () use ($mainHandler, &$resultStatus) { + $resultStatus = $mainHandler(); + }); + }; + + if (extension_loaded('swoole')) { + logger()->info('RabbitMQConsumer.AsyncMode.Swoole'); + \Co\run($coroutineContextHandler); + } elseif (extension_loaded('openswoole')) { + logger()->info('RabbitMQConsumer.AsyncMode.OpenSwoole'); + \OpenSwoole\Runtime::enableCoroutine(true, \OpenSwoole\Runtime::HOOK_ALL); + \co::run($coroutineContextHandler); + } else { + throw new \Exception('Async mode is not supported. Check if Swoole extension is installed'); + } + return $resultStatus; + } else { + logger()->info('RabbitMQConsumer.AsyncMode.Off'); + $this->connectionMutex = new Mutex(false); + } + + $heartbeatHandler(); + return $mainHandler(); + } + + /** + * Start consuming the rabbitmq queues + */ + private function start(): void + { + $this->connectionMutex->lock(static::MAIN_HANDLER_LOCK); + $this->channel->basic_qos( + $this->prefetchSize, + $this->prefetchCount, + true + ); + $this->connectionMutex->unlock(static::MAIN_HANDLER_LOCK); + $this->discoverQueues(); + if ($this->roundRobin) { + $this->switchToNextQueue(true); + } else { + foreach ($this->currentQueues as $queue) { + $this->startConsuming($queue); + } + } + } + + /** + * Switching to next queue(round-robin) + * + * @param bool $first + */ + private function switchToNextQueue(bool $first = false) + { + logger()->info('RabbitMQConsumer.switchToNextQueue.before', [ + 'workerName' => $this->name, + 'currentPrefetch' => $this->currentPrefetch, + 'currentMessagesCount' => count($this->currentMessages), + ]); + + $nextQueue = $this->discoverNextQueue($first); + $this->startConsuming($nextQueue); + + logger()->info('RabbitMQConsumer.switchToNextQueue.after', [ + 'workerName' => $this->name, + 'currentPrefetch' => $this->currentPrefetch, + 'currentMessagesCount' => count($this->currentMessages), + ]); + } + + /** + * Gets the next round-robin queue + * + * @param bool $first + * @return string + * @throws \Exception + */ + private function nextQueueRoundRobin(bool $first = false): string + { + if (!$first) { + $this->currentQueue++; + } else { + $this->currentQueue = 0; + } + $nextQueue = $this->currentQueues[$this->currentQueue] ?? null; + if (!$nextQueue) { + $this->discoverQueues(); + $this->currentQueue = 0; + $nextQueue = $this->currentQueues[$this->currentQueue] ?? null; + if (!$nextQueue) { + throw new \Exception('Error next queue'); + } + } + + return $nextQueue; + } + + /** + * Validates next queue if needed + * + * @param bool $first + * @return string + * @throws \GuzzleHttp\Exception\GuzzleException + */ + private function discoverNextQueue(bool $first = false): string + { + do { + $nextQueue = $this->nextQueueRoundRobin($first); + $first = false; + $queueIsNotReady = false; + $this->currentPrefetch = $this->prefetchCount; + $this->currentConsumeInterval = $this->consumeInterval; + + if ($this->preCheck || $this->autoPrefetch || $this->consumeIntervalMapping) { + $client = $this->getHttpClient(); + + $host = $this->config['hosts'][0]['api_host']; + $port = $this->config['hosts'][0]['api_port']; + $username = $this->config['hosts'][0]['user']; + $password = $this->config['hosts'][0]['password']; + + $scheme = $this->config['secure'] ? 'https://' : 'http://'; + + $url = $scheme . $host . ':' . $port; + + try { + $res = $client->get( + "$url/api/queues/%2F/$nextQueue", // %2F stands for / + [ + 'headers' => [ + 'Authorization' => 'Basic ' . base64_encode( + $username . ':' . $password + ) + ] + ] + ); + } catch (RequestException $e) { + if ((int) $e->getCode() === 404) { + logger()->warning('RabbitMQConsumer.discoverNextQueue.queueNotFound', [ + 'queue' => $nextQueue, + ]); + $queueIsNotReady = true; + continue; + } + throw $e; + } + + $queueData = json_decode($res->getBody()); + + $messages = $queueData->messages_ready ?? 0; + + logger()->info('RabbitMQConsumer.queues.dataRetrieved', [ + 'queue' => $nextQueue, + 'messagesReady' => $messages, + 'totalMessages' => $queueData->messages ?? 0, + ]); + + if ($this->preCheck) { + if ($messages === 0 || ($queueData->consumers ?? 0) >= 2) { + $queueIsNotReady = true; + logger()->info('RabbitMQConsumer.queues.precheck.failed', [ + 'queue' => $nextQueue, + 'workerName' => $this->name, + 'messagesReady' => $messages, + 'active-consumers' => $queueData->consumers ?? 0, + ]); + } + } + + if ($this->autoPrefetch && $messages > 0) { + $this->currentPrefetch = $messages >= $this->prefetchCount ? $this->prefetchCount : $messages; + logger()->info('RabbitMQConsumer.queues.currentPrefetch.set', [ + 'queue' => $nextQueue, + 'workerName' => $this->name, + 'prefetchCount' => $this->prefetchCount, + 'currentPrefetch' => $this->currentPrefetch, + 'messagesReady' => $messages + ]); + } + + if ($this->consumeIntervalMapping) { + foreach ($this->consumeIntervalMapping as $mapping) { + if ($mapping['range'] >= $messages) { + $this->currentConsumeInterval = (int) $mapping['interval']; + logger()->info('RabbitMQConsumer.queues.currentConsumeInterval.set', [ + 'queue' => $nextQueue, + 'workerName' => $this->name, + 'currentConsumeInterval' => $this->currentConsumeInterval, + 'messagesReady' => $queueData->messages_ready + ]); + break; + } + } + } + } + } while ($queueIsNotReady); + + return $nextQueue; + } + + /** + * Batch handler + * + * @param AMQPMessage $message + */ + private function batchHandler(AMQPMessage $message) + { + logger()->info('RabbitMQConsumer.batchHandler.addingCurrentMessage', [ + 'workerName' => $this->name, + 'existingMessagesCount' => count($this->currentMessages), + 'routingKey' => $message->getRoutingKey(), + 'currentPrefetch' => $this->currentPrefetch, + 'processedJob' => $this->processedJob, + ]); + + $this->currentMessages[] = $message; + $this->processedJob++; + if ($this->processedJob >= $this->currentPrefetch) { + if ($this->roundRobin) { + $this->stopConsume(); + $this->processBatch(); + $this->switchToNextQueue(); + } else { + $this->processBatch(); + } + } + } + + /** + * Single message handler + * + * @param AMQPMessage $message + */ + private function singleHandler(AMQPMessage $message) + { + if ($this->roundRobin) { + $this->stopConsume(); + $this->processMessage($message); + $this->switchToNextQueue(); + } else { + $this->processMessage($message); + } + } + + /** + * AMQP consuming logic + * + * @param string $queue + */ + private function startConsuming(string $queue) + { + $callback = function (AMQPMessage $message) use ($queue, &$callback): void { + if (!$this->isValidMessage($message, $queue)) { + return; + } + + if ($this->currentPrefetch > 1) { + $this->batchHandler($message); + } else { + $this->singleHandler($message); + } + }; + + logger()->info('RabbitMQConsumer.queues.startConsuming', [ + 'workerName' => $this->name, + 'newQueue' => $queue + ]); + + $this->connectionMutex->lock(static::MAIN_HANDLER_LOCK); + $this->channel->basic_consume( + $queue, + $this->consumerTag, + false, + false, + false, + false, + $callback + ); + $this->connectionMutex->unlock(static::MAIN_HANDLER_LOCK); + } + + /** + * StopConsume command (switch queue/exit) + */ + private function stopConsume() + { + logger()->info('RabbitMQConsumer.StopConsume', [ + 'workerName' => $this->name, + 'currentMessagesCount' => count($this->currentMessages), + 'currentPrefetch' => $this->currentPrefetch, + ]); + + $this->connectionMutex->lock(static::MAIN_HANDLER_LOCK); + $this->channel->basic_cancel($this->consumerTag, true); + $this->connectionMutex->unlock(static::MAIN_HANDLER_LOCK); + } + + /** + * Discovers existing queues via API + * + * @throws \GuzzleHttp\Exception\GuzzleException + */ + private function discoverQueues() + { + do { + $host = $this->config['hosts'][0]['api_host']; + $port = $this->config['hosts'][0]['api_port']; + $username = $this->config['hosts'][0]['user']; + $password = $this->config['hosts'][0]['password']; + $client = $this->getHttpClient(); + $scheme = $this->config['secure'] ? 'https://' : 'http://'; + $url = $scheme . $host . ':' . $port; + $res = $client->get( + "$url/api/queues/%2F?disable_stats=true&enable_queue_totals=true", + [ + 'headers' => [ + 'Authorization' => 'Basic ' . base64_encode( + $username . ':' . $password + ) + ] + ] + ); + $queues = json_decode($res->getBody()); + $queues = collect($queues) + ->filter(function ($queue) { + $readyMessages = $queue->messages_ready ?? 0; + return str_starts_with( + $queue->name, + $this->mask + ) && (($this->roundRobin && $readyMessages > 0) || !$this->roundRobin); + }) + ->pluck('name') + ->values(); + + // Shuffle $queues collection randomly + if ($this->roundRobin) { + $queues = $queues->shuffle(); + } + + $this->currentQueues = $queues->toArray(); + + if (count($this->currentQueues) === 0) { + $this->sleep($this->options->sleep); + } else { + logger()->info('RabbitMQConsumer.queues.discovered', [ + 'workerName' => $this->name, + 'queues' => collect($queues)->implode(', ') + ]); + } + } while (count($this->currentQueues) === 0); + } + + /** + * Process batch messages + */ + private function processBatch() + { + $this->processJobs(); + $this->processedJob = 0; + $this->currentMessages = []; + + $this->requeuePastQueuesMessages(); + + logger()->info('RabbitMQConsumer.BatchProcessed.ClearData', [ + 'workerName' => $this->name, + ]); + } + + /** + * Gets job class by AMQPMessage + * + * @param AMQPMessage $message + * @return mixed + */ + private function getJobByMessage(AMQPMessage $message): mixed + { + $jobClass = $this->queueConnection->getJobClass(); + /** @var RabbitMQJob $job */ + return new $jobClass( + $this->container, + $this->queueConnection, + $message, + $this->connectionName, + $this->queue + ); + } + + /** + * Single message process logic + * + * @param AMQPMessage $message + */ + private function processMessage(AMQPMessage $message) + { + $job = $this->getJobByMessage($message); + + if ($this->supportsAsyncSignals()) { + $this->registerTimeoutHandler($job, $this->options); + } + + logger()->info('RabbitMQConsumer.processMessage.before', [ + 'workerName' => $this->name, + 'jobsClass' => $job->getPayloadClass(), + 'routingKey' => $message->getRoutingKey(), + 'currentMessagesCount' => count($this->currentMessages), + 'prefetchCount' => $this->currentPrefetch, + ]); + + $this->runJob($job, $this->connectionName, $this->options); + + if ($this->supportsAsyncSignals()) { + $this->resetTimeoutHandler(); + } + + $this->requeuePastQueuesMessages(); + } + + private function isValidMessage(AMQPMessage $message, string $currentQueue): bool + { + // case when we receive messages from previously consumed queue + // it might be possible since for cancelling consuming we don't wait for response + if ($message->getRoutingKey() !== $currentQueue) { + logger()->warning('RabbitMQConsumer.messageFromPastQueue', [ + 'workerName' => $this->name, + 'currentQueue' => $currentQueue, + 'receivedQueue' => $message->getRoutingKey(), + ]); + // we don't send message back immediately to not process it again (in the theory) + // instead we collect it and send when we finished messages processing + $this->pastQueuesMessages[] = $message; + // if for some reason we collect a lot of messages from the different queues + if (count($this->pastQueuesMessages) >= $this->prefetchCount) { + logger()->warning('RabbitMQConsumer.messageFromPastQueue.moreThanPrefetchCount', [ + 'workerName' => $this->name, + 'currentQueue' => $currentQueue, + 'prefetchCount' => $this->prefetchCount, + 'pastQueuesMessagesCount' => count($this->pastQueuesMessages), + ]); + } + return false; + } + + return true; + } + + private function requeuePastQueuesMessages(): void + { + if (!$this->pastQueuesMessages) { + return; + } + + logger()->warning('RabbitMQConsumer.requeuePastQueuesMessages', [ + 'workerName' => $this->name, + 'pastQueuesMessagesCount' => count($this->pastQueuesMessages), + 'pastQueues' => array_unique( + array_map(fn (AMQPMessage $pastQueueMessage) => $pastQueueMessage->getRoutingKey(), $this->pastQueuesMessages) + ), + ]); + + $this->connectionMutex->lock(static::MAIN_HANDLER_LOCK); + foreach ($this->pastQueuesMessages as $message) { + try { + $message->nack(true); + } catch (\Throwable $e) { + logger()->error('RabbitMQConsumer.requeuePastQueuesMessages.failed', [ + 'workerName' => $this->name, + 'messageQueue' => $message->getRoutingKey(), + 'errorMessage' => $e->getMessage(), + 'errorTrace' => $e->getTraceAsString(), + ]); + } + } + $this->connectionMutex->unlock(static::MAIN_HANDLER_LOCK); + $this->pastQueuesMessages = []; + } + + /** + * Process current batch + */ + private function processJobs() + { + if (count($this->currentMessages) === 0) { + logger()->warning('RabbitMQConsumer.jobs.empty', [ + 'workerName' => $this->name, + ]); + return; + } + + $class = $this->getJobByMessage($this->currentMessages[0])->getPayloadClass(); + + $sameClass = collect($this->currentMessages)->every(function (AMQPMessage $message) use ($class) { + return $this->getJobByMessage($message)->getPayloadClass() === $class; + }); + + $reflection = new \ReflectionClass($class); + + if (count($this->currentMessages) > 1 && $sameClass && $reflection->implementsInterface( + RabbitMQBatchable::class + )) { + $batchData = []; + foreach ($this->currentMessages as $message) { + $job = $this->getJobByMessage($message); + $batchData[] = $job->getPayloadData(); + } + + $routingKeys = array_map(fn ($currentMessage) => $currentMessage->getRoutingKey(), $this->currentMessages); + $routingKeys = array_unique($routingKeys); + if (count($routingKeys) > 1) { + logger()->warning('RabbitMQConsumer.IncorrectGroupedRoutingKeys', [ + 'workerName' => $this->name, + 'routingKeys' => $routingKeys, + ]); + } + + $failed = false; + try { + $class::collection($batchData); + logger()->info('RabbitMQConsumer.jobs.process.done', [ + 'workerName' => $this->name, + 'jobsCount' => count($this->currentMessages), + 'jobsClass' => $class, + 'routingKey' => $this->currentMessages[0]->getRoutingKey(), + 'currentPrefetchCount' => $this->currentPrefetch, + 'currentQueue' => $this->currentQueues[$this->currentQueue] ?? null, + ]); + } catch (\Throwable $exception) { + $failed = true; + logger()->error('RabbitMQConsumer.batch.process.failed', [ + 'workerName' => $this->name, + 'message' => $exception->getMessage(), + 'trace' => $exception->getTraceAsString(), + 'jobsClass' => $class, + 'routingKey' => $this->currentMessages[0]->getRoutingKey(), + 'currentPrefetchCount' => $this->currentPrefetch, + 'currentQueue' => $this->currentQueues[$this->currentQueue] ?? null, + ]); + } + $this->connectionMutex->lock(static::MAIN_HANDLER_LOCK); + /** @var AMQPMessage $message */ + foreach ($this->currentMessages as $message) { + if ($failed) { + $this->processMessage($message); + } else { + try { + $message->ack(); // TODO group ack + } catch (\Throwable $exception) { + logger()->error('RabbitMQConsumer.ack.failed', [ + 'workerName' => $this->name, + 'message' => $exception->getMessage(), + 'trace' => $exception->getTraceAsString(), + 'jobsClass' => $class, + 'routingKey' => $message->getRoutingKey() + ]); + } + } + } + $this->connectionMutex->unlock(static::MAIN_HANDLER_LOCK); + $this->processed += count($this->currentMessages); + return; + } + + /** @var AMQPMessage $message */ + foreach ($this->currentMessages as $message) { + $this->processMessage($message); + } + $this->processed += count($this->currentMessages); + } + + private function startHeartbeatCheck(): void + { + if (!$this->isAsyncMode()) { + return; + } + $heartbeatInterval = (int) ($this->config['options']['heartbeat'] ?? 0); + if (!$heartbeatInterval) { + return; + } + + $heartbeatHandler = function () { + if ($this->shouldQuit || !is_null($this->workerExitCode)) { + return; + } + + try { + $connection = $this->channel->getConnection(); + if (!$connection?->isConnected() + || $connection->isWriting() + || $connection->isBlocked() + ) { + return; + } + + $this->connectionMutex->lock(static::HEALTHCHECK_HANDLER_LOCK, 3); + $connection->checkHeartBeat(); + } catch (MutexTimeout) { + } catch (\Throwable $e) { + logger()->error('RabbitMQConsumer.heartbeatCheck.error', [ + 'message' => $e->getMessage(), + 'trace' => $e->getTraceAsString(), + 'workerName' => $this->name, + ]); + $this->shouldQuit = true; + } finally { + $this->connectionMutex->unlock(static::HEALTHCHECK_HANDLER_LOCK); + } + }; + + \go(function () use ($heartbeatHandler, $heartbeatInterval) { + logger()->info('RabbitMQConsumer.heartbeatCheck.started'); + while (true) { + sleep($heartbeatInterval); + $heartbeatHandler(); + if ($this->shouldQuit || !is_null($this->workerExitCode)) { + return; + } + } + }); + } + + /** + * @return Client + */ + private function getHttpClient(): Client + { + return new Client([ + RequestOptions::TIMEOUT => 30, + RequestOptions::CONNECT_TIMEOUT => 30, + ]); + } +} diff --git a/src/Console/BatchableConsumeCommand.php b/src/Console/BatchableConsumeCommand.php new file mode 100644 index 00000000..587de810 --- /dev/null +++ b/src/Console/BatchableConsumeCommand.php @@ -0,0 +1,121 @@ +setContainer($this->laravel); + $consumer->setName($this->option('name')); + $consumer->setConsumerTag($this->consumerTag()); + $consumer->setMaxPriority((int) $this->option('max-priority')); + $consumer->setPrefetchSize((int) $this->option('prefetch-size')); + $consumer->setPrefetchCount((int) $this->option('prefetch-count')); + $consumer->setMask((string) $this->option('mask')); + $consumer->setPreCheck((bool) $this->option('precheck')); + $consumer->setRoundRobin((bool) $this->option('roundrobin')); + $consumer->setAutoPrefetch((bool) $this->option('auto-prefetch')); + $consumer->setAsyncMode((bool) $this->option('async-mode')); + $consumer->setConsumeInterval((int) $this->option('consume-interval')); + + $consumeIntervalMapping = $this->option('consume-interval-mapping'); + if ($consumeIntervalMapping !== 'false') { + $intervals = explode(';', $consumeIntervalMapping); + foreach ($intervals as &$interval) { + $values = explode(':', $interval); + $interval = [ + 'range' => $values[0], + 'interval' => $values[1], + ]; + } + $intervals = collect($intervals)->sortBy('range')->values()->toArray(); + $consumer->setConsumeIntervalMapping($intervals); + } + + if ($this->downForMaintenance() && $this->option('once')) { + $this->worker->sleep($this->option('sleep')); + return 0; + } + + $this->listenForEvents(); + + $connection = $this->argument('connection') + ?: $this->laravel['config']['queue.default']; + + $queue = $this->getQueue($connection); + + if (Terminal::hasSttyAvailable()) { + $this->components->info( + sprintf('Processing jobs using the [%s] %s.', $consumer->getMask(), str('mask')->plural(explode(',', $queue))) + ); + } + + return $this->runWorker( + $connection, $queue + ); + } + + /** + * @return string + */ + protected function consumerTag(): string + { + if ($consumerTag = $this->option('consumer-tag')) { + return $consumerTag; + } + + $consumerTag = implode('_', [ + Str::slug(config('app.name', 'laravel')), + Str::slug($this->option('name')), + md5(serialize($this->options()).Str::random(16).getmypid()), + ]); + + return Str::substr($consumerTag, 0, 255); + } +} diff --git a/src/Console/GarbageCollector.php b/src/Console/GarbageCollector.php new file mode 100644 index 00000000..15323c2c --- /dev/null +++ b/src/Console/GarbageCollector.php @@ -0,0 +1,140 @@ +config = $config; + parent::__construct(); + } + + /** + * Execute the console command. + * + * @return mixed + * @throws \GuzzleHttp\Exception\GuzzleException + */ + public function handle() + { + $scheme = $this->config['secure'] ? 'https://' : 'http://'; + $host = $this->config['hosts'][0]['api_host']; + $port = $this->config['hosts'][0]['api_port']; + $username = $this->config['hosts'][0]['user']; + $password = $this->config['hosts'][0]['password']; + $client = new Client(); + $url = $host . ':' . $port; + $tries = 0; + while ($tries < 5) { + $tries++; + try { + $res = $client->get( + "{$scheme}{$url}/api/queues/%2F?disable_stats=true&enable_queue_totals=true", + [ + 'headers' => [ + 'Authorization' => 'Basic ' . base64_encode( + $username . ':' . $password + ) + ] + ] + ); + $queues = json_decode($res->getBody()); + } catch (\Throwable $exception) { + logger()->warning('RabbitMQ Garbage Collector failed to get queues', [ + 'message' => $exception->getMessage() + ]); + $queues = []; + } + } + + if (!isset($queues)) { + $queues = []; + } + + $dlqTargets = []; + foreach ($queues as $queue) { + $arguments = $queue->arguments ?? null; + if (!empty($arguments)) { + $dlx = $arguments->{'x-dead-letter-exchange'} ?? null; + $dlk = $arguments->{'x-dead-letter-routing-key'} ?? null; + + if (empty($dlk) || 0 === ($queue->messages ?? 0)) { + continue; + } + + $dlqTargets[$dlk] = !empty($dlx) + ? $dlx + : $dlk; + } + } + + + $queuesToRemove = collect($queues) + ->filter(function ($queue) use ($dlqTargets) { + $messages = $queue->messages ?? 0; + return ($queue->name !== 'default') + && !str_contains($queue->name, 'failed') + && !str_contains($queue->name, 'dlq') + && !isset($dlqTargets[$queue->name]) + && $messages === 0; + }) + ->pluck('name') + ->values() + ->toArray(); + + logger()->info('RabbitMQ Garbage Collector loaded queues filtered', [ + 'queues_count' => count($queues ?? 0), + 'queues_filtered' => count($queuesToRemove ?? 0) + ]); + + foreach ($queuesToRemove as $queue) { + try { + $client->delete( + "{$scheme}$url/api/queues/%2F/{$queue}?if-empty=true&if-unused=true", // %2F stands for / + [ + 'headers' => [ + 'Authorization' => 'Basic ' . base64_encode( + $username . ':' . $password + ) + ] + ] + ); + $this->info("RabbitMQ. Delete $queue queue"); + } catch (\Throwable $exception) { + $this->warn("Was not able to remove $queue with error {$exception->getMessage()}"); + logger()->warning('RabbitMQ Garbage Collector failed to remove queue', [ + 'queue' => $queue, + 'message' => $exception->getMessage(), + 'trace' => $exception->getTraceAsString() + ]); + } + } + + $this->info('Garbage collector finished'); + } +} diff --git a/src/Consumer.php b/src/Consumer.php index ed3d8099..b71297d9 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -4,6 +4,9 @@ use Exception; use Illuminate\Container\Container; +use Illuminate\Contracts\Debug\ExceptionHandler; +use Illuminate\Contracts\Events\Dispatcher; +use Illuminate\Contracts\Queue\Factory as QueueManager; use Illuminate\Queue\Worker; use Illuminate\Queue\WorkerOptions; use PhpAmqpLib\Channel\AMQPChannel; @@ -14,6 +17,9 @@ class Consumer extends Worker { + protected const MAIN_HANDLER_LOCK = 'main_handler'; + protected const HEALTHCHECK_HANDLER_LOCK = 'healthcheck_handler'; + /** @var Container */ protected $container; @@ -35,6 +41,38 @@ class Consumer extends Worker /** @var object|null */ protected $currentJob; + /** @var array */ + protected array $config; + + protected bool $asyncMode = false; + + protected int $consumeInterval = 60; + + protected ?Mutex $connectionMutex = null; + + /** + * Create a new queue worker. + * + * @param \Illuminate\Contracts\Queue\Factory $manager + * @param \Illuminate\Contracts\Events\Dispatcher $events + * @param \Illuminate\Contracts\Debug\ExceptionHandler $exceptions + * @param callable $isDownForMaintenance + * @param callable|null $resetScope + * @return void + */ + public function __construct( + QueueManager $manager, + Dispatcher $events, + ExceptionHandler $exceptions, + callable $isDownForMaintenance, + callable $resetScope = null, + array $config = [] + ) { + $this->config = $config; + + parent::__construct($manager, $events, $exceptions, $isDownForMaintenance, $resetScope); + } + public function setContainer(Container $value): void { $this->container = $value; @@ -60,6 +98,30 @@ public function setPrefetchCount(int $value): void $this->prefetchCount = $value; } + /** + * @return bool + */ + public function isAsyncMode(): bool + { + return $this->asyncMode; + } + + /** + * @param bool $asyncMode + */ + public function setAsyncMode(bool $asyncMode): void + { + $this->asyncMode = $asyncMode; + } + + /** + * @param int $consumeInterval + */ + public function setConsumeInterval(int $consumeInterval): void + { + $this->consumeInterval = $consumeInterval; + } + /** * Listen to the given queue in a loop. * @@ -83,12 +145,15 @@ public function daemon($connectionName, $queue, WorkerOptions $options) $connection = $this->manager->connection($connectionName); $this->channel = $connection->getChannel(); + $this->connectionMutex = new Mutex(false); + $this->connectionMutex->lock(static::MAIN_HANDLER_LOCK); $this->channel->basic_qos( $this->prefetchSize, $this->prefetchCount, false ); + $this->connectionMutex->unlock(static::MAIN_HANDLER_LOCK); $jobClass = $connection->getJobClass(); $arguments = []; @@ -96,6 +161,7 @@ public function daemon($connectionName, $queue, WorkerOptions $options) $arguments['priority'] = ['I', $this->maxPriority]; } + $this->connectionMutex->lock(static::MAIN_HANDLER_LOCK); $this->channel->basic_consume( $queue, $this->consumerTag, @@ -133,6 +199,7 @@ function (AMQPMessage $message) use ($connection, $options, $connectionName, $qu null, $arguments ); + $this->connectionMutex->unlock(static::MAIN_HANDLER_LOCK); while ($this->channel->is_consuming()) { // Before reserving any jobs, we will make sure this queue is not paused and @@ -146,7 +213,9 @@ function (AMQPMessage $message) use ($connection, $options, $connectionName, $qu // If the daemon should run (not in maintenance mode, etc.), then we can wait for a job. try { + $this->connectionMutex->lock(static::MAIN_HANDLER_LOCK); $this->channel->wait(null, true, (int) $options->timeout); + $this->connectionMutex->unlock(static::MAIN_HANDLER_LOCK); } catch (AMQPRuntimeException $exception) { $this->exceptions->report($exception); @@ -201,10 +270,15 @@ protected function daemonShouldRun(WorkerOptions $options, $connectionName, $que */ public function stop($status = 0, $options = null) { + $this->connectionMutex->lock(static::MAIN_HANDLER_LOCK); + // Tell the server you are going to stop consuming. // It will finish up the last message and not send you any more. $this->channel->basic_cancel($this->consumerTag, false, true); + $stoppingStatus = parent::stop($status, $options); + + $this->connectionMutex->unlock(static::MAIN_HANDLER_LOCK); - return parent::stop($status, $options); + return $stoppingStatus; } } diff --git a/src/Exceptions/MutexTimeout.php b/src/Exceptions/MutexTimeout.php new file mode 100644 index 00000000..5f454bff --- /dev/null +++ b/src/Exceptions/MutexTimeout.php @@ -0,0 +1,7 @@ + $batch + * @return mixed + */ + public static function collection(array $batch): void; +} diff --git a/src/LaravelQueueRabbitMQServiceProvider.php b/src/LaravelQueueRabbitMQServiceProvider.php index ee46d6cd..161a75fd 100644 --- a/src/LaravelQueueRabbitMQServiceProvider.php +++ b/src/LaravelQueueRabbitMQServiceProvider.php @@ -5,7 +5,9 @@ use Illuminate\Contracts\Debug\ExceptionHandler; use Illuminate\Queue\QueueManager; use Illuminate\Support\ServiceProvider; +use VladimirYuldashev\LaravelQueueRabbitMQ\Console\BatchableConsumeCommand; use VladimirYuldashev\LaravelQueueRabbitMQ\Console\ConsumeCommand; +use VladimirYuldashev\LaravelQueueRabbitMQ\Console\GarbageCollector; use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Connectors\RabbitMQConnector; class LaravelQueueRabbitMQServiceProvider extends ServiceProvider @@ -21,7 +23,29 @@ public function register(): void ); if ($this->app->runningInConsole()) { - $this->app->singleton('rabbitmq.consumer', function () { + $this->app->singleton(BatchableConsumer::class, function () { + $isDownForMaintenance = function () { + return $this->app->isDownForMaintenance(); + }; + + return new BatchableConsumer( + $this->app['queue'], + $this->app['events'], + $this->app[ExceptionHandler::class], + $isDownForMaintenance, + null, + $this->app['config']['queue']['connections']['rabbitmq'] + ); + }); + + $this->app->singleton(BatchableConsumeCommand::class, static function ($app) { + return new BatchableConsumeCommand( + $app[BatchableConsumer::class], + $app['cache.store'] + ); + }); + + $this->app->singleton(Consumer::class, function () { $isDownForMaintenance = function () { return $this->app->isDownForMaintenance(); }; @@ -30,22 +54,31 @@ public function register(): void $this->app['queue'], $this->app['events'], $this->app[ExceptionHandler::class], - $isDownForMaintenance + $isDownForMaintenance, + null, + $this->app['config']['queue']['connections']['rabbitmq'] ); }); $this->app->singleton(ConsumeCommand::class, static function ($app) { return new ConsumeCommand( - $app['rabbitmq.consumer'], + $app[Consumer::class], $app['cache.store'] ); }); $this->commands([ Console\ConsumeCommand::class, + Console\BatchableConsumeCommand::class, ]); } + $this->app->singleton(GarbageCollector::class, static function ($app) { + return new GarbageCollector( + $app['config']['queue']['connections']['rabbitmq'] + ); + }); + $this->commands([ Console\ExchangeDeclareCommand::class, Console\ExchangeDeleteCommand::class, @@ -53,6 +86,7 @@ public function register(): void Console\QueueDeclareCommand::class, Console\QueueDeleteCommand::class, Console\QueuePurgeCommand::class, + Console\GarbageCollector::class, ]); } diff --git a/src/Mutex.php b/src/Mutex.php new file mode 100644 index 00000000..150eb994 --- /dev/null +++ b/src/Mutex.php @@ -0,0 +1,67 @@ +availableLocksPool = new \Swoole\Coroutine\Channel(); + } elseif (extension_loaded('openswoole')) { + $this->availableLocksPool = new \OpenSwoole\Coroutine\Channel(); + } + + // initialize pool with 1 available lock + $this->availableLocksPool?->push(true); + } + + public function unlock(string $initiator): void + { + if ($this->availableLocksPool === null) { + return; + } + + if ($this->currentLockInitiator !== $initiator) { + return; + } + $this->currentLockInitiator = null; + if (!$this->hasAvailableLock()) { + $this->availableLocksPool->push(true); + } + } + + public function lock(string $initiator, float $timeout = null): void + { + if ($this->availableLocksPool === null) { + return; + } + + // if the same initiator tries to lock, we allow it and ignore + if (!$this->hasAvailableLock() && $this->currentLockInitiator === $initiator) { + return; + } + + $hasAvailableLock = $this->availableLocksPool->pop($timeout ?: self::DEFAULT_WAITING_CHANNEL_TIMEOUT); + if (!$hasAvailableLock) { + throw new MutexTimeout('Mutex error on trying to acquire lock'); + } + $this->currentLockInitiator = $initiator; + } + + public function hasAvailableLock(): bool + { + return !$this->availableLocksPool?->isEmpty(); + } +} diff --git a/src/Queue/Connection/ConnectionFactory.php b/src/Queue/Connection/ConnectionFactory.php index df19f223..6d9dbd56 100644 --- a/src/Queue/Connection/ConnectionFactory.php +++ b/src/Queue/Connection/ConnectionFactory.php @@ -39,6 +39,11 @@ public static function make(array $config = []): AbstractConnection $connection = self::getConnectionFromConfig($config); $connectionConfig = ConfigFactory::make($config); + if ($connectionConfig->isSecure()) { + $connectionConfig->setNetworkProtocol('ssl'); + $connectionConfig->setSslVerify(false); + } + /** * Todo [Major]: * - Remove if statement and contents. diff --git a/src/Queue/Jobs/RabbitMQJob.php b/src/Queue/Jobs/RabbitMQJob.php index abcdfab4..541cd37b 100644 --- a/src/Queue/Jobs/RabbitMQJob.php +++ b/src/Queue/Jobs/RabbitMQJob.php @@ -127,7 +127,7 @@ public function release($delay = 0): void parent::release(); // Always create a new message when this Job is released - $this->rabbitmq->laterRaw($delay, $this->message->getBody(), $this->queue, $this->attempts()); + $this->rabbitmq->laterRaw($delay, $this->message->getBody(), $this->message->getRoutingKey(), $this->attempts()); // Releasing a Job means the message was failed to process. // Because this Job message is always recreated and pushed as new message, this Job message is correctly handled. diff --git a/src/Queue/Jobs/RabbitMQJobBatchable.php b/src/Queue/Jobs/RabbitMQJobBatchable.php new file mode 100644 index 00000000..3303413f --- /dev/null +++ b/src/Queue/Jobs/RabbitMQJobBatchable.php @@ -0,0 +1,60 @@ +payload(); + + [$class, $method] = JobName::parse($payload['job']); + + ($this->instance = $this->resolve($class))->{$method}($this, $payload['data']); + } + + /** + * Returns target class name + * + * @return mixed + */ + public function getPayloadClass(): string + { + $payload = $this->payload(); + + return $payload['data']['commandName']; + } + + /** + * @return object + * @throws \RuntimeException + */ + public function getPayloadData(): object + { + $payload = $this->payload(); + + $data = $payload['data']; + + if (str_starts_with($data['command'], 'O:')) { + return unserialize($data['command']); + } + + if ($this->container->bound(Encrypter::class)) { + return unserialize($this->container[Encrypter::class]->decrypt($data['command'])); + } + + throw new \RuntimeException('Unable to extract job data.'); + } +} diff --git a/src/Queue/RabbitMQQueue.php b/src/Queue/RabbitMQQueue.php index 57c23e64..f43747ff 100644 --- a/src/Queue/RabbitMQQueue.php +++ b/src/Queue/RabbitMQQueue.php @@ -54,7 +54,7 @@ class RabbitMQQueue extends Queue implements QueueContract, RabbitMQQueueContrac /** * Current job being processed. */ - protected RabbitMQJob $currentJob; + protected ?RabbitMQJob $currentJob = null; /** * Holds the Configuration @@ -164,7 +164,7 @@ public function laterRaw($delay, string $payload, $queue = null, int $attempts = [$mainDestination, $exchange, $exchangeType, $attempts] = $this->publishProperties($queue, $options); $this->declareDestination($mainDestination, $exchange, $exchangeType); - $destination = $this->getQueue($queue).'.delay.'.$ttl; + $destination = 'delay.' . $ttl . '.' . $this->getQueue($queue); $this->declareQueue($destination, true, false, $this->getDelayQueueArguments($this->getQueue($queue), $ttl)); @@ -400,10 +400,6 @@ public function isQueueExists(string $name = null): bool { $queueName = $this->getQueue($name); - if ($this->isQueueDeclared($queueName)) { - return true; - } - try { // create a temporary channel, so the main channel will not be closed on exception $channel = $this->createChannel(); @@ -431,11 +427,8 @@ public function declareQueue( bool $autoDelete = false, array $arguments = [] ): void { - if ($this->isQueueDeclared($name)) { - return; - } - - $this->getChannel()->queue_declare( + $channel = $this->createChannel(); + $channel->queue_declare( $name, false, $durable, @@ -444,6 +437,7 @@ public function declareQueue( false, new AMQPTable($arguments) ); + $channel->close(); } /** @@ -504,6 +498,10 @@ public function ack(RabbitMQJob $job): void */ public function reject(RabbitMQJob $job, bool $requeue = false): void { + logger()->error('RabbitMQJob.markAsFailed.reject', [ + 'trace' => debug_backtrace(options: DEBUG_BACKTRACE_IGNORE_ARGS), + ]); + $this->getChannel()->basic_reject($job->getRabbitMQMessage()->getDeliveryTag(), $requeue); } @@ -577,13 +575,13 @@ protected function getRandomId(): string */ public function close(): void { - if (! $this->currentJob->isDeletedOrReleased()) { + if ($this->currentJob && ! $this->currentJob->isDeletedOrReleased()) { $this->reject($this->currentJob, true); } try { $this->getConnection()->close(); - } catch (ErrorException) { + } catch (\Throwable) { // Ignore the exception } } @@ -699,16 +697,11 @@ protected function declareDestination(string $destination, ?string $exchange = n if ($exchange && ! $this->isExchangeExists($exchange)) { $this->declareExchange($exchange, $exchangeType); } - - // When an exchange is provided, just return. - if ($exchange) { - return; - } - - // When the queue already exists, just return. - if ($this->isQueueExists($destination)) { - return; - } +// +// // When the queue already exists, just return. +// if ($this->isQueueExists($destination)) { +// return; +// } // Create a queue for amq.direct publishing. $this->declareQueue($destination, true, false, $this->getQueueArguments($destination)); diff --git a/src/Queue/RabbitMQQueueBatchable.php b/src/Queue/RabbitMQQueueBatchable.php new file mode 100644 index 00000000..497c3eee --- /dev/null +++ b/src/Queue/RabbitMQQueueBatchable.php @@ -0,0 +1,54 @@ +reconnect(); + parent::publishBasic($msg, $exchange, $destination, $mandatory, $immediate, $ticket); + } + } + + protected function publishBatch($jobs, $data = '', $queue = null): void + { + try { + parent::publishBatch($jobs, $data, $queue); + } catch (AMQPConnectionClosedException|AMQPChannelClosedException) { + $this->reconnect(); + parent::publishBatch($jobs, $data, $queue); + } + } + + protected function createChannel(): AMQPChannel + { + try { + return parent::createChannel(); + } catch (AMQPConnectionClosedException) { + $this->reconnect(); + return parent::createChannel(); + } + } + + public function push($job, $data = '', $queue = null) + { + $queue = $queue ?: $job->onQueue(); + return parent::push($job, $data, $queue); + } + + + public function pushRaw($payload, $queue = null, array $options = []): int|string|null + { + return parent::pushRaw($payload, $queue, $options); + } +}