Skip to content

Commit e731be3

Browse files
authored
Merge pull request vyuldashev#502 from shuqingzai/dev-13.0.1
fix vyuldashev#449 vyuldashev#499 vyuldashev#501
2 parents e15f988 + 87bf82e commit e731be3

File tree

3 files changed

+15
-7
lines changed

3 files changed

+15
-7
lines changed

src/Consumer.php

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public function daemon($connectionName, $queue, WorkerOptions $options)
8888
$this->channel->basic_qos(
8989
$this->prefetchSize,
9090
$this->prefetchCount,
91-
null
91+
false
9292
);
9393

9494
$jobClass = $connection->getJobClass();
@@ -147,7 +147,7 @@ function (AMQPMessage $message) use ($connection, $options, $connectionName, $qu
147147
} catch (AMQPRuntimeException $exception) {
148148
$this->exceptions->report($exception);
149149

150-
$this->kill(1);
150+
$this->kill(self::EXIT_ERROR, $options);
151151
} catch (Exception|Throwable $exception) {
152152
$this->exceptions->report($exception);
153153

@@ -171,7 +171,7 @@ function (AMQPMessage $message) use ($connection, $options, $connectionName, $qu
171171
);
172172

173173
if (! is_null($status)) {
174-
return $this->stop($status);
174+
return $this->stop($status, $options);
175175
}
176176

177177
$this->currentJob = null;
@@ -195,14 +195,15 @@ protected function daemonShouldRun(WorkerOptions $options, $connectionName, $que
195195
* Stop listening and bail out of the script.
196196
*
197197
* @param int $status
198+
* @param WorkerOptions|null $options
198199
* @return int
199200
*/
200-
public function stop($status = 0): int
201+
public function stop($status = 0, $options = null)
201202
{
202203
// Tell the server you are going to stop consuming.
203204
// It will finish up the last message and not send you any more.
204205
$this->channel->basic_cancel($this->consumerTag, false, true);
205206

206-
return parent::stop($status);
207+
return parent::stop($status, $options);
207208
}
208209
}

src/Horizon/RabbitMQQueue.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public function later($delay, $job, $data = '', $queue = null)
6464
{
6565
$payload = (new JobPayload($this->createPayload($job, $data)))->prepare($job)->value;
6666

67-
return tap(parent::pushRaw($payload, $queue, ['delay' => $this->secondsUntil($delay)]), function () use ($payload, $queue): void {
67+
return tap(parent::laterRaw($delay, $payload, $queue), function () use ($payload, $queue): void {
6868
$this->event($this->getQueue($queue), new JobPushed($payload));
6969
});
7070
}

src/Queue/RabbitMQQueue.php

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,11 +188,18 @@ public function laterRaw($delay, $payload, $queue = null, $attempts = 0)
188188
{
189189
$ttl = $this->secondsUntil($delay) * 1000;
190190

191+
// default options
192+
$options = ['delay' => $delay, 'attempts' => $attempts];
193+
191194
// When no ttl just publish a new message to the exchange or queue
192195
if ($ttl <= 0) {
193-
return $this->pushRaw($payload, $queue, ['delay' => $delay, 'attempts' => $attempts]);
196+
return $this->pushRaw($payload, $queue, $options);
194197
}
195198

199+
// Create a main queue to handle delayed messages
200+
[$mainDestination, $exchange, $exchangeType, $attempts] = $this->publishProperties($queue, $options);
201+
$this->declareDestination($mainDestination, $exchange, $exchangeType);
202+
196203
$destination = $this->getQueue($queue).'.delay.'.$ttl;
197204

198205
$this->declareQueue($destination, true, false, $this->getDelayQueueArguments($this->getQueue($queue), $ttl));

0 commit comments

Comments
 (0)