Skip to content

Commit 503608b

Browse files
committed
amqp interop based version.
1 parent 3be5e3e commit 503608b

File tree

4 files changed

+116
-158
lines changed

4 files changed

+116
-158
lines changed

composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
"illuminate/database": "5.5.*",
1414
"illuminate/support": "5.5.*",
1515
"illuminate/queue": "5.5.*",
16-
"php-amqplib/php-amqplib": "2.6.*"
16+
"enqueue/amqp-lib": "^0.8"
1717
},
1818
"require-dev": {
1919
"phpunit/phpunit": "~6.0",

src/Queue/Connectors/RabbitMQConnector.php

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,14 @@
22

33
namespace VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Connectors;
44

5+
use Enqueue\AmqpLib\AmqpConnectionFactory;
6+
use Enqueue\AmqpTools\RabbitMqDlxDelayStrategy;
57
use Illuminate\Contracts\Queue\Queue;
68
use Illuminate\Queue\Connectors\ConnectorInterface;
7-
use PhpAmqpLib\Connection\AMQPStreamConnection;
89
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue;
910

1011
class RabbitMQConnector implements ConnectorInterface
1112
{
12-
/** @var AMQPStreamConnection */
13-
private $connection;
14-
1513
/**
1614
* Establish a queue connection.
1715
*
@@ -21,23 +19,19 @@ class RabbitMQConnector implements ConnectorInterface
2119
*/
2220
public function connect(array $config): Queue
2321
{
24-
// create connection with AMQP
25-
$this->connection = new AMQPStreamConnection(
26-
$config['host'],
27-
$config['port'],
28-
$config['login'],
29-
$config['password'],
30-
$config['vhost']
31-
);
22+
$factory = new AmqpConnectionFactory([
23+
'host' => $config['host'],
24+
'port' => $config['port'],
25+
'user' => $config['login'],
26+
'pass' => $config['password'],
27+
'vhost' => $config['vhost'],
28+
]);
29+
30+
$factory->setDelayStrategy(new RabbitMqDlxDelayStrategy());
3231

3332
return new RabbitMQQueue(
34-
$this->connection,
33+
$factory->createContext(),
3534
$config
3635
);
3736
}
38-
39-
public function connection()
40-
{
41-
return $this->connection;
42-
}
4337
}

src/Queue/Jobs/RabbitMQJob.php

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@
99
use Illuminate\Queue\Jobs\Job;
1010
use Illuminate\Queue\Jobs\JobName;
1111
use Illuminate\Support\Str;
12-
use PhpAmqpLib\Channel\AMQPChannel;
13-
use PhpAmqpLib\Message\AMQPMessage;
12+
use Interop\Amqp\AmqpConsumer;
13+
use Interop\Amqp\AmqpMessage;
1414
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue;
1515

1616
class RabbitMQJob extends Job implements JobContract
@@ -23,20 +23,20 @@ class RabbitMQJob extends Job implements JobContract
2323
const ATTEMPT_COUNT_HEADERS_KEY = 'attempts_count';
2424

2525
protected $connection;
26-
protected $channel;
26+
protected $consumer;
2727
protected $message;
2828

2929
public function __construct(
3030
Container $container,
3131
RabbitMQQueue $connection,
32-
AMQPChannel $channel,
32+
AmqpConsumer $consumer,
3333
string $queue,
34-
AMQPMessage $message,
34+
AmqpMessage $message,
3535
string $connectionName = null
3636
) {
3737
$this->container = $container;
3838
$this->connection = $connection;
39-
$this->channel = $channel;
39+
$this->consumer = $consumer;
4040
$this->queue = $queue;
4141
$this->message = $message;
4242
$this->connectionName = $connectionName;
@@ -79,16 +79,10 @@ public function fire()
7979
*/
8080
public function attempts(): int
8181
{
82-
if ($this->message->has('application_headers') === true) {
83-
$headers = $this->message->get('application_headers')->getNativeData();
84-
85-
if (isset($headers[self::ATTEMPT_COUNT_HEADERS_KEY]) === true) {
86-
return $headers[self::ATTEMPT_COUNT_HEADERS_KEY];
87-
}
88-
}
89-
9082
// set default job attempts to 1 so that jobs can run without retry
91-
return 1;
83+
$defaultAttempts = 1;
84+
85+
return $this->message->getProperty(self::ATTEMPT_COUNT_HEADERS_KEY, $defaultAttempts);
9286
}
9387

9488
/**
@@ -98,14 +92,15 @@ public function attempts(): int
9892
*/
9993
public function getRawBody(): string
10094
{
101-
return $this->message->body;
95+
return $this->message->getBody();
10296
}
10397

10498
/** @inheritdoc */
10599
public function delete()
106100
{
107101
parent::delete();
108-
$this->channel->basic_ack($this->message->get('delivery_tag'));
102+
103+
$this->consumer->acknowledge($this->message);
109104
}
110105

111106
/** @inheritdoc */
@@ -143,7 +138,7 @@ public function release($delay = 0)
143138
*/
144139
public function getJobId(): string
145140
{
146-
return $this->message->get('correlation_id');
141+
return $this->message->getCorrelationId();
147142
}
148143

149144
/**

0 commit comments

Comments
 (0)