Skip to content

Commit 313aea2

Browse files
committed
SWR-18338: Close connections on switch
1 parent 943000b commit 313aea2

File tree

3 files changed

+24
-25
lines changed

3 files changed

+24
-25
lines changed

composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
},
3535
"extra": {
3636
"branch-alias": {
37-
"dev-master": "1.21-dev"
37+
"dev-master": "1.22-dev"
3838
},
3939
"laravel": {
4040
"providers": [

src/Consumer.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ class Consumer extends Worker
3232
/** @var AMQPChannel */
3333
protected $channel;
3434

35+
protected $connection;
36+
3537
/** @var object|null */
3638
protected $currentJob;
3739

src/VhostsConsumers/AbstractVhostsConsumer.php

Lines changed: 21 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,7 @@ public function __construct(
7474
ExceptionHandler $exceptions,
7575
callable $isDownForMaintenance,
7676
callable $resetScope = null
77-
)
78-
{
77+
) {
7978
parent::__construct($manager, $events, $exceptions, $isDownForMaintenance, $resetScope);
8079
}
8180

@@ -124,7 +123,7 @@ public function daemon($connectionName, $queue, WorkerOptions $options)
124123

125124
$this->vhostDaemon($connectionName, $options);
126125
}
127-
126+
128127
abstract protected function vhostDaemon($connectionName, WorkerOptions $options);
129128

130129
/**
@@ -141,8 +140,7 @@ protected function getStopStatus(
141140
$startTime = 0,
142141
$jobsProcessed = 0,
143142
bool $hasJob = false
144-
): ?int
145-
{
143+
): ?int {
146144
return match (true) {
147145
$this->shouldQuit => static::EXIT_SUCCESS,
148146
$this->memoryExceeded($options->memory) => static::EXIT_MEMORY_LIMIT,
@@ -566,16 +564,14 @@ protected function updateLastProcessedAt()
566564
/**
567565
* @return RabbitMQQueue
568566
*/
569-
protected function initConnection(): RabbitMQQueue
567+
protected function initConnection(): RabbitMQQueue
570568
{
571-
// Close any existing connection/channel
572-
if ($this->channel) {
569+
if ($this->connection) {
573570
try {
574-
$this->channel->close();
571+
$this->connection->close();
575572
} catch (\Exception $e) {
576-
// Ignore close errors
577573
}
578-
$this->channel = null;
574+
$this->connection = null;
579575
}
580576

581577
$connection = $this->manager->connection(
@@ -584,6 +580,19 @@ protected function initConnection(): RabbitMQQueue
584580

585581
try {
586582
$channel = $connection->getChannel();
583+
584+
$this->currentConnectionName = $connection->getConnectionName();
585+
586+
$this->connectionMutex->lock(self::MAIN_HANDLER_LOCK);
587+
$channel->basic_qos(
588+
$this->prefetchSize,
589+
$this->prefetchCount,
590+
false
591+
);
592+
$this->connectionMutex->unlock(self::MAIN_HANDLER_LOCK);
593+
594+
$this->channel = $channel;
595+
$this->connection = $connection;
587596
} catch (AMQPConnectionClosedException $exception) {
588597
$this->logError('initConnection.exception', [
589598
'message' => $exception->getMessage(),
@@ -601,18 +610,6 @@ protected function initConnection(): RabbitMQQueue
601610
return $this->initConnection();
602611
}
603612

604-
$this->currentConnectionName = $connection->getConnectionName();
605-
606-
$this->connectionMutex->lock(self::MAIN_HANDLER_LOCK);
607-
$channel->basic_qos(
608-
$this->prefetchSize,
609-
$this->prefetchCount,
610-
false
611-
);
612-
$this->connectionMutex->unlock(self::MAIN_HANDLER_LOCK);
613-
614-
$this->channel = $channel;
615-
616613
return $connection;
617614
}
618615

@@ -686,4 +683,4 @@ protected function log(string $message, array $data = [], bool $isError = false)
686683
$this->logger->info($logMessage, $data);
687684
}
688685
}
689-
}
686+
}

0 commit comments

Comments
 (0)