77use Illuminate \Contracts \Queue \Job as JobContract ;
88use Illuminate \Queue \Jobs \Job ;
99use Illuminate \Support \Arr ;
10+ use PhpAmqpLib \Exception \AMQPProtocolChannelException ;
1011use PhpAmqpLib \Message \AMQPMessage ;
1112use PhpAmqpLib \Wire \AMQPTable ;
1213use VladimirYuldashev \LaravelQueueRabbitMQ \Horizon \RabbitMQQueue as HorizonRabbitMQQueue ;
@@ -55,7 +56,7 @@ public function __construct(
5556 */
5657 public function getJobId ()
5758 {
58- return json_decode ( $ this ->message -> getBody (), true ) ['id ' ] ?? null ;
59+ return $ this ->decoded ['id ' ] ?? null ;
5960 }
6061
6162 /**
@@ -71,31 +72,26 @@ public function getRawBody(): string
7172 */
7273 public function attempts (): int
7374 {
74- /** @var AMQPTable|null $headers */
75- $ headers = Arr::get ($ this ->message ->get_properties (), 'application_headers ' );
76-
77- if (! $ headers ) {
75+ if (! $ data = $ this ->getRabbitMQMessageHeaders ()) {
7876 return 1 ;
7977 }
8078
81- $ data = $ headers ->getNativeData ();
82-
8379 $ laravelAttempts = (int ) Arr::get ($ data , 'laravel.attempts ' , 0 );
8480
85- return ( $ laravelAttempts) + 1 ;
81+ return $ laravelAttempts + 1 ;
8682 }
8783
8884 /**
89- * @param null $e
85+ * {@inheritdoc}
9086 */
91- public function fail ( $ e = null ): void
87+ public function markAsFailed ( ): void
9288 {
89+ parent ::markAsFailed ();
90+
9391 // We must tel rabbitMQ this Job is failed
9492 // The message must be rejected when the Job marked as failed, in case rabbitMQ wants to do some extra magic.
9593 // like: Death lettering the message to an other exchange/routing-key.
9694 $ this ->rabbitmq ->reject ($ this );
97-
98- parent ::fail ($ e );
9995 }
10096
10197 /**
@@ -120,18 +116,21 @@ public function delete(): void
120116 }
121117
122118 /**
123- * {@inheritdoc}
119+ * Release the job back into the queue.
120+ *
121+ * @param int $delay
122+ * @throws AMQPProtocolChannelException
124123 */
125124 public function release ($ delay = 0 ): void
126125 {
127126 parent ::release ();
128127
129128 // Always create a new message when this Job is released
130- $ this ->rabbitmq ->laterRaw ($ delay , $ this ->message ->body , $ this ->queue , $ this ->attempts ());
129+ $ this ->rabbitmq ->laterRaw ($ delay , $ this ->message ->getBody () , $ this ->queue , $ this ->attempts ());
131130
132131 // Releasing a Job means the message was failed to process.
133- // Because this Job is always recreated and pushed as new message, this Job is correctly handled.
134- // We must tell rabbitMQ this fact .
132+ // Because this Job message is always recreated and pushed as new message, this Job message is correctly handled.
133+ // We must tell rabbitMQ this job message can be removed by acknowledging the message .
135134 $ this ->rabbitmq ->ack ($ this );
136135 }
137136
@@ -154,4 +153,19 @@ public function getRabbitMQMessage(): AMQPMessage
154153 {
155154 return $ this ->message ;
156155 }
156+
157+ /**
158+ * Get the headers from the rabbitMQ message.
159+ *
160+ * @return array|null
161+ */
162+ protected function getRabbitMQMessageHeaders (): ?array
163+ {
164+ /** @var AMQPTable|null $headers */
165+ if (! $ headers = Arr::get ($ this ->message ->get_properties (), 'application_headers ' )) {
166+ return null ;
167+ }
168+
169+ return $ headers ->getNativeData ();
170+ }
157171}
0 commit comments