diff --git a/composer.json b/composer.json index 1a91d725..6c1a93d4 100644 --- a/composer.json +++ b/composer.json @@ -11,16 +11,16 @@ "require": { "php": "^8.0", "ext-json": "*", - "illuminate/queue": "^10.0|^11.0", + "illuminate/queue": "^10.0|^11.0|^12.0", "php-amqplib/php-amqplib": "^v3.6" }, "require-dev": { - "phpunit/phpunit": "^10.0|^11.0", + "phpunit/phpunit": "^10.0|^11.0|^12.0", "mockery/mockery": "^1.0", "laravel/horizon": "^5.0", "orchestra/testbench": "^7.0|^8.0|^9.0", "laravel/pint": "^1.2", - "laravel/framework": "^9.0|^10.0|^11.0" + "laravel/framework": "^9.0|^10.0|^11.0|12.0" }, "autoload": { "psr-4": { diff --git a/config/rabbitmq.php b/config/rabbitmq.php index 4c102ce8..8299fe24 100644 --- a/config/rabbitmq.php +++ b/config/rabbitmq.php @@ -21,8 +21,7 @@ ], ], - 'options' => [ - ], + 'options' => [], /* * Set to "horizon" if you wish to use Laravel Horizon. diff --git a/src/Queue/QueueFactory.php b/src/Queue/QueueFactory.php index 75bde1ed..ab362a35 100644 --- a/src/Queue/QueueFactory.php +++ b/src/Queue/QueueFactory.php @@ -4,6 +4,7 @@ use Illuminate\Support\Arr; use VladimirYuldashev\LaravelQueueRabbitMQ\Horizon\RabbitMQQueue as HorizonRabbitMQQueue; +use VladimirYuldashev\LaravelQueueRabbitMQ\SpiralRoadrunner\RabbitMQQueue as SpiralRoadRunnerRabbitMQQueue; class QueueFactory { @@ -20,6 +21,10 @@ public static function make(array $config = []): RabbitMQQueue return new HorizonRabbitMQQueue($queueConfig); } + if (strtolower($worker) == 'spiral/roadrunner') { + return new SpiralRoadRunnerRabbitMQQueue($queueConfig); + } + return new $worker($queueConfig); } } diff --git a/src/SpiralRoadrunner/RabbitMQQueue.php b/src/SpiralRoadrunner/RabbitMQQueue.php new file mode 100644 index 00000000..94d67b42 --- /dev/null +++ b/src/SpiralRoadrunner/RabbitMQQueue.php @@ -0,0 +1,122 @@ +enqueueUsing( + $job, + $this->createPayload($job, $this->getQueue($queue), $data), + $queue, + null, + function ($payload, $queue) use ($job) { + return $this->pushRaw($payload, $queue, ['jobClass' => $job]); + } + ); + } + + /** + * Create a AMQP message. + */ + protected function createMessage($payload, int $attempts = 0, $jobClass = null): array + { + $properties = [ + 'content_type' => 'application/json', + 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, + ]; + + $currentPayload = json_decode($payload, true); + if ($correlationId = $currentPayload['id'] ?? null) { + $properties['correlation_id'] = $correlationId; + } + + if ($this->getConfig()->isPrioritizeDelayed()) { + $properties['priority'] = $attempts; + } + + if (isset($currentPayload['data']['command'])) { + // If the command data is encrypted, decrypt it first before attempting to unserialize + if (is_subclass_of($currentPayload['data']['commandName'], ShouldBeEncrypted::class)) { + $currentPayload['data']['command'] = Crypt::decrypt($currentPayload['data']['command']); + } + + $commandData = unserialize($currentPayload['data']['command']); + if (property_exists($commandData, 'priority')) { + $properties['priority'] = $commandData->priority; + } + } + + $properties['payload'] = $payload; + + $message = new AMQPMessage($payload, $properties); + + $message->set('application_headers', new AMQPTable([ + 'laravel' => [ + 'attempts' => $attempts, + ], + 'rr_id' => $correlationId, + 'rr_job' => $jobClass + ])); + + return [ + $message, + $correlationId, + ]; + } + + /** + * {@inheritdoc} + * + * @throws AMQPProtocolChannelException + */ + public function pushRaw($payload, $queue = null, array $options = []): int|string|null + { + [$destination, $exchange, $exchangeType, $attempts] = $this->publishProperties($queue, $options); + + $this->declareDestination($destination, $exchange, $exchangeType); + + [$message, $correlationId] = $this->createMessage($payload, $attempts, $options['jobClass']); + + $this->publishBasic($message, $exchange, $destination, true); + + return $correlationId; + } + + /** + * {@inheritdoc} + */ + protected function getRandomId(): string + { + return Str::uuid(); + } +}