Skip to content

Commit 4b86963

Browse files
author
Haziman
committed
fixing: blocker for spiral/roadrunner support
1 parent d87656c commit 4b86963

File tree

2 files changed

+20
-5
lines changed

2 files changed

+20
-5
lines changed

src/Queue/QueueFactory.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
use Illuminate\Support\Arr;
66
use VladimirYuldashev\LaravelQueueRabbitMQ\Horizon\RabbitMQQueue as HorizonRabbitMQQueue;
7+
use VladimirYuldashev\LaravelQueueRabbitMQ\SpiralRoadrunner\RabbitMQQueue as SpiralRoadRunnerRabbitMQQueue;
78

89
class QueueFactory
910
{

src/SpiralRoadrunner/RabbitMQQueue.php

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,27 @@
44

55
use Illuminate\Contracts\Container\BindingResolutionException;
66
use Illuminate\Contracts\Events\Dispatcher;
7+
use Illuminate\Contracts\Queue\ShouldBeEncrypted;
78
use Illuminate\Support\Str;
9+
use Illuminate\Support\Facades\Crypt;
810
use Laravel\Horizon\Events\JobDeleted;
911
use Laravel\Horizon\Events\JobPushed;
1012
use Laravel\Horizon\Events\JobReserved;
1113
use Laravel\Horizon\JobPayload;
12-
use PhpAmqpLib\Exception\AMQPProtocolChannelException;
1314
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob;
1415
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue as BaseRabbitMQQueue;
1516

17+
use PhpAmqpLib\Channel\AMQPChannel;
18+
use PhpAmqpLib\Connection\AbstractConnection;
19+
use PhpAmqpLib\Exception\AMQPChannelClosedException;
20+
use PhpAmqpLib\Exception\AMQPConnectionBlockedException;
21+
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
22+
use PhpAmqpLib\Exception\AMQPProtocolChannelException;
23+
use PhpAmqpLib\Exception\AMQPRuntimeException;
24+
use PhpAmqpLib\Exchange\AMQPExchangeType;
25+
use PhpAmqpLib\Message\AMQPMessage;
26+
use PhpAmqpLib\Wire\AMQPTable;
27+
1628
class RabbitMQQueue extends BaseRabbitMQQueue
1729
{
1830
/**
@@ -36,7 +48,7 @@ function ($payload, $queue) use ($job) {
3648
/**
3749
* Create a AMQP message.
3850
*/
39-
protected function createMessage($payload, int $attempts = 0): array
51+
protected function createMessage($payload, int $attempts = 0, $jobClass = null): array
4052
{
4153
$properties = [
4254
'content_type' => 'application/json',
@@ -64,12 +76,16 @@ protected function createMessage($payload, int $attempts = 0): array
6476
}
6577
}
6678

79+
$properties['payload'] = $payload;
80+
6781
$message = new AMQPMessage($payload, $properties);
6882

6983
$message->set('application_headers', new AMQPTable([
7084
'laravel' => [
7185
'attempts' => $attempts,
7286
],
87+
'rr_id' => $correlationId,
88+
'rr_job' => $jobClass
7389
]));
7490

7591
return [
@@ -89,9 +105,7 @@ public function pushRaw($payload, $queue = null, array $options = []): int|strin
89105

90106
$this->declareDestination($destination, $exchange, $exchangeType);
91107

92-
[$message, $correlationId] = $this->createMessage($payload, $attempts);
93-
94-
dump($message);
108+
[$message, $correlationId] = $this->createMessage($payload, $attempts, $options['jobClass']);
95109

96110
$this->publishBasic($message, $exchange, $destination, true);
97111

0 commit comments

Comments
 (0)