@@ -110,6 +110,8 @@ public function daemon($connectionName, $queue, WorkerOptions $options)
110110 {
111111 $ this ->goAheadOrWait ();
112112
113+ $ this ->connectionMutex = new Mutex (false );
114+
113115 $ this ->configConnectionName = (string ) $ connectionName ;
114116 $ this ->workerOptions = $ options ;
115117
@@ -121,9 +123,7 @@ public function daemon($connectionName, $queue, WorkerOptions $options)
121123
122124 [$ startTime , $ jobsProcessed ] = [hrtime (true ) / 1e9 , 0 ];
123125
124- $ connection = $ this ->initConnection ();
125-
126- $ this ->startConsuming ();
126+ $ connection = $ this ->startConsuming ();
127127
128128 while ($ this ->channel ->is_consuming ()) {
129129 // Before reserving any jobs, we will make sure this queue is not paused and
@@ -175,9 +175,10 @@ public function daemon($connectionName, $queue, WorkerOptions $options)
175175 if (false === $ this ->hasJob ) {
176176 $ this ->output ->info ('Consuming sleep. No job... ' );
177177
178+ $ this ->stopConsuming ();
179+
178180 $ this ->processBatch ($ connection );
179181
180- $ this ->stopConsuming ();
181182 $ this ->goAheadOrWait ();
182183 $ this ->startConsuming ();
183184
@@ -231,7 +232,11 @@ protected function getStopStatus(
231232 };
232233 }
233234
234- private function startConsuming ()
235+ /**
236+ * @return RabbitMQQueue
237+ * @throws Exceptions\MutexTimeout
238+ */
239+ private function startConsuming (): RabbitMQQueue
235240 {
236241 $ this ->output ->info (sprintf (
237242 'Start consuming. Vhost: "%s". Queue: "%s" ' ,
@@ -276,9 +281,10 @@ private function startConsuming()
276281 ]);
277282
278283 if ($ jobsProcessed >= $ this ->batchSize ) {
284+ $ this ->stopConsuming ();
285+
279286 $ this ->processBatch ($ connection );
280287
281- $ this ->stopConsuming ();
282288 $ this ->goAheadOrWait ();
283289 $ this ->startConsuming ();
284290 }
@@ -294,7 +300,7 @@ private function startConsuming()
294300 try {
295301 $ this ->channel ->basic_consume (
296302 $ this ->currentQueueName ,
297- $ this ->consumerTag ,
303+ $ this ->getTagName () ,
298304 false ,
299305 false ,
300306 false ,
@@ -328,9 +334,13 @@ private function startConsuming()
328334 $ this ->updateLastProcessedAt ();
329335
330336 if (false === $ isSuccess ) {
337+ $ this ->stopConsuming ();
338+
331339 $ this ->goAheadOrWait ();
332340 return $ this ->startConsuming ();
333341 }
342+
343+ return $ connection ;
334344 }
335345
336346 /**
@@ -507,10 +517,10 @@ private function ackMessage(AMQPMessage $message, bool $multiple = false): void
507517 * @return void
508518 * @throws Exceptions\MutexTimeout
509519 */
510- private function stopConsuming ()
520+ private function stopConsuming (): void
511521 {
512522 $ this ->connectionMutex ->lock (self ::MAIN_HANDLER_LOCK );
513- $ this ->channel ->basic_cancel ($ this ->consumerTag , true );
523+ $ this ->channel ->basic_cancel ($ this ->getTagName () , true );
514524 $ this ->connectionMutex ->unlock (self ::MAIN_HANDLER_LOCK );
515525 }
516526
@@ -717,7 +727,7 @@ private function updateLastProcessedAt()
717727 /**
718728 * @return RabbitMQQueue
719729 */
720- private function initConnection ()
730+ private function initConnection (): RabbitMQQueue
721731 {
722732 $ connection = $ this ->manager ->connection (
723733 ConnectionNameDto::getVhostConnectionName ($ this ->currentVhostName , $ this ->configConnectionName )
@@ -751,19 +761,26 @@ private function initConnection()
751761 }
752762
753763 $ this ->currentConnectionName = $ connection ->getConnectionName ();
754- $ this ->channel = $ channel ;
755-
756- $ this ->connectionMutex = new Mutex (false );
757764
758765 $ this ->connectionMutex ->lock (self ::MAIN_HANDLER_LOCK );
759- $ this -> channel ->basic_qos (
766+ $ channel ->basic_qos (
760767 $ this ->prefetchSize ,
761768 $ this ->prefetchCount ,
762769 false
763770 );
764771 $ this ->connectionMutex ->unlock (self ::MAIN_HANDLER_LOCK );
765772
773+ $ this ->channel = $ channel ;
774+
766775 return $ connection ;
767776 }
777+
778+ /**
779+ * @return string
780+ */
781+ private function getTagName (): string
782+ {
783+ return $ this ->consumerTag . '_ ' . $ this ->currentVhostName ;
784+ }
768785}
769786
0 commit comments