99use Interop \Amqp \AmqpQueue ;
1010use Interop \Amqp \AmqpTopic ;
1111use Interop \Amqp \Impl \AmqpBind ;
12- use Log ;
12+ use Psr \ Log \ LoggerInterface ;
1313use RuntimeException ;
1414use VladimirYuldashev \LaravelQueueRabbitMQ \Queue \Jobs \RabbitMQJob ;
1515
@@ -20,12 +20,8 @@ class RabbitMQQueue extends Queue implements QueueContract
2020 */
2121 const ATTEMPT_COUNT_HEADERS_KEY = 'attempts_count ' ;
2222
23- /**
24- * @var AmqpContext
25- */
26- protected $ context ;
27-
2823 protected $ declareExchange ;
24+ protected $ declareQueue ;
2925 protected $ declareBindQueue ;
3026 protected $ sleepOnError ;
3127
@@ -37,6 +33,10 @@ class RabbitMQQueue extends Queue implements QueueContract
3733 private $ declaredExchanges = [];
3834 private $ declaredQueues = [];
3935
36+ /**
37+ * @var AmqpContext
38+ */
39+ private $ context ;
4040 private $ retryAfter ;
4141 private $ correlationId ;
4242
@@ -48,15 +48,16 @@ public function __construct(AmqpContext $context, array $config)
4848 $ this ->queueArguments = isset ($ this ->queueParameters ['arguments ' ]) ? json_decode ($ this ->queueParameters ['arguments ' ], true ) : [];
4949 $ this ->configExchange = $ config ['exchange_params ' ];
5050 $ this ->declareExchange = $ config ['exchange_declare ' ];
51+ $ this ->declareQueue = $ config ['queue_declare ' ];
5152 $ this ->declareBindQueue = $ config ['queue_declare_bind ' ];
5253 $ this ->sleepOnError = $ config ['sleep_on_error ' ] ?? 5 ;
5354 }
5455
5556 /** @inheritdoc */
5657 public function size ($ queueName = null ): int
5758 {
58- $ queue = $ this -> context -> createQueue ( $ this -> getQueueName ( $ queueName ));
59- $ queue-> addFlag (AmqpQueue:: FLAG_PASSIVE );
59+ /** @var AmqpQueue $queue */
60+ list ( $ queue) = $ this -> declareEverything ( $ queueName );
6061
6162 return $ this ->context ->declareQueue ($ queue );
6263 }
@@ -71,10 +72,8 @@ public function push($job, $data = '', $queue = null)
7172 public function pushRaw ($ payload , $ queueName = null , array $ options = [])
7273 {
7374 try {
74- $ queueName = $ this ->getQueueName ($ queueName );
75- list ($ queueName , $ exchangeName ) = $ this ->declareQueue ($ queueName );
76-
77- $ topic = $ this ->context ->createTopic ($ exchangeName );
75+ /** @var AmqpTopic $topic */
76+ list (, $ topic ) = $ this ->declareEverything ($ queueName );
7877
7978 $ message = $ this ->context ->createMessage ($ payload );
8079 $ message ->setRoutingKey ($ queueName );
@@ -110,19 +109,13 @@ public function later($delay, $job, $data = '', $queue = null)
110109 /** @inheritdoc */
111110 public function pop ($ queueName = null )
112111 {
113- $ queueName = $ this ->getQueueName ($ queueName );
114-
115112 try {
116- // declare queue if not exists
117- $ this ->declareQueue ($ queueName );
113+ /** @var AmqpQueue $ queue */
114+ list ( $ queue ) = $ this ->declareEverything ($ queueName );
118115
119-
120- $ queue = $ this ->context ->createQueue ($ queueName );
121116 $ consumer = $ this ->context ->createConsumer ($ queue );
122117
123- $ message = $ consumer ->receiveNoWait ();
124-
125- if ($ message ) {
118+ if ($ message = $ consumer ->receiveNoWait ()) {
126119 return new RabbitMQJob (
127120 $ this ->container ,
128121 $ this ,
@@ -173,67 +166,68 @@ public function setCorrelationId(string $id)
173166 $ this ->correlationId = $ id ;
174167 }
175168
176- private function getQueueName (string $ queue = null ): string
169+ /**
170+ * @return AmqpContext
171+ */
172+ public function getContext (): AmqpContext
177173 {
178- return $ queue ?: $ this ->defaultQueue ;
174+ return $ this ->context ;
179175 }
180176
181- private function declareQueue (string $ queueName ): array
177+ /**
178+ * @param string $queueName
179+ *
180+ * @return array [Interop\Amqp\AmqpQueue, Interop\Amqp\AmqpTopic]
181+ */
182+ private function declareEverything (string $ queueName ): array
182183 {
183- $ queueName = $ this ->getQueueName ( $ queueName ) ;
184+ $ queueName = $ queueName ?: $ this ->defaultQueue ;
184185 $ exchangeName = $ this ->configExchange ['name ' ] ?: $ queueName ;
185186
186- if ($ this ->declareExchange && !in_array ($ exchangeName , $ this ->declaredExchanges , true )) {
187- $ topic = $ this ->context ->createTopic ($ exchangeName );
188- $ topic ->setType ($ this ->configExchange ['type ' ]);
189-
190- if ($ this ->configExchange ['passive ' ]) {
191- $ topic ->addFlag (AmqpTopic::FLAG_PASSIVE );
192- }
193- if ($ this ->configExchange ['durable ' ]) {
194- $ topic ->addFlag (AmqpTopic::FLAG_DURABLE );
195- }
196- if ($ this ->configExchange ['auto_delete ' ]) {
197- $ topic ->addFlag (AmqpTopic::FLAG_AUTODELETE );
198- }
187+ $ topic = $ this ->context ->createTopic ($ exchangeName );
188+ $ topic ->setType ($ this ->configExchange ['type ' ]);
189+ if ($ this ->configExchange ['passive ' ]) {
190+ $ topic ->addFlag (AmqpTopic::FLAG_PASSIVE );
191+ }
192+ if ($ this ->configExchange ['durable ' ]) {
193+ $ topic ->addFlag (AmqpTopic::FLAG_DURABLE );
194+ }
195+ if ($ this ->configExchange ['auto_delete ' ]) {
196+ $ topic ->addFlag (AmqpTopic::FLAG_AUTODELETE );
197+ }
199198
199+ if ($ this ->declareExchange && !in_array ($ exchangeName , $ this ->declaredExchanges , true )) {
200200 $ this ->context ->declareTopic ($ topic );
201201
202202 $ this ->declaredExchanges [] = $ exchangeName ;
203203 }
204204
205- if ($ this ->declareBindQueue && !in_array ($ queueName , $ this ->declaredQueues , true )) {
206- $ queue = $ this ->context ->createQueue ($ queueName );
207-
208-
209- if ($ this ->queueParameters ['passive ' ]) {
210- $ queue ->addFlag (AmqpQueue::FLAG_PASSIVE );
211- }
212- if ($ this ->queueParameters ['durable ' ]) {
213- $ queue ->addFlag (AmqpQueue::FLAG_DURABLE );
214- }
215- if ($ this ->queueParameters ['exclusive ' ]) {
216- $ queue ->addFlag (AmqpQueue::FLAG_EXCLUSIVE );
217- }
218- if ($ this ->queueParameters ['auto_delete ' ]) {
219- $ queue ->addFlag (AmqpQueue::FLAG_AUTODELETE );
220- }
221-
222- $ queue ->setArguments ($ this ->queueArguments );
205+ $ queue = $ this ->context ->createQueue ($ queueName );
206+ $ queue ->setArguments ($ this ->queueArguments );
207+ if ($ this ->queueParameters ['passive ' ]) {
208+ $ queue ->addFlag (AmqpQueue::FLAG_PASSIVE );
209+ }
210+ if ($ this ->queueParameters ['durable ' ]) {
211+ $ queue ->addFlag (AmqpQueue::FLAG_DURABLE );
212+ }
213+ if ($ this ->queueParameters ['exclusive ' ]) {
214+ $ queue ->addFlag (AmqpQueue::FLAG_EXCLUSIVE );
215+ }
216+ if ($ this ->queueParameters ['auto_delete ' ]) {
217+ $ queue ->addFlag (AmqpQueue::FLAG_AUTODELETE );
218+ }
223219
220+ if ($ this ->declareQueue && !in_array ($ queueName , $ this ->declaredQueues , true )) {
224221 $ this ->context ->declareQueue ($ queue );
225222
226-
227- $ this ->context ->bind (new AmqpBind (
228- $ queue ,
229- $ this ->context ->createTopic ($ exchangeName ),
230- $ queueName )
231- );
232-
233223 $ this ->declaredQueues [] = $ queueName ;
234224 }
235225
236- return [$ queueName , $ exchangeName ];
226+ if ($ this ->declareBindQueue ) {
227+ $ this ->context ->bind (new AmqpBind ($ queue , $ topic , $ queueName ));
228+ }
229+
230+ return [$ queue , $ topic ];
237231 }
238232
239233 /**
@@ -243,11 +237,14 @@ private function declareQueue(string $queueName): array
243237 */
244238 protected function reportConnectionError ($ action , \Exception $ e )
245239 {
246- Log::error ('AMQP error while attempting ' . $ action . ': ' . $ e ->getMessage ());
240+ /** @var LoggerInterface $logger */
241+ $ logger = $ this ->container ['log ' ];
242+
243+ $ logger ->error ('AMQP error while attempting ' . $ action . ': ' . $ e ->getMessage ());
247244
248245 // If it's set to false, throw an error rather than waiting
249246 if ($ this ->sleepOnError === false ) {
250- throw new RuntimeException ('Error writing data to the connection with RabbitMQ ' );
247+ throw new RuntimeException ('Error writing data to the connection with RabbitMQ ' , null , $ e );
251248 }
252249
253250 // Sleep so that we don't flood the log file
0 commit comments