From 0c413f8ad93af80f6ba73184e4f3def34319bef6 Mon Sep 17 00:00:00 2001 From: Haziman Hashim <2503209+jimanx2@users.noreply.github.com> Date: Sat, 15 Mar 2025 15:47:27 +0800 Subject: [PATCH 1/6] Update composer.json --- composer.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/composer.json b/composer.json index 1a91d725..c712f524 100644 --- a/composer.json +++ b/composer.json @@ -11,7 +11,7 @@ "require": { "php": "^8.0", "ext-json": "*", - "illuminate/queue": "^10.0|^11.0", + "illuminate/queue": "^10.0|^11.0|^12.0", "php-amqplib/php-amqplib": "^v3.6" }, "require-dev": { From 5191a2b06e7b7f01adae9b3d8facc79014de3cab Mon Sep 17 00:00:00 2001 From: Haziman Hashim <2503209+jimanx2@users.noreply.github.com> Date: Sat, 15 Mar 2025 15:49:24 +0800 Subject: [PATCH 2/6] Update composer.json --- composer.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/composer.json b/composer.json index c712f524..6c1a93d4 100644 --- a/composer.json +++ b/composer.json @@ -15,12 +15,12 @@ "php-amqplib/php-amqplib": "^v3.6" }, "require-dev": { - "phpunit/phpunit": "^10.0|^11.0", + "phpunit/phpunit": "^10.0|^11.0|^12.0", "mockery/mockery": "^1.0", "laravel/horizon": "^5.0", "orchestra/testbench": "^7.0|^8.0|^9.0", "laravel/pint": "^1.2", - "laravel/framework": "^9.0|^10.0|^11.0" + "laravel/framework": "^9.0|^10.0|^11.0|12.0" }, "autoload": { "psr-4": { From 903ddc042a468207845b364d6e469dd621af7ad6 Mon Sep 17 00:00:00 2001 From: Haziman Hashim <2503209+jimanx2@users.noreply.github.com> Date: Sun, 16 Mar 2025 10:30:51 +0800 Subject: [PATCH 3/6] Update RabbitMQQueue.php --- src/Queue/RabbitMQQueue.php | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/Queue/RabbitMQQueue.php b/src/Queue/RabbitMQQueue.php index 957620ef..6f24c90b 100644 --- a/src/Queue/RabbitMQQueue.php +++ b/src/Queue/RabbitMQQueue.php @@ -106,8 +106,10 @@ public function push($job, $data = '', $queue = null) $this->createPayload($job, $this->getQueue($queue), $data), $queue, null, - function ($payload, $queue) { - return $this->pushRaw($payload, $queue); + function ($payload, $queue) use ($job) { + return $this->pushRaw($payload, $queue, [ + 'job' => $job + ]); } ); } @@ -119,11 +121,11 @@ function ($payload, $queue) { */ public function pushRaw($payload, $queue = null, array $options = []): int|string|null { - [$destination, $exchange, $exchangeType, $attempts] = $this->publishProperties($queue, $options); + [$destination, $exchange, $exchangeType, $attempts, $job] = $this->publishProperties($queue, $options); $this->declareDestination($destination, $exchange, $exchangeType); - [$message, $correlationId] = $this->createMessage($payload, $attempts); + [$message, $correlationId] = $this->createMessage($payload, $attempts, $job); $this->publishBasic($message, $exchange, $destination, true); @@ -511,7 +513,7 @@ public function reject(RabbitMQJob $job, bool $requeue = false): void /** * Create a AMQP message. */ - protected function createMessage($payload, int $attempts = 0): array + protected function createMessage($payload, int $attempts = 0, string $job = null): array { $properties = [ 'content_type' => 'application/json', @@ -542,6 +544,8 @@ protected function createMessage($payload, int $attempts = 0): array $message = new AMQPMessage($payload, $properties); $message->set('application_headers', new AMQPTable([ + 'rr_job' => $job, + 'rr_id' => $correlationId, 'laravel' => [ 'attempts' => $attempts, ], From 08be1cda2d62a2e766a5125e18018c078ae3b5ad Mon Sep 17 00:00:00 2001 From: Haziman Date: Sun, 16 Mar 2025 10:39:16 +0800 Subject: [PATCH 4/6] Revert "Update RabbitMQQueue.php" This reverts commit 903ddc042a468207845b364d6e469dd621af7ad6. --- src/Queue/RabbitMQQueue.php | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/src/Queue/RabbitMQQueue.php b/src/Queue/RabbitMQQueue.php index 6f24c90b..957620ef 100644 --- a/src/Queue/RabbitMQQueue.php +++ b/src/Queue/RabbitMQQueue.php @@ -106,10 +106,8 @@ public function push($job, $data = '', $queue = null) $this->createPayload($job, $this->getQueue($queue), $data), $queue, null, - function ($payload, $queue) use ($job) { - return $this->pushRaw($payload, $queue, [ - 'job' => $job - ]); + function ($payload, $queue) { + return $this->pushRaw($payload, $queue); } ); } @@ -121,11 +119,11 @@ function ($payload, $queue) use ($job) { */ public function pushRaw($payload, $queue = null, array $options = []): int|string|null { - [$destination, $exchange, $exchangeType, $attempts, $job] = $this->publishProperties($queue, $options); + [$destination, $exchange, $exchangeType, $attempts] = $this->publishProperties($queue, $options); $this->declareDestination($destination, $exchange, $exchangeType); - [$message, $correlationId] = $this->createMessage($payload, $attempts, $job); + [$message, $correlationId] = $this->createMessage($payload, $attempts); $this->publishBasic($message, $exchange, $destination, true); @@ -513,7 +511,7 @@ public function reject(RabbitMQJob $job, bool $requeue = false): void /** * Create a AMQP message. */ - protected function createMessage($payload, int $attempts = 0, string $job = null): array + protected function createMessage($payload, int $attempts = 0): array { $properties = [ 'content_type' => 'application/json', @@ -544,8 +542,6 @@ protected function createMessage($payload, int $attempts = 0, string $job = null $message = new AMQPMessage($payload, $properties); $message->set('application_headers', new AMQPTable([ - 'rr_job' => $job, - 'rr_id' => $correlationId, 'laravel' => [ 'attempts' => $attempts, ], From d87656c76e4e3ec5eac2e2735fff8c4c7117c480 Mon Sep 17 00:00:00 2001 From: Haziman Date: Sun, 16 Mar 2025 17:10:29 +0800 Subject: [PATCH 5/6] enhancement: add support for spiral/roadrunner --- config/rabbitmq.php | 3 +- src/Queue/QueueFactory.php | 4 + src/SpiralRoadrunner/RabbitMQQueue.php | 108 +++++++++++++++++++++++++ 3 files changed, 113 insertions(+), 2 deletions(-) create mode 100644 src/SpiralRoadrunner/RabbitMQQueue.php diff --git a/config/rabbitmq.php b/config/rabbitmq.php index 4c102ce8..8299fe24 100644 --- a/config/rabbitmq.php +++ b/config/rabbitmq.php @@ -21,8 +21,7 @@ ], ], - 'options' => [ - ], + 'options' => [], /* * Set to "horizon" if you wish to use Laravel Horizon. diff --git a/src/Queue/QueueFactory.php b/src/Queue/QueueFactory.php index 75bde1ed..4acf9a3f 100644 --- a/src/Queue/QueueFactory.php +++ b/src/Queue/QueueFactory.php @@ -20,6 +20,10 @@ public static function make(array $config = []): RabbitMQQueue return new HorizonRabbitMQQueue($queueConfig); } + if (strtolower($worker) == 'spiral/roadrunner') { + return new SpiralRoadRunnerRabbitMQQueue($queueConfig); + } + return new $worker($queueConfig); } } diff --git a/src/SpiralRoadrunner/RabbitMQQueue.php b/src/SpiralRoadrunner/RabbitMQQueue.php new file mode 100644 index 00000000..18234e24 --- /dev/null +++ b/src/SpiralRoadrunner/RabbitMQQueue.php @@ -0,0 +1,108 @@ +enqueueUsing( + $job, + $this->createPayload($job, $this->getQueue($queue), $data), + $queue, + null, + function ($payload, $queue) use ($job) { + return $this->pushRaw($payload, $queue, ['jobClass' => $job]); + } + ); + } + + /** + * Create a AMQP message. + */ + protected function createMessage($payload, int $attempts = 0): array + { + $properties = [ + 'content_type' => 'application/json', + 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, + ]; + + $currentPayload = json_decode($payload, true); + if ($correlationId = $currentPayload['id'] ?? null) { + $properties['correlation_id'] = $correlationId; + } + + if ($this->getConfig()->isPrioritizeDelayed()) { + $properties['priority'] = $attempts; + } + + if (isset($currentPayload['data']['command'])) { + // If the command data is encrypted, decrypt it first before attempting to unserialize + if (is_subclass_of($currentPayload['data']['commandName'], ShouldBeEncrypted::class)) { + $currentPayload['data']['command'] = Crypt::decrypt($currentPayload['data']['command']); + } + + $commandData = unserialize($currentPayload['data']['command']); + if (property_exists($commandData, 'priority')) { + $properties['priority'] = $commandData->priority; + } + } + + $message = new AMQPMessage($payload, $properties); + + $message->set('application_headers', new AMQPTable([ + 'laravel' => [ + 'attempts' => $attempts, + ], + ])); + + return [ + $message, + $correlationId, + ]; + } + + /** + * {@inheritdoc} + * + * @throws AMQPProtocolChannelException + */ + public function pushRaw($payload, $queue = null, array $options = []): int|string|null + { + [$destination, $exchange, $exchangeType, $attempts] = $this->publishProperties($queue, $options); + + $this->declareDestination($destination, $exchange, $exchangeType); + + [$message, $correlationId] = $this->createMessage($payload, $attempts); + + dump($message); + + $this->publishBasic($message, $exchange, $destination, true); + + return $correlationId; + } + + /** + * {@inheritdoc} + */ + protected function getRandomId(): string + { + return Str::uuid(); + } +} From 4b86963212c631df35c8b190ffc88c4ed5ccf14a Mon Sep 17 00:00:00 2001 From: Haziman Date: Sun, 16 Mar 2025 18:29:47 +0800 Subject: [PATCH 6/6] fixing: blocker for spiral/roadrunner support --- src/Queue/QueueFactory.php | 1 + src/SpiralRoadrunner/RabbitMQQueue.php | 24 +++++++++++++++++++----- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/src/Queue/QueueFactory.php b/src/Queue/QueueFactory.php index 4acf9a3f..ab362a35 100644 --- a/src/Queue/QueueFactory.php +++ b/src/Queue/QueueFactory.php @@ -4,6 +4,7 @@ use Illuminate\Support\Arr; use VladimirYuldashev\LaravelQueueRabbitMQ\Horizon\RabbitMQQueue as HorizonRabbitMQQueue; +use VladimirYuldashev\LaravelQueueRabbitMQ\SpiralRoadrunner\RabbitMQQueue as SpiralRoadRunnerRabbitMQQueue; class QueueFactory { diff --git a/src/SpiralRoadrunner/RabbitMQQueue.php b/src/SpiralRoadrunner/RabbitMQQueue.php index 18234e24..94d67b42 100644 --- a/src/SpiralRoadrunner/RabbitMQQueue.php +++ b/src/SpiralRoadrunner/RabbitMQQueue.php @@ -4,15 +4,27 @@ use Illuminate\Contracts\Container\BindingResolutionException; use Illuminate\Contracts\Events\Dispatcher; +use Illuminate\Contracts\Queue\ShouldBeEncrypted; use Illuminate\Support\Str; +use Illuminate\Support\Facades\Crypt; use Laravel\Horizon\Events\JobDeleted; use Laravel\Horizon\Events\JobPushed; use Laravel\Horizon\Events\JobReserved; use Laravel\Horizon\JobPayload; -use PhpAmqpLib\Exception\AMQPProtocolChannelException; use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob; use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue as BaseRabbitMQQueue; +use PhpAmqpLib\Channel\AMQPChannel; +use PhpAmqpLib\Connection\AbstractConnection; +use PhpAmqpLib\Exception\AMQPChannelClosedException; +use PhpAmqpLib\Exception\AMQPConnectionBlockedException; +use PhpAmqpLib\Exception\AMQPConnectionClosedException; +use PhpAmqpLib\Exception\AMQPProtocolChannelException; +use PhpAmqpLib\Exception\AMQPRuntimeException; +use PhpAmqpLib\Exchange\AMQPExchangeType; +use PhpAmqpLib\Message\AMQPMessage; +use PhpAmqpLib\Wire\AMQPTable; + class RabbitMQQueue extends BaseRabbitMQQueue { /** @@ -36,7 +48,7 @@ function ($payload, $queue) use ($job) { /** * Create a AMQP message. */ - protected function createMessage($payload, int $attempts = 0): array + protected function createMessage($payload, int $attempts = 0, $jobClass = null): array { $properties = [ 'content_type' => 'application/json', @@ -64,12 +76,16 @@ protected function createMessage($payload, int $attempts = 0): array } } + $properties['payload'] = $payload; + $message = new AMQPMessage($payload, $properties); $message->set('application_headers', new AMQPTable([ 'laravel' => [ 'attempts' => $attempts, ], + 'rr_id' => $correlationId, + 'rr_job' => $jobClass ])); return [ @@ -89,9 +105,7 @@ public function pushRaw($payload, $queue = null, array $options = []): int|strin $this->declareDestination($destination, $exchange, $exchangeType); - [$message, $correlationId] = $this->createMessage($payload, $attempts); - - dump($message); + [$message, $correlationId] = $this->createMessage($payload, $attempts, $options['jobClass']); $this->publishBasic($message, $exchange, $destination, true);