Skip to content

Commit ea57d84

Browse files
SWR-15367
1 parent 609a271 commit ea57d84

11 files changed

+333
-104
lines changed

README.md

Lines changed: 76 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -9,20 +9,63 @@ RabbitMQ Queue driver for Laravel
99

1010
Only the latest version will get new features. Bug fixes will be provided using the following scheme:
1111

12-
| Package Version | Laravel Version | Bug Fixes Until | |
13-
|-----------------|-----------------|------------------|---------------------------------------------------------------------------------------------|
14-
| 13 | 9 | August 8th, 2023 | [Documentation](https://github.com/vyuldashev/laravel-queue-rabbitmq/blob/master/README.md) |
12+
| Package Version | Laravel Version | Bug Fixes Until | |
13+
|-----------------|-----------------|-------------------|---------------------------------------------------------------------------------------------|
14+
| 1 | 05 | December 24th, 2024 | [Documentation](https://github.com/vyuldashev/laravel-queue-rabbitmq/blob/master/README.md) |
1515

1616
## Installation
1717

1818
You can install this package via composer using this command:
1919

2020
```
21-
composer require salesmessage/php-lib-rabbitmq
21+
composer require salesmessage/php-lib-rabbitmq:^1.05 --ignore-platform-reqs
2222
```
2323

2424
The package will automatically register itself.
2525

26+
### Groups configuration
27+
28+
Add groups configuration to file `rabbit-groups.yml`:
29+
30+
> This config file is required.
31+
32+
```php
33+
groups:
34+
test-group-1:
35+
vhosts:
36+
- organization_10
37+
- organization_11
38+
- organization_12
39+
vhosts_mask: organization
40+
queues:
41+
- test-queue-1
42+
- test-queue-11
43+
queues_mask: test
44+
batch_size: 100
45+
test-group-2:
46+
vhosts:
47+
- organization_20
48+
- organization_21
49+
- organization_22
50+
vhosts_mask: organization
51+
queues:
52+
- test-queue-2
53+
- test-queue-22
54+
queues_mask: test
55+
batch_size: 100
56+
test-group-3:
57+
vhosts:
58+
- organization_30
59+
- organization_31
60+
- organization_32
61+
vhosts_mask: organization
62+
queues:
63+
- test-queue-3
64+
- test-queue-33
65+
queues_mask: test
66+
batch_size: 100,
67+
```
68+
2669
### Configuration
2770

2871
Add connection to `config/queue.php`:
@@ -33,9 +76,9 @@ Add connection to `config/queue.php`:
3376
'connections' => [
3477
// ...
3578

36-
'rabbitmq' => [
79+
'rabbitmq_vhosts' => [
3780

38-
'driver' => 'rabbitmq',
81+
'driver' => 'rabbitmq_vhosts',
3982
'hosts' => [
4083
[
4184
'host' => env('RABBITMQ_HOST', '127.0.0.1'),
@@ -67,7 +110,7 @@ When you want to prioritize messages when they were delayed, then this is possib
67110
'connections' => [
68111
// ...
69112

70-
'rabbitmq' => [
113+
'rabbitmq_vhosts' => [
71114
// ...
72115

73116
'options' => [
@@ -96,7 +139,7 @@ When you want to publish messages against an exchange with routing-keys, then th
96139
'connections' => [
97140
// ...
98141

99-
'rabbitmq' => [
142+
'rabbitmq_vhosts' => [
100143
// ...
101144

102145
'options' => [
@@ -130,7 +173,7 @@ by adding extra options.
130173
'connections' => [
131174
// ...
132175

133-
'rabbitmq' => [
176+
'rabbitmq_vhosts' => [
134177
// ...
135178

136179
'options' => [
@@ -162,7 +205,7 @@ This Library supports Horizon, but in the config you have to inform Laravel to u
162205
'connections' => [
163206
// ...
164207

165-
'rabbitmq' => [
208+
'rabbitmq_vhosts' => [
166209
// ...
167210

168211
/* Set to "horizon" if you wish to use Laravel Horizon. */
@@ -189,14 +232,14 @@ An example for the config:
189232
'connections' => [
190233
// ...
191234

192-
'rabbitmq' => [
235+
'rabbitmq_vhosts' => [
193236
// ...
194237

195238
'options' => [
196239
'queue' => [
197240
// ...
198241

199-
'job' => \App\Queue\Jobs\RabbitMQJob::class,
242+
'job' => \Salesmessage\LibRabbitMQ\Queue\Jobs\RabbitMQJobBatchable::class,
200243
],
201244
],
202245
],
@@ -303,7 +346,7 @@ An example for the config:
303346
'connections' => [
304347
// ...
305348

306-
'rabbitmq' => [
349+
'rabbitmq_vhosts' => [
307350
// ...
308351

309352
'connection' = > \App\Queue\Connection\MyRabbitMQConnection::class,
@@ -325,11 +368,11 @@ and inform laravel to use your class by setting `RABBITMQ_WORKER` to `\App\Queue
325368
'connections' => [
326369
// ...
327370

328-
'rabbitmq' => [
371+
'rabbitmq_vhosts' => [
329372
// ...
330373

331374
/* Set to a class if you wish to use your own. */
332-
'worker' => \App\Queue\RabbitMQQueue::class,
375+
'worker' => \Salesmessage\LibRabbitMQ\Queue\RabbitMQQueueBatchable::class
333376
],
334377

335378
// ...
@@ -409,7 +452,7 @@ It is possible to change te default queue by adding an extra parameter in the co
409452
'connections' => [
410453
// ...
411454

412-
'rabbitmq' => [
455+
'rabbitmq_vhosts' => [
413456
// ...
414457

415458
'queue' => env('RABBITMQ_QUEUE', 'default'),
@@ -429,7 +472,7 @@ You can alter the heartbeat settings by changing the config.
429472
'connections' => [
430473
// ...
431474

432-
'rabbitmq' => [
475+
'rabbitmq_vhosts' => [
433476
// ...
434477

435478
'options' => [
@@ -451,7 +494,7 @@ If you need a secure connection to rabbitMQ server(s), you will need to add thes
451494
'connections' => [
452495
// ...
453496

454-
'rabbitmq' => [
497+
'rabbitmq_vhosts' => [
455498
// ...
456499

457500
'secure' = > true,
@@ -480,7 +523,7 @@ To instruct Laravel workers to dispatch events after all database commits are co
480523
'connections' => [
481524
// ...
482525

483-
'rabbitmq' => [
526+
'rabbitmq_vhosts' => [
484527
// ...
485528

486529
'after_commit' => true,
@@ -499,7 +542,7 @@ If for some reason you don't want the connection lazy you can turn it off by set
499542
'connections' => [
500543
// ...
501544

502-
'rabbitmq' => [
545+
'rabbitmq_vhosts' => [
503546
// ...
504547

505548
'lazy' = > false,
@@ -519,7 +562,7 @@ Available protocols : `tcp`, `ssl`, `tls`
519562
'connections' => [
520563
// ...
521564

522-
'rabbitmq' => [
565+
'rabbitmq_vhosts' => [
523566
// ...
524567

525568
'network_protocol' => 'tcp',
@@ -549,13 +592,24 @@ For Lumen usage the service provider should be registered manually as follow in
549592
$app->register(Salesmessage\LibRabbitMQ\LaravelLibRabbitMQServiceProvider::class);
550593
```
551594

595+
## Scan Vhosts
596+
597+
```bash
598+
php artisan lib-rabbitmq:scan-vhosts --sleep=10
599+
```
600+
552601
## Consuming Messages
553602

554603
There are two ways of consuming messages.
555604

556605
1. `queue:work` command which is Laravel's built-in command. This command utilizes `basic_get`. Use this if you want to consume multiple queues.
557606

558-
2. `rabbitmq:consume` command which is provided by this package. This command utilizes `basic_consume` and is more performant than `basic_get` by ~2x, but does not support multiple queues.
607+
2. `lib-rabbitmq:consume-vhosts` command which is provided by this package. This command utilizes `basic_consume` and is more performant than `basic_get` by ~2x, but does not support multiple queues.
608+
609+
Example:
610+
```bash
611+
php artisan lib-rabbitmq:consume-vhosts test-group-1 rabbitmq_vhosts --name=mq-vhosts-test-name --sleep=3 --memory=300 --max-jobs=5000 --max-time=600 --prefetch-count=100 --timeout=0
612+
```
559613

560614
## Testing
561615

composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
},
3535
"extra": {
3636
"branch-alias": {
37-
"dev-master": "1.04-dev"
37+
"dev-master": "1.05-dev"
3838
},
3939
"laravel": {
4040
"providers": [

src/Console/ConsumeVhostsCommand.php

Lines changed: 37 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,17 @@
77
use Illuminate\Queue\Worker;
88
use Illuminate\Support\Str;
99
use Salesmessage\LibRabbitMQ\Dto\ConsumeVhostsFiltersDto;
10+
use Salesmessage\LibRabbitMQ\Services\GroupsService;
1011
use Symfony\Component\Console\Terminal;
1112
use Salesmessage\LibRabbitMQ\VhostsConsumer;
13+
use Throwable;
1214

1315
class ConsumeVhostsCommand extends WorkCommand
1416
{
1517
protected $signature = 'lib-rabbitmq:consume-vhosts
16-
{connection? : The name of the queue connection to work}
18+
{group : The name of the group}
19+
{connection=rabbitmq_vhosts : The name of the queue connection to work}
1720
{--name=default : The name of the consumer}
18-
{--vhosts= : The list of the vhosts to work}
19-
{--vhosts-mask= : The vhosts mask}
20-
{--queues= : The list of the queues to work}
21-
{--queues-mask= : The queues mask}
22-
{--batch-size=100 : The number of jobs for batch}
2321
{--once : Only process the next job on the queue}
2422
{--stop-when-empty : Stop when the queue is empty}
2523
{--delay=0 : The number of seconds to delay failed jobs (Deprecated)}
@@ -41,11 +39,43 @@ class ConsumeVhostsCommand extends WorkCommand
4139

4240
protected $description = 'Consume messages';
4341

42+
/**
43+
* @param GroupsService $groupsService
44+
* @param Worker $worker
45+
* @param Cache $cache
46+
*/
47+
public function __construct(
48+
private GroupsService $groupsService,
49+
Worker $worker,
50+
Cache $cache
51+
)
52+
{
53+
parent::__construct($worker, $cache);
54+
}
55+
4456
public function handle(): void
4557
{
58+
$group = trim($this->argument('group'));
59+
60+
$groupConfigData = $this->groupsService->getGroupConfig($group);
61+
if (empty($groupConfigData)) {
62+
$this->error(sprintf('Config for consumer group "%s" is not specified', $group));
63+
return;
64+
}
65+
66+
$filtersDto = new ConsumeVhostsFiltersDto(
67+
$group,
68+
(array) ($groupConfigData['vhosts'] ?? []),
69+
trim($groupConfigData['vhosts_mask'] ?? ''),
70+
(array) ($groupConfigData['queues'] ?? []),
71+
trim($groupConfigData['queues_mask'] ?? '')
72+
);
73+
4674
/** @var VhostsConsumer $consumer */
4775
$consumer = $this->worker;
4876

77+
$consumer->setFiltersDto($filtersDto);
78+
4979
$consumer->setOutput($this->getOutput());
5080

5181
$consumer->setContainer($this->laravel);
@@ -54,15 +84,7 @@ public function handle(): void
5484
$consumer->setMaxPriority((int) $this->option('max-priority'));
5585
$consumer->setPrefetchSize((int) $this->option('prefetch-size'));
5686
$consumer->setPrefetchCount((int) $this->option('prefetch-count'));
57-
$consumer->setBatchSize((int) $this->option('batch-size'));
58-
59-
$filtersDto = new ConsumeVhostsFiltersDto(
60-
trim($this->option('vhosts', '')),
61-
trim($this->option('vhosts-mask', '')),
62-
trim($this->option('queues', '')),
63-
trim($this->option('queues-mask', ''))
64-
);
65-
$consumer->setFiltersDto($filtersDto);
87+
$consumer->setBatchSize((int) ($groupConfigData['batch_size'] ?? 100));
6688

6789
if ($this->downForMaintenance() && $this->option('once')) {
6890
$consumer->sleep($this->option('sleep'));

0 commit comments

Comments
 (0)