Skip to content

Commit 1aa4378

Browse files
committed
Fixes for Attempt/retry logic
Small cleanups and helper method adjustments
1 parent e501804 commit 1aa4378

File tree

2 files changed

+86
-22
lines changed

2 files changed

+86
-22
lines changed

src/VladimirYuldashev/LaravelQueueRabbitMQ/Queue/Jobs/RabbitMQJob.php

Lines changed: 55 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,26 @@
1111

1212
class RabbitMQJob extends Job implements JobContract
1313
{
14+
/**
15+
* Same as RabbitMQQueue, used for attempt counts
16+
*/
17+
const ATTEMPT_COUNT_HEADERS_KEY = 'attempts_count';
1418

1519
protected $connection;
1620
protected $channel;
1721
protected $queue;
1822
protected $message;
1923

24+
/**
25+
* Creates a new instance of RabbitMQJob.
26+
*
27+
* @param Illuminate\Container\Container $container
28+
* @param VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue $connection
29+
* @param PhpAmqpLib\Channel\AMQPChannel $channel
30+
* @param string $queue
31+
* @param PhpAmqpLib\Message\AMQPMessage $message
32+
*
33+
*/
2034
public function __construct(
2135
Container $container,
2236
RabbitMQQueue $connection,
@@ -38,7 +52,7 @@ public function __construct(
3852
*/
3953
public function fire()
4054
{
41-
$this->resolveAndFire(json_decode($this->message->body, true));
55+
$this->resolveAndFire($this->getParsedBody());
4256
}
4357

4458
/**
@@ -51,6 +65,16 @@ public function getRawBody()
5165
return $this->message->body;
5266
}
5367

68+
/**
69+
* Retrieves the parsed body for the job.
70+
*
71+
* @return array|false
72+
*/
73+
public function getParsedBody()
74+
{
75+
return json_decode($this->getRawBody(), true);
76+
}
77+
5478
/**
5579
* Delete the job from the queue.
5680
*
@@ -59,12 +83,11 @@ public function getRawBody()
5983
public function delete()
6084
{
6185
parent::delete();
62-
63-
$this->channel->basic_ack($this->message->delivery_info['delivery_tag']);
86+
$this->channel->basic_ack($this->message->get('delivery_tag'));
6487
}
6588

6689
/**
67-
* Get queue name
90+
* Get the queue name.
6891
*
6992
* @return string
7093
*/
@@ -83,16 +106,18 @@ public function getQueue()
83106
public function release($delay = 0)
84107
{
85108
$this->delete();
109+
$this->setAttempts($this->attempts() + 1);
86110

87-
$body = $this->message->body;
88-
$body = json_decode($body, true);
89-
90-
$attempts = $this->attempts();
111+
$body = $this->getParsedBody();
91112

92-
$job = unserialize($body['data']['command']);
93-
94-
// write attempts to job
95-
$job->attempts = $attempts + 1;
113+
/**
114+
* Some jobs don't have the command set, so fall back to just sending it the job name string
115+
*/
116+
if (isset($body['data']['command']) === true) {
117+
$job = unserialize($body['data']['command']);
118+
} else {
119+
$job = $this->getName();
120+
}
96121

97122
$data = $body['data'];
98123

@@ -110,15 +135,28 @@ public function release($delay = 0)
110135
*/
111136
public function attempts()
112137
{
113-
$body = json_decode($this->message->body, true);
114-
$job = unserialize($body['data']['command']);
115-
if (is_object($job) && property_exists($job, 'attempts'))
116-
{
117-
return (int) $job->attempts;
138+
if ($this->message->has('application_headers') === true) {
139+
$headers = $this->message->get('application_headers')->getNativeData();
140+
141+
if (isset($headers[self::ATTEMPT_COUNT_HEADERS_KEY]) === true) {
142+
return $headers[self::ATTEMPT_COUNT_HEADERS_KEY];
143+
}
118144
}
119145
return 0;
120146
}
121147

148+
/**
149+
* Sets the count of attempts at processing this job.
150+
*
151+
* @param int $count
152+
*
153+
* @return void
154+
*/
155+
private function setAttempts($count)
156+
{
157+
$this->connection->setAttempts($count);
158+
}
159+
122160
/**
123161
* Get the job identifier.
124162
*
@@ -128,5 +166,4 @@ public function getJobId()
128166
{
129167
return $this->message->get('correlation_id');
130168
}
131-
132169
}

src/VladimirYuldashev/LaravelQueueRabbitMQ/Queue/RabbitMQQueue.php

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@
1313

1414
class RabbitMQQueue extends Queue implements QueueContract
1515
{
16+
/**
17+
* Used for retry logic, to set the retries on the message metadata instead of the message body
18+
*/
19+
const ATTEMPT_COUNT_HEADERS_KEY = 'attempts_count';
1620

1721
protected $connection;
1822
protected $channel;
@@ -24,6 +28,11 @@ class RabbitMQQueue extends Queue implements QueueContract
2428
protected $configQueue;
2529
protected $configExchange;
2630

31+
/**
32+
* @var int
33+
*/
34+
private $attempts;
35+
2736
/**
2837
* @param AMQPStreamConnection $amqpConnection
2938
* @param array $config
@@ -72,11 +81,18 @@ public function pushRaw($payload, $queue = null, array $options = [])
7281
} else {
7382
list($queue, $exchange) = $this->declareQueue($queue);
7483
}
75-
// push job to a queue
76-
$message = new AMQPMessage($payload, [
84+
85+
$headers = [
7786
'Content-Type' => 'application/json',
7887
'delivery_mode' => 2,
79-
]);
88+
];
89+
90+
if (isset($this->attempts) === true) {
91+
$headers['application_headers'] = [self::ATTEMPT_COUNT_HEADERS_KEY => ['I', $this->attempts]];
92+
}
93+
94+
// push job to a queue
95+
$message = new AMQPMessage($payload, $headers);
8096

8197
// push task to a queue
8298
$this->channel->basic_publish($message, $exchange, $queue);
@@ -222,4 +238,15 @@ private function declareDelayedQueue($destination, $delay)
222238
return [$name, $exchange];
223239
}
224240

225-
}
241+
/**
242+
* Sets the attempts member variable to be used in message generation
243+
*
244+
* @param int $count
245+
*
246+
* @return void
247+
*/
248+
public function setAttempts($count)
249+
{
250+
$this->attempts = $count;
251+
}
252+
}

0 commit comments

Comments
 (0)