Skip to content

Commit 167a4b1

Browse files
authored
Merge pull request vyuldashev#130 from lephleg/v4.2
Fixes push performance issue on L4.2.
2 parents 52fe570 + 0ac92a9 commit 167a4b1

File tree

2 files changed

+23
-2
lines changed

2 files changed

+23
-2
lines changed

src/VladimirYuldashev/LaravelQueueRabbitMQ/Queue/Connectors/RabbitMQConnector.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
namespace VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Connectors;
44

55
use Illuminate\Queue\Connectors\ConnectorInterface;
6-
use PhpAmqpLib\Connection\AMQPConnection;
6+
use PhpAmqpLib\Connection\AMQPStreamConnection as AMQPConnection;
77
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue;
88

99
class RabbitMQConnector implements ConnectorInterface

src/VladimirYuldashev/LaravelQueueRabbitMQ/Queue/RabbitMQQueue.php

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
use Illuminate\Queue\Queue;
77
use Illuminate\Queue\QueueInterface;
88
use PhpAmqpLib\Channel\AMQPChannel;
9-
use PhpAmqpLib\Connection\AMQPConnection;
9+
use PhpAmqpLib\Connection\AMQPStreamConnection as AMQPConnection;
1010
use PhpAmqpLib\Message\AMQPMessage;
1111
use PhpAmqpLib\Wire\AMQPTable;
1212
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob;
@@ -18,6 +18,8 @@ class RabbitMQQueue extends Queue implements QueueInterface
1818
protected $channel;
1919

2020
protected $defaultQueue;
21+
protected $declaredQueues = [];
22+
protected $declaredDelayedQueues = [];
2123
protected $configQueue;
2224
protected $configExchange;
2325

@@ -138,11 +140,20 @@ public function getChannel()
138140

139141
/**
140142
* @param string $name
143+
*
144+
* @return string
141145
*/
142146
public function declareQueue($name)
143147
{
144148
$name = $this->getQueueName($name);
145149

150+
// if the current queue has been already declared, skip this
151+
if (!in_array($name, $this->declaredQueues)) {
152+
$this->declaredQueues[]= $name;
153+
} else {
154+
return $name;
155+
}
156+
146157
// declare queue
147158
$this->channel->queue_declare(
148159
$name,
@@ -163,6 +174,9 @@ public function declareQueue($name)
163174

164175
// bind queue to the exchange
165176
$this->channel->queue_bind($name, $name, $name);
177+
178+
return $name;
179+
166180
}
167181

168182
/**
@@ -177,6 +191,13 @@ public function declareDelayedQueue($destination, $delay)
177191
$destination = $this->getQueueName($destination);
178192
$name = $this->getQueueName($destination) . '_deferred_' . $delay;
179193

194+
// if the current delayed queue has been already declared, skip this
195+
if (!in_array($name, $this->declaredDelayedQueues)) {
196+
$this->declaredDelayedQueues[]= $name;
197+
} else {
198+
return $name;
199+
}
200+
180201
// declare exchange
181202
$this->channel->exchange_declare(
182203
$name,

0 commit comments

Comments
 (0)