Skip to content

Commit 7e206e8

Browse files
committed
Merge branch 'v5.3' into v5.4
# Conflicts: # composer.json # src/VladimirYuldashev/LaravelQueueRabbitMQ/Queue/RabbitMQQueue.php
2 parents cc2d8e1 + f5bdc5f commit 7e206e8

File tree

8 files changed

+166
-49
lines changed

8 files changed

+166
-49
lines changed

composer.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
],
1212
"require": {
1313
"php": ">=5.6.4",
14+
"illuminate/database": "5.4.*",
1415
"illuminate/support": "5.4.*",
1516
"illuminate/queue": "5.4.*",
1617
"php-amqplib/php-amqplib": "2.6.*"

src/VladimirYuldashev/LaravelQueueRabbitMQ/LaravelQueueRabbitMQServiceProvider.php

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
namespace VladimirYuldashev\LaravelQueueRabbitMQ;
44

5+
use Illuminate\Queue\QueueManager;
56
use Illuminate\Support\ServiceProvider;
67
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Connectors\RabbitMQConnector;
78

@@ -26,8 +27,16 @@ public function register()
2627
*/
2728
public function boot()
2829
{
29-
app('queue')->addConnector('rabbitmq', function () {
30-
return new RabbitMQConnector();
30+
/** @var QueueManager $queue */
31+
$queue = $this->app['queue'];
32+
$connector = new RabbitMQConnector;
33+
34+
$queue->stopping(function () use ($connector) {
35+
$connector->connection()->close();
36+
});
37+
38+
$queue->addConnector('rabbitmq', function () use ($connector) {
39+
return $connector;
3140
});
3241
}
3342
}

src/VladimirYuldashev/LaravelQueueRabbitMQ/Queue/Connectors/RabbitMQConnector.php

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@
88

99
class RabbitMQConnector implements ConnectorInterface
1010
{
11+
12+
/** @var AMQPStreamConnection */
13+
private $connection;
14+
1115
/**
1216
* Establish a queue connection.
1317
*
@@ -18,7 +22,7 @@ class RabbitMQConnector implements ConnectorInterface
1822
public function connect(array $config)
1923
{
2024
// create connection with AMQP
21-
$connection = new AMQPStreamConnection(
25+
$this->connection = new AMQPStreamConnection(
2226
$config['host'],
2327
$config['port'],
2428
$config['login'],
@@ -27,8 +31,14 @@ public function connect(array $config)
2731
);
2832

2933
return new RabbitMQQueue(
30-
$connection,
34+
$this->connection,
3135
$config
3236
);
3337
}
38+
39+
public function connection()
40+
{
41+
return $this->connection;
42+
}
43+
3444
}

src/VladimirYuldashev/LaravelQueueRabbitMQ/Queue/Jobs/RabbitMQJob.php

Lines changed: 68 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,21 @@
22

33
namespace VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs;
44

5+
use Exception;
56
use Illuminate\Container\Container;
67
use Illuminate\Contracts\Queue\Job as JobContract;
8+
use Illuminate\Database\DetectsDeadlocks;
79
use Illuminate\Queue\Jobs\Job;
10+
use Illuminate\Support\Str;
811
use PhpAmqpLib\Channel\AMQPChannel;
912
use PhpAmqpLib\Message\AMQPMessage;
1013
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue;
1114

1215
class RabbitMQJob extends Job implements JobContract
1316
{
17+
18+
use DetectsDeadlocks;
19+
1420
/**
1521
* Same as RabbitMQQueue, used for attempt counts.
1622
*/
@@ -24,26 +30,56 @@ class RabbitMQJob extends Job implements JobContract
2430
/**
2531
* Creates a new instance of RabbitMQJob.
2632
*
27-
* @param \Illuminate\Container\Container $container
33+
* @param \Illuminate\Container\Container $container
2834
* @param \VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue $connection
29-
* @param \PhpAmqpLib\Channel\AMQPChannel $channel
30-
* @param string $queue
31-
* @param \PhpAmqpLib\Message\AMQPMessage $message
35+
* @param \PhpAmqpLib\Channel\AMQPChannel $channel
36+
* @param string $queue
37+
* @param \PhpAmqpLib\Message\AMQPMessage $message
3238
*/
3339
public function __construct(
3440
Container $container,
3541
RabbitMQQueue $connection,
3642
AMQPChannel $channel,
3743
$queue,
3844
AMQPMessage $message
39-
) {
45+
)
46+
{
4047
$this->container = $container;
4148
$this->connection = $connection;
4249
$this->channel = $channel;
4350
$this->queue = $queue;
4451
$this->message = $message;
4552
}
4653

54+
/**
55+
* Fire the job.
56+
* @return void
57+
* @throws Exception
58+
*/
59+
public function fire()
60+
{
61+
$payload = $this->payload();
62+
63+
list($class, $method) = $this->parseJob($payload['job']);
64+
65+
$this->instance = $this->resolve($class);
66+
67+
try {
68+
$this->instance->{$method}($this, $payload['data']);
69+
} catch (Exception $exception) {
70+
if (
71+
$this->causedByDeadlock($exception) ||
72+
Str::contains($exception->getMessage(), ['detected deadlock'])
73+
) {
74+
sleep(2);
75+
$this->fire();
76+
return;
77+
}
78+
79+
throw $exception;
80+
}
81+
}
82+
4783
/**
4884
* Get the number of times the job has been attempted.
4985
*
@@ -88,8 +124,8 @@ public function delete()
88124
* Release the job back into the queue.
89125
*
90126
* @param int $delay
91-
*
92127
* @return void
128+
* @throws Exception
93129
*/
94130
public function release($delay = 0)
95131
{
@@ -104,7 +140,7 @@ public function release($delay = 0)
104140
* Some jobs don't have the command set, so fall back to just sending it the job name string
105141
*/
106142
if (isset($body['data']['command']) === true) {
107-
$job = unserialize($body['data']['command']);
143+
$job = $this->unserialize($body);
108144
} else {
109145
$job = $this->getName();
110146
}
@@ -151,4 +187,29 @@ public function setJobId($id)
151187
{
152188
$this->connection->setCorrelationId($id);
153189
}
190+
191+
/**
192+
* Unserialize job
193+
*
194+
* @param array $body
195+
* @return mixed
196+
* @throws Exception
197+
*/
198+
private function unserialize(array $body)
199+
{
200+
try {
201+
return unserialize($body['data']['command']);
202+
} catch (Exception $exception) {
203+
if (
204+
$this->causedByDeadlock($exception) ||
205+
Str::contains($exception->getMessage(), ['detected deadlock'])
206+
) {
207+
sleep(2);
208+
return $this->unserialize($body);
209+
}
210+
211+
throw $exception;
212+
}
213+
}
214+
154215
}

src/VladimirYuldashev/LaravelQueueRabbitMQ/Queue/RabbitMQQueue.php

Lines changed: 62 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
namespace VladimirYuldashev\LaravelQueueRabbitMQ\Queue;
44

55
use DateTime;
6+
use ErrorException;
7+
use Exception;
8+
use Log;
69
use Illuminate\Contracts\Queue\Queue as QueueContract;
710
use Illuminate\Queue\Queue;
811
use PhpAmqpLib\Channel\AMQPChannel;
@@ -24,6 +27,8 @@ class RabbitMQQueue extends Queue implements QueueContract
2427
protected $declareExchange;
2528
protected $declaredExchanges = [];
2629
protected $declareBindQueue;
30+
protected $sleepOnError;
31+
2732
protected $declaredQueues = [];
2833

2934
protected $defaultQueue;
@@ -52,6 +57,7 @@ public function __construct(AMQPStreamConnection $amqpConnection, $config)
5257
$this->configExchange = $config['exchange_params'];
5358
$this->declareExchange = $config['exchange_declare'];
5459
$this->declareBindQueue = $config['queue_declare_bind'];
60+
$this->sleepOnError = isset($config['sleep_on_error']) ? $config['sleep_on_error'] : 5;
5561

5662
$this->channel = $this->getChannel();
5763
}
@@ -93,33 +99,39 @@ public function push($job, $data = '', $queue = null)
9399
*/
94100
public function pushRaw($payload, $queue = null, array $options = [])
95101
{
96-
$queue = $this->getQueueName($queue);
97-
$this->declareQueue($queue);
98-
if (isset($options['delay']) && $options['delay'] > 0) {
99-
list($queue, $exchange) = $this->declareDelayedQueue($queue, $options['delay']);
100-
} else {
101-
list($queue, $exchange) = $this->declareQueue($queue);
102-
}
103-
104-
$headers = [
105-
'Content-Type' => 'application/json',
106-
'delivery_mode' => 2,
107-
];
108-
109-
if (isset($this->retryAfter) === true) {
110-
$headers['application_headers'] = [self::ATTEMPT_COUNT_HEADERS_KEY => ['I', $this->retryAfter]];
102+
try {
103+
$queue = $this->getQueueName($queue);
104+
$this->declareQueue($queue);
105+
if (isset($options['delay']) && $options['delay'] > 0) {
106+
list($queue, $exchange) = $this->declareDelayedQueue($queue, $options['delay']);
107+
} else {
108+
list($queue, $exchange) = $this->declareQueue($queue);
109+
}
110+
111+
$headers = [
112+
'Content-Type' => 'application/json',
113+
'delivery_mode' => 2,
114+
];
115+
116+
if (isset($this->retryAfter) === true) {
117+
$headers['application_headers'] = [self::ATTEMPT_COUNT_HEADERS_KEY => ['I', $this->retryAfter]];
118+
}
119+
120+
// push job to a queue
121+
$message = new AMQPMessage($payload, $headers);
122+
123+
$correlationId = $this->getCorrelationId();
124+
$message->set('correlation_id', $correlationId);
125+
126+
// push task to a queue
127+
$this->channel->basic_publish($message, $exchange, $queue);
128+
129+
return $correlationId;
130+
} catch (ErrorException $exception) {
131+
$this->reportConnectionError('pushRaw', $exception);
111132
}
112133

113-
// push job to a queue
114-
$message = new AMQPMessage($payload, $headers);
115-
116-
$correlationId = $this->getCorrelationId();
117-
$message->set('correlation_id', $correlationId);
118-
119-
// push task to a queue
120-
$this->channel->basic_publish($message, $exchange, $queue);
121-
122-
return $correlationId;
134+
return null;
123135
}
124136

125137
/**
@@ -148,15 +160,22 @@ public function pop($queue = null)
148160
{
149161
$queue = $this->getQueueName($queue);
150162

151-
// declare queue if not exists
152-
$this->declareQueue($queue);
163+
try {
164+
// declare queue if not exists
165+
$this->declareQueue($queue);
153166

154-
// get envelope
155-
$message = $this->channel->basic_get($queue);
167+
// get envelope
168+
$message = $this->channel->basic_get($queue);
156169

157-
if ($message instanceof AMQPMessage) {
158-
return new RabbitMQJob($this->container, $this, $this->channel, $queue, $message);
170+
if ($message instanceof AMQPMessage) {
171+
return new RabbitMQJob($this->container, $this, $this->channel, $queue, $message);
172+
}
159173
}
174+
catch(ErrorException $exception) {
175+
$this->reportConnectionError('pop', $exception);
176+
}
177+
178+
return null;
160179
}
161180

162181
/**
@@ -300,4 +319,16 @@ public function getCorrelationId()
300319
{
301320
return $this->correlationId ?: uniqid();
302321
}
322+
323+
/**
324+
* @param string $action
325+
* @param Exception $e
326+
*/
327+
private function reportConnectionError($action, Exception $e)
328+
{
329+
Log::error('AMQP error while attempting ' . $action . ': ' . $e->getMessage());
330+
// Sleep so that we don't flood the log file
331+
sleep($this->sleepOnError);
332+
}
333+
303334
}

src/config/rabbitmq.php

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@
1212
'host' => env('RABBITMQ_HOST', '127.0.0.1'),
1313
'port' => env('RABBITMQ_PORT', 5672),
1414

15-
'vhost' => env('RABBITMQ_VHOST', '/'),
16-
'login' => env('RABBITMQ_LOGIN', 'guest'),
15+
'vhost' => env('RABBITMQ_VHOST', '/'),
16+
'login' => env('RABBITMQ_LOGIN', 'guest'),
1717
'password' => env('RABBITMQ_PASSWORD', 'guest'),
1818

1919
'queue' => env('RABBITMQ_QUEUE'),
@@ -25,9 +25,9 @@
2525
// create the queue if not exists and bind to the exchange
2626

2727
'queue_params' => [
28-
'passive' => env('RABBITMQ_QUEUE_PASSIVE', false),
29-
'durable' => env('RABBITMQ_QUEUE_DURABLE', true),
30-
'exclusive' => env('RABBITMQ_QUEUE_EXCLUSIVE', false),
28+
'passive' => env('RABBITMQ_QUEUE_PASSIVE', false),
29+
'durable' => env('RABBITMQ_QUEUE_DURABLE', true),
30+
'exclusive' => env('RABBITMQ_QUEUE_EXCLUSIVE', false),
3131
'auto_delete' => env('RABBITMQ_QUEUE_AUTODELETE', false),
3232
],
3333
'exchange_params' => [
@@ -40,4 +40,6 @@
4040
'auto_delete' => env('RABBITMQ_EXCHANGE_AUTODELETE', false),
4141
],
4242

43+
'sleep_on_error' => env('RABBITMQ_ERROR_SLEEP', 5), // the number of seconds to sleep if there's an error communicating with rabbitmq
44+
4345
];

tests/RabbitMQConnectorTest.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
<?php
22

3+
use PhpAmqpLib\Connection\AMQPStreamConnection;
34
use PHPUnit\Framework\TestCase;
45
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Connectors\RabbitMQConnector;
56
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue;
@@ -38,5 +39,6 @@ public function test_connect()
3839
$queue = $connector->connect($config);
3940

4041
$this->assertInstanceOf(RabbitMQQueue::class, $queue);
42+
$this->assertInstanceOf(AMQPStreamConnection::class, $connector->connection());
4143
}
4244
}

0 commit comments

Comments
 (0)