Skip to content

Commit a91976d

Browse files
committed
Changes from pull request vyuldashev#94
1 parent 1252d6c commit a91976d

File tree

2 files changed

+69
-36
lines changed

2 files changed

+69
-36
lines changed

src/VladimirYuldashev/LaravelQueueRabbitMQ/Queue/RabbitMQQueue.php

Lines changed: 62 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
namespace VladimirYuldashev\LaravelQueueRabbitMQ\Queue;
44

55
use DateTime;
6+
use ErrorException;
7+
use Exception;
8+
use Log;
69
use Illuminate\Contracts\Queue\Queue as QueueContract;
710
use Illuminate\Queue\Queue;
811
use PhpAmqpLib\Channel\AMQPChannel;
@@ -24,6 +27,8 @@ class RabbitMQQueue extends Queue implements QueueContract
2427
protected $declareExchange;
2528
protected $declaredExchanges = [];
2629
protected $declareBindQueue;
30+
protected $sleepOnError;
31+
2732
protected $declaredQueues = [];
2833

2934
protected $defaultQueue;
@@ -52,6 +57,7 @@ public function __construct(AMQPStreamConnection $amqpConnection, $config)
5257
$this->configExchange = $config['exchange_params'];
5358
$this->declareExchange = $config['exchange_declare'];
5459
$this->declareBindQueue = $config['queue_declare_bind'];
60+
$this->sleepOnError = isset($config['sleep_on_error']) ? $config['sleep_on_error'] : 5;
5561

5662
$this->channel = $this->getChannel();
5763
}
@@ -92,33 +98,39 @@ public function push($job, $data = '', $queue = null)
9298
*/
9399
public function pushRaw($payload, $queue = null, array $options = [])
94100
{
95-
$queue = $this->getQueueName($queue);
96-
$this->declareQueue($queue);
97-
if (isset($options['delay']) && $options['delay'] > 0) {
98-
list($queue, $exchange) = $this->declareDelayedQueue($queue, $options['delay']);
99-
} else {
100-
list($queue, $exchange) = $this->declareQueue($queue);
101-
}
102-
103-
$headers = [
104-
'Content-Type' => 'application/json',
105-
'delivery_mode' => 2,
106-
];
107-
108-
if (isset($this->attempts) === true) {
109-
$headers['application_headers'] = [self::ATTEMPT_COUNT_HEADERS_KEY => ['I', $this->attempts]];
101+
try {
102+
$queue = $this->getQueueName($queue);
103+
$this->declareQueue($queue);
104+
if (isset($options['delay']) && $options['delay'] > 0) {
105+
list($queue, $exchange) = $this->declareDelayedQueue($queue, $options['delay']);
106+
} else {
107+
list($queue, $exchange) = $this->declareQueue($queue);
108+
}
109+
110+
$headers = [
111+
'Content-Type' => 'application/json',
112+
'delivery_mode' => 2,
113+
];
114+
115+
if (isset($this->attempts) === true) {
116+
$headers['application_headers'] = [self::ATTEMPT_COUNT_HEADERS_KEY => ['I', $this->attempts]];
117+
}
118+
119+
// push job to a queue
120+
$message = new AMQPMessage($payload, $headers);
121+
122+
$correlationId = $this->getCorrelationId();
123+
$message->set('correlation_id', $correlationId);
124+
125+
// push task to a queue
126+
$this->channel->basic_publish($message, $exchange, $queue);
127+
128+
return $correlationId;
129+
} catch (ErrorException $exception) {
130+
$this->reportConnectionError('pushRaw', $exception);
110131
}
111132

112-
// push job to a queue
113-
$message = new AMQPMessage($payload, $headers);
114-
115-
$correlationId = $this->getCorrelationId();
116-
$message->set('correlation_id', $correlationId);
117-
118-
// push task to a queue
119-
$this->channel->basic_publish($message, $exchange, $queue);
120-
121-
return $correlationId;
133+
return null;
122134
}
123135

124136
/**
@@ -147,15 +159,22 @@ public function pop($queue = null)
147159
{
148160
$queue = $this->getQueueName($queue);
149161

150-
// declare queue if not exists
151-
$this->declareQueue($queue);
162+
try {
163+
// declare queue if not exists
164+
$this->declareQueue($queue);
152165

153-
// get envelope
154-
$message = $this->channel->basic_get($queue);
166+
// get envelope
167+
$message = $this->channel->basic_get($queue);
155168

156-
if ($message instanceof AMQPMessage) {
157-
return new RabbitMQJob($this->container, $this, $this->channel, $queue, $message);
169+
if ($message instanceof AMQPMessage) {
170+
return new RabbitMQJob($this->container, $this, $this->channel, $queue, $message);
171+
}
158172
}
173+
catch(ErrorException $exception) {
174+
$this->reportConnectionError('pop', $exception);
175+
}
176+
177+
return null;
159178
}
160179

161180
/**
@@ -299,4 +318,16 @@ public function getCorrelationId()
299318
{
300319
return $this->correlationId ?: uniqid();
301320
}
321+
322+
/**
323+
* @param string $action
324+
* @param Exception $e
325+
*/
326+
private function reportConnectionError($action, Exception $e)
327+
{
328+
Log::error('AMQP error while attempting ' . $action . ': ' . $e->getMessage());
329+
// Sleep so that we don't flood the log file
330+
sleep($this->sleepOnError);
331+
}
332+
302333
}

src/config/rabbitmq.php

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@
1212
'host' => env('RABBITMQ_HOST', '127.0.0.1'),
1313
'port' => env('RABBITMQ_PORT', 5672),
1414

15-
'vhost' => env('RABBITMQ_VHOST', '/'),
16-
'login' => env('RABBITMQ_LOGIN', 'guest'),
15+
'vhost' => env('RABBITMQ_VHOST', '/'),
16+
'login' => env('RABBITMQ_LOGIN', 'guest'),
1717
'password' => env('RABBITMQ_PASSWORD', 'guest'),
1818

1919
'queue' => env('RABBITMQ_QUEUE'),
@@ -25,9 +25,9 @@
2525
// create the queue if not exists and bind to the exchange
2626

2727
'queue_params' => [
28-
'passive' => env('RABBITMQ_QUEUE_PASSIVE', false),
29-
'durable' => env('RABBITMQ_QUEUE_DURABLE', true),
30-
'exclusive' => env('RABBITMQ_QUEUE_EXCLUSIVE', false),
28+
'passive' => env('RABBITMQ_QUEUE_PASSIVE', false),
29+
'durable' => env('RABBITMQ_QUEUE_DURABLE', true),
30+
'exclusive' => env('RABBITMQ_QUEUE_EXCLUSIVE', false),
3131
'auto_delete' => env('RABBITMQ_QUEUE_AUTODELETE', false),
3232
],
3333
'exchange_params' => [
@@ -40,4 +40,6 @@
4040
'auto_delete' => env('RABBITMQ_EXCHANGE_AUTODELETE', false),
4141
],
4242

43+
'sleep_on_error' => env('RABBITMQ_ERROR_SLEEP', 5), // the number of seconds to sleep if there's an error communicating with rabbitmq
44+
4345
];

0 commit comments

Comments
 (0)