Skip to content

Commit 23d9f87

Browse files
Merge pull request #11 from salesmessage/SWR-19885
SWR-19885 #comment Implement Async Heartbeat For Vhosts Consumers
2 parents d7d088d + 971f65b commit 23d9f87

File tree

5 files changed

+139
-15
lines changed

5 files changed

+139
-15
lines changed

README.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,16 @@ RabbitMQ Queue driver for Laravel
99

1010
Only the latest version will get new features. Bug fixes will be provided using the following scheme:
1111

12-
| Package Version | Laravel Version | Bug Fixes Until | |
13-
|-----------------|-----------------|------------------|---------------------------------------------------------------------------------------------|
14-
| 1 | 20 | April 23th, 2025 | [Documentation](https://github.com/vyuldashev/laravel-queue-rabbitmq/blob/master/README.md) |
12+
| Package Version | Laravel Version | Bug Fixes Until | |
13+
|-----------------|-----------------|----------------------|---------------------------------------------------------------------------------------------|
14+
| 1 | 27 | September 16th, 2025 | [Documentation](https://github.com/vyuldashev/laravel-queue-rabbitmq/blob/master/README.md) |
1515

1616
## Installation
1717

1818
You can install this package via composer using this command:
1919

2020
```
21-
composer require salesmessage/php-lib-rabbitmq:^1.20 --ignore-platform-reqs
21+
composer require salesmessage/php-lib-rabbitmq:^1.27 --ignore-platform-reqs
2222
```
2323

2424
The package will automatically register itself.
@@ -632,7 +632,7 @@ There are two ways of consuming messages.
632632

633633
Example:
634634
```bash
635-
php artisan lib-rabbitmq:consume-vhosts test-group-1 rabbitmq_vhosts --name=mq-vhosts-test-name --sleep=3 --memory=300 --max-jobs=5000 --max-time=600 --timeout=0
635+
php artisan lib-rabbitmq:consume-vhosts test-group-1 rabbitmq_vhosts --name=mq-vhosts-test-name --sleep=3 --memory=300 --max-jobs=5000 --max-time=600 --timeout=0 --async-mode=1
636636
```
637637

638638
## Testing

composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
},
3535
"extra": {
3636
"branch-alias": {
37-
"dev-master": "1.26-dev"
37+
"dev-master": "1.27-dev"
3838
},
3939
"laravel": {
4040
"providers": [

src/Console/ConsumeVhostsCommand.php

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ class ConsumeVhostsCommand extends WorkCommand
3030
{--timeout=60 : The number of seconds a child process can run}
3131
{--tries=1 : Number of times to attempt a job before logging it failed}
3232
{--rest=0 : Number of seconds to rest between jobs}
33+
{--async-mode=0 : Async processing for some functionality (now only "heartbeat" is supported)}
3334
3435
{--max-priority=}
3536
{--consumer-tag}
@@ -84,6 +85,7 @@ public function handle(): void
8485
$consumer->setPrefetchSize((int) $this->option('prefetch-size'));
8586
$consumer->setPrefetchCount((int) ($groupConfigData['prefetch_count'] ?? 1000));
8687
$consumer->setBatchSize((int) ($groupConfigData['batch_size'] ?? 1000));
88+
$consumer->setAsyncMode((bool) $this->option('async-mode'));
8789

8890
if ($this->downForMaintenance() && $this->option('once')) {
8991
$consumer->sleep($this->option('sleep'));
@@ -95,8 +97,10 @@ public function handle(): void
9597
// which jobs are coming through a queue and be informed on its progress.
9698
$this->listenForEvents();
9799

98-
$connection = $this->argument('connection')
99-
?: $this->laravel['config']['queue.default'];
100+
$queueConfigData = $this->laravel['config']['queue'];
101+
$connectionName = $this->argument('connection') ?: ($queueConfigData['default'] ?? '');
102+
103+
$consumer->setConfig((array) ($queueConfigData['connections'][$connectionName] ?? []));
100104

101105
if (Terminal::hasSttyAvailable()) {
102106
$this->components->info(sprintf(
@@ -107,7 +111,7 @@ public function handle(): void
107111
}
108112

109113
$this->runWorker(
110-
$connection,
114+
$connectionName,
111115
''
112116
);
113117
}

src/VhostsConsumers/AbstractVhostsConsumer.php

Lines changed: 122 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
use Illuminate\Queue\QueueManager;
99
use Illuminate\Queue\WorkerOptions;
1010
use Illuminate\Support\Str;
11+
use PhpAmqpLib\Connection\AMQPStreamConnection;
1112
use PhpAmqpLib\Exception\AMQPChannelClosedException;
1213
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
1314
use PhpAmqpLib\Exception\AMQPProtocolChannelException;
@@ -19,6 +20,7 @@
1920
use Salesmessage\LibRabbitMQ\Dto\ConsumeVhostsFiltersDto;
2021
use Salesmessage\LibRabbitMQ\Dto\QueueApiDto;
2122
use Salesmessage\LibRabbitMQ\Dto\VhostApiDto;
23+
use Salesmessage\LibRabbitMQ\Exceptions\MutexTimeout;
2224
use Salesmessage\LibRabbitMQ\Interfaces\RabbitMQBatchable;
2325
use Salesmessage\LibRabbitMQ\Mutex;
2426
use Salesmessage\LibRabbitMQ\Queue\Jobs\RabbitMQJob;
@@ -29,6 +31,8 @@ abstract class AbstractVhostsConsumer extends Consumer
2931
{
3032
protected const MAIN_HANDLER_LOCK = 'vhost_handler';
3133

34+
protected const HEALTHCHECK_HANDLER_LOCK = 'healthcheck_vhost_handler';
35+
3236
protected ?OutputStyle $output = null;
3337

3438
protected ?ConsumeVhostsFiltersDto $filtersDto = null;
@@ -59,6 +63,12 @@ abstract class AbstractVhostsConsumer extends Consumer
5963

6064
protected bool $hadJobs = false;
6165

66+
protected ?int $stopStatusCode = null;
67+
68+
protected array $config = [];
69+
70+
protected bool $asyncMode = false;
71+
6272
/**
6373
* @param InternalStorageManager $internalStorageManager
6474
* @param LoggerInterface $logger
@@ -110,19 +120,71 @@ public function setBatchSize(int $batchSize): self
110120
return $this;
111121
}
112122

123+
/**
124+
* @param array $config
125+
* @return $this
126+
*/
127+
public function setConfig(array $config): self
128+
{
129+
$this->config = $config;
130+
return $this;
131+
}
132+
133+
/**
134+
* @param bool $asyncMode
135+
* @return $this
136+
*/
137+
public function setAsyncMode(bool $asyncMode): self
138+
{
139+
$this->asyncMode = $asyncMode;
140+
return $this;
141+
}
142+
113143
public function daemon($connectionName, $queue, WorkerOptions $options)
114144
{
115145
$this->goAheadOrWait();
116146

117-
$this->connectionMutex = new Mutex(false);
118-
119147
$this->configConnectionName = (string) $connectionName;
120148
$this->workerOptions = $options;
121149

122150
if ($this->supportsAsyncSignals()) {
123151
$this->listenForSignals();
124152
}
125153

154+
if ($this->asyncMode) {
155+
$this->logInfo('daemon.AsyncMode.On');
156+
157+
$coroutineContextHandler = function () use ($connectionName, $options) {
158+
$this->logInfo('daemon.AsyncMode.Coroutines.Running');
159+
160+
// we can't move it outside since Mutex should be created within coroutine context
161+
$this->connectionMutex = new Mutex(true);
162+
$this->startHeartbeatCheck();
163+
\go(function () use ($connectionName, $options) {
164+
$this->vhostDaemon($connectionName, $options);
165+
});
166+
};
167+
168+
if (extension_loaded('swoole')) {
169+
$this->logInfo('daemon.AsyncMode.Swoole');
170+
171+
\Co\run($coroutineContextHandler);
172+
} elseif (extension_loaded('openswoole')) {
173+
$this->logInfo('daemon.AsyncMode.OpenSwoole');
174+
175+
\OpenSwoole\Runtime::enableCoroutine(true, \OpenSwoole\Runtime::HOOK_ALL);
176+
\co::run($coroutineContextHandler);
177+
} else {
178+
throw new \Exception('Async mode is not supported. Check if Swoole extension is installed');
179+
}
180+
181+
return;
182+
}
183+
184+
$this->logInfo('daemon.AsyncMode.Off');
185+
186+
$this->connectionMutex = new Mutex(false);
187+
$this->startHeartbeatCheck();
126188
$this->vhostDaemon($connectionName, $options);
127189
}
128190

@@ -623,6 +685,64 @@ protected function initConnection(): RabbitMQQueue
623685
return $connection;
624686
}
625687

688+
/**
689+
* @return void
690+
*/
691+
protected function startHeartbeatCheck(): void
692+
{
693+
if (false === $this->asyncMode) {
694+
return;
695+
}
696+
697+
$heartbeatInterval = (int) ($this->config['options']['heartbeat'] ?? 0);
698+
if (!$heartbeatInterval) {
699+
return;
700+
}
701+
702+
$heartbeatHandler = function () {
703+
if ($this->shouldQuit || (null !== $this->stopStatusCode)) {
704+
return;
705+
}
706+
707+
try {
708+
/** @var AMQPStreamConnection $connection */
709+
$connection = $this->connection?->getConnection();
710+
if ((null === $connection)
711+
|| (false === $connection->isConnected())
712+
|| $connection->isWriting()
713+
|| $connection->isBlocked()
714+
) {
715+
return;
716+
}
717+
718+
$this->connectionMutex->lock(static::HEALTHCHECK_HANDLER_LOCK, 3);
719+
$connection->checkHeartBeat();
720+
} catch (MutexTimeout) {
721+
} catch (Throwable $exception) {
722+
$this->logError('startHeartbeatCheck.exception', [
723+
'eroor' => $exception->getMessage(),
724+
'trace' => $e->getTraceAsString(),
725+
]);
726+
727+
$this->shouldQuit = true;
728+
} finally {
729+
$this->connectionMutex->unlock(static::HEALTHCHECK_HANDLER_LOCK);
730+
}
731+
};
732+
733+
\go(function () use ($heartbeatHandler, $heartbeatInterval) {
734+
$this->logInfo('startHeartbeatCheck.started');
735+
736+
while (true) {
737+
sleep($heartbeatInterval);
738+
$heartbeatHandler();
739+
if ($this->shouldQuit || !is_null($this->stopStatusCode)) {
740+
return;
741+
}
742+
}
743+
});
744+
}
745+
626746
/**
627747
* @return string
628748
*/

src/VhostsConsumers/QueueConsumer.php

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,19 +83,19 @@ protected function vhostDaemon($connectionName, WorkerOptions $options)
8383
// Finally, we will check to see if we have exceeded our memory limits or if
8484
// the queue should restart based on other indications. If so, we'll stop
8585
// this worker and let whatever is "monitoring" it restart the process.
86-
$status = $this->getStopStatus(
86+
$this->stopStatusCode = $this->getStopStatus(
8787
$this->workerOptions,
8888
$lastRestart,
8989
$startTime,
9090
$jobsProcessed,
9191
$this->hasJob
9292
);
93-
if (! is_null($status)) {
93+
if (! is_null($this->stopStatusCode)) {
9494
$this->logInfo('consuming_stop', [
95-
'status' => $status,
95+
'status' => $this->stopStatusCode,
9696
]);
9797

98-
return $this->stop($status, $this->workerOptions);
98+
return $this->stop($this->stopStatusCode, $this->workerOptions);
9999
}
100100

101101
$this->hasJob = false;

0 commit comments

Comments
 (0)