Skip to content

Commit 14e740f

Browse files
committed
Merge branch 'fix/refactor-creation-of-connections-and-queues' into feature/support-octane-reconnect
2 parents 514d6dc + b43ac84 commit 14e740f

File tree

1 file changed

+22
-6
lines changed

1 file changed

+22
-6
lines changed

src/Queue/RabbitMQQueue.php

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ public function size($queue = null): int
8484
}
8585

8686
// create a temporary channel, so the main channel will not be closed on exception
87-
$channel = $this->getConnection()->channel();
87+
$channel = $this->createChannel();
8888
[, $size] = $channel->queue_declare($queue, true);
8989
$channel->close();
9090

@@ -277,7 +277,7 @@ public function setConnection(AbstractConnection $connection): RabbitMQQueue
277277
public function getChannel($forceNew = false): AMQPChannel
278278
{
279279
if (! $this->channel || $forceNew) {
280-
$this->channel = $this->getConnection()->channel();
280+
$this->channel = $this->createChannel();
281281
}
282282

283283
return $this->channel;
@@ -333,7 +333,7 @@ public function isExchangeExists(string $exchange): bool
333333

334334
try {
335335
// create a temporary channel, so the main channel will not be closed on exception
336-
$channel = $this->getConnection()->channel();
336+
$channel = $this->createChannel();
337337
$channel->exchange_declare($exchange, '', true);
338338
$channel->close();
339339

@@ -405,12 +405,20 @@ public function deleteExchange(string $name, bool $unused = false): void
405405
*/
406406
public function isQueueExists(string $name = null): bool
407407
{
408+
$queueName = $this->getQueue($name);
409+
410+
if ($this->isQueueDeclared($queueName)) {
411+
return true;
412+
}
413+
408414
try {
409415
// create a temporary channel, so the main channel will not be closed on exception
410-
$channel = $this->getConnection()->channel();
411-
$channel->queue_declare($this->getQueue($name), true);
416+
$channel = $this->createChannel();
417+
$channel->queue_declare($queueName, true);
412418
$channel->close();
413419

420+
$this->queues[] = $queueName;
421+
414422
return true;
415423
} catch (AMQPProtocolChannelException $exception) {
416424
if ($exception->amqp_reply_code === 404) {
@@ -457,6 +465,9 @@ public function deleteQueue(string $name, bool $if_unused = false, bool $if_empt
457465
return;
458466
}
459467

468+
$idx = array_search($name, $this->queues);
469+
unset($this->queues[$idx]);
470+
460471
$this->getChannel()->queue_delete($name, $if_unused, $if_empty);
461472
}
462473

@@ -482,7 +493,7 @@ public function bindQueue(string $queue, string $exchange, string $routingKey =
482493
public function purge(string $queue = null): void
483494
{
484495
// create a temporary channel, so the main channel will not be closed on exception
485-
$channel = $this->getConnection()->channel();
496+
$channel = $this->createChannel();
486497
$channel->queue_purge($this->getQueue($queue));
487498
$channel->close();
488499
}
@@ -739,4 +750,9 @@ protected function publishBatch(): void
739750
{
740751
$this->getChannel()->publish_batch();
741752
}
753+
754+
protected function createChannel(): AMQPChannel
755+
{
756+
return $this->getConnection()->channel();
757+
}
742758
}

0 commit comments

Comments
 (0)