Skip to content

Commit cf35ef2

Browse files
Merge pull request #5 from salesmessage/SWR-15367-fix-prefetch-count
SWR-15367 #comment Bandwidth Splitting
2 parents 3bf9f39 + 2357dc2 commit cf35ef2

File tree

4 files changed

+20
-18
lines changed

4 files changed

+20
-18
lines changed

README.md

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,14 @@ Only the latest version will get new features. Bug fixes will be provided using
1111

1212
| Package Version | Laravel Version | Bug Fixes Until | |
1313
|-----------------|-----------------|---------------------|---------------------------------------------------------------------------------------------|
14-
| 1 | 08 | December 26th, 2024 | [Documentation](https://github.com/vyuldashev/laravel-queue-rabbitmq/blob/master/README.md) |
14+
| 1 | 09 | December 26th, 2024 | [Documentation](https://github.com/vyuldashev/laravel-queue-rabbitmq/blob/master/README.md) |
1515

1616
## Installation
1717

1818
You can install this package via composer using this command:
1919

2020
```
21-
composer require salesmessage/php-lib-rabbitmq:^1.08 --ignore-platform-reqs
21+
composer require salesmessage/php-lib-rabbitmq:^1.09 --ignore-platform-reqs
2222
```
2323

2424
The package will automatically register itself.
@@ -42,6 +42,7 @@ groups:
4242
- test-queue-11
4343
queues_mask: test
4444
batch_size: 100
45+
prefetch_count: 100
4546
test-group-2:
4647
vhosts:
4748
- organization_20
@@ -53,6 +54,7 @@ groups:
5354
- test-queue-22
5455
queues_mask: test
5556
batch_size: 100
57+
prefetch_count: 100
5658
test-group-3:
5759
vhosts:
5860
- organization_30
@@ -63,7 +65,8 @@ groups:
6365
- test-queue-3
6466
- test-queue-33
6567
queues_mask: test
66-
batch_size: 100,
68+
batch_size: 100
69+
prefetch_count: 100
6770
```
6871

6972
### Configuration
@@ -628,7 +631,7 @@ There are two ways of consuming messages.
628631

629632
Example:
630633
```bash
631-
php artisan lib-rabbitmq:consume-vhosts test-group-1 rabbitmq_vhosts --name=mq-vhosts-test-name --sleep=3 --memory=300 --max-jobs=5000 --max-time=600 --prefetch-count=100 --timeout=0
634+
php artisan lib-rabbitmq:consume-vhosts test-group-1 rabbitmq_vhosts --name=mq-vhosts-test-name --sleep=3 --memory=300 --max-jobs=5000 --max-time=600 --timeout=0
632635
```
633636

634637
## Testing

composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
},
3535
"extra": {
3636
"branch-alias": {
37-
"dev-master": "1.08-dev"
37+
"dev-master": "1.09-dev"
3838
},
3939
"laravel": {
4040
"providers": [

src/Console/ConsumeVhostsCommand.php

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ class ConsumeVhostsCommand extends WorkCommand
3434
{--max-priority=}
3535
{--consumer-tag}
3636
{--prefetch-size=0}
37-
{--prefetch-count=1000}
3837
';
3938

4039
protected $description = 'Consume messages';
@@ -83,8 +82,8 @@ public function handle(): void
8382
$consumer->setConsumerTag($this->consumerTag());
8483
$consumer->setMaxPriority((int) $this->option('max-priority'));
8584
$consumer->setPrefetchSize((int) $this->option('prefetch-size'));
86-
$consumer->setPrefetchCount((int) $this->option('prefetch-count'));
87-
$consumer->setBatchSize((int) ($groupConfigData['batch_size'] ?? 100));
85+
$consumer->setPrefetchCount((int) ($groupConfigData['prefetch_count'] ?? 1000));
86+
$consumer->setBatchSize((int) ($groupConfigData['batch_size'] ?? 1000));
8887

8988
if ($this->downForMaintenance() && $this->option('once')) {
9089
$consumer->sleep($this->option('sleep'));

src/VhostsConsumer.php

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -120,16 +120,6 @@ public function daemon($connectionName, $queue, WorkerOptions $options)
120120

121121
$connection = $this->initConnection();
122122

123-
$this->connectionMutex = new Mutex(false);
124-
125-
$this->connectionMutex->lock(self::MAIN_HANDLER_LOCK);
126-
$this->channel->basic_qos(
127-
$this->prefetchSize,
128-
$this->prefetchCount,
129-
false
130-
);
131-
$this->connectionMutex->unlock(self::MAIN_HANDLER_LOCK);
132-
133123
$this->startConsuming();
134124

135125
while ($this->channel->is_consuming()) {
@@ -693,6 +683,16 @@ private function initConnection()
693683
$this->currentConnectionName = $connection->getConnectionName();
694684
$this->channel = $channel;
695685

686+
$this->connectionMutex = new Mutex(false);
687+
688+
$this->connectionMutex->lock(self::MAIN_HANDLER_LOCK);
689+
$this->channel->basic_qos(
690+
$this->prefetchSize,
691+
$this->prefetchCount,
692+
false
693+
);
694+
$this->connectionMutex->unlock(self::MAIN_HANDLER_LOCK);
695+
696696
return $connection;
697697
}
698698
}

0 commit comments

Comments
 (0)