Skip to content

Commit 3dd712f

Browse files
authored
Merge pull request vyuldashev#154 from formapro-forks/amqp-interop
Amqp interop based version.
2 parents 3be5e3e + 6472e05 commit 3dd712f

21 files changed

+1442
-466
lines changed

README.md

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,19 +11,15 @@ RabbitMQ Queue driver for Laravel
1111
1. Install this package via composer using:
1212

1313
```
14-
composer require vladimir-yuldashev/laravel-queue-rabbitmq:5.5
14+
composer require vladimir-yuldashev/laravel-queue-rabbitmq:^6
1515
```
1616

1717
2. Add these properties to `.env` with proper values:
1818

1919
```
2020
QUEUE_DRIVER=rabbitmq
2121
22-
RABBITMQ_HOST=127.0.0.1
23-
RABBITMQ_PORT=5672
24-
RABBITMQ_VHOST=/
25-
RABBITMQ_LOGIN=guest
26-
RABBITMQ_PASSWORD=guest
22+
RABBITMQ_DSN=amqp://guest:guest@127.0.0.1:5672/%2F
2723
RABBITMQ_QUEUE=queue_name
2824
```
2925

composer.json

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,23 @@
1313
"illuminate/database": "5.5.*",
1414
"illuminate/support": "5.5.*",
1515
"illuminate/queue": "5.5.*",
16-
"php-amqplib/php-amqplib": "2.6.*"
16+
"enqueue/amqp-lib": "0.8.5"
1717
},
1818
"require-dev": {
1919
"phpunit/phpunit": "~6.0",
20+
"illuminate/events": "5.5.*",
2021
"mockery/mockery": "^0.9.5"
2122
},
2223
"autoload": {
2324
"psr-4": {
2425
"VladimirYuldashev\\LaravelQueueRabbitMQ\\": "src/"
2526
}
2627
},
28+
"autoload-dev": {
29+
"psr-4": {
30+
"VladimirYuldashev\\LaravelQueueRabbitMQ\\Tests\\": "tests/"
31+
}
32+
},
2733
"extra": {
2834
"laravel": {
2935
"providers": [

config/rabbitmq.php

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,16 @@
99

1010
'driver' => 'rabbitmq',
1111

12+
'dsn' => env('RABBITMQ_DSN', null),
13+
14+
/*
15+
* Could be one a class that implements \Interop\Amqp\AmqpConnectionFactory for example:
16+
* - \EnqueueAmqpExt\AmqpConnectionFactory if you install enqueue/amqp-ext
17+
* - \EnqueueAmqpLib\AmqpConnectionFactory if you install enqueue/amqp-lib
18+
* - \EnqueueAmqpBunny\AmqpConnectionFactory if you install enqueue/amqp-bunny
19+
*/
20+
'factory_class' => \Enqueue\AmqpLib\AmqpConnectionFactory::class,
21+
1222
'host' => env('RABBITMQ_HOST', '127.0.0.1'),
1323
'port' => env('RABBITMQ_PORT', 5672),
1424

@@ -26,6 +36,11 @@
2636
*/
2737
'exchange_declare' => env('RABBITMQ_EXCHANGE_DECLARE', true),
2838

39+
/*
40+
* Determine if queue should be created if it does not exist.
41+
*/
42+
'queue_declare' => env('RABBITMQ_QUEUE_DECLARE', true),
43+
2944
/*
3045
* Determine if queue should be created and binded to the exchange if it does not exist.
3146
*/
@@ -43,7 +58,7 @@
4358
],
4459
'exchange_params' => [
4560
'name' => env('RABBITMQ_EXCHANGE_NAME'),
46-
'type' => env('RABBITMQ_EXCHANGE_TYPE', 'direct'),
61+
'type' => env('RABBITMQ_EXCHANGE_TYPE', \Interop\Amqp\AmqpTopic::TYPE_DIRECT),
4762
'passive' => env('RABBITMQ_EXCHANGE_PASSIVE', false),
4863
'durable' => env('RABBITMQ_EXCHANGE_DURABLE', true),
4964
'auto_delete' => env('RABBITMQ_EXCHANGE_AUTODELETE', false),
@@ -62,6 +77,7 @@
6277
'ssl_on' => env('RABBITMQ_SSL', false),
6378
'cafile' => env('RABBITMQ_SSL_CAFILE', null),
6479
'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null),
80+
'local_key' => env('RABBITMQ_SSL_LOCALKEY', null),
6581
'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true),
6682
'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null),
6783
]

src/LaravelQueueRabbitMQServiceProvider.php

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,7 @@
44

55
use Illuminate\Queue\QueueManager;
66
use Illuminate\Support\ServiceProvider;
7-
use PhpAmqpLib\Connection\AMQPStreamConnection;
87
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Connectors\RabbitMQConnector;
9-
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Connectors\RabbitMQConnectorSSL;
108

119
class LaravelQueueRabbitMQServiceProvider extends ServiceProvider
1210
{
@@ -32,20 +30,8 @@ public function boot()
3230
/** @var QueueManager $queue */
3331
$queue = $this->app['queue'];
3432

35-
if ($this->app['config']['rabbitmq']['ssl_params']['ssl_on'] === true) {
36-
$connector = new RabbitMQConnectorSSL();
37-
} else {
38-
$connector = new RabbitMQConnector();
39-
}
40-
41-
$queue->stopping(function () use ($connector) {
42-
if ($connector->connection() instanceof AMQPStreamConnection) {
43-
$connector->connection()->close();
44-
}
45-
});
46-
47-
$queue->addConnector('rabbitmq', function () use ($connector) {
48-
return $connector;
33+
$queue->addConnector('rabbitmq', function () {
34+
return new RabbitMQConnector($this->app['events']);
4935
});
5036
}
5137
}

src/Queue/Connectors/RabbitMQConnector.php

Lines changed: 50 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,27 @@
22

33
namespace VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Connectors;
44

5+
use Enqueue\AmqpTools\DelayStrategyAware;
6+
use Enqueue\AmqpTools\RabbitMqDlxDelayStrategy;
7+
use Illuminate\Contracts\Events\Dispatcher;
58
use Illuminate\Contracts\Queue\Queue;
69
use Illuminate\Queue\Connectors\ConnectorInterface;
7-
use PhpAmqpLib\Connection\AMQPStreamConnection;
10+
use Illuminate\Queue\Events\WorkerStopping;
11+
use Interop\Amqp\AmqpConnectionFactory as InteropAmqpConnectionFactory;
12+
use Interop\Amqp\AmqpConnectionFactory;
813
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue;
914

1015
class RabbitMQConnector implements ConnectorInterface
1116
{
12-
/** @var AMQPStreamConnection */
13-
private $connection;
17+
/**
18+
* @var Dispatcher
19+
*/
20+
private $dispatcher;
21+
22+
public function __construct(Dispatcher $dispatcher)
23+
{
24+
$this->dispatcher = $dispatcher;
25+
}
1426

1527
/**
1628
* Establish a queue connection.
@@ -21,23 +33,41 @@ class RabbitMQConnector implements ConnectorInterface
2133
*/
2234
public function connect(array $config): Queue
2335
{
24-
// create connection with AMQP
25-
$this->connection = new AMQPStreamConnection(
26-
$config['host'],
27-
$config['port'],
28-
$config['login'],
29-
$config['password'],
30-
$config['vhost']
31-
);
32-
33-
return new RabbitMQQueue(
34-
$this->connection,
35-
$config
36-
);
37-
}
36+
if (false == array_key_exists('factory_class', $config)) {
37+
throw new \LogicException('The factory_class option is missing though it is required.');
38+
}
3839

39-
public function connection()
40-
{
41-
return $this->connection;
40+
$factoryClass = $config['factory_class'];
41+
if (false == class_exists($factoryClass) || false == (new \ReflectionClass($factoryClass))->implementsInterface(InteropAmqpConnectionFactory::class)) {
42+
throw new \LogicException(sprintf('The factory_class option has to be valid class that implements "%s"', InteropAmqpConnectionFactory::class));
43+
}
44+
45+
/** @var AmqpConnectionFactory $factory */
46+
$factory = new $factoryClass([
47+
'dsn' => $config['dsn'],
48+
'host' => $config['host'],
49+
'port' => $config['port'],
50+
'user' => $config['login'],
51+
'pass' => $config['password'],
52+
'vhost' => $config['vhost'],
53+
'ssl_on' => $config['ssl_params']['ssl_on'],
54+
'ssl_verify' => $config['ssl_params']['verify_peer'],
55+
'ssl_cacert' => $config['ssl_params']['cafile'],
56+
'ssl_cert' => $config['ssl_params']['local_cert'],
57+
'ssl_key' => $config['ssl_params']['local_key'],
58+
'ssl_passphrase' => $config['ssl_params']['passphrase'],
59+
]);
60+
61+
if ($factory instanceof DelayStrategyAware) {
62+
$factory->setDelayStrategy(new RabbitMqDlxDelayStrategy());
63+
}
64+
65+
$context = $factory->createContext();
66+
67+
$this->dispatcher->listen(WorkerStopping::class, function () use ($context) {
68+
$context->close();
69+
});
70+
71+
return new RabbitMQQueue($context, $config);
4272
}
4373
}

src/Queue/Connectors/RabbitMQConnectorSSL.php

Lines changed: 0 additions & 50 deletions
This file was deleted.

src/Queue/Jobs/RabbitMQJob.php

Lines changed: 14 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@
99
use Illuminate\Queue\Jobs\Job;
1010
use Illuminate\Queue\Jobs\JobName;
1111
use Illuminate\Support\Str;
12-
use PhpAmqpLib\Channel\AMQPChannel;
13-
use PhpAmqpLib\Message\AMQPMessage;
12+
use Interop\Amqp\AmqpConsumer;
13+
use Interop\Amqp\AmqpMessage;
1414
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue;
1515

1616
class RabbitMQJob extends Job implements JobContract
@@ -23,23 +23,20 @@ class RabbitMQJob extends Job implements JobContract
2323
const ATTEMPT_COUNT_HEADERS_KEY = 'attempts_count';
2424

2525
protected $connection;
26-
protected $channel;
26+
protected $consumer;
2727
protected $message;
2828

2929
public function __construct(
3030
Container $container,
3131
RabbitMQQueue $connection,
32-
AMQPChannel $channel,
33-
string $queue,
34-
AMQPMessage $message,
35-
string $connectionName = null
32+
AmqpConsumer $consumer,
33+
AmqpMessage $message
3634
) {
3735
$this->container = $container;
3836
$this->connection = $connection;
39-
$this->channel = $channel;
40-
$this->queue = $queue;
37+
$this->consumer = $consumer;
4138
$this->message = $message;
42-
$this->connectionName = $connectionName;
39+
$this->queue = $consumer->getQueue()->getQueueName();
4340
}
4441

4542
/**
@@ -79,16 +76,10 @@ public function fire()
7976
*/
8077
public function attempts(): int
8178
{
82-
if ($this->message->has('application_headers') === true) {
83-
$headers = $this->message->get('application_headers')->getNativeData();
84-
85-
if (isset($headers[self::ATTEMPT_COUNT_HEADERS_KEY]) === true) {
86-
return $headers[self::ATTEMPT_COUNT_HEADERS_KEY];
87-
}
88-
}
89-
9079
// set default job attempts to 1 so that jobs can run without retry
91-
return 1;
80+
$defaultAttempts = 1;
81+
82+
return $this->message->getProperty(self::ATTEMPT_COUNT_HEADERS_KEY, $defaultAttempts);
9283
}
9384

9485
/**
@@ -98,14 +89,15 @@ public function attempts(): int
9889
*/
9990
public function getRawBody(): string
10091
{
101-
return $this->message->body;
92+
return $this->message->getBody();
10293
}
10394

10495
/** @inheritdoc */
10596
public function delete()
10697
{
10798
parent::delete();
108-
$this->channel->basic_ack($this->message->get('delivery_tag'));
99+
100+
$this->consumer->acknowledge($this->message);
109101
}
110102

111103
/** @inheritdoc */
@@ -143,7 +135,7 @@ public function release($delay = 0)
143135
*/
144136
public function getJobId(): string
145137
{
146-
return $this->message->get('correlation_id');
138+
return $this->message->getCorrelationId();
147139
}
148140

149141
/**

0 commit comments

Comments
 (0)