Skip to content

Commit 3d1836c

Browse files
committed
Changes from pull request vyuldashev#94
1 parent 3765473 commit 3d1836c

File tree

1 file changed

+62
-33
lines changed

1 file changed

+62
-33
lines changed

src/VladimirYuldashev/LaravelQueueRabbitMQ/Queue/RabbitMQQueue.php

Lines changed: 62 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
namespace VladimirYuldashev\LaravelQueueRabbitMQ\Queue;
44

55
use DateTime;
6+
use ErrorException;
7+
use Exception;
8+
use Log;
69
use Illuminate\Contracts\Queue\Queue as QueueContract;
710
use Illuminate\Queue\Queue;
811
use PhpAmqpLib\Channel\AMQPChannel;
@@ -30,6 +33,7 @@ class RabbitMQQueue extends Queue implements QueueContract
3033
protected $defaultQueue;
3134
protected $configQueue;
3235
protected $configExchange;
36+
protected $sleepOnError;
3337

3438
/**
3539
* @var int
@@ -53,6 +57,7 @@ public function __construct(AMQPStreamConnection $amqpConnection, $config)
5357
$this->configExchange = $config['exchange_params'];
5458
$this->declareExchange = $config['exchange_declare'];
5559
$this->declareBindQueue = $config['queue_declare_bind'];
60+
$this->sleepOnError = isset($config['sleep_on_error']) ? $config['sleep_on_error'] : 5;
5661

5762
$this->channel = $this->getChannel();
5863
}
@@ -83,32 +88,38 @@ public function push($job, $data = '', $queue = null)
8388
public function pushRaw($payload, $queue = null, array $options = [])
8489
{
8590
$queue = $this->getQueueName($queue);
86-
$this->declareQueue($queue);
87-
if (isset($options['delay']) && $options['delay'] > 0) {
88-
list($queue, $exchange) = $this->declareDelayedQueue($queue, $options['delay']);
89-
} else {
90-
list($queue, $exchange) = $this->declareQueue($queue);
91+
try {
92+
$this->declareQueue($queue);
93+
if (isset($options['delay']) && $options['delay'] > 0) {
94+
list($queue, $exchange) = $this->declareDelayedQueue($queue, $options['delay']);
95+
} else {
96+
list($queue, $exchange) = $this->declareQueue($queue);
97+
}
98+
99+
$headers = [
100+
'Content-Type' => 'application/json',
101+
'delivery_mode' => 2,
102+
];
103+
104+
if (isset($this->attempts) === true) {
105+
$headers['application_headers'] = [self::ATTEMPT_COUNT_HEADERS_KEY => ['I', $this->attempts]];
106+
}
107+
108+
// push job to a queue
109+
$message = new AMQPMessage($payload, $headers);
110+
111+
$correlationId = $this->getCorrelationId();
112+
$message->set('correlation_id', $correlationId);
113+
114+
// push task to a queue
115+
$this->channel->basic_publish($message, $exchange, $queue);
116+
117+
return $correlationId;
118+
} catch (ErrorException $exception) {
119+
$this->reportConnectionError('pushRaw', $exception);
91120
}
92121

93-
$headers = [
94-
'Content-Type' => 'application/json',
95-
'delivery_mode' => 2,
96-
];
97-
98-
if (isset($this->attempts) === true) {
99-
$headers['application_headers'] = [self::ATTEMPT_COUNT_HEADERS_KEY => ['I', $this->attempts]];
100-
}
101-
102-
// push job to a queue
103-
$message = new AMQPMessage($payload, $headers);
104-
105-
$correlationId = $this->getCorrelationId();
106-
$message->set('correlation_id', $correlationId);
107-
108-
// push task to a queue
109-
$this->channel->basic_publish($message, $exchange, $queue);
110-
111-
return $correlationId;
122+
return null;
112123
}
113124

114125
/**
@@ -137,15 +148,21 @@ public function pop($queue = null)
137148
{
138149
$queue = $this->getQueueName($queue);
139150

140-
// declare queue if not exists
141-
$this->declareQueue($queue);
151+
try {
152+
// declare queue if not exists
153+
$this->declareQueue($queue);
142154

143-
// get envelope
144-
$message = $this->channel->basic_get($queue);
155+
// get envelope
156+
$message = $this->channel->basic_get($queue);
145157

146-
if ($message instanceof AMQPMessage) {
147-
return new RabbitMQJob($this->container, $this, $this->channel, $queue, $message);
158+
if ($message instanceof AMQPMessage) {
159+
return new RabbitMQJob($this->container, $this, $this->channel, $queue, $message);
160+
}
161+
} catch (ErrorException $exception) {
162+
$this->reportConnectionError('pop', $exception);
148163
}
164+
165+
return null;
149166
}
150167

151168
/**
@@ -175,7 +192,7 @@ private function declareQueue($name)
175192
$name = $this->getQueueName($name);
176193
$exchange = $this->configExchange['name'] ?: $name;
177194

178-
if ($this->declareExchange && ! in_array($exchange, $this->declaredExchanges)) {
195+
if ($this->declareExchange && !in_array($exchange, $this->declaredExchanges)) {
179196
$this->declaredExchanges[] = $exchange;
180197
// declare exchange
181198
$this->channel->exchange_declare(
@@ -187,7 +204,7 @@ private function declareQueue($name)
187204
);
188205
}
189206

190-
if ($this->declareBindQueue && ! in_array($name, $this->declaredQueues)) {
207+
if ($this->declareBindQueue && !in_array($name, $this->declaredQueues)) {
191208
$this->declaredQueues[] = $name;
192209
// declare queue
193210
$this->channel->queue_declare(
@@ -216,7 +233,7 @@ private function declareDelayedQueue($destination, $delay)
216233
$delay = $this->getSeconds($delay);
217234
$destination = $this->getQueueName($destination);
218235
$destinationExchange = $this->configExchange['name'] ?: $destination;
219-
$name = $this->getQueueName($destination).'_deferred_'.$delay;
236+
$name = $this->getQueueName($destination) . '_deferred_' . $delay;
220237
$exchange = $this->configExchange['name'] ?: $destination;
221238

222239
// declare exchange
@@ -282,4 +299,16 @@ public function getCorrelationId()
282299
{
283300
return $this->correlationId ?: uniqid();
284301
}
302+
303+
/**
304+
* @param string $action
305+
* @param Exception $e
306+
*/
307+
private function reportConnectionError($action, Exception $e)
308+
{
309+
Log::error('AMQP error while attempting ' . $action . ': ' . $e->getMessage());
310+
// Sleep so that we don't flood the log file
311+
sleep($this->sleepOnError);
312+
}
313+
285314
}

0 commit comments

Comments
 (0)