|
9 | 9 | use Illuminate\Contracts\Queue\Queue as QueueContract; |
10 | 10 | use Illuminate\Queue\Queue; |
11 | 11 | use Illuminate\Support\Arr; |
| 12 | +use Illuminate\Support\Facades\Redis; |
12 | 13 | use Illuminate\Support\Str; |
13 | 14 | use JsonException; |
14 | 15 | use PhpAmqpLib\Channel\AMQPChannel; |
@@ -120,9 +121,9 @@ public function size($queue = null): int |
120 | 121 | * |
121 | 122 | * @throws AMQPProtocolChannelException |
122 | 123 | */ |
123 | | - public function push($job, $data = '', $queue = null) |
| 124 | + public function push($job, $data = '', $queue = null, array $options = []) |
124 | 125 | { |
125 | | - return $this->pushRaw($this->createPayload($job, $queue, $data), $queue, []); |
| 126 | + return $this->pushRaw($this->createPayload($job, $queue, $data), $queue, $options); |
126 | 127 | } |
127 | 128 |
|
128 | 129 | /** |
@@ -215,7 +216,7 @@ public function laterRaw($delay, $payload, $queue = null, $attempts = 0) |
215 | 216 | [$message, $correlationId] = $this->createMessage($payload, $attempts); |
216 | 217 |
|
217 | 218 | // Publish directly on the delayQueue, no need to publish trough an exchange. |
218 | | - $this->channel->basic_publish($message, null, $destination, true, false); |
| 219 | + $this->channel->basic_publish($message, null, $destination, false, false); |
219 | 220 |
|
220 | 221 | return $correlationId; |
221 | 222 | } |
@@ -586,7 +587,7 @@ public function reject(RabbitMQJob $job, bool $requeue = false): void |
586 | 587 | * @return array |
587 | 588 | * @throws JsonException |
588 | 589 | */ |
589 | | - protected function createMessage($payload, int $attempts = 0): array |
| 590 | + protected function createMessage($payload, int $attempts = 0, $options = []): array |
590 | 591 | { |
591 | 592 | $properties = [ |
592 | 593 | 'content_type' => 'application/json', |
@@ -892,7 +893,7 @@ protected function publishProperties($queue, array $options = []): array |
892 | 893 | $queue = $this->getQueue($queue); |
893 | 894 | $attempts = Arr::get($options, 'attempts') ?: 0; |
894 | 895 |
|
895 | | - $destination = $this->getRoutingKey($queue); |
| 896 | + $destination = $this->getRoutingKey($queue, $options); |
896 | 897 | $exchange = $this->getExchange(Arr::get($options, 'exchange')); |
897 | 898 | $exchangeType = $this->getExchangeType(Arr::get($options, 'exchange_type')); |
898 | 899 |
|
|
0 commit comments