22
33namespace VladimirYuldashev \LaravelQueueRabbitMQ \Queue ;
44
5- use DateTime ;
65use ErrorException ;
76use Exception ;
87use Illuminate \Contracts \Queue \Queue as QueueContract ;
1211use PhpAmqpLib \Connection \AMQPStreamConnection ;
1312use PhpAmqpLib \Message \AMQPMessage ;
1413use PhpAmqpLib \Wire \AMQPTable ;
14+ use RuntimeException ;
1515use VladimirYuldashev \LaravelQueueRabbitMQ \Queue \Jobs \RabbitMQJob ;
1616
1717class RabbitMQQueue extends Queue implements QueueContract
@@ -25,41 +25,30 @@ class RabbitMQQueue extends Queue implements QueueContract
2525 protected $ channel ;
2626
2727 protected $ declareExchange ;
28- protected $ declaredExchanges = [];
2928 protected $ declareBindQueue ;
3029 protected $ sleepOnError ;
3130
32- protected $ declaredQueues = [];
33-
3431 protected $ defaultQueue ;
3532 protected $ configQueue ;
3633 protected $ configQueueArguments ;
3734 protected $ configExchange ;
3835
39- /**
40- * @var int
41- */
42- private $ retryAfter ;
36+ private $ declaredExchanges = [];
37+ private $ declaredQueues = [];
4338
44- /**
45- * @var string
46- */
39+ private $ retryAfter ;
4740 private $ correlationId ;
4841
49- /**
50- * @param AMQPStreamConnection $amqpConnection
51- * @param array $config
52- */
53- public function __construct (AMQPStreamConnection $ amqpConnection , $ config )
42+ public function __construct (AMQPStreamConnection $ connection , array $ config )
5443 {
55- $ this ->connection = $ amqpConnection ;
44+ $ this ->connection = $ connection ;
5645 $ this ->defaultQueue = $ config ['queue ' ];
5746 $ this ->configQueue = $ config ['queue_params ' ];
5847 $ this ->configQueueArguments = json_decode ($ this ->configQueue ['arguments ' ], 1 ) ?: [];
5948 $ this ->configExchange = $ config ['exchange_params ' ];
6049 $ this ->declareExchange = $ config ['exchange_declare ' ];
6150 $ this ->declareBindQueue = $ config ['queue_declare_bind ' ];
62- $ this ->sleepOnError = isset ( $ config ['sleep_on_error ' ]) ? $ config [ ' sleep_on_error ' ] : 5 ;
51+ $ this ->sleepOnError = $ config ['sleep_on_error ' ] ?? 5 ;
6352
6453 $ this ->channel = $ this ->getChannel ();
6554 }
@@ -71,7 +60,7 @@ public function __construct(AMQPStreamConnection $amqpConnection, $config)
7160 *
7261 * @return int
7362 */
74- public function size ($ queue = null )
63+ public function size ($ queue = null ): int
7564 {
7665 list (, $ messageCount ) = $ this ->channel ->queue_declare ($ this ->getQueueName ($ queue ), true );
7766
@@ -82,12 +71,12 @@ public function size($queue = null)
8271 * Push a new job onto the queue.
8372 *
8473 * @param string $job
85- * @param mixed $data
74+ * @param mixed $data
8675 * @param string $queue
8776 *
8877 * @return bool
8978 */
90- public function push ($ job , $ data = '' , $ queue = null )
79+ public function push ($ job , $ data = '' , $ queue = null ): bool
9180 {
9281 return $ this ->pushRaw ($ this ->createPayload ($ job , $ data ), $ queue , []);
9382 }
@@ -97,7 +86,7 @@ public function push($job, $data = '', $queue = null)
9786 *
9887 * @param string $payload
9988 * @param string $queue
100- * @param array $options
89+ * @param array $options
10190 *
10291 * @return mixed
10392 */
@@ -112,7 +101,7 @@ public function pushRaw($payload, $queue = null, array $options = [])
112101 }
113102
114103 $ headers = [
115- 'Content-Type ' => 'application/json ' ,
104+ 'Content-Type ' => 'application/json ' ,
116105 'delivery_mode ' => 2 ,
117106 ];
118107
@@ -132,16 +121,18 @@ public function pushRaw($payload, $queue = null, array $options = [])
132121 return $ correlationId ;
133122 } catch (ErrorException $ exception ) {
134123 $ this ->reportConnectionError ('pushRaw ' , $ exception );
124+
125+ return null ;
135126 }
136127 }
137128
138129 /**
139130 * Push a new job onto the queue after a delay.
140131 *
141132 * @param \DateTime|int $delay
142- * @param string $job
143- * @param mixed $data
144- * @param string $queue
133+ * @param string $job
134+ * @param mixed $data
135+ * @param string $queue
145136 *
146137 * @return mixed
147138 */
@@ -181,32 +172,26 @@ public function pop($queue = null)
181172 } catch (ErrorException $ exception ) {
182173 $ this ->reportConnectionError ('pop ' , $ exception );
183174 }
175+
176+ return null ;
184177 }
185178
186179 /**
187180 * @param string $queue
188181 *
189182 * @return string
190183 */
191- private function getQueueName ($ queue )
184+ private function getQueueName ($ queue ): string
192185 {
193186 return $ queue ?: $ this ->defaultQueue ;
194187 }
195188
196- /**
197- * @return AMQPChannel
198- */
199- private function getChannel ()
189+ private function getChannel (): AMQPChannel
200190 {
201191 return $ this ->connection ->channel ();
202192 }
203193
204- /**
205- * @param $name
206- *
207- * @return array
208- */
209- private function declareQueue ($ name )
194+ private function declareQueue (string $ name ): array
210195 {
211196 $ name = $ this ->getQueueName ($ name );
212197 $ exchange = $ this ->configExchange ['name ' ] ?: $ name ;
@@ -245,18 +230,12 @@ private function declareQueue($name)
245230 return [$ name , $ exchange ];
246231 }
247232
248- /**
249- * @param string $destination
250- * @param DateTime|int $delay
251- *
252- * @return array
253- */
254- private function declareDelayedQueue ($ destination , $ delay )
233+ private function declareDelayedQueue (string $ destination , $ delay ): array
255234 {
256235 $ delay = $ this ->secondsUntil ($ delay );
257236 $ destination = $ this ->getQueueName ($ destination );
258237 $ destinationExchange = $ this ->configExchange ['name ' ] ?: $ destination ;
259- $ name = $ this ->getQueueName ($ destination ). '_deferred_ ' . $ delay ;
238+ $ name = $ this ->getQueueName ($ destination ) . '_deferred_ ' . $ delay ;
260239 $ exchange = $ this ->configExchange ['name ' ] ?: $ destination ;
261240
262241 // declare exchange
@@ -273,11 +252,11 @@ private function declareDelayedQueue($destination, $delay)
273252 // declare queue
274253 if (!in_array ($ name , $ this ->declaredQueues , true )) {
275254 $ queueArguments = array_merge ([
276- 'x-dead-letter-exchange ' => $ destinationExchange ,
277- 'x-dead-letter-routing-key ' => $ destination ,
278- 'x-message-ttl ' => $ delay * 1000 ,
279- ], (array )$ this ->configQueueArguments );
280-
255+ 'x-dead-letter-exchange ' => $ destinationExchange ,
256+ 'x-dead-letter-routing-key ' => $ destination ,
257+ 'x-message-ttl ' => $ delay * 1000 ,
258+ ], (array )$ this ->configQueueArguments );
259+
281260 $ this ->channel ->queue_declare (
282261 $ name ,
283262 $ this ->configQueue ['passive ' ],
@@ -302,7 +281,7 @@ private function declareDelayedQueue($destination, $delay)
302281 *
303282 * @return void
304283 */
305- public function setAttempts ($ count )
284+ public function setAttempts (int $ count )
306285 {
307286 $ this ->retryAfter = $ count ;
308287 }
@@ -314,7 +293,7 @@ public function setAttempts($count)
314293 *
315294 * @return void
316295 */
317- public function setCorrelationId ($ id )
296+ public function setCorrelationId (string $ id )
318297 {
319298 $ this ->correlationId = $ id ;
320299 }
@@ -324,23 +303,23 @@ public function setCorrelationId($id)
324303 *
325304 * @return string
326305 */
327- public function getCorrelationId ()
306+ public function getCorrelationId (): string
328307 {
329308 return $ this ->correlationId ?: uniqid ('' , true );
330309 }
331310
332311 /**
333- * @param string $action
312+ * @param string $action
334313 * @param Exception $e
335314 * @throws Exception
336315 */
337316 protected function reportConnectionError ($ action , Exception $ e )
338317 {
339- Log::error ('AMQP error while attempting ' . $ action. ': ' . $ e ->getMessage ());
318+ Log::error ('AMQP error while attempting ' . $ action . ': ' . $ e ->getMessage ());
340319
341320 // If it's set to false, throw an error rather than waiting
342321 if ($ this ->sleepOnError === false ) {
343- throw new \ RuntimeException ('Error writing data to the connection with RabbitMQ ' );
322+ throw new RuntimeException ('Error writing data to the connection with RabbitMQ ' );
344323 }
345324
346325 // Sleep so that we don't flood the log file
0 commit comments