Skip to content

Commit 47ad591

Browse files
committed
fix 100% CPU usage and stop-when-empty flag of rabbitmq:consume command
1 parent 83255a2 commit 47ad591

File tree

2 files changed

+19
-4
lines changed

2 files changed

+19
-4
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,11 @@
22

33
All notable changes to this project will be documented in this file.
44

5+
## [10.1.3 (2020-01-12)](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v10.1.2...v10.1.3)
6+
7+
- Fix 100% CPU usage of `rabbitmq:consume` command by adding sleep to consumer when no messages are got from the queue.
8+
- Fix `stop-on-empty` flag for `rabbitmq:consume` command.
9+
510
## [10.1.2 (2019-12-24)](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v10.1.1...v10.1.2)
611

712
- Fix `rabbitmq:queue-bind` command. [#294](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/294)

src/Consumer.php

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ class Consumer extends Worker
3232
/** @var AMQPChannel */
3333
protected $channel;
3434

35+
/** @var boolean */
36+
protected $gotJob = false;
37+
3538
public function setContainer(Container $value): void
3639
{
3740
$this->container = $value;
@@ -79,6 +82,8 @@ public function daemon($connectionName, $queue, WorkerOptions $options): void
7982
false,
8083
false,
8184
function (AMQPMessage $message) use ($connection, $options, $connectionName, $queue): void {
85+
$this->gotJob = true;
86+
8287
$job = new RabbitMQJob(
8388
$this->container,
8489
$connection,
@@ -104,9 +109,7 @@ function (AMQPMessage $message) use ($connection, $options, $connectionName, $qu
104109
continue;
105110
}
106111

107-
// If the daemon should run (not in maintenance mode, etc.), then we can run
108-
// fire off this job for processing. Otherwise, we will need to sleep the
109-
// worker so no more jobs are processed until they should be processed.
112+
// If the daemon should run (not in maintenance mode, etc.), then we can wait for a job.
110113
try {
111114
$this->channel->wait(null, true, (int) $options->timeout);
112115
} catch (AMQPRuntimeException $exception) {
@@ -123,10 +126,17 @@ function (AMQPMessage $message) use ($connection, $options, $connectionName, $qu
123126
$this->stopWorkerIfLostConnection($exception);
124127
}
125128

129+
// If no job is got off the queue, we will need to sleep the worker.
130+
if (! $this->gotJob) {
131+
$this->sleep($options->sleep);
132+
}
133+
126134
// Finally, we will check to see if we have exceeded our memory limits or if
127135
// the queue should restart based on other indications. If so, we'll stop
128136
// this worker and let whatever is "monitoring" it restart the process.
129-
$this->stopIfNecessary($options, $lastRestart, null);
137+
$this->stopIfNecessary($options, $lastRestart, $this->gotJob ? true : null);
138+
139+
$this->gotJob = false;
130140
}
131141
}
132142

0 commit comments

Comments
 (0)