33namespace VladimirYuldashev \LaravelQueueRabbitMQ \Queue ;
44
55use DateTime ;
6+ use ErrorException ;
7+ use Exception ;
68use Illuminate \Contracts \Queue \Queue as QueueContract ;
79use Illuminate \Queue \Queue ;
10+ use Log ;
811use PhpAmqpLib \Channel \AMQPChannel ;
912use PhpAmqpLib \Connection \AMQPConnection ;
1013use PhpAmqpLib \Message \AMQPMessage ;
@@ -23,6 +26,7 @@ class RabbitMQQueue extends Queue implements QueueContract
2326 protected $ defaultQueue ;
2427 protected $ configQueue ;
2528 protected $ configExchange ;
29+ protected $ sleepOnError ;
2630
2731 /**
2832 * @param AMQPConnection $amqpConnection
@@ -36,6 +40,7 @@ public function __construct(AMQPConnection $amqpConnection, $config)
3640 $ this ->configExchange = $ config ['exchange_params ' ];
3741 $ this ->declareExchange = $ config ['exchange_declare ' ];
3842 $ this ->declareBindQueue = $ config ['queue_declare_bind ' ];
43+ $ this ->sleepOnError = $ config ['sleep_on_error ' ];
3944
4045 $ this ->channel = $ this ->getChannel ();
4146 }
@@ -66,20 +71,24 @@ public function push($job, $data = '', $queue = null)
6671 public function pushRaw ($ payload , $ queue = null , array $ options = [])
6772 {
6873 $ queue = $ this ->getQueueName ($ queue );
69- $ this ->declareQueue ($ queue );
70- if (isset ($ options ['delay ' ])) {
71- $ queue = $ this ->declareDelayedQueue ($ queue , $ options ['delay ' ]);
74+ try {
75+ $ this ->declareQueue ($ queue );
76+ if (isset ($ options ['delay ' ])) {
77+ $ queue = $ this ->declareDelayedQueue ($ queue , $ options ['delay ' ]);
78+ }
79+
80+ // push job to a queue
81+ $ message = new AMQPMessage ($ payload , [
82+ 'Content-Type ' => 'application/json ' ,
83+ 'delivery_mode ' => 2 ,
84+ ]);
85+
86+ // push task to a queue
87+ $ this ->channel ->basic_publish ($ message , $ queue , $ queue );
88+ } catch (ErrorException $ e ) {
89+ $ this ->reportConnectionError ('pushRaw ' , $ e );
7290 }
7391
74- // push job to a queue
75- $ message = new AMQPMessage ($ payload , [
76- 'Content-Type ' => 'application/json ' ,
77- 'delivery_mode ' => 2 ,
78- ]);
79-
80- // push task to a queue
81- $ this ->channel ->basic_publish ($ message , $ queue , $ queue );
82-
8392 return true ;
8493 }
8594
@@ -109,14 +118,18 @@ public function pop($queue = null)
109118 {
110119 $ queue = $ this ->getQueueName ($ queue );
111120
112- // declare queue if not exists
113- $ this ->declareQueue ($ queue );
121+ try {
122+ // declare queue if not exists
123+ $ this ->declareQueue ($ queue );
114124
115- // get envelope
116- $ message = $ this ->channel ->basic_get ($ queue );
125+ // get envelope
126+ $ message = $ this ->channel ->basic_get ($ queue );
117127
118- if ($ message instanceof AMQPMessage) {
119- return new RabbitMQJob ($ this ->container , $ this , $ this ->channel , $ queue , $ message );
128+ if ($ message instanceof AMQPMessage) {
129+ return new RabbitMQJob ($ this ->container , $ this , $ this ->channel , $ queue , $ message );
130+ }
131+ } catch (ErrorException $ e ) {
132+ $ this ->reportConnectionError ('pop ' , $ e );
120133 }
121134
122135 return null ;
@@ -215,4 +228,15 @@ private function declareDelayedQueue($destination, $delay)
215228 return $ name ;
216229 }
217230
231+ /**
232+ * @param string $action
233+ * @param Exception $e
234+ */
235+ private function reportConnectionError ($ action , Exception $ e )
236+ {
237+ Log::error ('AMQP error while attempting ' . $ action . ': ' . $ e ->getMessage ());
238+
239+ // Sleep so that we don't flood the log file
240+ sleep ($ this ->sleepOnError );
241+ }
218242}
0 commit comments