Skip to content

Commit 78f2c93

Browse files
author
Devon Weller
committed
use consumer and declarations cache
1 parent b565914 commit 78f2c93

File tree

1 file changed

+33
-5
lines changed

1 file changed

+33
-5
lines changed

src/Queue/RabbitMQQueue.php

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ class RabbitMQQueue extends Queue implements QueueContract
2424
private $declaredExchanges = [];
2525
private $declaredQueues = [];
2626

27+
private $declarationsCache = [];
28+
private $consumerCache = [];
29+
2730
/**
2831
* @var AmqpContext
2932
*/
@@ -42,7 +45,7 @@ public function __construct(AmqpContext $context, array $config)
4245
$this->exchangeOptions['arguments'] = isset($this->exchangeOptions['arguments']) ?
4346
json_decode($this->exchangeOptions['arguments'], true) : [];
4447

45-
$this->receiveConfig = $config['receive'] ?? ;
48+
$this->receiveConfig = $config['receive'] ?? [];
4649

4750
$this->sleepOnError = $config['sleep_on_error'] ?? 5;
4851
}
@@ -51,7 +54,7 @@ public function __construct(AmqpContext $context, array $config)
5154
public function size($queueName = null): int
5255
{
5356
/** @var AmqpQueue $queue */
54-
list($queue) = $this->declareEverything($queueName);
57+
list($queue) = $this->declareEverythingOnce($queueName);
5558

5659
return $this->context->declareQueue($queue);
5760
}
@@ -70,7 +73,7 @@ public function pushRaw($payload, $queueName = null, array $options = [])
7073
* @var AmqpTopic $topic
7174
* @var AmqpQueue $queue
7275
*/
73-
list($queue, $topic) = $this->declareEverything($queueName);
76+
list($queue, $topic) = $this->declareEverythingOnce($queueName);
7477

7578
$message = $this->context->createMessage($payload);
7679
$message->setRoutingKey($queue->getQueueName());
@@ -126,9 +129,10 @@ public function pop($queueName = null)
126129
{
127130
try {
128131
/** @var AmqpQueue $queue */
129-
list($queue) = $this->declareEverything($queueName);
132+
list($queue) = $this->declareEverythingOnce($queueName);
130133

131-
$consumer = $this->context->createConsumer($queue);
134+
// create the consumer once and cache it
135+
$consumer = $this->createConsumerOnce($queue);
132136

133137
if (isset($this->receiveConfig['method']) AND $this->receiveConfig['method'] == 'basic_consume') {
134138
$message = $consumer->receive($this->receiveConfig['timeout']);
@@ -176,6 +180,20 @@ public function getContext(): AmqpContext
176180
return $this->context;
177181
}
178182

183+
/**
184+
* @param string $queueName
185+
*
186+
* @return array [Interop\Amqp\AmqpQueue, Interop\Amqp\AmqpTopic]
187+
*/
188+
private function declareEverythingOnce(string $queueName = null): array
189+
{
190+
$queueName = $queueName ?: $this->queueOptions['name'];
191+
if (!isset($this->declarationsCache[$queueName])) {
192+
$this->declarationsCache[$queueName] = $this->declareEverything($queueName);
193+
}
194+
return $this->declarationsCache[$queueName];
195+
}
196+
179197
/**
180198
* @param string $queueName
181199
*
@@ -233,6 +251,15 @@ private function declareEverything(string $queueName = null): array
233251
return [$queue, $topic];
234252
}
235253

254+
protected function createConsumerOnce($queue)
255+
{
256+
$cache_key = spl_object_hash($queue);
257+
if (!isset($this->consumerCache[$cache_key])) {
258+
$this->consumerCache[$cache_key] = $this->context->createConsumer($queue);
259+
}
260+
return $this->consumerCache[$cache_key];
261+
}
262+
236263
/**
237264
* @param string $action
238265
* @param \Exception $e
@@ -253,4 +280,5 @@ protected function reportConnectionError($action, \Exception $e)
253280
// Sleep so that we don't flood the log file
254281
sleep($this->sleepOnError);
255282
}
283+
256284
}

0 commit comments

Comments
 (0)