Skip to content

Commit 61cc707

Browse files
committed
Added better error handling to rabbitmq connectors
If the RabbitMQ connector was unavailable, and the queue was running in daemon mode, the error log would flood within seconds. This reports the connection error responsibly, and sleeps to stop the log file from being instantly flooded.
1 parent b3038bc commit 61cc707

File tree

2 files changed

+43
-18
lines changed

2 files changed

+43
-18
lines changed

src/VladimirYuldashev/LaravelQueueRabbitMQ/Queue/RabbitMQQueue.php

Lines changed: 42 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,11 @@
33
namespace VladimirYuldashev\LaravelQueueRabbitMQ\Queue;
44

55
use DateTime;
6+
use ErrorException;
7+
use Exception;
68
use Illuminate\Contracts\Queue\Queue as QueueContract;
79
use Illuminate\Queue\Queue;
10+
use Log;
811
use PhpAmqpLib\Channel\AMQPChannel;
912
use PhpAmqpLib\Connection\AMQPConnection;
1013
use PhpAmqpLib\Message\AMQPMessage;
@@ -23,6 +26,7 @@ class RabbitMQQueue extends Queue implements QueueContract
2326
protected $defaultQueue;
2427
protected $configQueue;
2528
protected $configExchange;
29+
protected $sleepOnError;
2630

2731
/**
2832
* @param AMQPConnection $amqpConnection
@@ -36,6 +40,7 @@ public function __construct(AMQPConnection $amqpConnection, $config)
3640
$this->configExchange = $config['exchange_params'];
3741
$this->declareExchange = $config['exchange_declare'];
3842
$this->declareBindQueue = $config['queue_declare_bind'];
43+
$this->sleepOnError = $config['sleep_on_error'];
3944

4045
$this->channel = $this->getChannel();
4146
}
@@ -66,20 +71,24 @@ public function push($job, $data = '', $queue = null)
6671
public function pushRaw($payload, $queue = null, array $options = [])
6772
{
6873
$queue = $this->getQueueName($queue);
69-
$this->declareQueue($queue);
70-
if (isset($options['delay'])) {
71-
$queue = $this->declareDelayedQueue($queue, $options['delay']);
74+
try {
75+
$this->declareQueue($queue);
76+
if (isset($options['delay'])) {
77+
$queue = $this->declareDelayedQueue($queue, $options['delay']);
78+
}
79+
80+
// push job to a queue
81+
$message = new AMQPMessage($payload, [
82+
'Content-Type' => 'application/json',
83+
'delivery_mode' => 2,
84+
]);
85+
86+
// push task to a queue
87+
$this->channel->basic_publish($message, $queue, $queue);
88+
} catch (ErrorException $e) {
89+
$this->reportConnectionError('pushRaw', $e);
7290
}
7391

74-
// push job to a queue
75-
$message = new AMQPMessage($payload, [
76-
'Content-Type' => 'application/json',
77-
'delivery_mode' => 2,
78-
]);
79-
80-
// push task to a queue
81-
$this->channel->basic_publish($message, $queue, $queue);
82-
8392
return true;
8493
}
8594

@@ -109,14 +118,18 @@ public function pop($queue = null)
109118
{
110119
$queue = $this->getQueueName($queue);
111120

112-
// declare queue if not exists
113-
$this->declareQueue($queue);
121+
try {
122+
// declare queue if not exists
123+
$this->declareQueue($queue);
114124

115-
// get envelope
116-
$message = $this->channel->basic_get($queue);
125+
// get envelope
126+
$message = $this->channel->basic_get($queue);
117127

118-
if ($message instanceof AMQPMessage) {
119-
return new RabbitMQJob($this->container, $this, $this->channel, $queue, $message);
128+
if ($message instanceof AMQPMessage) {
129+
return new RabbitMQJob($this->container, $this, $this->channel, $queue, $message);
130+
}
131+
} catch (ErrorException $e) {
132+
$this->reportConnectionError('pop', $e);
120133
}
121134

122135
return null;
@@ -215,4 +228,15 @@ private function declareDelayedQueue($destination, $delay)
215228
return $name;
216229
}
217230

231+
/**
232+
* @param string $action
233+
* @param Exception $e
234+
*/
235+
private function reportConnectionError($action, Exception $e)
236+
{
237+
Log::error('AMQP error while attempting ' . $action . ': ' . $e->getMessage());
238+
239+
// Sleep so that we don't flood the log file
240+
sleep($this->sleepOnError);
241+
}
218242
}

src/config/rabbitmq.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,5 @@
3333
'auto_delete' => env('RABBITMQ_EXCHANGE_AUTODELETE', false),
3434
],
3535

36+
'sleep_on_error' => env('RABBITMQ_ERROR_SLEEP', 5), // the number of seconds to sleep if there's an error communicating with rabbitmq
3637
];

0 commit comments

Comments
 (0)