Skip to content

Commit cca5684

Browse files
author
Devon Weller
committed
Reopen outbound connection on write failure
1 parent 7c9f0c9 commit cca5684

File tree

2 files changed

+78
-63
lines changed

2 files changed

+78
-63
lines changed

src/Queue/Connectors/RabbitMQConnector.php

Lines changed: 32 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -38,42 +38,45 @@ public function connect(array $config): Queue
3838
throw new \LogicException('The factory_class option is missing though it is required.');
3939
}
4040

41-
$factoryClass = $config['factory_class'];
42-
if (false === class_exists($factoryClass) || false === (new \ReflectionClass($factoryClass))->implementsInterface(InteropAmqpConnectionFactory::class)) {
43-
throw new \LogicException(sprintf('The factory_class option has to be valid class that implements "%s"', InteropAmqpConnectionFactory::class));
44-
}
41+
// for reconnecting...
42+
$build_context_fn = function() use ($config) {
43+
$factoryClass = $config['factory_class'];
44+
if (false === class_exists($factoryClass) || false === (new \ReflectionClass($factoryClass))->implementsInterface(InteropAmqpConnectionFactory::class)) {
45+
throw new \LogicException(sprintf('The factory_class option has to be valid class that implements "%s"', InteropAmqpConnectionFactory::class));
46+
}
4547

46-
/** @var AmqpConnectionFactory $factory */
47-
$factory = new $factoryClass([
48-
'dsn' => $config['dsn'],
49-
'host' => $config['host'],
50-
'port' => $config['port'],
51-
'user' => $config['login'],
52-
'pass' => $config['password'],
53-
'vhost' => $config['vhost'],
54-
'ssl_on' => $config['ssl_params']['ssl_on'],
55-
'ssl_verify' => $config['ssl_params']['verify_peer'],
56-
'ssl_cacert' => $config['ssl_params']['cafile'],
57-
'ssl_cert' => $config['ssl_params']['local_cert'],
58-
'ssl_key' => $config['ssl_params']['local_key'],
59-
'ssl_passphrase' => $config['ssl_params']['passphrase'],
60-
'receive_method' => isset($config['receive']) ? $config['receive']['method'] : 'basic_get',
61-
'heartbeat' => isset($config['timeouts']) ? $config['timeouts']['heartbeat'] : 0,
62-
'read_timeout' => isset($config['timeouts']) ? $config['timeouts']['read'] : 3,
63-
'write_timeout' => isset($config['timeouts']) ? $config['timeouts']['write'] : 3,
64-
]);
48+
/** @var AmqpConnectionFactory $factory */
49+
$factory = new $factoryClass([
50+
'dsn' => $config['dsn'],
51+
'host' => $config['host'],
52+
'port' => $config['port'],
53+
'user' => $config['login'],
54+
'pass' => $config['password'],
55+
'vhost' => $config['vhost'],
56+
'ssl_on' => $config['ssl_params']['ssl_on'],
57+
'ssl_verify' => $config['ssl_params']['verify_peer'],
58+
'ssl_cacert' => $config['ssl_params']['cafile'],
59+
'ssl_cert' => $config['ssl_params']['local_cert'],
60+
'ssl_key' => $config['ssl_params']['local_key'],
61+
'ssl_passphrase' => $config['ssl_params']['passphrase'],
62+
'receive_method' => isset($config['receive']) ? $config['receive']['method'] : 'basic_get',
63+
'heartbeat' => isset($config['timeouts']) ? $config['timeouts']['heartbeat'] : 0,
64+
'read_timeout' => isset($config['timeouts']) ? $config['timeouts']['read'] : 3,
65+
'write_timeout' => isset($config['timeouts']) ? $config['timeouts']['write'] : 3,
66+
]);
6567

66-
if ($factory instanceof DelayStrategyAware) {
67-
$factory->setDelayStrategy(new RabbitMqDlxDelayStrategy());
68-
}
68+
if ($factory instanceof DelayStrategyAware) {
69+
$factory->setDelayStrategy(new RabbitMqDlxDelayStrategy());
70+
}
6971

70-
/** @var AmqpContext $context */
71-
$context = $factory->createContext();
72+
return $factory->createContext();
73+
};
74+
$context = $build_context_fn();
7275

7376
$this->dispatcher->listen(WorkerStopping::class, function () use ($context) {
7477
$context->close();
7578
});
7679

77-
return new RabbitMQQueue($context, $config);
80+
return new RabbitMQQueue($context, $config, $build_context_fn);
7881
}
7982
}

src/Queue/RabbitMQQueue.php

Lines changed: 46 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,14 @@ class RabbitMQQueue extends Queue implements QueueContract
3030
/**
3131
* @var AmqpContext
3232
*/
33+
protected $build_context_fn;
3334
private $context;
3435
private $correlationId;
3536

36-
public function __construct(AmqpContext $context, array $config)
37+
public function __construct(AmqpContext $context, array $config, callable $build_context_fn)
3738
{
3839
$this->context = $context;
40+
$this->build_context_fn = $build_context_fn;
3941

4042
$this->queueOptions = $config['options']['queue'];
4143
$this->queueOptions['arguments'] = isset($this->queueOptions['arguments']) ?
@@ -50,6 +52,13 @@ public function __construct(AmqpContext $context, array $config)
5052
$this->sleepOnError = $config['sleep_on_error'] ?? 5;
5153
}
5254

55+
public function reconnect()
56+
{
57+
$this->context = call_user_func($this->build_context_fn);
58+
$this->correlationId = null;
59+
$this->declarationsCache = [];
60+
}
61+
5362
/** @inheritdoc */
5463
public function size($queueName = null): int
5564
{
@@ -68,40 +77,43 @@ public function push($job, $data = '', $queue = null)
6877
/** @inheritdoc */
6978
public function pushRaw($payload, $queueName = null, array $options = [])
7079
{
71-
try {
72-
/**
73-
* @var AmqpTopic $topic
74-
* @var AmqpQueue $queue
75-
*/
76-
list($queue, $topic) = $this->declareEverythingOnce($queueName);
77-
78-
$message = $this->context->createMessage($payload);
79-
$message->setRoutingKey($queue->getQueueName());
80-
$message->setCorrelationId($this->getCorrelationId());
81-
$message->setContentType('application/json');
82-
$message->setDeliveryMode(AmqpMessage::DELIVERY_MODE_PERSISTENT);
83-
84-
if (isset($options['attempts'])) {
85-
$message->setProperty(RabbitMQJob::ATTEMPT_COUNT_HEADERS_KEY, $options['attempts']);
86-
}
87-
88-
// set priority
89-
if (isset($options['priority']) && $options['priority'] > 0) {
90-
$message->setPriority($options['priority']);
80+
for ($attempt = 1; $attempt <= 2; $attempt++) {
81+
try {
82+
/**
83+
* @var AmqpTopic $topic
84+
* @var AmqpQueue $queue
85+
*/
86+
list($queue, $topic) = $this->declareEverythingOnce($queueName);
87+
88+
$message = $this->context->createMessage($payload);
89+
$message->setRoutingKey($queue->getQueueName());
90+
$message->setCorrelationId($this->getCorrelationId());
91+
$message->setContentType('application/json');
92+
$message->setDeliveryMode(AmqpMessage::DELIVERY_MODE_PERSISTENT);
93+
94+
if (isset($options['attempts'])) {
95+
$message->setProperty(RabbitMQJob::ATTEMPT_COUNT_HEADERS_KEY, $options['attempts']);
96+
}
97+
98+
$producer = $this->context->createProducer();
99+
if (isset($options['delay']) && $options['delay'] > 0) {
100+
$producer->setDeliveryDelay($options['delay'] * 1000);
101+
}
102+
103+
$producer->send($topic, $message);
104+
105+
return $message->getCorrelationId();
106+
} catch (\Exception $exception) {
107+
// on first failure, try re-closing and opening the queue connection
108+
if ($attempt == 1) {
109+
$this->reconnect();
110+
continue;
111+
}
112+
113+
$this->reportConnectionError('pushRaw', $exception);
114+
115+
return null;
91116
}
92-
93-
$producer = $this->context->createProducer();
94-
if (isset($options['delay']) && $options['delay'] > 0) {
95-
$producer->setDeliveryDelay($options['delay'] * 1000);
96-
}
97-
98-
$producer->send($topic, $message);
99-
100-
return $message->getCorrelationId();
101-
} catch (\Exception $exception) {
102-
$this->reportConnectionError('pushRaw', $exception);
103-
104-
return null;
105117
}
106118
}
107119

0 commit comments

Comments
 (0)