Skip to content

Commit acc827b

Browse files
authored
Merge pull request vyuldashev#167 from net53/master
Fix chained jobs attempts
2 parents 6ba9bd1 + a4b80a0 commit acc827b

File tree

3 files changed

+25
-40
lines changed

3 files changed

+25
-40
lines changed

src/Queue/Jobs/RabbitMQJob.php

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,6 @@ public function release($delay = 0)
107107
parent::release($delay);
108108

109109
$this->delete();
110-
$this->connection->setAttempts($this->attempts() + 1);
111110

112111
$body = $this->payload();
113112

@@ -122,11 +121,7 @@ public function release($delay = 0)
122121

123122
$data = $body['data'];
124123

125-
if ($delay > 0) {
126-
$this->connection->later($delay, $job, $data, $this->getQueue());
127-
} else {
128-
$this->connection->push($job, $data, $this->getQueue());
129-
}
124+
$this->connection->release($delay, $job, $data, $this->getQueue(), $this->attempts() + 1);
130125
}
131126

132127
/**

src/Queue/RabbitMQQueue.php

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,6 @@
1515

1616
class RabbitMQQueue extends Queue implements QueueContract
1717
{
18-
/**
19-
* Used for retry logic, to set the retries on the message metadata instead of the message body.
20-
*/
21-
const ATTEMPT_COUNT_HEADERS_KEY = 'attempts_count';
22-
2318
protected $sleepOnError;
2419

2520
protected $queueOptions;
@@ -32,7 +27,6 @@ class RabbitMQQueue extends Queue implements QueueContract
3227
* @var AmqpContext
3328
*/
3429
private $context;
35-
private $retryAfter;
3630
private $correlationId;
3731

3832
public function __construct(AmqpContext $context, array $config)
@@ -78,8 +72,8 @@ public function pushRaw($payload, $queueName = null, array $options = [])
7872
$message->setContentType('application/json');
7973
$message->setDeliveryMode(AmqpMessage::DELIVERY_MODE_PERSISTENT);
8074

81-
if ($this->retryAfter !== null) {
82-
$message->setProperty(self::ATTEMPT_COUNT_HEADERS_KEY, $this->retryAfter);
75+
if (isset($options['attempts'])) {
76+
$message->setProperty(RabbitMQJob::ATTEMPT_COUNT_HEADERS_KEY, $options['attempts']);
8377
}
8478

8579
$producer = $this->context->createProducer();
@@ -103,6 +97,24 @@ public function later($delay, $job, $data = '', $queue = null)
10397
return $this->pushRaw($this->createPayload($job, $data), $queue, ['delay' => $this->secondsUntil($delay)]);
10498
}
10599

100+
/**
101+
* Release a reserved job back onto the queue.
102+
*
103+
* @param \DateTimeInterface|\DateInterval|int $delay
104+
* @param string|object $job
105+
* @param mixed $data
106+
* @param string $queue
107+
* @param int $attempts
108+
* @return mixed
109+
*/
110+
public function release($delay, $job, $data, $queue, $attempts = 0)
111+
{
112+
return $this->pushRaw($this->createPayload($job, $data), $queue, [
113+
'delay' => $this->secondsUntil($delay),
114+
'attempts' => $attempts
115+
]);
116+
}
117+
106118
/** @inheritdoc */
107119
public function pop($queueName = null)
108120
{
@@ -122,18 +134,6 @@ public function pop($queueName = null)
122134
return null;
123135
}
124136

125-
/**
126-
* Sets the attempts member variable to be used in message generation.
127-
*
128-
* @param int $count
129-
*
130-
* @return void
131-
*/
132-
public function setAttempts(int $count)
133-
{
134-
$this->retryAfter = $count;
135-
}
136-
137137
/**
138138
* Retrieves the correlation id, or a unique id.
139139
*

tests/Queue/RabbitMQQueueTest.php

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,6 @@ public function testCouldBeConstructedWithExpectedArguments()
3434
new RabbitMQQueue($this->createAmqpContext(), $this->createDummyConfig());
3535
}
3636

37-
public function testShouldAllowSetAttempts()
38-
{
39-
$queue = new RabbitMQQueue($this->createAmqpContext(), $this->createDummyConfig());
40-
41-
$queue->setAttempts(123);
42-
43-
$this->assertAttributeSame(123, 'retryAfter', $queue);
44-
}
45-
4637
public function testShouldGenerateNewCorrelationIdIfNotSet()
4738
{
4839
$queue = new RabbitMQQueue($this->createAmqpContext(), $this->createDummyConfig());
@@ -128,7 +119,7 @@ public function testShouldSendExpectedMessageOnPushRaw()
128119
$this->assertSame('application/json', $message->getContentType());
129120
$this->assertSame(AmqpMessage::DELIVERY_MODE_PERSISTENT, $message->getDeliveryMode());
130121
$this->assertNotEmpty($message->getCorrelationId());
131-
$this->assertNull($message->getProperty(RabbitMQQueue::ATTEMPT_COUNT_HEADERS_KEY));
122+
$this->assertNull($message->getProperty(RabbitMQJob::ATTEMPT_COUNT_HEADERS_KEY));
132123
})
133124
;
134125
$producer
@@ -167,7 +158,7 @@ public function testShouldSendExpectedMessageOnPushRaw()
167158
$queue->pushRaw('thePayload', $expectedQueueName);
168159
}
169160

170-
public function testShouldSetAttemptCountHeaderIfNotNull()
161+
public function testShouldSetAttemptCountPropIfNotNull()
171162
{
172163
$expectedAttempts = 54321;
173164

@@ -179,7 +170,7 @@ public function testShouldSetAttemptCountHeaderIfNotNull()
179170
->method('send')
180171
->with($this->identicalTo($topic), $this->isInstanceOf(AmqpMessage::class))
181172
->willReturnCallback(function ($actualTopic, AmqpMessage $message) use ($expectedAttempts) {
182-
$this->assertSame($expectedAttempts, $message->getProperty(RabbitMQQueue::ATTEMPT_COUNT_HEADERS_KEY));
173+
$this->assertSame($expectedAttempts, $message->getProperty(RabbitMQJob::ATTEMPT_COUNT_HEADERS_KEY));
183174
})
184175
;
185176
$producer
@@ -212,9 +203,8 @@ public function testShouldSetAttemptCountHeaderIfNotNull()
212203

213204
$queue = new RabbitMQQueue($context, $this->createDummyConfig());
214205
$queue->setContainer($this->createDummyContainer());
215-
$queue->setAttempts($expectedAttempts);
216206

217-
$queue->pushRaw('thePayload', 'aQueue');
207+
$queue->pushRaw('thePayload', 'aQueue', ['attempts' => $expectedAttempts]);
218208
}
219209

220210
public function testShouldSetDeliveryDelayIfDelayOptionPresent()

0 commit comments

Comments
 (0)