Skip to content

Commit 503158a

Browse files
committed
wip
1 parent 6883475 commit 503158a

File tree

4 files changed

+69
-36
lines changed

4 files changed

+69
-36
lines changed

README.md

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ Add connection to `config/queue.php`:
6161
'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true),
6262
'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null),
6363
],
64+
'queue' => [
65+
'job' => VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob::class,
66+
],
6467
],
6568

6669
/*
@@ -161,15 +164,17 @@ When you want to instruct RabbitMQ to reroute failed messages to a exchange or a
161164
// ...
162165
],
163166
```
167+
164168
### Use your own RabbitMQJob class
165169
Sometimes you have to work with messages published by another application.
166-
Those messages probably won't respect the laravel QueueApi payload schema.
167-
The problem with these messages is that, laravel workers won't be able to determine the actual job or class to execute.
170+
Those messages probably won't respect Laravel's job payload schema.
171+
The problem with these messages is that, Laravel workers won't be able to determine the actual job or class to execute.
168172

169-
You can extend the build-in `RabbitMQJob::class` and within the queue connection config, you can add your own class.
173+
You can extend the build-in `RabbitMQJob::class` and within the queue connection config, you can define your own class.
170174
When you specify an `job` key in the config, with your own class name, every message retrieved from the broker will get wrapped by your own class.
171175

172176
An example for the config:
177+
173178
```php
174179
'connections' => [
175180
// ...
@@ -189,7 +194,9 @@ An example for the config:
189194
// ...
190195
],
191196
```
197+
192198
An example of your own job class:
199+
193200
```php
194201
<?php
195202

@@ -217,7 +224,9 @@ class RabbitMQJob extends BaseJob
217224
}
218225

219226
```
220-
Or maybe you want to add extra properties to the payload.
227+
228+
Or maybe you want to add extra properties to the payload:
229+
221230
```php
222231
<?php
223232

@@ -241,6 +250,7 @@ class RabbitMQJob extends BaseJob
241250
}
242251
}
243252
```
253+
244254
## Laravel Usage
245255

246256
Once you completed the configuration you can use Laravel Queue API. If you used other queue drivers you do not need to change anything else. If you do not know how to use Queue API, please refer to the official Laravel documentation: http://laravel.com/docs/queues

config/rabbitmq.php

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@
2929
'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true),
3030
'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null),
3131
],
32+
'queue' => [
33+
'job' => VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob::class,
34+
],
3235
],
3336

3437
/*

src/Horizon/RabbitMQQueue.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ public function later($delay, $job, $data = '', $queue = null)
7474
public function pop($queue = null)
7575
{
7676
return tap(parent::pop($queue), function ($result) use ($queue): void {
77-
if ($result instanceof RabbitMQJob) {
77+
if (is_a($result, RabbitMQJob::class, true)) {
7878
$this->event($this->getQueue($queue), new JobReserved($result->getRawBody()));
7979
}
8080
});

src/Queue/RabbitMQQueue.php

Lines changed: 51 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,15 @@
1010
use Illuminate\Queue\Queue;
1111
use Illuminate\Support\Arr;
1212
use Illuminate\Support\Str;
13+
use JsonException;
1314
use PhpAmqpLib\Channel\AMQPChannel;
1415
use PhpAmqpLib\Connection\AbstractConnection;
1516
use PhpAmqpLib\Exception\AMQPProtocolChannelException;
1617
use PhpAmqpLib\Exchange\AMQPExchangeType;
1718
use PhpAmqpLib\Message\AMQPMessage;
1819
use PhpAmqpLib\Wire\AMQPTable;
20+
use Throwable;
21+
use Throwabler;
1922
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob;
2023

2124
class RabbitMQQueue extends Queue implements QueueContract
@@ -101,7 +104,7 @@ public function size($queue = null): int
101104
{
102105
$queue = $this->getQueue($queue);
103106

104-
if (! $this->isQueueExists($queue)) {
107+
if (!$this->isQueueExists($queue)) {
105108
return 0;
106109
}
107110

@@ -172,7 +175,7 @@ public function laterRaw($delay, $payload, $queue = null, $attempts = 0)
172175
return $this->pushRaw($payload, $queue, ['delay' => $delay, 'attempts' => $attempts]);
173176
}
174177

175-
$destination = $this->getQueue($queue).'.delay.'.$ttl;
178+
$destination = $this->getQueue($queue) . '.delay.' . $ttl;
176179

177180
$this->declareQueue($destination, true, false, $this->getDelayQueueArguments($this->getQueue($queue), $ttl));
178181

@@ -191,7 +194,7 @@ public function laterRaw($delay, $payload, $queue = null, $attempts = 0)
191194
*/
192195
public function bulk($jobs, $data = '', $queue = null): void
193196
{
194-
foreach ((array) $jobs as $job) {
197+
foreach ((array)$jobs as $job) {
195198
$this->bulkRaw($this->createPayload($job, $queue, $data), $queue, ['job' => $job]);
196199
}
197200

@@ -221,7 +224,7 @@ public function bulkRaw(string $payload, $queue = null, array $options = [])
221224
/**
222225
* {@inheritdoc}
223226
*
224-
* @throws Exception
227+
* @throws Throwable
225228
*/
226229
public function pop($queue = null)
227230
{
@@ -244,7 +247,6 @@ public function pop($queue = null)
244247
// If there is not exchange or queue AMQP will throw exception with code 404
245248
// We need to catch it and return null
246249
if ($exception->amqp_reply_code === 404) {
247-
248250
// Because of the channel exception the channel was closed and removed.
249251
// We have to open a new channel. Because else the worker(s) are stuck in a loop, without processing.
250252
$this->channel = $this->connection->channel();
@@ -275,16 +277,20 @@ public function getChannel(): AMQPChannel
275277
}
276278

277279
/**
278-
* Gets the Job class from config or returns the default job class
279-
* when the job class does not extend the default job class an exception is thrown.
280+
* Job class to use.
280281
*
281282
* @return string
282283
* @throws \Throwable
283284
*/
284-
public function getJobClass()
285+
public function getJobClass(): string
285286
{
286287
$job = Arr::get($this->options, 'job', RabbitMQJob::class);
287-
throw_if(! is_a($job, RabbitMQJob::class, true), Exception::class, sprintf('Class %s must extend: %s', $job, RabbitMQJob::class));
288+
289+
throw_if(
290+
!is_a($job, RabbitMQJob::class, true),
291+
Exception::class,
292+
sprintf('Class %s must extend: %s', $job, RabbitMQJob::class)
293+
);
288294

289295
return $job;
290296
}
@@ -295,7 +301,7 @@ public function getJobClass()
295301
* @param null $queue
296302
* @return string
297303
*/
298-
public function getQueue($queue = null)
304+
public function getQueue($queue = null): string
299305
{
300306
return $queue ?: $this->default;
301307
}
@@ -336,8 +342,13 @@ public function isExchangeExists(string $exchange): bool
336342
* @param array $arguments
337343
* @return void
338344
*/
339-
public function declareExchange(string $name, string $type = AMQPExchangeType::DIRECT, bool $durable = true, bool $autoDelete = false, array $arguments = []): void
340-
{
345+
public function declareExchange(
346+
string $name,
347+
string $type = AMQPExchangeType::DIRECT,
348+
bool $durable = true,
349+
bool $autoDelete = false,
350+
array $arguments = []
351+
): void {
341352
if ($this->isExchangeDeclared($name)) {
342353
return;
343354
}
@@ -364,7 +375,7 @@ public function declareExchange(string $name, string $type = AMQPExchangeType::D
364375
*/
365376
public function deleteExchange(string $name, bool $unused = false): void
366377
{
367-
if (! $this->isExchangeExists($name)) {
378+
if (!$this->isExchangeExists($name)) {
368379
return;
369380
}
370381

@@ -378,7 +389,7 @@ public function deleteExchange(string $name, bool $unused = false): void
378389
* Checks if the given queue already present/defined in RabbitMQ.
379390
* Returns false when when the queue is missing.
380391
*
381-
* @param string $name
392+
* @param string|null $name
382393
* @return bool
383394
* @throws AMQPProtocolChannelException
384395
*/
@@ -409,8 +420,12 @@ public function isQueueExists(string $name = null): bool
409420
* @param array $arguments
410421
* @return void
411422
*/
412-
public function declareQueue(string $name, bool $durable = true, bool $autoDelete = false, array $arguments = []): void
413-
{
423+
public function declareQueue(
424+
string $name,
425+
bool $durable = true,
426+
bool $autoDelete = false,
427+
array $arguments = []
428+
): void {
414429
if ($this->isQueueDeclared($name)) {
415430
return;
416431
}
@@ -437,7 +452,7 @@ public function declareQueue(string $name, bool $durable = true, bool $autoDelet
437452
*/
438453
public function deleteQueue(string $name, bool $if_unused = false, bool $if_empty = false): void
439454
{
440-
if (! $this->isQueueExists($name)) {
455+
if (!$this->isQueueExists($name)) {
441456
return;
442457
}
443458

@@ -468,7 +483,7 @@ public function bindQueue(string $queue, string $exchange, string $routingKey =
468483
/**
469484
* Purge the queue of messages.
470485
*
471-
* @param string $queue
486+
* @param string|null $queue
472487
* @return void
473488
*/
474489
public function purge(string $queue = null): void
@@ -509,6 +524,7 @@ public function reject(RabbitMQJob $job, bool $requeue = false): void
509524
* @param $payload
510525
* @param int $attempts
511526
* @return array
527+
* @throws JsonException
512528
*/
513529
protected function createMessage($payload, int $attempts = 0): array
514530
{
@@ -517,7 +533,7 @@ protected function createMessage($payload, int $attempts = 0): array
517533
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
518534
];
519535

520-
if ($correlationId = json_decode($payload, true)['id'] ?? null) {
536+
if ($correlationId = json_decode($payload, true, 512)['id'] ?? null) {
521537
$properties['correlation_id'] = $correlationId;
522538
}
523539

@@ -542,12 +558,12 @@ protected function createMessage($payload, int $attempts = 0): array
542558
/**
543559
* Create a payload array from the given job and data.
544560
*
545-
* @param object|string $job
561+
* @param string|object $job
546562
* @param string $queue
547-
* @param string $data
563+
* @param mixed $data
548564
* @return array
549565
*/
550-
protected function createPayloadArray($job, $queue, $data = '')
566+
protected function createPayloadArray($job, $queue, $data = ''): array
551567
{
552568
return array_merge(parent::createPayloadArray($job, $queue, $data), [
553569
'id' => $this->getRandomId(),
@@ -572,7 +588,7 @@ protected function getRandomId(): string
572588
*/
573589
public function close(): void
574590
{
575-
if ($this->currentJob && ! $this->currentJob->isDeletedOrReleased()) {
591+
if ($this->currentJob && !$this->currentJob->isDeletedOrReleased()) {
576592
$this->reject($this->currentJob, true);
577593
}
578594

@@ -632,7 +648,7 @@ protected function getDelayQueueArguments(string $destination, int $ttl): array
632648
*/
633649
protected function isPrioritizeDelayed(): bool
634650
{
635-
return boolval(Arr::get($this->options, 'prioritize_delayed') ?: false);
651+
return (bool)(Arr::get($this->options, 'prioritize_delayed') ?: false);
636652
}
637653

638654
/**
@@ -645,13 +661,13 @@ protected function isPrioritizeDelayed(): bool
645661
*/
646662
protected function getQueueMaxPriority(): int
647663
{
648-
return intval(Arr::get($this->options, 'queue_max_priority') ?: 2);
664+
return (int)(Arr::get($this->options, 'queue_max_priority') ?: 2);
649665
}
650666

651667
/**
652668
* Get the exchange name, or &null; as default value.
653669
*
654-
* @param string $exchange
670+
* @param string|null $exchange
655671
* @return string|null
656672
*/
657673
protected function getExchange(string $exchange = null): ?string
@@ -679,7 +695,8 @@ protected function getRoutingKey(string $destination): string
679695
*/
680696
protected function getExchangeType(?string $type = null): string
681697
{
682-
return @constant(AMQPExchangeType::class.'::'.Str::upper($type ?: Arr::get($this->options, 'exchange_type') ?: 'direct')) ?: AMQPExchangeType::DIRECT;
698+
return @constant(AMQPExchangeType::class . '::' . Str::upper($type ?: Arr::get($this->options,
699+
'exchange_type') ?: 'direct')) ?: AMQPExchangeType::DIRECT;
683700
}
684701

685702
/**
@@ -689,7 +706,7 @@ protected function getExchangeType(?string $type = null): string
689706
*/
690707
protected function isRerouteFailed(): bool
691708
{
692-
return boolval(Arr::get($this->options, 'reroute_failed') ?: false);
709+
return (bool)(Arr::get($this->options, 'reroute_failed') ?: false);
693710
}
694711

695712
/**
@@ -746,10 +763,13 @@ protected function isQueueDeclared(string $name): bool
746763
* @return void
747764
* @throws AMQPProtocolChannelException
748765
*/
749-
protected function declareDestination(string $destination, ?string $exchange = null, string $exchangeType = AMQPExchangeType::DIRECT): void
750-
{
766+
protected function declareDestination(
767+
string $destination,
768+
?string $exchange = null,
769+
string $exchangeType = AMQPExchangeType::DIRECT
770+
): void {
751771
// When a exchange is provided and no exchange is present in RabbitMQ, create an exchange.
752-
if ($exchange && ! $this->isExchangeExists($exchange)) {
772+
if ($exchange && !$this->isExchangeExists($exchange)) {
753773
$this->declareExchange($exchange, $exchangeType);
754774
}
755775

0 commit comments

Comments
 (0)