Skip to content

Commit 8bbd5f9

Browse files
SWR-15367
1 parent 54162ee commit 8bbd5f9

File tree

3 files changed

+157
-22
lines changed

3 files changed

+157
-22
lines changed

src/Console/ConsumeVhostsCommand.php

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
use Illuminate\Queue\Console\WorkCommand;
77
use Illuminate\Queue\Worker;
88
use Illuminate\Support\Str;
9+
use Salesmessage\LibRabbitMQ\Dto\ConsumeVhostsFiltersDto;
910
use Symfony\Component\Console\Terminal;
1011
use Salesmessage\LibRabbitMQ\VhostsConsumer;
1112

@@ -14,7 +15,8 @@ class ConsumeVhostsCommand extends WorkCommand
1415
protected $signature = 'lib-rabbitmq:consume-vhosts
1516
{connection? : The name of the queue connection to work}
1617
{--name=default : The name of the consumer}
17-
{--queue= : The name of the queue to work. Please notice that there is no support for multiple queues}
18+
{--vhosts= : The list of the vhosts to work}
19+
{--queues= : The list of the queues to work}
1820
{--once : Only process the next job on the queue}
1921
{--stop-when-empty : Stop when the queue is empty}
2022
{--delay=0 : The number of seconds to delay failed jobs (Deprecated)}
@@ -50,6 +52,12 @@ public function handle(): void
5052
$consumer->setPrefetchSize((int) $this->option('prefetch-size'));
5153
$consumer->setPrefetchCount((int) $this->option('prefetch-count'));
5254

55+
$filtersDto = new ConsumeVhostsFiltersDto(
56+
trim($this->option('vhosts', '')),
57+
trim($this->option('queues', ''))
58+
);
59+
$consumer->setFiltersDto($filtersDto);
60+
5361
if ($this->downForMaintenance() && $this->option('once')) {
5462
$consumer->sleep($this->option('sleep'));
5563
return;
@@ -63,20 +71,17 @@ public function handle(): void
6371
$connection = $this->argument('connection')
6472
?: $this->laravel['config']['queue.default'];
6573

66-
// We need to get the right queue for the connection which is set in the queue
67-
// configuration file for the application. We will pull it based on the set
68-
// connection being run for the queue operation currently being executed.
69-
$queue = $this->getQueue($connection);
70-
7174
if (Terminal::hasSttyAvailable()) {
72-
$this->components->info(
73-
sprintf('Processing jobs from the [%s] %s.', $queue, str('queue')->plural(explode(',', $queue)))
74-
);
75+
$this->components->info(sprintf(
76+
'Processing vhosts: [%s]. Queues: [%s].',
77+
($filtersDto->hasVhosts() ? implode(', ', $filtersDto->getVhosts()) : 'all'),
78+
($filtersDto->hasQueues() ? implode(', ', $filtersDto->getQueues()) : 'all')
79+
));
7580
}
7681

7782
$this->runWorker(
7883
$connection,
79-
$queue
84+
''
8085
);
8186
}
8287

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
<?php
2+
3+
namespace Salesmessage\LibRabbitMQ\Dto;
4+
5+
class ConsumeVhostsFiltersDto
6+
{
7+
/**
8+
* @var array
9+
*/
10+
private array $vhosts = [];
11+
12+
/**
13+
* @var array
14+
*/
15+
private array $queues = [];
16+
17+
/**
18+
* @param string $vhosts
19+
* @param string $queues
20+
*/
21+
public function __construct(string $vhosts, string $queues)
22+
{
23+
$this->vhosts = $this->stringToArray($vhosts);
24+
$this->queues = $this->stringToArray($queues);
25+
}
26+
27+
/**
28+
* @param string $string
29+
* @return array
30+
*/
31+
private function stringToArray(string $string): array
32+
{
33+
if ('' === $string) {
34+
return [];
35+
}
36+
37+
$array = explode(',', $string);
38+
$array = array_map(fn($value) => trim($value), $array);
39+
40+
return array_filter($array);
41+
}
42+
43+
/**
44+
* @return array
45+
*/
46+
public function getVhosts(): array
47+
{
48+
return $this->vhosts;
49+
}
50+
51+
/**
52+
* @return bool
53+
*/
54+
public function hasVhosts(): bool
55+
{
56+
return !empty($this->vhosts);
57+
}
58+
59+
/**
60+
* @return array
61+
*/
62+
public function getQueues(): array
63+
{
64+
return $this->queues;
65+
}
66+
67+
/**
68+
* @return bool
69+
*/
70+
public function hasQueues(): bool
71+
{
72+
return !empty($this->queues);
73+
}
74+
}
75+

src/VhostsConsumer.php

Lines changed: 67 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
use Illuminate\Queue\WorkerOptions;
1212
use PhpAmqpLib\Exception\AMQPRuntimeException;
1313
use PhpAmqpLib\Message\AMQPMessage;
14+
use Salesmessage\LibRabbitMQ\Dto\ConsumeVhostsFiltersDto;
1415
use Salesmessage\LibRabbitMQ\Dto\QueueApiDto;
1516
use Salesmessage\LibRabbitMQ\Dto\VhostApiDto;
1617
use Salesmessage\LibRabbitMQ\Interfaces\RabbitMQBatchable;
@@ -30,6 +31,8 @@ class VhostsConsumer extends Consumer
3031

3132
private ?OutputStyle $output = null;
3233

34+
private ?ConsumeVhostsFiltersDto $filtersDto = null;
35+
3336
private string $configConnectionName = '';
3437

3538
private string $currentConnectionName = '';
@@ -70,19 +73,30 @@ public function __construct(
7073

7174
/**
7275
* @param OutputStyle $output
73-
* @return void
76+
* @return $this
7477
*/
75-
public function setOutput(OutputStyle $output)
78+
public function setOutput(OutputStyle $output): self
7679
{
7780
$this->output = $output;
81+
return $this;
82+
}
83+
84+
/**
85+
* @param ConsumeVhostsFiltersDto $filtersDto
86+
* @return $this
87+
*/
88+
public function setFiltersDto(ConsumeVhostsFiltersDto $filtersDto): self
89+
{
90+
$this->filtersDto = $filtersDto;
91+
return $this;
7892
}
7993

8094
public function daemon($connectionName, $queue, WorkerOptions $options)
8195
{
8296
$this->loadVhosts();
8397
if (false === $this->switchToNextVhost()) {
8498
// @todo load vhosts again
85-
$this->output->warning('No active vhosts....');
99+
$this->output->warning('No active vhosts... Exit');
86100

87101
return;
88102
}
@@ -99,7 +113,7 @@ public function daemon($connectionName, $queue, WorkerOptions $options)
99113
[$startTime, $jobsProcessed] = [hrtime(true) / 1e9, 0];
100114

101115
/** @var RabbitMQQueue $connection */
102-
$connection = $this->manager->rabbitConnectionByVhost('/', $this->configConnectionName);
116+
$connection = $this->manager->rabbitConnectionByVhost($this->currentVhostName, $this->configConnectionName);
103117
$this->currentConnectionName = $connection->getConnectionName();
104118

105119
$this->channel = $connection->getChannel();
@@ -153,7 +167,7 @@ public function daemon($connectionName, $queue, WorkerOptions $options)
153167
$this->processBatch($connection);
154168

155169
$this->stopConsuming();
156-
$this->goAhead();
170+
$this->goAheadOrWait();
157171
$this->startConsuming();
158172

159173
$this->sleep($this->workerOptions->sleep);
@@ -249,7 +263,7 @@ private function startConsuming()
249263
$this->processBatch($connection);
250264

251265
$this->stopConsuming();
252-
$this->goAhead();
266+
$this->goAheadOrWait();
253267
$this->startConsuming();
254268
}
255269

@@ -431,7 +445,15 @@ private function stopConsuming()
431445
*/
432446
private function loadVhosts(): void
433447
{
434-
$this->vhosts = $this->internalStorageManager->getVhosts();
448+
$vhosts = $this->internalStorageManager->getVhosts();
449+
450+
// filter vhosts
451+
$filterVhosts = $this->filtersDto->getVhosts();
452+
if (!empty($filterVhosts)) {
453+
$vhosts = array_filter($vhosts, fn($value) => in_array($value, $filterVhosts, true));
454+
}
455+
$this->vhosts = $vhosts;
456+
$this->vhostQueues = [];
435457

436458
$this->currentVhostName = null;
437459
$this->currentQueueName = null;
@@ -444,6 +466,8 @@ private function switchToNextVhost(): bool
444466
{
445467
$nextVhost = $this->getNextVhost();
446468
if (null === $nextVhost) {
469+
$this->currentVhostName = null;
470+
$this->currentQueueName = null;
447471
return false;
448472
}
449473

@@ -452,6 +476,7 @@ private function switchToNextVhost(): bool
452476

453477
$nextQueue = $this->getNextQueue();
454478
if (null === $nextQueue) {
479+
$this->currentQueueName = null;
455480
return $this->switchToNextVhost();
456481
}
457482

@@ -465,7 +490,7 @@ private function switchToNextVhost(): bool
465490
private function getNextVhost(): ?string
466491
{
467492
if (null === $this->currentVhostName) {
468-
return (string) reset($this->vhosts);
493+
return !empty($this->vhosts) ? (string) reset($this->vhosts) : null;
469494
}
470495

471496
$currentIndex = array_search($this->currentVhostName, $this->vhosts, true);
@@ -481,10 +506,18 @@ private function getNextVhost(): ?string
481506
*/
482507
private function loadVhostQueues(): void
483508
{
484-
$this->vhostQueues = (null !== $this->currentVhostName)
509+
$vhostQueues = (null !== $this->currentVhostName)
485510
? $this->internalStorageManager->getVhostQueues($this->currentVhostName)
486511
: [];
487512

513+
// filter queues
514+
$filterQueues = $this->filtersDto->getQueues();
515+
if (!empty($vhostQueues) && !empty($filterQueues)) {
516+
$vhostQueues = array_filter($vhostQueues, fn($value) => in_array($value, $filterQueues, true));
517+
}
518+
519+
$this->vhostQueues = $vhostQueues;
520+
488521
$this->currentQueueName = null;
489522
}
490523

@@ -495,6 +528,7 @@ private function switchToNextQueue(): bool
495528
{
496529
$nextQueue = $this->getNextQueue();
497530
if (null === $nextQueue) {
531+
$this->currentQueueName = null;
498532
return false;
499533
}
500534

@@ -508,7 +542,7 @@ private function switchToNextQueue(): bool
508542
private function getNextQueue(): ?string
509543
{
510544
if (null === $this->currentQueueName) {
511-
return (string) reset($this->vhostQueues);
545+
return !empty($this->vhostQueues) ? (string) reset($this->vhostQueues) : null;
512546
}
513547

514548
$currentIndex = array_search($this->currentQueueName, $this->vhostQueues, true);
@@ -519,6 +553,28 @@ private function getNextQueue(): ?string
519553
return null;
520554
}
521555

556+
/**
557+
* @param int $waitSeconds
558+
* @return bool
559+
*/
560+
private function goAheadOrWait(int $waitSeconds = 3): bool
561+
{
562+
if (false === $this->goAhead()) {
563+
$this->loadVhosts();
564+
if (empty($this->vhosts)) {
565+
$this->output->warning(sprintf('No active vhosts. Wait %d seconds...', $waitSeconds));
566+
$this->sleep($waitSeconds);
567+
568+
return $this->goAheadOrWait($waitSeconds);
569+
}
570+
571+
$this->output->info('Starting from the first vhost...');
572+
return $this->goAheadOrWait($waitSeconds);
573+
}
574+
575+
return true;
576+
}
577+
522578
/**
523579
* @return bool
524580
*/
@@ -532,8 +588,7 @@ private function goAhead(): bool
532588
return true;
533589
}
534590

535-
$this->loadVhosts();
536-
return $this->switchToNextVhost();
591+
return false;
537592
}
538593

539594
/**

0 commit comments

Comments
 (0)