File tree Expand file tree Collapse file tree 2 files changed +11
-10
lines changed Expand file tree Collapse file tree 2 files changed +11
-10
lines changed Original file line number Diff line number Diff line change 66use Psr \Log \LoggerInterface ;
77use Salesmessage \LibRabbitMQ \Dto \VhostApiDto ;
88use Salesmessage \LibRabbitMQ \Services \Api \RabbitApiClient ;
9+ use Throwable ;
910
1011class QueueService
1112{
Original file line number Diff line number Diff line change @@ -365,15 +365,14 @@ private function processBatch(RabbitMQQueue $connection): void
365365 }
366366
367367 $ this ->connectionMutex ->lock (static ::MAIN_HANDLER_LOCK );
368- foreach ($ batchJobMessages as $ batchMessage ) {
369- if ($ isBatchSuccess ) {
370- $ this ->ackMessage ($ batchMessage );
371-
372- continue ;
368+ if ($ isBatchSuccess ) {
369+ $ lastBatchMessage = end ($ batchJobMessages );
370+ $ this ->ackMessage ($ lastBatchMessage , true );
371+ } else {
372+ foreach ($ batchJobMessages as $ batchMessage ) {
373+ $ job = $ this ->getJobByMessage ($ batchMessage , $ connection );
374+ $ this ->processSingleJob ($ job );
373375 }
374-
375- $ job = $ this ->getJobByMessage ($ batchMessage , $ connection );
376- $ this ->processSingleJob ($ job , $ connection );
377376 }
378377 $ this ->connectionMutex ->unlock (static ::MAIN_HANDLER_LOCK );
379378 }
@@ -423,12 +422,13 @@ private function processSingleJob(RabbitMQJob $job): void
423422
424423 /**
425424 * @param AMQPMessage $message
425+ * @param bool $multiple
426426 * @return void
427427 */
428- private function ackMessage (AMQPMessage $ message ): void
428+ private function ackMessage (AMQPMessage $ message, bool $ multiple = false ): void
429429 {
430430 try {
431- $ message ->ack ();
431+ $ message ->ack ($ multiple );
432432 } catch (Throwable $ exception ) {
433433 $ this ->output ->error ('Ack message error: ' . $ exception ->getMessage ());
434434 }
You can’t perform that action at this time.
0 commit comments