Skip to content

Commit 820e511

Browse files
authored
SWR-20483 Server: RabbitMQ Improvements: Quorum Queues (#17)
* SWR-20482 Server: RabbitMQ Improvements: Deduplication * SWR-20482 Server: RabbitMQ Improvements: Deduplication * SWR-20482 Server: RabbitMQ Improvements: Deduplication * SWR-20482 Server: RabbitMQ Improvements: Deduplication * SWR-20482 Server: RabbitMQ Improvements: Deduplication * SWR-20482 Server: RabbitMQ Improvements: Deduplication * SWR-20482 Server: RabbitMQ Improvements: Deduplication * SWR-20482 Server: RabbitMQ Improvements: Deduplication * SWR-20482 Server: RabbitMQ Improvements: Deduplication * SWR-20482 Server: RabbitMQ Improvements: Deduplication * SWR-20482 Server: RabbitMQ Improvements: Deduplication * SWR-20482 Server: RabbitMQ Improvements: Deduplication * SWR-20482 Server: RabbitMQ Improvements: Deduplication * SWR-20482 Server: RabbitMQ Improvements: Deduplication * SWR-20483 Server: RabbitMQ Improvements: Quorum Queues * SWR-20483 Server: RabbitMQ Improvements: Quorum Queues * SWR-20483 Server: RabbitMQ Improvements: Quorum Queues
1 parent ab1a4eb commit 820e511

File tree

11 files changed

+184
-82
lines changed

11 files changed

+184
-82
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ Only the latest version will get new features. Bug fixes will be provided using
1818
You can install this package via composer using this command:
1919

2020
```
21-
composer require salesmessage/php-lib-rabbitmq:^1.31 --ignore-platform-reqs
21+
composer require salesmessage/php-lib-rabbitmq:^1.32 --ignore-platform-reqs
2222
```
2323

2424
The package will automatically register itself.

composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
},
3535
"extra": {
3636
"branch-alias": {
37-
"dev-master": "1.31-dev"
37+
"dev-master": "1.32-dev"
3838
},
3939
"laravel": {
4040
"providers": [

src/Console/QueueDeclareCommand.php

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,11 @@ class QueueDeclareCommand extends Command
1111
protected $signature = 'lib-rabbitmq:queue-declare
1212
{name : The name of the queue to declare}
1313
{connection=rabbitmq : The name of the queue connection to use}
14-
{--max-priority}
14+
{--max-priority : Set x-max-priority (ignored for quorum)}
1515
{--durable=1}
1616
{--auto-delete=0}
17-
{--quorum=0}';
17+
{--quorum=0 : Declare quorum queue (x-queue-type=quorum)}
18+
{--quorum-initial-group-size= : x-quorum-initial-group-size when quorum is enabled}';
1819

1920
protected $description = 'Declare queue';
2021

@@ -36,12 +37,24 @@ public function handle(RabbitMQConnector $connector): void
3637
$arguments = [];
3738

3839
$maxPriority = (int) $this->option('max-priority');
39-
if ($maxPriority) {
40-
$arguments['x-max-priority'] = $maxPriority;
41-
}
40+
$isQuorum = (bool) $this->option('quorum');
4241

43-
if ($this->option('quorum')) {
42+
if ($isQuorum) {
4443
$arguments['x-queue-type'] = 'quorum';
44+
45+
$initialGroupSize = (int) $this->option('quorum-initial-group-size');
46+
if ($initialGroupSize > 0) {
47+
$arguments['x-quorum-initial-group-size'] = $initialGroupSize;
48+
}
49+
50+
if ($maxPriority) {
51+
// quorum queues do not support priority; ignore and warn
52+
$this->warn('Ignoring --max-priority for quorum queue.');
53+
}
54+
} else {
55+
if ($maxPriority) {
56+
$arguments['x-max-priority'] = $maxPriority;
57+
}
4558
}
4659

4760
$queue->declareQueue(

src/Contracts/RabbitMQConsumable.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,14 @@
77
*/
88
interface RabbitMQConsumable
99
{
10+
public const MQ_TYPE_CLASSIC = 'classic';
11+
public const MQ_TYPE_QUORUM = 'quorum';
12+
1013
/**
1114
* Check duplications on the application side.
1215
* It's mostly represented as an idempotency checker.
1316
*/
1417
public function isDuplicated(): bool;
18+
19+
public function getQueueType(): string;
1520
}

src/Horizon/RabbitMQQueue.php

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
use PhpAmqpLib\Exception\AMQPProtocolChannelException;
1313
use Salesmessage\LibRabbitMQ\Queue\Jobs\RabbitMQJob;
1414
use Salesmessage\LibRabbitMQ\Queue\RabbitMQQueue as BaseRabbitMQQueue;
15+
use Salesmessage\LibRabbitMQ\Contracts\RabbitMQConsumable;
1516

1617
class RabbitMQQueue extends BaseRabbitMQQueue
1718
{
@@ -50,6 +51,10 @@ public function pushRaw($payload, $queue = null, array $options = []): int|strin
5051
{
5152
$payload = (new JobPayload($payload))->prepare($this->lastPushed ?? null)->value;
5253

54+
if (!isset($options['queue_type']) && isset($this->lastPushed) && is_object($this->lastPushed) && $this->lastPushed instanceof RabbitMQConsumable) {
55+
$options['queue_type'] = $this->lastPushed->getQueueType();
56+
}
57+
5358
return tap(parent::pushRaw($payload, $queue, $options), function () use ($queue, $payload): void {
5459
$this->event($this->getQueue($queue), new JobPushed($payload));
5560
});
@@ -64,7 +69,9 @@ public function later($delay, $job, $data = '', $queue = null): mixed
6469
{
6570
$payload = (new JobPayload($this->createPayload($job, $data)))->prepare($job)->value;
6671

67-
return tap(parent::laterRaw($delay, $payload, $queue), function () use ($payload, $queue): void {
72+
$queueType = ($job instanceof RabbitMQConsumable) ? $job->getQueueType() : null;
73+
74+
return tap(parent::laterRaw($delay, $payload, $queue, queueType: $queueType), function () use ($payload, $queue): void {
6875
$this->event($this->getQueue($queue), new JobPushed($payload));
6976
});
7077
}

src/Queue/Jobs/RabbitMQJob.php

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,14 @@
44

55
use Illuminate\Container\Container;
66
use Illuminate\Contracts\Container\BindingResolutionException;
7+
use Illuminate\Contracts\Encryption\Encrypter;
78
use Illuminate\Contracts\Queue\Job as JobContract;
89
use Illuminate\Queue\Jobs\Job;
910
use Illuminate\Support\Arr;
1011
use PhpAmqpLib\Exception\AMQPProtocolChannelException;
1112
use PhpAmqpLib\Message\AMQPMessage;
1213
use PhpAmqpLib\Wire\AMQPTable;
14+
use Salesmessage\LibRabbitMQ\Contracts\RabbitMQConsumable;
1315
use Salesmessage\LibRabbitMQ\Horizon\RabbitMQQueue as HorizonRabbitMQQueue;
1416
use Salesmessage\LibRabbitMQ\Queue\RabbitMQQueue;
1517

@@ -126,15 +128,53 @@ public function release($delay = 0): void
126128
{
127129
parent::release();
128130

131+
$consumableJob = $this->getPayloadData();
132+
if (!($consumableJob instanceof RabbitMQConsumable)) {
133+
throw new \RuntimeException('Job must be an instance of RabbitMQJobBatchable');
134+
}
135+
129136
// Always create a new message when this Job is released
130-
$this->rabbitmq->laterRaw($delay, $this->message->getBody(), $this->queue, $this->attempts());
137+
$this->rabbitmq->laterRaw($delay, $this->message->getBody(), $this->queue, $this->attempts(), $consumableJob->getQueueType());
131138

132139
// Releasing a Job means the message was failed to process.
133140
// Because this Job message is always recreated and pushed as new message, this Job message is correctly handled.
134141
// We must tell rabbitMQ this job message can be removed by acknowledging the message.
135142
$this->rabbitmq->ack($this);
136143
}
137144

145+
/**
146+
* @return object
147+
* @throws \RuntimeException
148+
*/
149+
public function getPayloadData(): object
150+
{
151+
$payload = $this->payload();
152+
153+
$data = $payload['data'];
154+
155+
if (str_starts_with($data['command'], 'O:')) {
156+
return unserialize($data['command']);
157+
}
158+
159+
if ($this->container->bound(Encrypter::class)) {
160+
return unserialize($this->container[Encrypter::class]->decrypt($data['command']));
161+
}
162+
163+
throw new \RuntimeException('Unable to extract job data.');
164+
}
165+
166+
/**
167+
* Returns target class name
168+
*
169+
* @return mixed
170+
*/
171+
public function getPayloadClass(): string
172+
{
173+
$payload = $this->payload();
174+
175+
return $payload['data']['commandName'];
176+
}
177+
138178
/**
139179
* Get the underlying RabbitMQ connection.
140180
*/

src/Queue/Jobs/RabbitMQJobBatchable.php

Lines changed: 0 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -2,59 +2,11 @@
22

33
namespace Salesmessage\LibRabbitMQ\Queue\Jobs;
44

5-
use Illuminate\Contracts\Encryption\Encrypter;
6-
use Illuminate\Queue\Jobs\JobName;
75
use Salesmessage\LibRabbitMQ\Queue\Jobs\RabbitMQJob as BaseJob;
86

97
/**
108
* SQS Job wrapper for RabbitMQ
119
*/
1210
class RabbitMQJobBatchable extends BaseJob
1311
{
14-
/**
15-
* Fire the job.
16-
*
17-
* @return void
18-
*/
19-
public function fire()
20-
{
21-
$payload = $this->payload();
22-
23-
[$class, $method] = JobName::parse($payload['job']);
24-
25-
($this->instance = $this->resolve($class))->{$method}($this, $payload['data']);
26-
}
27-
28-
/**
29-
* Returns target class name
30-
*
31-
* @return mixed
32-
*/
33-
public function getPayloadClass(): string
34-
{
35-
$payload = $this->payload();
36-
37-
return $payload['data']['commandName'];
38-
}
39-
40-
/**
41-
* @return object
42-
* @throws \RuntimeException
43-
*/
44-
public function getPayloadData(): object
45-
{
46-
$payload = $this->payload();
47-
48-
$data = $payload['data'];
49-
50-
if (str_starts_with($data['command'], 'O:')) {
51-
return unserialize($data['command']);
52-
}
53-
54-
if ($this->container->bound(Encrypter::class)) {
55-
return unserialize($this->container[Encrypter::class]->decrypt($data['command']));
56-
}
57-
58-
throw new \RuntimeException('Unable to extract job data.');
59-
}
6012
}

src/Queue/QueueConfig.php

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ class QueueConfig
3030

3131
protected bool $quorum = false;
3232

33+
protected ?int $quorumInitialGroupSize = null;
34+
35+
protected string $quorumQueuePostfix = '';
36+
3337
protected array $options = [];
3438

3539
/**
@@ -247,6 +251,41 @@ public function setQuorum($quorum): QueueConfig
247251
return $this;
248252
}
249253

254+
/**
255+
* When set, used to declare quorum queues with a specific initial group size.
256+
*/
257+
public function getQuorumInitialGroupSize(): ?int
258+
{
259+
return $this->quorumInitialGroupSize;
260+
}
261+
262+
public function setQuorumInitialGroupSize(?int $size): self
263+
{
264+
if ($size === null) {
265+
$this->quorumInitialGroupSize = null;
266+
return $this;
267+
}
268+
269+
if ($size <= 0) {
270+
throw new \InvalidArgumentException('Invalid quorum group size');
271+
}
272+
273+
$this->quorumInitialGroupSize = $size;
274+
275+
return $this;
276+
}
277+
278+
public function getQuorumQueuePostfix(): string
279+
{
280+
return $this->quorumQueuePostfix;
281+
}
282+
283+
public function setQuorumQueuePostfix(string $postfix): self
284+
{
285+
$this->quorumQueuePostfix = $postfix;
286+
return $this;
287+
}
288+
250289
/**
251290
* Holds all unknown queue options provided in the connection config
252291
*/

src/Queue/QueueConfigFactory.php

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,15 @@ protected static function getOptionsFromConfig(QueueConfig $queueConfig, array $
6868
$queueConfig->setQuorum($quorum);
6969
}
7070

71+
// Feature: Quorum initial group size
72+
if (Arr::has($queueOptions, 'quorum_initial_group_size')) {
73+
$queueConfig->setQuorumInitialGroupSize((int) Arr::pull($queueOptions, 'quorum_initial_group_size'));
74+
}
75+
76+
if ($quorumPostfix = (string) Arr::pull($queueOptions, 'quorum_queue_postfix')) {
77+
$queueConfig->setQuorumQueuePostfix($quorumPostfix);
78+
}
79+
7180
// All extra options not defined
7281
$queueConfig->setOptions($queueOptions);
7382
}

0 commit comments

Comments
 (0)