99use Illuminate \Contracts \Events \Dispatcher ;
1010use Illuminate \Queue \QueueManager ;
1111use Illuminate \Queue \WorkerOptions ;
12+ use Illuminate \Support \Str ;
1213use PhpAmqpLib \Channel \AMQPChannel ;
1314use PhpAmqpLib \Exception \AMQPChannelClosedException ;
1415use PhpAmqpLib \Exception \AMQPConnectionClosedException ;
@@ -54,6 +55,10 @@ class VhostsConsumer extends Consumer
5455
5556 private array $ batchMessages = [];
5657
58+ private ?string $ processingUuid = null ;
59+
60+ private int |float $ processingStartedAt = 0 ;
61+
5762 /**
5863 * @param InternalStorageManager $internalStorageManager
5964 * @param LoggerInterface $logger
@@ -143,11 +148,7 @@ public function daemon($connectionName, $queue, WorkerOptions $options)
143148 $ this ->channel ->wait (null , true , (int ) $ this ->workerOptions ->timeout );
144149 $ this ->connectionMutex ->unlock (self ::MAIN_HANDLER_LOCK );
145150 } catch (AMQPRuntimeException $ exception ) {
146- $ this ->output ->error ('Consuming AMQP Runtime exception. Error: ' . $ exception ->getMessage ());
147-
148- $ this ->logger ->error ('Salesmessage.LibRabbitMQ.VhostsConsumer.daemon.amqp_runtime_exception ' , [
149- 'vhost_name ' => $ this ->currentVhostName ,
150- 'queue_name ' => $ this ->currentQueueName ,
151+ $ this ->logError ('daemon.amqp_runtime_exception ' , [
151152 'message ' => $ exception ->getMessage (),
152153 'trace ' => $ exception ->getTraceAsString (),
153154 ]);
@@ -156,14 +157,10 @@ public function daemon($connectionName, $queue, WorkerOptions $options)
156157
157158 $ this ->kill (self ::EXIT_ERROR , $ this ->workerOptions );
158159 } catch (Exception |Throwable $ exception ) {
159- $ this ->output ->error ('Consuming exception. Error: ' . $ exception ->getMessage ());
160-
161- $ this ->logger ->error ('Salesmessage.LibRabbitMQ.VhostsConsumer.daemon.exception ' , [
162- 'vhost_name ' => $ this ->currentVhostName ,
163- 'queue_name ' => $ this ->currentQueueName ,
164- 'class ' => get_class ($ exception ),
160+ $ this ->logError ('daemon.exception ' , [
165161 'message ' => $ exception ->getMessage (),
166162 'trace ' => $ exception ->getTraceAsString (),
163+ 'error_class ' => get_class ($ exception ),
167164 ]);
168165
169166 $ this ->exceptions ->report ($ exception );
@@ -196,7 +193,9 @@ public function daemon($connectionName, $queue, WorkerOptions $options)
196193 $ this ->hasJob
197194 );
198195 if (! is_null ($ status )) {
199- $ this ->output ->info (['Consuming stop. ' , $ status ]);
196+ $ this ->logInfo ('consuming_stop ' , [
197+ 'status ' => $ status ,
198+ ]);
200199
201200 return $ this ->stop ($ status , $ this ->workerOptions );
202201 }
@@ -238,11 +237,10 @@ protected function getStopStatus(
238237 */
239238 private function startConsuming (): RabbitMQQueue
240239 {
241- $ this ->output ->info (sprintf (
242- 'Start consuming. Vhost: "%s". Queue: "%s" ' ,
243- $ this ->currentVhostName ,
244- $ this ->currentQueueName
245- ));
240+ $ this ->processingUuid = $ this ->generateProcessingUuid ();
241+ $ this ->processingStartedAt = microtime (true );
242+
243+ $ this ->logInfo ('startConsuming.init ' );
246244
247245 $ arguments = [];
248246 if ($ this ->maxPriority ) {
@@ -266,17 +264,8 @@ private function startConsuming(): RabbitMQQueue
266264
267265 $ jobsProcessed ++;
268266
269- $ this ->output ->info (sprintf (
270- 'Consume message. Vhost: "%s". Queue: "%s". Num: %s ' ,
271- $ this ->currentVhostName ,
272- $ this ->currentQueueName ,
273- $ jobsProcessed
274- ));
275-
276- $ this ->logger ->info ('Salesmessage.LibRabbitMQ.VhostsConsumer.startConsuming.consume_message ' , [
277- 'vhost_name ' => $ this ->currentVhostName ,
278- 'queue_name ' => $ this ->currentQueueName ,
279- 'num ' => $ jobsProcessed ,
267+ $ this ->logInfo ('startConsuming.message_consumed ' , [
268+ 'processed_jobs_count ' => $ jobsProcessed ,
280269 'is_support_batching ' => $ isSupportBatching ,
281270 ]);
282271
@@ -312,20 +301,10 @@ private function startConsuming(): RabbitMQQueue
312301 } catch (AMQPProtocolChannelException |AMQPChannelClosedException $ exception ) {
313302 $ isSuccess = false ;
314303
315- $ this ->output ->error (sprintf (
316- 'Start consuming. Vhost: "%s". Queue: "%s". Error: "%s". Code: %d ' ,
317- $ this ->currentVhostName ,
318- $ this ->currentQueueName ,
319- $ exception ->getMessage (),
320- $ exception ->getCode ()
321- ));
322-
323- $ this ->logger ->error ('Salesmessage.LibRabbitMQ.VhostsConsumer.startConsuming.exception ' , [
324- 'vhost_name ' => $ this ->currentVhostName ,
325- 'queue_name ' => $ this ->currentQueueName ,
326- 'class ' => get_class ($ exception ),
304+ $ this ->logError ('startConsuming.exception ' , [
327305 'message ' => $ exception ->getMessage (),
328306 'trace ' => $ exception ->getTraceAsString (),
307+ 'error_class ' => get_class ($ exception ),
329308 ]);
330309 }
331310
@@ -343,6 +322,14 @@ private function startConsuming(): RabbitMQQueue
343322 return $ connection ;
344323 }
345324
325+ /**
326+ * @return string
327+ */
328+ private function generateProcessingUuid (): string
329+ {
330+ return sprintf ('%s:%d:%s ' , $ this ->filtersDto ->getGroup (), time (), Str::random (16 ));
331+ }
332+
346333 /**
347334 * @param AMQPMessage $message
348335 * @return string
@@ -393,37 +380,37 @@ private function processBatch(RabbitMQQueue $connection): void
393380
394381 $ batchSize = count ($ batchJobMessages );
395382 if ($ batchSize > 1 ) {
383+ $ batchTimeStarted = microtime (true );
384+
396385 $ batchData = [];
397386 /** @var AMQPMessage $batchMessage */
398387 foreach ($ batchJobMessages as $ batchMessage ) {
399388 $ job = $ this ->getJobByMessage ($ batchMessage , $ connection );
400389 $ batchData [] = $ job ->getPayloadData ();
401390 }
402391
392+ $ this ->logInfo ('processBatch.start ' , [
393+ 'batch_job_class ' => $ batchJobClass ,
394+ 'batch_size ' => $ batchSize ,
395+ ]);
396+
403397 try {
404398 $ batchJobClass ::collection ($ batchData );
405399 $ isBatchSuccess = true ;
406400
407- $ this ->output ->comment ('Process batch jobs success. Job class: ' . $ batchJobClass . 'Size: ' . $ batchSize );
408-
409- $ this ->logger ->info ('Salesmessage.LibRabbitMQ.VhostsConsumer.processBatch.process_batch_jobs_success ' , [
410- 'vhost_name ' => $ this ->currentVhostName ,
411- 'queue_name ' => $ this ->currentQueueName ,
401+ $ this ->logInfo ('processBatch.finish ' , [
412402 'batch_job_class ' => $ batchJobClass ,
413403 'batch_size ' => $ batchSize ,
404+ 'executive_batch_time_seconds ' => microtime (true ) - $ batchTimeStarted ,
414405 ]);
415406 } catch (Throwable $ exception ) {
416407 $ isBatchSuccess = false ;
417408
418- $ this ->output ->error ('Process batch jobs error. Job class: ' . $ batchJobClass . ' Error: ' . $ exception ->getMessage ());
419-
420- $ this ->logger ->error ('Salesmessage.LibRabbitMQ.VhostsConsumer.processBatch.exception ' , [
421- 'vhost_name ' => $ this ->currentVhostName ,
422- 'queue_name ' => $ this ->currentQueueName ,
409+ $ this ->logError ('processBatch.exception ' , [
423410 'batch_job_class ' => $ batchJobClass ,
424- 'class ' => get_class ($ exception ),
425411 'message ' => $ exception ->getMessage (),
426412 'trace ' => $ exception ->getTraceAsString (),
413+ 'error_class ' => get_class ($ exception ),
427414 ]);
428415 }
429416
@@ -472,23 +459,23 @@ private function getJobByMessage(AMQPMessage $message, RabbitMQQueue $connection
472459 */
473460 private function processSingleJob (RabbitMQJob $ job ): void
474461 {
462+ $ timeStarted = microtime (true );
463+ $ this ->logInfo ('processSingleJob.start ' );
464+
475465 if ($ this ->supportsAsyncSignals ()) {
476466 $ this ->registerTimeoutHandler ($ job , $ this ->workerOptions );
477467 }
478468
479469 $ this ->runJob ($ job , $ this ->currentConnectionName , $ this ->workerOptions );
480470 $ this ->updateLastProcessedAt ();
481471
482- $ this ->output ->info ('Process single job... ' );
483-
484- $ this ->logger ->info ('Salesmessage.LibRabbitMQ.VhostsConsumer.processSingleJob.success ' , [
485- 'vhost_name ' => $ this ->currentVhostName ,
486- 'queue_name ' => $ this ->currentQueueName ,
487- ]);
488-
489472 if ($ this ->supportsAsyncSignals ()) {
490473 $ this ->resetTimeoutHandler ();
491474 }
475+
476+ $ this ->logInfo ('processSingleJob.finish ' , [
477+ 'executive_job_time_seconds ' => microtime (true ) - $ timeStarted ,
478+ ]);
492479 }
493480
494481 /**
@@ -501,14 +488,10 @@ private function ackMessage(AMQPMessage $message, bool $multiple = false): void
501488 try {
502489 $ message ->ack ($ multiple );
503490 } catch (Throwable $ exception ) {
504- $ this ->output ->error ('Ack message error: ' . $ exception ->getMessage ());
505-
506- $ this ->logger ->error ('Salesmessage.LibRabbitMQ.VhostsConsumer.ackMessage.exception ' , [
507- 'vhost_name ' => $ this ->currentVhostName ,
508- 'queue_name ' => $ this ->currentQueueName ,
509- 'class ' => get_class ($ exception ),
491+ $ this ->logError ('ackMessage.exception ' , [
510492 'message ' => $ exception ->getMessage (),
511493 'trace ' => $ exception ->getTraceAsString (),
494+ 'error_class ' => get_class ($ exception ),
512495 ]);
513496 }
514497 }
@@ -736,15 +719,7 @@ private function initConnection(): RabbitMQQueue
736719 try {
737720 $ channel = $ connection ->getChannel ();
738721 } catch (AMQPConnectionClosedException $ exception ) {
739- $ this ->output ->error (sprintf (
740- 'Init Connection Error: "%s". Vhost: "%s" ' ,
741- $ exception ->getMessage (),
742- $ this ->currentVhostName
743- ));
744-
745- $ this ->logger ->error ('Salesmessage.LibRabbitMQ.VhostsConsumer.initConnection.exception ' , [
746- 'vhost_name ' => $ this ->currentVhostName ,
747- 'queue_name ' => $ this ->currentQueueName ,
722+ $ this ->logError ('initConnection.exception ' , [
748723 'message ' => $ exception ->getMessage (),
749724 'trace ' => $ exception ->getTraceAsString (),
750725 ]);
@@ -782,5 +757,66 @@ private function getTagName(): string
782757 {
783758 return $ this ->consumerTag . '_ ' . $ this ->currentVhostName ;
784759 }
760+
761+ /**
762+ * @param string $message
763+ * @param array $data
764+ * @return void
765+ */
766+ private function logInfo (string $ message , array $ data = []): void
767+ {
768+ $ this ->log ($ message , $ data , false );
769+ }
770+
771+ /**
772+ * @param string $message
773+ * @param array $data
774+ * @return void
775+ */
776+ private function logError (string $ message , array $ data = []): void
777+ {
778+ $ this ->log ($ message , $ data , true );
779+ }
780+
781+ /**
782+ * @param string $message
783+ * @param array $data
784+ * @param bool $isError
785+ * @return void
786+ */
787+ private function log (string $ message , array $ data = [], bool $ isError = false ): void
788+ {
789+ $ data ['vhost_name ' ] = $ this ->currentVhostName ;
790+ $ data ['queue_name ' ] = $ this ->currentQueueName ;
791+
792+ $ outputMessage = $ message ;
793+ foreach ($ data as $ key => $ value ) {
794+ if (in_array ($ key , ['trace ' , 'error_class ' ])) {
795+ continue ;
796+ }
797+ $ outputMessage .= '. ' . ucfirst (str_replace ('_ ' , ' ' , $ key )) . ': ' . $ value ;
798+ }
799+ if ($ isError ) {
800+ $ this ->output ->error ($ outputMessage );
801+ } else {
802+ $ this ->output ->info ($ outputMessage );
803+ }
804+
805+ $ processingData = [
806+ 'uuid ' => $ this ->processingUuid ,
807+ 'started_at ' => $ this ->processingStartedAt ,
808+ ];
809+ if ($ this ->processingStartedAt ) {
810+ $ processingData ['executive_time_seconds ' ] = microtime (true ) - $ this ->processingStartedAt ;
811+ }
812+ $ data ['processing ' ] = $ processingData ;
813+
814+ $ logMessage = 'Salesmessage.LibRabbitMQ.VhostsConsumer. ' . $ message ;
815+ if ($ isError ) {
816+ $ this ->logger ->error ($ logMessage , $ data );
817+ } else {
818+ $ this ->logger ->info ($ logMessage , $ data );
819+ }
820+ }
785821}
786822
0 commit comments