Skip to content

Commit ecd02f2

Browse files
committed
fix errors in Consumer
1 parent 768d496 commit ecd02f2

File tree

3 files changed

+27
-23
lines changed

3 files changed

+27
-23
lines changed

src/Consumer.php

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,11 @@
44

55
use Exception;
66
use Illuminate\Container\Container;
7-
use Illuminate\Contracts\Queue\Job;
87
use Illuminate\Queue\Worker;
98
use Illuminate\Queue\WorkerOptions;
109
use PhpAmqpLib\Channel\AMQPChannel;
1110
use PhpAmqpLib\Exception\AMQPRuntimeException;
1211
use PhpAmqpLib\Message\AMQPMessage;
13-
use Symfony\Component\Debug\Exception\FatalThrowableError;
1412
use Throwable;
1513
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue;
1614

@@ -31,8 +29,8 @@ class Consumer extends Worker
3129
/** @var AMQPChannel */
3230
protected $channel;
3331

34-
/** @var bool */
35-
protected $gotJob = false;
32+
/** @var object|null */
33+
protected $currentJob;
3634

3735
public function setContainer(Container $value): void
3836
{
@@ -57,10 +55,11 @@ public function setPrefetchCount(int $value): void
5755
/**
5856
* Listen to the given queue in a loop.
5957
*
60-
* @param string $connectionName
61-
* @param string $queue
62-
* @param \Illuminate\Queue\WorkerOptions $options
58+
* @param string $connectionName
59+
* @param string $queue
60+
* @param WorkerOptions $options
6361
* @return int
62+
* @throws Throwable
6463
*/
6564
public function daemon($connectionName, $queue, WorkerOptions $options)
6665
{
@@ -70,6 +69,8 @@ public function daemon($connectionName, $queue, WorkerOptions $options)
7069

7170
$lastRestart = $this->getTimestampOfLastQueueRestart();
7271

72+
[$startTime, $jobsProcessed] = [hrtime(true) / 1e9, 0];
73+
7374
/** @var RabbitMQQueue $connection */
7475
$connection = $this->manager->connection($connectionName);
7576

@@ -90,9 +91,7 @@ public function daemon($connectionName, $queue, WorkerOptions $options)
9091
false,
9192
false,
9293
false,
93-
function (AMQPMessage $message) use ($connection, $options, $connectionName, $queue, $jobClass): void {
94-
$this->gotJob = true;
95-
94+
function (AMQPMessage $message) use ($connection, $options, $connectionName, $queue, $jobClass, &$jobsProcessed): void {
9695
$job = new $jobClass(
9796
$this->container,
9897
$connection,
@@ -101,10 +100,14 @@ function (AMQPMessage $message) use ($connection, $options, $connectionName, $qu
101100
$queue
102101
);
103102

103+
$this->currentJob = $job;
104+
104105
if ($this->supportsAsyncSignals()) {
105106
$this->registerTimeoutHandler($job, $options);
106107
}
107108

109+
$jobsProcessed++;
110+
108111
$this->runJob($job, $connectionName, $options);
109112

110113
if ($this->supportsAsyncSignals()) {
@@ -129,33 +132,33 @@ function (AMQPMessage $message) use ($connection, $options, $connectionName, $qu
129132
$this->exceptions->report($exception);
130133

131134
$this->kill(1);
132-
} catch (Exception $exception) {
135+
} catch (Exception | Throwable $exception) {
133136
$this->exceptions->report($exception);
134137

135-
$this->stopWorkerIfLostConnection($exception);
136-
} catch (Throwable $exception) {
137-
$this->exceptions->report($exception = new FatalThrowableError($exception));
138-
139138
$this->stopWorkerIfLostConnection($exception);
140139
}
141140

142141
// If no job is got off the queue, we will need to sleep the worker.
143-
if (! $this->gotJob) {
142+
if ($this->currentJob === null) {
144143
$this->sleep($options->sleep);
145144
}
146145

147146
// Finally, we will check to see if we have exceeded our memory limits or if
148147
// the queue should restart based on other indications. If so, we'll stop
149148
// this worker and let whatever is "monitoring" it restart the process.
150149
$status = $this->stopIfNecessary(
151-
$options, $lastRestart, $startTime, $jobsProcessed, $job
150+
$options,
151+
$lastRestart,
152+
$startTime,
153+
$jobsProcessed,
154+
$this->currentJob
152155
);
153156

154157
if (! is_null($status)) {
155158
return $this->stop($status);
156159
}
157160

158-
$this->gotJob = false;
161+
$this->currentJob = null;
159162
}
160163
}
161164

@@ -176,14 +179,14 @@ protected function daemonShouldRun(WorkerOptions $options, $connectionName, $que
176179
* Stop listening and bail out of the script.
177180
*
178181
* @param int $status
179-
* @return void
182+
* @return int
180183
*/
181-
public function stop($status = 0): void
184+
public function stop($status = 0): int
182185
{
183186
// Tell the server you are going to stop consuming.
184187
// It will finish up the last message and not send you any more.
185188
$this->channel->basic_cancel($this->consumerTag, false, true);
186189

187-
parent::stop($status);
190+
return parent::stop($status);
188191
}
189192
}

src/Queue/RabbitMQQueue.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@ public function getChannel(): AMQPChannel
279279
* Job class to use.
280280
*
281281
* @return string
282-
* @throws \Throwable
282+
* @throws Throwable
283283
*/
284284
public function getJobClass(): string
285285
{

tests/Feature/TestCase.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use Illuminate\Support\Facades\Queue;
66
use Illuminate\Support\Str;
77
use PhpAmqpLib\Exception\AMQPProtocolChannelException;
8+
use RuntimeException;
89
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob;
910
use VladimirYuldashev\LaravelQueueRabbitMQ\Tests\Mocks\TestJob;
1011
use VladimirYuldashev\LaravelQueueRabbitMQ\Tests\TestCase as BaseTestCase;
@@ -310,7 +311,7 @@ public function testFailed(): void
310311

311312
$job = Queue::pop();
312313

313-
$job->fail(new \RuntimeException($job->resolveName().' has an exception.'));
314+
$job->fail(new RuntimeException($job->resolveName().' has an exception.'));
314315

315316
sleep(1);
316317

0 commit comments

Comments
 (0)