Skip to content

Commit a5f3f1e

Browse files
committed
vyuldashev#225 laravel horizon support
1 parent ddb655d commit a5f3f1e

File tree

7 files changed

+175
-16
lines changed

7 files changed

+175
-16
lines changed

config/rabbitmq.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,11 @@
99

1010
'driver' => 'rabbitmq',
1111

12+
/*
13+
* Set to horizon if you wish to use Laravel Horizon.
14+
*/
15+
'worker' => env('RABBITMQ_WORKER', 'default'),
16+
1217
'dsn' => env('RABBITMQ_DSN', null),
1318

1419
/*

src/Horizon/RabbitMQQueue.php

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
<?php
2+
3+
namespace VladimirYuldashev\LaravelQueueRabbitMQ\Horizon;
4+
5+
use Illuminate\Contracts\Events\Dispatcher;
6+
use Laravel\Horizon\Events\JobDeleted;
7+
use Laravel\Horizon\Events\JobPushed;
8+
use Laravel\Horizon\Events\JobReserved;
9+
use Laravel\Horizon\JobId;
10+
use Laravel\Horizon\JobPayload;
11+
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob;
12+
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue as BaseRabbitMQQueue;
13+
14+
class RabbitMQQueue extends BaseRabbitMQQueue
15+
{
16+
/**
17+
* The job that last pushed to queue via the "push" method.
18+
*
19+
* @var object|string
20+
*/
21+
protected $lastPushed;
22+
23+
/**
24+
* Get the number of queue jobs that are ready to process.
25+
*
26+
* @param string|null $queue
27+
* @return int
28+
*/
29+
public function readyNow($queue = null): int
30+
{
31+
return $this->size($queue);
32+
}
33+
34+
/** {@inheritdoc} */
35+
public function push($job, $data = '', $queue = null)
36+
{
37+
$this->lastPushed = $job;
38+
39+
return parent::push($job, $data, $queue);
40+
}
41+
42+
/** {@inheritdoc} */
43+
public function pushRaw($payload, $queueName = null, array $options = [])
44+
{
45+
$payload = (new JobPayload($payload))->prepare($this->lastPushed)->value;
46+
47+
return tap(parent::pushRaw($payload, $queueName, $options), function () use ($queueName, $payload) {
48+
$this->event($queueName ?: $this->queueName, new JobPushed($payload));
49+
});
50+
}
51+
52+
/** {@inheritdoc} */
53+
public function later($delay, $job, $data = '', $queueName = null)
54+
{
55+
$payload = (new JobPayload($this->createPayload($job, $data)))->prepare($job)->value;
56+
57+
return tap(parent::pushRaw($payload, $queueName, ['delay' => $this->secondsUntil($delay)]), function () use ($payload, $queueName) {
58+
$this->event($queueName ?: $this->queueName, new JobPushed($payload));
59+
});
60+
}
61+
62+
/** {@inheritdoc} */
63+
public function pop($queueName = null)
64+
{
65+
return tap(parent::pop($queueName), function ($result) use ($queueName) {
66+
if ($result instanceof RabbitMQJob) {
67+
$this->event($queueName ?: $this->queueName, new JobReserved($result->getRawBody()));
68+
}
69+
});
70+
}
71+
72+
/** {@inheritdoc} */
73+
public function release($delay, $job, $data, $queue, $attempts = 0)
74+
{
75+
$this->lastPushed = $job;
76+
77+
return parent::release($delay, $job, $data, $queue, $attempts);
78+
}
79+
80+
/**
81+
* Fire the job deleted event.
82+
*
83+
* @param string $queueName
84+
* @param RabbitMQJob $job
85+
* @return void
86+
* @throws \Illuminate\Contracts\Container\BindingResolutionException
87+
*/
88+
public function deleteReserved($queueName, $job): void
89+
{
90+
$this->event($queueName ?: $this->queueName, new JobDeleted($job, $job->getRawBody()));
91+
}
92+
93+
/**
94+
* Fire the given event if a dispatcher is bound.
95+
*
96+
* @param string $queue
97+
* @param mixed $event
98+
* @return void
99+
* @throws \Illuminate\Contracts\Container\BindingResolutionException
100+
*/
101+
protected function event($queue, $event): void
102+
{
103+
if ($this->container && $this->container->bound(Dispatcher::class)) {
104+
$this->container->make(Dispatcher::class)->dispatch(
105+
$event->connection($this->getConnectionName())->queue($queue)
106+
);
107+
}
108+
}
109+
110+
/** {@inheritdoc} */
111+
protected function getRandomId(): string
112+
{
113+
return JobId::generate();
114+
}
115+
}

src/LaravelQueueRabbitMQServiceProvider.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ class LaravelQueueRabbitMQServiceProvider extends ServiceProvider
1313
*
1414
* @return void
1515
*/
16-
public function register()
16+
public function register(): void
1717
{
1818
$this->mergeConfigFrom(
1919
__DIR__.'/../config/rabbitmq.php', 'queue.connections.rabbitmq'
@@ -25,7 +25,7 @@ public function register()
2525
*
2626
* @return void
2727
*/
28-
public function boot()
28+
public function boot(): void
2929
{
3030
/** @var QueueManager $queue */
3131
$queue = $this->app['queue'];

src/Queue/Connectors/RabbitMQConnector.php

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
namespace VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Connectors;
44

5+
use Illuminate\Support\Arr;
56
use Interop\Amqp\AmqpContext;
67
use Illuminate\Contracts\Queue\Queue;
78
use Interop\Amqp\AmqpConnectionFactory;
@@ -10,6 +11,7 @@
1011
use Illuminate\Queue\Events\WorkerStopping;
1112
use Enqueue\AmqpTools\RabbitMqDlxDelayStrategy;
1213
use Illuminate\Queue\Connectors\ConnectorInterface;
14+
use VladimirYuldashev\LaravelQueueRabbitMQ\Horizon\RabbitMQQueue as HorizonRabbitMQQueue;
1315
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue;
1416
use Interop\Amqp\AmqpConnectionFactory as InteropAmqpConnectionFactory;
1517

@@ -71,6 +73,14 @@ public function connect(array $config): Queue
7173
$context->close();
7274
});
7375

74-
return new RabbitMQQueue($context, $config);
76+
$worker = Arr::get($config, 'worker', 'default');
77+
78+
if($worker === 'default') {
79+
return new RabbitMQQueue($context, $config);
80+
}
81+
82+
if($worker === 'horizon') {
83+
return new HorizonRabbitMQQueue($context, $config);
84+
}
7585
}
7686
}

src/Queue/Jobs/RabbitMQJob.php

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
use Illuminate\Container\Container;
1212
use Illuminate\Database\DetectsDeadlocks;
1313
use Illuminate\Contracts\Queue\Job as JobContract;
14+
use VladimirYuldashev\LaravelQueueRabbitMQ\Horizon\RabbitMQQueue as HorizonRabbitMQQueue;
1415
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue;
1516

1617
class RabbitMQJob extends Job implements JobContract
@@ -31,7 +32,8 @@ public function __construct(
3132
RabbitMQQueue $connection,
3233
AmqpConsumer $consumer,
3334
AmqpMessage $message
34-
) {
35+
)
36+
{
3537
$this->container = $container;
3638
$this->connection = $connection;
3739
$this->consumer = $consumer;
@@ -99,6 +101,11 @@ public function delete(): void
99101
parent::delete();
100102

101103
$this->consumer->acknowledge($this->message);
104+
105+
// required for Laravel Horizon
106+
if($this->connection instanceof HorizonRabbitMQQueue) {
107+
$this->connection->deleteReserved($this->queue, $this);
108+
}
102109
}
103110

104111
/** {@inheritdoc}

src/Queue/RabbitMQQueue.php

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
namespace VladimirYuldashev\LaravelQueueRabbitMQ\Queue;
44

5+
use Illuminate\Support\Str;
56
use RuntimeException;
67
use Illuminate\Queue\Queue;
78
use Interop\Amqp\AmqpQueue;
@@ -254,4 +255,21 @@ protected function reportConnectionError($action, \Throwable $e)
254255
// Sleep so that we don't flood the log file
255256
sleep($this->sleepOnError);
256257
}
258+
259+
protected function createPayloadArray($job, $queue, $data = '')
260+
{
261+
return array_merge(parent::createPayloadArray($job, $queue, $data), [
262+
'id' => $this->getRandomId(),
263+
]);
264+
}
265+
266+
/**
267+
* Get a random ID string.
268+
*
269+
* @return string
270+
*/
271+
protected function getRandomId(): string
272+
{
273+
return Str::random(32);
274+
}
257275
}

tests/Functional/SendAndReceiveMessageTest.php

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,12 @@ public function test()
2020
{
2121
$config = [
2222
'factory_class' => AmqpConnectionFactory::class,
23-
'dsn' => null,
24-
'host' => getenv('HOST'),
25-
'port' => getenv('PORT'),
26-
'login' => 'guest',
23+
'dsn' => null,
24+
'host' => getenv('HOST'),
25+
'port' => getenv('PORT'),
26+
'login' => 'guest',
2727
'password' => 'guest',
28-
'vhost' => '/',
28+
'vhost' => '/',
2929
'options' => [
3030
'exchange' => [
3131
'name' => null,
@@ -48,12 +48,12 @@ public function test()
4848
],
4949
],
5050
'ssl_params' => [
51-
'ssl_on' => false,
52-
'cafile' => null,
53-
'local_cert' => null,
54-
'local_key' => null,
55-
'verify_peer' => true,
56-
'passphrase' => null,
51+
'ssl_on' => false,
52+
'cafile' => null,
53+
'local_cert' => null,
54+
'local_key' => null,
55+
'verify_peer' => true,
56+
'passphrase' => null,
5757
],
5858
];
5959

@@ -67,18 +67,22 @@ public function test()
6767

6868
$queue->getContext()->purgeQueue($queue->getContext()->createQueue('default'));
6969

70-
$expectedPayload = __METHOD__.microtime(true);
70+
$expectedPayload = __METHOD__ . microtime(true);
7171

7272
$queue->pushRaw($expectedPayload);
7373

7474
sleep(1);
7575

76+
$this->assertEquals(1, $queue->size());
77+
7678
$job = $queue->pop();
7779

7880
$this->assertInstanceOf(RabbitMQJob::class, $job);
7981
$this->assertSame($expectedPayload, $job->getRawBody());
8082

8183
$job->delete();
84+
85+
$this->assertEquals(0, $queue->size());
8286
}
8387

8488
private function createDummyContainer()

0 commit comments

Comments
 (0)