Skip to content

Commit ae7de07

Browse files
MerhnferrMerhnferr
andauthored
[10.2.2] Possibility to declare quorum queue. (vyuldashev#359)
* Added: possibility to declare quorum queue using connection config or artisan command * Ignore priority argument for quorum queue. * Fixed style Co-authored-by: Merhnferr <i.gaevskiy@andersenlab.com>
1 parent fed393d commit ae7de07

File tree

4 files changed

+90
-3
lines changed

4 files changed

+90
-3
lines changed

src/Console/QueueDeclareCommand.php

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ class QueueDeclareCommand extends Command
1212
{name : The name of the queue to declare}
1313
{connection=rabbitmq : The name of the queue connection to use}
1414
{--durable=1}
15-
{--auto-delete=0}';
15+
{--auto-delete=0}
16+
{--quorum=0}';
1617

1718
protected $description = 'Declare queue';
1819

@@ -32,10 +33,15 @@ public function handle(RabbitMQConnector $connector): void
3233
return;
3334
}
3435

36+
$arguments = (bool) $this->option('quorum')
37+
? ['x-queue-type' => 'quorum']
38+
: [];
39+
3540
$queue->declareQueue(
3641
$this->argument('name'),
3742
(bool) $this->option('durable'),
38-
(bool) $this->option('auto-delete')
43+
(bool) $this->option('auto-delete'),
44+
$arguments
3945
);
4046

4147
$this->info('Queue declared successfully.');

src/Queue/RabbitMQQueue.php

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -611,7 +611,8 @@ protected function getQueueArguments(string $destination): array
611611
// Messages without a priority property are treated as if their priority were 0.
612612
// Messages with a priority which is higher than the queue's maximum, are treated as if they were
613613
// published with the maximum priority.
614-
if ($this->isPrioritizeDelayed()) {
614+
// Quorum queues does not support priority.
615+
if ($this->isPrioritizeDelayed() && ! $this->isQuorum()) {
615616
$arguments['x-max-priority'] = $this->getQueueMaxPriority();
616617
}
617618

@@ -620,6 +621,10 @@ protected function getQueueArguments(string $destination): array
620621
$arguments['x-dead-letter-routing-key'] = $this->getFailedRoutingKey($destination);
621622
}
622623

624+
if ($this->isQuorum()) {
625+
$arguments['x-queue-type'] = 'quorum';
626+
}
627+
623628
return $arguments;
624629
}
625630

@@ -710,6 +715,16 @@ protected function isRerouteFailed(): bool
710715
return (bool) (Arr::get($this->options, 'reroute_failed') ?: false);
711716
}
712717

718+
/**
719+
* Returns &true;, if declared queue must be quorum queue.
720+
*
721+
* @return bool
722+
*/
723+
protected function isQuorum(): bool
724+
{
725+
return (bool) (Arr::get($this->options, 'quorum') ?: false);
726+
}
727+
713728
/**
714729
* Get the exchange for failed messages.
715730
*

tests/Functional/RabbitMQQueueTest.php

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,22 @@ public function testFailedRoutingKey(): void
139139
$this->assertSame('test.failed', $this->callMethod($queue, 'getFailedRoutingKey', ['test']));
140140
}
141141

142+
public function testQuorum(): void
143+
{
144+
/** @var $queue RabbitMQQueue */
145+
$queue = $this->connection();
146+
$this->assertFalse($this->callMethod($queue, 'isQuorum'));
147+
148+
$queue = $this->connection('rabbitmq-with-options');
149+
$this->assertFalse($this->callMethod($queue, 'isQuorum'));
150+
151+
$queue = $this->connection('rabbitmq-with-quorum-options');
152+
$this->assertTrue($this->callMethod($queue, 'isQuorum'));
153+
154+
$queue = $this->connection('rabbitmq-with-options-empty');
155+
$this->assertFalse($this->callMethod($queue, 'isQuorum'));
156+
}
157+
142158
public function testDeclareDeleteExchange(): void
143159
{
144160
/** @var $queue RabbitMQQueue */
@@ -193,6 +209,17 @@ public function testQueueArguments(): void
193209
$this->assertEquals(array_keys($expected), array_keys($actual));
194210
$this->assertEquals(array_values($expected), array_values($actual));
195211

212+
$queue = $this->connection('rabbitmq-with-quorum-options');
213+
$actual = $this->callMethod($queue, 'getQueueArguments', [$name]);
214+
$expected = [
215+
'x-dead-letter-exchange' => 'failed-exchange',
216+
'x-dead-letter-routing-key' => sprintf('application-x.%s.failed', $name),
217+
'x-queue-type' => 'quorum',
218+
];
219+
220+
$this->assertEquals(array_keys($expected), array_keys($actual));
221+
$this->assertEquals(array_values($expected), array_values($actual));
222+
196223
$queue = $this->connection('rabbitmq-with-options-empty');
197224
$actual = $this->callMethod($queue, 'getQueueArguments', [$name]);
198225
$expected = [];

tests/Functional/TestCase.php

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,45 @@ protected function getEnvironmentSetUp($app): void
110110
'reroute_failed' => '',
111111
'failed_exchange' => '',
112112
'failed_routing_key' => '',
113+
'quorum' => '',
114+
],
115+
],
116+
117+
'worker' => 'default',
118+
119+
]);
120+
$app['config']->set('queue.connections.rabbitmq-with-quorum-options', [
121+
'driver' => 'rabbitmq',
122+
'queue' => 'order',
123+
'connection' => AMQPLazyConnection::class,
124+
125+
'hosts' => [
126+
[
127+
'host' => getenv('HOST'),
128+
'port' => getenv('PORT'),
129+
'vhost' => '/',
130+
'user' => 'guest',
131+
'password' => 'guest',
132+
],
133+
],
134+
135+
'options' => [
136+
'ssl_options' => [
137+
'cafile' => null,
138+
'local_cert' => null,
139+
'local_key' => null,
140+
'verify_peer' => true,
141+
'passphrase' => null,
142+
],
143+
144+
'queue' => [
145+
'exchange' => 'application-x',
146+
'exchange_type' => 'topic',
147+
'exchange_routing_key' => 'process.%s',
148+
'reroute_failed' => true,
149+
'failed_exchange' => 'failed-exchange',
150+
'failed_routing_key' => 'application-x.%s.failed',
151+
'quorum' => true,
113152
],
114153
],
115154

0 commit comments

Comments
 (0)