diff --git a/CHANGELOG-13x.md b/CHANGELOG-13x.md index 0631fc9d..88da7e2f 100644 --- a/CHANGELOG-13x.md +++ b/CHANGELOG-13x.md @@ -2,7 +2,25 @@ All notable changes to this project will be documented in this file. -## [unreleased](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v13.2.0...master) +## [unreleased](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v13.3.0...master) + +## [13.3.0](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v13.2.0...13.3.0) + +- Refactor the creation of RabbitMQ Connection and + QueueAPI. [#528](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/528) +- Added configuration object as single dependency for RabbitMQQueue in + constructor. [#528](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/528) +- Fix method getExchangeType, not throwing an + exception. [#528](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/528) +- Separating the api logic from the actual publishing to + RabbitMQ. [#528](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/528) +- Added a reconnect method. [#528](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/528) +- Fix the connection and channel not being fully lazy, when QueueAPI was + created. [#528](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/528) +- Keep track of declared queue's within RabbitMQ. [#528](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/528) +- Implemented the 'rest' option to the consumer [#530](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/530) +- Added ability to reconnect to RabbitMQ, by creating your + own `RabbitMQQueue:class` [#531](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/531) ## [13.2.0](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v13.1.0...13.2.0) @@ -16,7 +34,8 @@ All notable changes to this project will be documented in this file. ## [13.0.1 (2022-09-16)](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v13.0.0...v13.0.1) -- Add $dispatchAfterCommit when running via Horizon [#484](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/484) +- Add $dispatchAfterCommit when running via + Horizon [#484](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/484) ## [13.0.0 (2022-09-15)](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v12.0.1...v13.0.0) diff --git a/README.md b/README.md index 24c25862..8f0081f1 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,6 @@ RabbitMQ Queue driver for Laravel [![Latest Stable Version](https://poser.pugx.org/vladimir-yuldashev/laravel-queue-rabbitmq/v/stable?format=flat-square)](https://packagist.org/packages/vladimir-yuldashev/laravel-queue-rabbitmq) [![Build Status](https://github.com/vyuldashev/laravel-queue-rabbitmq/workflows/Tests/badge.svg)](https://github.com/vyuldashev/laravel-queue-rabbitmq/actions) [![Total Downloads](https://poser.pugx.org/vladimir-yuldashev/laravel-queue-rabbitmq/downloads?format=flat-square)](https://packagist.org/packages/vladimir-yuldashev/laravel-queue-rabbitmq) -[![StyleCI](https://styleci.io/repos/14976752/shield)](https://styleci.io/repos/14976752) [![License](https://poser.pugx.org/vladimir-yuldashev/laravel-queue-rabbitmq/license?format=flat-square)](https://packagist.org/packages/vladimir-yuldashev/laravel-queue-rabbitmq) ## Support Policy @@ -24,8 +23,12 @@ composer require vladimir-yuldashev/laravel-queue-rabbitmq The package will automatically register itself. +### Configuration + Add connection to `config/queue.php`: +> This is the minimal config for the rabbitMQ connection/driver to work. + ```php 'connections' => [ // ... @@ -33,9 +36,6 @@ Add connection to `config/queue.php`: 'rabbitmq' => [ 'driver' => 'rabbitmq', - 'queue' => env('RABBITMQ_QUEUE', 'default'), - 'connection' => PhpAmqpLib\Connection\AMQPLazyConnection::class, - 'hosts' => [ [ 'host' => env('RABBITMQ_HOST', '127.0.0.1'), @@ -44,33 +44,17 @@ Add connection to `config/queue.php`: 'password' => env('RABBITMQ_PASSWORD', 'guest'), 'vhost' => env('RABBITMQ_VHOST', '/'), ], + // ... ], - - 'options' => [ - 'ssl_options' => [ - 'cafile' => env('RABBITMQ_SSL_CAFILE', null), - 'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null), - 'local_key' => env('RABBITMQ_SSL_LOCALKEY', null), - 'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true), - 'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null), - ], - 'queue' => [ - 'job' => VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob::class, - ], - ], - - /* - * Set to "horizon" if you wish to use Laravel Horizon. - */ - 'worker' => env('RABBITMQ_WORKER', 'default'), - 'after_commit' => false, + + // ... ], // ... ], ``` -### Optional Config +### Optional Queue Config Optionally add queue options to the config of a connection. Every queue created for this connection, gets the properties. @@ -164,6 +148,31 @@ by adding extra options. ], ``` +### Horizon support + +Starting with 8.0, this package supports [Laravel Horizon](https://laravel.com/docs/horizon) out of the box. Firstly, +install Horizon and then set `RABBITMQ_WORKER` to `horizon`. + +Horizon is depending on events dispatched by the worker. +These events inform Horizon what was done with the message/job. + +This Library supports Horizon, but in the config you have to inform Laravel to use the QueueApi compatible with horizon. + +```php +'connections' => [ + // ... + + 'rabbitmq' => [ + // ... + + /* Set to "horizon" if you wish to use Laravel Horizon. */ + 'worker' => env('RABBITMQ_WORKER', 'default'), + ], + + // ... +], +``` + ### Use your own RabbitMQJob class Sometimes you have to work with messages published by another application. @@ -254,16 +263,263 @@ class RabbitMQJob extends BaseJob } ``` -## Laravel Usage +If you want to handle raw message, not in JSON format or without 'job' key in JSON, +you should add stub for `getName` method: -Once you completed the configuration you can use the Laravel Queue API. If you used other queue drivers you do not need to -change anything else. If you do not know how to use the Queue API, please refer to the official Laravel -documentation: http://laravel.com/docs/queues +```php +getRawBody(); + Log::info($anyMessage); + + $this->delete(); + } + + public function getName() + { + return ''; + } +} +``` + +### Use your own Connection + +You can extend the built-in `PhpAmqpLib\Connection\AMQPStreamConnection::class` +or `PhpAmqpLib\Connection\AMQPSLLConnection::class` and within the connection config, you can define your own class. +When you specify a `connection` key in the config, with your own class name, every connection will use your own class. + +An example for the config: -## Laravel Horizon Usage +```php +'connections' => [ + // ... + + 'rabbitmq' => [ + // ... + + 'connection' = > \App\Queue\Connection\MyRabbitMQConnection::class, + ], + + // ... +], +``` + +### Use your own Worker class + +If you want to use your own `RabbitMQQueue::class` this is possible by +extending `VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue`. +and inform laravel to use your class by setting `RABBITMQ_WORKER` to `\App\Queue\RabbitMQQueue::class`. + +> Note: Worker classes **must** extend `VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue` + +```php +'connections' => [ + // ... + + 'rabbitmq' => [ + // ... + + /* Set to a class if you wish to use your own. */ + 'worker' => \App\Queue\RabbitMQQueue::class, + ], + + // ... +], +``` + +```php + Note: this is not best practice, it is an example. + +```php +reconnect(); + parent::publishBasic($msg, $exchange, $destination, $mandatory, $immediate, $ticket); + } + } + + protected function publishBatch($jobs, $data = '', $queue = null): void + { + try { + parent::publishBatch($jobs, $data, $queue); + } catch (AMQPConnectionClosedException|AMQPChannelClosedException) { + $this->reconnect(); + parent::publishBatch($jobs, $data, $queue); + } + } + + protected function createChannel(): AMQPChannel + { + try { + return parent::createChannel(); + } catch (AMQPConnectionClosedException) { + $this->reconnect(); + return parent::createChannel(); + } + } +} +``` + +### Default Queue + +The connection does use a default queue with value 'default', when no queue is provided by laravel. +It is possible to change te default queue by adding an extra parameter in the connection config. + +```php +'connections' => [ + // ... + + 'rabbitmq' => [ + // ... + + 'queue' => env('RABBITMQ_QUEUE', 'default'), + ], + + // ... +], +``` + +### Heartbeat + +By default, your connection will be created with a heartbeat setting of `0`. +You can alter the heartbeat settings by changing the config. + +```php + +'connections' => [ + // ... + + 'rabbitmq' => [ + // ... + + 'options' => [ + // ... + + 'heartbeat' => 10, + ], + ], + + // ... +], +``` + +### SSL Secure + +If you need a secure connection to rabbitMQ server(s), you will need to add these extra config options. + +```php +'connections' => [ + // ... + + 'rabbitmq' => [ + // ... + + 'secure' = > true, + 'options' => [ + // ... + + 'ssl_options' => [ + 'cafile' => env('RABBITMQ_SSL_CAFILE', null), + 'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null), + 'local_key' => env('RABBITMQ_SSL_LOCALKEY', null), + 'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true), + 'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null), + ], + ], + ], + + // ... +], +``` + +### Events after Database commits + +To instruct Laravel workers to dispatch events after all database commits are completed. + +```php +'connections' => [ + // ... + + 'rabbitmq' => [ + // ... + + 'after_commit' => true, + ], + + // ... +], +``` + +### Lazy Connection + +By default, your connection will be created as a lazy connection. +If for some reason you don't want the connection lazy you can turn it off by setting the following config. + +```php +'connections' => [ + // ... + + 'rabbitmq' => [ + // ... + + 'lazy' = > false, + ], + + // ... +], +``` + +### Octane support + +Starting with 13.3.0, this package supports [Laravel Octane](https://laravel.com/docs/octane) out of the box. +Firstly, install Octane and don't forget to warm 'rabbitmq' connection in the octane config. +> See: https://github.com/vyuldashev/laravel-queue-rabbitmq/issues/460#issuecomment-1469851667 + +## Laravel Usage + +Once you completed the configuration you can use the Laravel Queue API. If you used other queue drivers you do not +need to change anything else. If you do not know how to use the Queue API, please refer to the official Laravel +documentation: http://laravel.com/docs/queues ## Lumen Usage @@ -287,7 +543,7 @@ There are two ways of consuming messages. Setup RabbitMQ using `docker-compose`: ```bash -docker-compose up -d rabbitmq +docker compose up -d ``` To run the test suite you can use the following commands: @@ -304,7 +560,7 @@ composer test:unit ``` If you receive any errors from the style tests, you can automatically fix most, -if not all of the issues with the following command: +if not all the issues with the following command: ```bash composer fix:style diff --git a/composer.json b/composer.json index 5461314d..20cf5310 100644 --- a/composer.json +++ b/composer.json @@ -1,59 +1,59 @@ { - "name": "vladimir-yuldashev/laravel-queue-rabbitmq", - "description": "RabbitMQ driver for Laravel Queue. Supports Laravel Horizon.", - "license": "MIT", - "authors": [ - { - "name": "Vladimir Yuldashev", - "email": "misterio92@gmail.com" - } - ], - "require": { - "php": "^8.0", - "ext-json": "*", - "illuminate/queue": "^9.0|^10.0", - "php-amqplib/php-amqplib": "^3.0" - }, - "require-dev": { - "phpunit/phpunit": "^9.3", - "mockery/mockery": "^1.0", - "laravel/horizon": "^5.0", - "orchestra/testbench": "^7.0|^8.0", - "laravel/pint": "^1.2", - "laravel/framework": "^9.0|^10.0" - }, - "autoload": { - "psr-4": { - "VladimirYuldashev\\LaravelQueueRabbitMQ\\": "src/" - } - }, - "autoload-dev": { - "psr-4": { - "VladimirYuldashev\\LaravelQueueRabbitMQ\\Tests\\": "tests/" - } - }, - "extra": { - "branch-alias": { - "dev-master": "13.0-dev" - }, - "laravel": { - "providers": [ - "VladimirYuldashev\\LaravelQueueRabbitMQ\\LaravelQueueRabbitMQServiceProvider" - ] - } - }, - "suggest": { - "ext-pcntl": "Required to use all features of the queue consumer." - }, - "scripts": { - "test": [ - "@test:style", - "@test:unit" + "name": "vladimir-yuldashev/laravel-queue-rabbitmq", + "description": "RabbitMQ driver for Laravel Queue. Supports Laravel Horizon.", + "license": "MIT", + "authors": [ + { + "name": "Vladimir Yuldashev", + "email": "misterio92@gmail.com" + } ], - "test:style": "@php vendor/bin/php-cs-fixer fix --config=.php-cs-fixer.dist.php --allow-risky=yes --dry-run --diff --verbose", - "test:unit": "@php vendor/bin/phpunit", - "fix:style": "@php vendor/bin/php-cs-fixer fix --config=.php-cs-fixer.dist.php --allow-risky=yes --diff --verbose" - }, - "minimum-stability": "dev", - "prefer-stable": true + "require": { + "php": "^8.0", + "ext-json": "*", + "illuminate/queue": "^9.0|^10.0", + "php-amqplib/php-amqplib": "^v3.2" + }, + "require-dev": { + "phpunit/phpunit": "^9.3", + "mockery/mockery": "^1.0", + "laravel/horizon": "^5.0", + "orchestra/testbench": "^7.0|^8.0", + "laravel/pint": "^1.2", + "laravel/framework": "^9.0|^10.0" + }, + "autoload": { + "psr-4": { + "VladimirYuldashev\\LaravelQueueRabbitMQ\\": "src/" + } + }, + "autoload-dev": { + "psr-4": { + "VladimirYuldashev\\LaravelQueueRabbitMQ\\Tests\\": "tests/" + } + }, + "extra": { + "branch-alias": { + "dev-master": "13.0-dev" + }, + "laravel": { + "providers": [ + "VladimirYuldashev\\LaravelQueueRabbitMQ\\LaravelQueueRabbitMQServiceProvider" + ] + } + }, + "suggest": { + "ext-pcntl": "Required to use all features of the queue consumer." + }, + "scripts": { + "test": [ + "@test:style", + "@test:unit" + ], + "test:style": "@php vendor/bin/pint --test -v", + "test:unit": "@php vendor/bin/phpunit", + "fix:style": "@php vendor/bin/pint -v" + }, + "minimum-stability": "dev", + "prefer-stable": true } diff --git a/config/rabbitmq.php b/config/rabbitmq.php index d5390a5c..4c102ce8 100644 --- a/config/rabbitmq.php +++ b/config/rabbitmq.php @@ -9,7 +9,7 @@ 'driver' => 'rabbitmq', 'queue' => env('RABBITMQ_QUEUE', 'default'), - 'connection' => PhpAmqpLib\Connection\AMQPLazyConnection::class, + 'connection' => 'default', 'hosts' => [ [ @@ -22,16 +22,6 @@ ], 'options' => [ - 'ssl_options' => [ - 'cafile' => env('RABBITMQ_SSL_CAFILE', null), - 'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null), - 'local_key' => env('RABBITMQ_SSL_LOCALKEY', null), - 'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true), - 'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null), - ], - 'queue' => [ - 'job' => VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob::class, - ], ], /* diff --git a/src/Consumer.php b/src/Consumer.php index bd7f099d..ed3d8099 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -125,6 +125,10 @@ function (AMQPMessage $message) use ($connection, $options, $connectionName, $qu if ($this->supportsAsyncSignals()) { $this->resetTimeoutHandler(); } + + if ($options->rest > 0) { + $this->sleep($options->rest); + } }, null, $arguments diff --git a/src/Contracts/RabbitMQQueueContract.php b/src/Contracts/RabbitMQQueueContract.php new file mode 100644 index 00000000..cf04a66e --- /dev/null +++ b/src/Contracts/RabbitMQQueueContract.php @@ -0,0 +1,14 @@ +size($queue); } @@ -46,8 +43,10 @@ public function push($job, $data = '', $queue = null) /** * {@inheritdoc} + * + * @throws BindingResolutionException */ - public function pushRaw($payload, $queue = null, array $options = []) + public function pushRaw($payload, $queue = null, array $options = []): int|string|null { $payload = (new JobPayload($payload))->prepare($this->lastPushed)->value; @@ -58,8 +57,10 @@ public function pushRaw($payload, $queue = null, array $options = []) /** * {@inheritdoc} + * + * @throws BindingResolutionException */ - public function later($delay, $job, $data = '', $queue = null) + public function later($delay, $job, $data = '', $queue = null): mixed { $payload = (new JobPayload($this->createPayload($job, $data)))->prepare($job)->value; @@ -80,16 +81,6 @@ public function pop($queue = null) }); } - /** - * {@inheritdoc} - */ - public function release($delay, $job, $data, $queue, $attempts = 0) - { - $this->lastPushed = $job; - - return parent::release($delay, $job, $data, $queue, $attempts); - } - /** * Fire the job deleted event. * diff --git a/src/Queue/Connection/ConfigFactory.php b/src/Queue/Connection/ConfigFactory.php new file mode 100644 index 00000000..c6e24699 --- /dev/null +++ b/src/Queue/Connection/ConfigFactory.php @@ -0,0 +1,93 @@ +setIsLazy(! in_array( + Arr::get($config, 'lazy') ?? true, + [false, 0, '0', 'false', 'no'], + true) + ); + + // Set the connection to unsecure by default + $connectionConfig->setIsSecure(in_array( + Arr::get($config, 'secure'), + [true, 1, '1', 'true', 'yes'], + true) + ); + + if ($connectionConfig->isSecure()) { + self::getSLLOptionsFromConfig($connectionConfig, $config); + } + + self::getHostFromConfig($connectionConfig, $config); + self::getHeartbeatFromConfig($connectionConfig, $config); + }); + } + + protected static function getHostFromConfig(AMQPConnectionConfig $connectionConfig, array $config): void + { + $hostConfig = Arr::first(Arr::shuffle(Arr::get($config, self::CONFIG_HOSTS, [])), null, []); + + if ($location = Arr::get($hostConfig, 'host')) { + $connectionConfig->setHost($location); + } + if ($port = Arr::get($hostConfig, 'port')) { + $connectionConfig->setPort($port); + } + if ($vhost = Arr::get($hostConfig, 'vhost')) { + $connectionConfig->setVhost($vhost); + } + if ($user = Arr::get($hostConfig, 'user')) { + $connectionConfig->setUser($user); + } + if ($password = Arr::get($hostConfig, 'password')) { + $connectionConfig->setPassword($password); + } + } + + protected static function getSLLOptionsFromConfig(AMQPConnectionConfig $connectionConfig, array $config): void + { + $sslConfig = Arr::get($config, self::CONFIG_OPTIONS.'.ssl_options', []); + + if ($caFile = Arr::get($sslConfig, 'cafile')) { + $connectionConfig->setSslCaCert($caFile); + } + if ($cert = Arr::get($sslConfig, 'local_cert')) { + $connectionConfig->setSslCert($cert); + } + if ($key = Arr::get($sslConfig, 'local_key')) { + $connectionConfig->setSslKey($key); + } + if ($verifyPeer = Arr::get($sslConfig, 'verify_peer')) { + $connectionConfig->setSslVerify($verifyPeer); + } + if ($passphrase = Arr::get($sslConfig, 'passphrase')) { + $connectionConfig->setSslPassPhrase($passphrase); + } + } + + protected static function getHeartbeatFromConfig(AMQPConnectionConfig $connectionConfig, array $config): void + { + $heartbeat = Arr::get($config, self::CONFIG_OPTIONS.'.heartbeat'); + + if (is_numeric($heartbeat) && intval($heartbeat) > 0) { + $connectionConfig->setHeartbeat((int) $heartbeat); + } + } +} diff --git a/src/Queue/Connection/ConnectionFactory.php b/src/Queue/Connection/ConnectionFactory.php new file mode 100644 index 00000000..df19f223 --- /dev/null +++ b/src/Queue/Connection/ConnectionFactory.php @@ -0,0 +1,224 @@ +getIoType() === AMQPConnectionConfig::IO_TYPE_SOCKET) { + return self::createSocketConnection($connection, $config); + } + + return self::createStreamConnection($connection, $config); + } + + protected static function createSocketConnection($connection, AMQPConnectionConfig $config): AMQPSocketConnection + { + self::assertSocketConnection($connection, $config); + + return new $connection( + $config->getHost(), + $config->getPort(), + $config->getUser(), + $config->getPassword(), + $config->getVhost(), + $config->isInsist(), + $config->getLoginMethod(), + $config->getLoginResponse(), + $config->getLocale(), + $config->getReadTimeout(), + $config->isKeepalive(), + $config->getWriteTimeout(), + $config->getHeartbeat(), + $config->getChannelRPCTimeout(), + $config + ); + } + + protected static function createStreamConnection($connection, AMQPConnectionConfig $config): AMQPStreamConnection + { + self::assertStreamConnection($connection); + + if ($config->isSecure()) { + self::assertSSLConnection($connection); + + return new $connection( + $config->getHost(), + $config->getPort(), + $config->getUser(), + $config->getPassword(), + $config->getVhost(), + self::getSslOptions($config), + [ + 'insist' => $config->isInsist(), + 'login_method' => $config->getLoginMethod(), + 'login_response' => $config->getLoginResponse(), + 'locale' => $config->getLocale(), + 'connection_timeout' => $config->getConnectionTimeout(), + 'read_write_timeout' => self::getReadWriteTimeout($config), + 'keepalive' => $config->isKeepalive(), + 'heartbeat' => $config->getHeartbeat(), + ], + $config->getNetworkProtocol(), + $config + ); + } + + return new $connection( + $config->getHost(), + $config->getPort(), + $config->getUser(), + $config->getPassword(), + $config->getVhost(), + $config->isInsist(), + $config->getLoginMethod(), + $config->getLoginResponse(), + $config->getLocale(), + $config->getConnectionTimeout(), + self::getReadWriteTimeout($config), + $config->getStreamContext(), + $config->isKeepalive(), + $config->getHeartbeat(), + $config->getChannelRPCTimeout(), + $config->getNetworkProtocol(), + $config + ); + } + + protected static function getReadWriteTimeout(AMQPConnectionConfig $config): float + { + return min($config->getReadTimeout(), $config->getWriteTimeout()); + } + + protected static function getSslOptions(AMQPConnectionConfig $config): array + { + return array_filter([ + 'cafile' => $config->getSslCaCert(), + 'capath' => $config->getSslCaPath(), + 'local_cert' => $config->getSslCert(), + 'local_pk' => $config->getSslKey(), + 'verify_peer' => $config->getSslVerify(), + 'verify_peer_name' => $config->getSslVerifyName(), + 'passphrase' => $config->getSslPassPhrase(), + 'ciphers' => $config->getSslCiphers(), + 'security_level' => $config->getSslSecurityLevel(), + ], static function ($value) { + return null !== $value; + }); + } + + protected static function assertConnectionFromConfig(string $connection): void + { + if ($connection !== self::CONNECTION_TYPE_DEFAULT && ! is_subclass_of($connection, self::CONNECTION_TYPE_EXTENDED)) { + throw new AMQPLogicException(sprintf('The config property \'%s\' must contain \'%s\' or must extend: %s', self::CONFIG_CONNECTION, self::CONNECTION_TYPE_DEFAULT, class_basename(self::CONNECTION_TYPE_EXTENDED))); + } + } + + protected static function assertSocketConnection($connection, AMQPConnectionConfig $config): void + { + self::assertExtendedOf($connection, self::CONNECTION_SUB_TYPE_SOCKET); + + if ($config->isSecure()) { + throw new AMQPLogicException('The socket connection implementation does not support secure connections.'); + } + } + + protected static function assertStreamConnection($connection): void + { + self::assertExtendedOf($connection, self::CONNECTION_SUB_TYPE_STREAM); + } + + protected static function assertSSLConnection($connection): void + { + self::assertExtendedOf($connection, self::CONNECTION_SUB_TYPE_SSL); + } + + protected static function assertExtendedOf($connection, string $abstract): void + { + if (! is_subclass_of($connection, $abstract)) { + throw new AMQPLogicException(sprintf('The connection must extend: %s', class_basename($abstract))); + } + } + + /** + * @return mixed + * + * @throws Exception + * + * @deprecated This is the fallback method, update your config asap. (example: connection => 'default') + */ + protected static function _createLazyConnection($connection, array $config): AbstractConnection + { + return $connection::create_connection( + Arr::shuffle(Arr::get($config, ConfigFactory::CONFIG_HOSTS, [])), + Arr::add(Arr::get($config, 'options', []), 'heartbeat', 0) + ); + } +} diff --git a/src/Queue/Connectors/RabbitMQConnector.php b/src/Queue/Connectors/RabbitMQConnector.php index 9fc8615f..a0f79e32 100644 --- a/src/Queue/Connectors/RabbitMQConnector.php +++ b/src/Queue/Connectors/RabbitMQConnector.php @@ -8,20 +8,15 @@ use Illuminate\Queue\Connectors\ConnectorInterface; use Illuminate\Queue\Events\JobFailed; use Illuminate\Queue\Events\WorkerStopping; -use Illuminate\Support\Arr; -use InvalidArgumentException; -use PhpAmqpLib\Connection\AbstractConnection; -use PhpAmqpLib\Connection\AMQPLazyConnection; use VladimirYuldashev\LaravelQueueRabbitMQ\Horizon\Listeners\RabbitMQFailedEvent; use VladimirYuldashev\LaravelQueueRabbitMQ\Horizon\RabbitMQQueue as HorizonRabbitMQQueue; +use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Connection\ConnectionFactory; +use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\QueueFactory; use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue; class RabbitMQConnector implements ConnectorInterface { - /** - * @var Dispatcher - */ - private $dispatcher; + protected Dispatcher $dispatcher; public function __construct(Dispatcher $dispatcher) { @@ -37,19 +32,9 @@ public function __construct(Dispatcher $dispatcher) */ public function connect(array $config): Queue { - $connection = $this->createConnection(Arr::except($config, 'options.queue')); + $connection = ConnectionFactory::make($config); - $queue = $this->createQueue( - Arr::get($config, 'worker', 'default'), - $connection, - $config['queue'], - Arr::get($config, 'after_commit', false), - Arr::get($config, 'options.queue', []) - ); - - if (! $queue instanceof RabbitMQQueue) { - throw new InvalidArgumentException('Invalid worker.'); - } + $queue = QueueFactory::make($config)->setConnection($connection); if ($queue instanceof HorizonRabbitMQQueue) { $this->dispatcher->listen(JobFailed::class, RabbitMQFailedEvent::class); @@ -61,66 +46,4 @@ public function connect(array $config): Queue return $queue; } - - /** - * @throws Exception - */ - protected function createConnection(array $config): AbstractConnection - { - /** @var AbstractConnection $connection */ - $connection = Arr::get($config, 'connection', AMQPLazyConnection::class); - - // disable heartbeat when not configured, so long-running tasks will not fail - $config = Arr::add($config, 'options.heartbeat', 0); - - return $connection::create_connection( - Arr::shuffle(Arr::get($config, 'hosts', [])), - $this->filter(Arr::get($config, 'options', [])) - ); - } - - /** - * Create a queue for the worker. - * - * @return HorizonRabbitMQQueue|RabbitMQQueue|Queue - */ - protected function createQueue( - string $worker, - AbstractConnection $connection, - string $queue, - bool $dispatchAfterCommit, - array $options = [] - ) { - switch ($worker) { - case 'default': - return new RabbitMQQueue($connection, $queue, $dispatchAfterCommit, $options); - case 'horizon': - return new HorizonRabbitMQQueue($connection, $queue, $dispatchAfterCommit, $options); - default: - return new $worker($connection, $queue, $options); - } - } - - /** - * Recursively filter only null values. - */ - private function filter(array $array): array - { - foreach ($array as $index => &$value) { - if (is_array($value)) { - $value = $this->filter($value); - - continue; - } - - // If the value is null then remove it. - if ($value === null) { - unset($array[$index]); - - continue; - } - } - - return $array; - } } diff --git a/src/Queue/QueueConfig.php b/src/Queue/QueueConfig.php new file mode 100644 index 00000000..e7ec27c4 --- /dev/null +++ b/src/Queue/QueueConfig.php @@ -0,0 +1,278 @@ +queue; + } + + public function setQueue(string $queue): QueueConfig + { + $this->queue = $queue; + + return $this; + } + + /** + * Returns &true; as indication that jobs should be dispatched after all database transactions + * have been committed. + */ + public function isDispatchAfterCommit(): bool + { + return $this->dispatchAfterCommit; + } + + public function setDispatchAfterCommit($dispatchAfterCommit): QueueConfig + { + $this->dispatchAfterCommit = $this->toBoolean($dispatchAfterCommit); + + return $this; + } + + /** + * Get the Job::class to use when processing messages + */ + public function getAbstractJob(): string + { + return $this->abstractJob; + } + + public function setAbstractJob(string $abstract): QueueConfig + { + $this->abstractJob = $abstract; + + return $this; + } + + /** + * Returns &true;, if delayed messages should be prioritized. + * + * RabbitMQ queues work with the FIFO method. So when there are 10000 messages in the queue and + * the delayed message is put back to the queue (at the end) for further processing the delayed message won´t + * process before all 10000 messages are processed. The same is true for requeueing. + * + * This may not what you desire. + * When you want the message to get processed immediately after the delayed time expires or when requeueing, we can + * use prioritization. + * + * @see[https://www.rabbitmq.com/queues.html#basics] + */ + public function isPrioritizeDelayed(): bool + { + return $this->prioritizeDelayed; + } + + public function setPrioritizeDelayed($prioritizeDelayed): QueueConfig + { + $this->prioritizeDelayed = $this->toBoolean($prioritizeDelayed); + + return $this; + } + + /** + * Returns a integer with a default of '2' for when using prioritization on delayed messages. + * If priority queues are desired, we recommend using between 1 and 10. + * Using more priority layers, will consume more CPU resources and would affect runtimes. + * + * @see https://www.rabbitmq.com/priority.html + */ + public function getQueueMaxPriority(): int + { + return $this->queueMaxPriority; + } + + public function setQueueMaxPriority($queueMaxPriority): QueueConfig + { + if (is_numeric($queueMaxPriority) && intval($queueMaxPriority) > 1) { + $this->queueMaxPriority = (int) $queueMaxPriority; + } + + return $this; + } + + /** + * Get the exchange name, or empty string; as default value. + * + * The default exchange is an unnamed pre-declared direct exchange. Usually, an empty string + * is frequently used to indicate it. If you choose default exchange, your message will be delivered + * to a queue with the same name as the routing key. + * With a routing key that is the same as the queue name, every queue is immediately tied to the default exchange. + */ + public function getExchange(): string + { + return $this->exchange; + } + + public function setExchange(string $exchange): QueueConfig + { + $this->exchange = $exchange; + + return $this; + } + + /** + * Get the exchange type + * + * There are four basic RabbitMQ exchange types in RabbitMQ, each of which uses different parameters + * and bindings to route messages in various ways, These are: 'direct', 'topic', 'fanout', 'headers' + * + * The default type is set as 'direct' + */ + public function getExchangeType(): string + { + return $this->exchangeType; + } + + public function setExchangeType(string $exchangeType): QueueConfig + { + $this->exchangeType = $exchangeType; + + return $this; + } + + /** + * Get the routing key when using an exchange other than the direct exchange. + * The routing key is a message attribute taken into account by the exchange when deciding how to route a message. + * + * The default routing-key is the given destination: '%s'. + */ + public function getExchangeRoutingKey(): string + { + return $this->exchangeRoutingKey; + } + + public function setExchangeRoutingKey(string $exchangeRoutingKey): QueueConfig + { + $this->exchangeRoutingKey = $exchangeRoutingKey; + + return $this; + } + + /** + * Returns &true;, if failed messages should be rerouted. + */ + public function isRerouteFailed(): bool + { + return $this->rerouteFailed; + } + + public function setRerouteFailed($rerouteFailed): QueueConfig + { + $this->rerouteFailed = $this->toBoolean($rerouteFailed); + + return $this; + } + + /** + * Get the exchange name with messages are published against. + * The default exchange is empty, so messages will be published directly to a queue. + */ + public function getFailedExchange(): string + { + return $this->failedExchange; + } + + public function setFailedExchange(string $failedExchange): QueueConfig + { + $this->failedExchange = $failedExchange; + + return $this; + } + + /** + * Get the substitution string for failed messages + * The default routing-key is the given destination substituted by '%s.failed'. + */ + public function getFailedRoutingKey(): string + { + return $this->failedRoutingKey; + } + + public function setFailedRoutingKey(string $failedRoutingKey): QueueConfig + { + $this->failedRoutingKey = $failedRoutingKey; + + return $this; + } + + /** + * Returns &true;, if queue is marked or set as quorum queue. + */ + public function isQuorum(): bool + { + return $this->quorum; + } + + public function setQuorum($quorum): QueueConfig + { + $this->quorum = $this->toBoolean($quorum); + + return $this; + } + + /** + * Holds all unknown queue options provided in the connection config + */ + public function getOptions(): array + { + return $this->options; + } + + public function setOptions(array $options): QueueConfig + { + $this->options = $options; + + return $this; + } + + /** + * Filters $value to boolean value + * + * Returns: &true; + * For values: 1, '1', true, 'true', 'yes' + * + * Returns: &false; + * For values: 0, '0', false, 'false', '', null, [] , 'ok', 'no', 'no not a bool', 'yes a bool' + */ + protected function toBoolean($value): bool + { + return filter_var($value, FILTER_VALIDATE_BOOLEAN); + } +} diff --git a/src/Queue/QueueConfigFactory.php b/src/Queue/QueueConfigFactory.php new file mode 100644 index 00000000..87fc2fac --- /dev/null +++ b/src/Queue/QueueConfigFactory.php @@ -0,0 +1,74 @@ +setQueue($queue); + } + if (! empty($afterCommit = Arr::get($config, 'after_commit'))) { + $queueConfig->setDispatchAfterCommit($afterCommit); + } + + self::getOptionsFromConfig($queueConfig, $config); + }); + } + + protected static function getOptionsFromConfig(QueueConfig $queueConfig, array $config): void + { + $queueOptions = Arr::get($config, self::CONFIG_OPTIONS.'.queue', []) ?: []; + + if ($job = Arr::pull($queueOptions, 'job')) { + $queueConfig->setAbstractJob($job); + } + + // Feature: Prioritize delayed messages. + if ($prioritizeDelayed = Arr::pull($queueOptions, 'prioritize_delayed')) { + $queueConfig->setPrioritizeDelayed($prioritizeDelayed); + } + if ($maxPriority = Arr::pull($queueOptions, 'queue_max_priority')) { + $queueConfig->setQueueMaxPriority($maxPriority); + } + + // Feature: Working with Exchange and routing-keys + if ($exchange = Arr::pull($queueOptions, 'exchange')) { + $queueConfig->setExchange($exchange); + } + if ($exchangeType = Arr::pull($queueOptions, 'exchange_type')) { + $queueConfig->setExchangeType($exchangeType); + } + if ($exchangeRoutingKey = Arr::pull($queueOptions, 'exchange_routing_key')) { + $queueConfig->setExchangeRoutingKey($exchangeRoutingKey); + } + + // Feature: Reroute failed messages + if ($rerouteFailed = Arr::pull($queueOptions, 'reroute_failed')) { + $queueConfig->setRerouteFailed($rerouteFailed); + } + if ($failedExchange = Arr::pull($queueOptions, 'failed_exchange')) { + $queueConfig->setFailedExchange($failedExchange); + } + if ($failedRoutingKey = Arr::pull($queueOptions, 'failed_routing_key')) { + $queueConfig->setFailedRoutingKey($failedRoutingKey); + } + + // Feature: Mark queue as quorum + if ($quorum = Arr::pull($queueOptions, 'quorum')) { + $queueConfig->setQuorum($quorum); + } + + // All extra options not defined + $queueConfig->setOptions($queueOptions); + } +} diff --git a/src/Queue/QueueFactory.php b/src/Queue/QueueFactory.php new file mode 100644 index 00000000..75bde1ed --- /dev/null +++ b/src/Queue/QueueFactory.php @@ -0,0 +1,25 @@ +connection = $connection; - $this->channel = $connection->channel(); - $this->default = $default; - $this->options = $options; - $this->dispatchAfterCommit = $dispatchAfterCommit; + public function __construct(QueueConfig $config) + { + $this->config = $config; + $this->dispatchAfterCommit = $config->isDispatchAfterCommit(); } /** @@ -109,7 +84,7 @@ public function size($queue = null): int } // create a temporary channel, so the main channel will not be closed on exception - $channel = $this->connection->channel(); + $channel = $this->createChannel(); [, $size] = $channel->queue_declare($queue, true); $channel->close(); @@ -139,7 +114,7 @@ function ($payload, $queue) { * * @throws AMQPProtocolChannelException */ - public function pushRaw($payload, $queue = null, array $options = []) + public function pushRaw($payload, $queue = null, array $options = []): int|string|null { [$destination, $exchange, $exchangeType, $attempts] = $this->publishProperties($queue, $options); @@ -147,7 +122,7 @@ public function pushRaw($payload, $queue = null, array $options = []) [$message, $correlationId] = $this->createMessage($payload, $attempts); - $this->channel->basic_publish($message, $exchange, $destination, true, false); + $this->publishBasic($message, $exchange, $destination, true); return $correlationId; } @@ -157,7 +132,7 @@ public function pushRaw($payload, $queue = null, array $options = []) * * @throws AMQPProtocolChannelException */ - public function later($delay, $job, $data = '', $queue = null) + public function later($delay, $job, $data = '', $queue = null): mixed { return $this->enqueueUsing( $job, @@ -171,13 +146,9 @@ function ($payload, $queue, $delay) { } /** - * @param null $queue - * @param int $attempts - * @return mixed - * * @throws AMQPProtocolChannelException */ - public function laterRaw($delay, $payload, $queue = null, $attempts = 0) + public function laterRaw($delay, string $payload, $queue = null, int $attempts = 0): int|string|null { $ttl = $this->secondsUntil($delay) * 1000; @@ -199,8 +170,8 @@ public function laterRaw($delay, $payload, $queue = null, $attempts = 0) [$message, $correlationId] = $this->createMessage($payload, $attempts); - // Publish directly on the delayQueue, no need to publish trough an exchange. - $this->channel->basic_publish($message, null, $destination, true, false); + // Publish directly on the delayQueue, no need to publish through an exchange. + $this->publishBasic($message, null, $destination, true); return $correlationId; } @@ -212,20 +183,25 @@ public function laterRaw($delay, $payload, $queue = null, $attempts = 0) */ public function bulk($jobs, $data = '', $queue = null): void { - foreach ((array) $jobs as $job) { + $this->publishBatch($jobs, $queue, $data); + } + + /** + * @throws AMQPProtocolChannelException + */ + protected function publishBatch($jobs, $data = '', $queue = null): void + { + foreach ($jobs as $job) { $this->bulkRaw($this->createPayload($job, $queue, $data), $queue, ['job' => $job]); } - $this->channel->publish_batch(); + $this->batchPublish(); } /** - * @param null $queue - * @return mixed - * * @throws AMQPProtocolChannelException */ - public function bulkRaw(string $payload, $queue = null, array $options = []) + public function bulkRaw(string $payload, string $queue = null, array $options = []): int|string|null { [$destination, $exchange, $exchangeType, $attempts] = $this->publishProperties($queue, $options); @@ -233,7 +209,7 @@ public function bulkRaw(string $payload, $queue = null, array $options = []) [$message, $correlationId] = $this->createMessage($payload, $attempts); - $this->channel->batch_basic_publish($message, $exchange, $destination); + $this->getChannel()->batch_basic_publish($message, $exchange, $destination); return $correlationId; } @@ -251,7 +227,7 @@ public function pop($queue = null) $job = $this->getJobClass(); /** @var AMQPMessage|null $message */ - if ($message = $this->channel->basic_get($queue)) { + if ($message = $this->getChannel()->basic_get($queue)) { return $this->currentJob = new $job( $this->container, $this, @@ -261,12 +237,12 @@ public function pop($queue = null) ); } } catch (AMQPProtocolChannelException $exception) { - // If there is not exchange or queue AMQP will throw exception with code 404 + // If there is no exchange or queue AMQP will throw exception with code 404 // We need to catch it and return null if ($exception->amqp_reply_code === 404) { // Because of the channel exception the channel was closed and removed. // We have to open a new channel. Because else the worker(s) are stuck in a loop, without processing. - $this->channel = $this->connection->channel(); + $this->getChannel(true); return null; } @@ -287,14 +263,23 @@ public function pop($queue = null) return null; } + /** + * @throws RuntimeException + */ public function getConnection(): AbstractConnection { + if (! $this->connection) { + throw new RuntimeException('Queue has no AMQPConnection set.'); + } + return $this->connection; } - public function getChannel(): AMQPChannel + public function setConnection(AbstractConnection $connection): RabbitMQQueue { - return $this->channel; + $this->connection = $connection; + + return $this; } /** @@ -305,7 +290,7 @@ public function getChannel(): AMQPChannel */ public function getJobClass(): string { - $job = Arr::get($this->options, 'job', RabbitMQJob::class); + $job = $this->getConfig()->getAbstractJob(); throw_if( ! is_a($job, RabbitMQJob::class, true), @@ -323,12 +308,12 @@ public function getJobClass(): string */ public function getQueue($queue = null): string { - return $queue ?: $this->default; + return $queue ?: $this->getConfig()->getQueue(); } /** * Checks if the given exchange already present/defined in RabbitMQ. - * Returns false when when the exchange is missing. + * Returns false when the exchange is missing. * * * @throws AMQPProtocolChannelException @@ -341,7 +326,7 @@ public function isExchangeExists(string $exchange): bool try { // create a temporary channel, so the main channel will not be closed on exception - $channel = $this->connection->channel(); + $channel = $this->createChannel(); $channel->exchange_declare($exchange, '', true); $channel->close(); @@ -358,7 +343,7 @@ public function isExchangeExists(string $exchange): bool } /** - * Declare a exchange in rabbitMQ, when not already declared. + * Declare an exchange in rabbitMQ, when not already declared. */ public function declareExchange( string $name, @@ -371,7 +356,7 @@ public function declareExchange( return; } - $this->channel->exchange_declare( + $this->getChannel()->exchange_declare( $name, $type, false, @@ -384,7 +369,7 @@ public function declareExchange( } /** - * Delete a exchange from rabbitMQ, only when present in RabbitMQ. + * Delete an exchange from rabbitMQ, only when present in RabbitMQ. * * * @throws AMQPProtocolChannelException @@ -398,7 +383,7 @@ public function deleteExchange(string $name, bool $unused = false): void $idx = array_search($name, $this->exchanges); unset($this->exchanges[$idx]); - $this->channel->exchange_delete( + $this->getChannel()->exchange_delete( $name, $unused ); @@ -406,19 +391,27 @@ public function deleteExchange(string $name, bool $unused = false): void /** * Checks if the given queue already present/defined in RabbitMQ. - * Returns false when when the queue is missing. + * Returns false when the queue is missing. * * * @throws AMQPProtocolChannelException */ public function isQueueExists(string $name = null): bool { + $queueName = $this->getQueue($name); + + if ($this->isQueueDeclared($queueName)) { + return true; + } + try { // create a temporary channel, so the main channel will not be closed on exception - $channel = $this->connection->channel(); - $channel->queue_declare($this->getQueue($name), true); + $channel = $this->createChannel(); + $channel->queue_declare($queueName, true); $channel->close(); + $this->queues[] = $queueName; + return true; } catch (AMQPProtocolChannelException $exception) { if ($exception->amqp_reply_code === 404) { @@ -442,7 +435,7 @@ public function declareQueue( return; } - $this->channel->queue_declare( + $this->getChannel()->queue_declare( $name, false, $durable, @@ -465,7 +458,10 @@ public function deleteQueue(string $name, bool $if_unused = false, bool $if_empt return; } - $this->channel->queue_delete($name, $if_unused, $if_empty); + $idx = array_search($name, $this->queues); + unset($this->queues[$idx]); + + $this->getChannel()->queue_delete($name, $if_unused, $if_empty); } /** @@ -481,7 +477,7 @@ public function bindQueue(string $queue, string $exchange, string $routingKey = return; } - $this->channel->queue_bind($queue, $exchange, $routingKey); + $this->getChannel()->queue_bind($queue, $exchange, $routingKey); } /** @@ -490,7 +486,7 @@ public function bindQueue(string $queue, string $exchange, string $routingKey = public function purge(string $queue = null): void { // create a temporary channel, so the main channel will not be closed on exception - $channel = $this->connection->channel(); + $channel = $this->createChannel(); $channel->queue_purge($this->getQueue($queue)); $channel->close(); } @@ -500,7 +496,7 @@ public function purge(string $queue = null): void */ public function ack(RabbitMQJob $job): void { - $this->channel->basic_ack($job->getRabbitMQMessage()->getDeliveryTag()); + $this->getChannel()->basic_ack($job->getRabbitMQMessage()->getDeliveryTag()); } /** @@ -508,14 +504,11 @@ public function ack(RabbitMQJob $job): void */ public function reject(RabbitMQJob $job, bool $requeue = false): void { - $this->channel->basic_reject($job->getRabbitMQMessage()->getDeliveryTag(), $requeue); + $this->getChannel()->basic_reject($job->getRabbitMQMessage()->getDeliveryTag(), $requeue); } /** * Create a AMQP message. - * - * - * @throws JsonException */ protected function createMessage($payload, int $attempts = 0): array { @@ -524,12 +517,12 @@ protected function createMessage($payload, int $attempts = 0): array 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, ]; - $currentPayload = json_decode($payload, true, 512); + $currentPayload = json_decode($payload, true); if ($correlationId = $currentPayload['id'] ?? null) { $properties['correlation_id'] = $correlationId; } - if ($this->isPrioritizeDelayed()) { + if ($this->getConfig()->isPrioritizeDelayed()) { $properties['priority'] = $attempts; } @@ -584,13 +577,13 @@ protected function getRandomId(): string */ public function close(): void { - if ($this->currentJob && ! $this->currentJob->isDeletedOrReleased()) { + if (! $this->currentJob->isDeletedOrReleased()) { $this->reject($this->currentJob, true); } try { - $this->connection->close(); - } catch (ErrorException $exception) { + $this->getConnection()->close(); + } catch (ErrorException) { // Ignore the exception } } @@ -606,16 +599,16 @@ protected function getQueueArguments(string $destination): array // Messages with a priority which is higher than the queue's maximum, are treated as if they were // published with the maximum priority. // Quorum queues does not support priority. - if ($this->isPrioritizeDelayed() && ! $this->isQuorum()) { - $arguments['x-max-priority'] = $this->getQueueMaxPriority(); + if ($this->getConfig()->isPrioritizeDelayed() && ! $this->getConfig()->isQuorum()) { + $arguments['x-max-priority'] = $this->getConfig()->getQueueMaxPriority(); } - if ($this->isRerouteFailed()) { - $arguments['x-dead-letter-exchange'] = $this->getFailedExchange() ?? ''; + if ($this->getConfig()->isRerouteFailed()) { + $arguments['x-dead-letter-exchange'] = $this->getFailedExchange(); $arguments['x-dead-letter-routing-key'] = $this->getFailedRoutingKey($destination); } - if ($this->isQuorum()) { + if ($this->getConfig()->isQuorum()) { $arguments['x-queue-type'] = 'quorum'; } @@ -628,7 +621,7 @@ protected function getQueueArguments(string $destination): array protected function getDelayQueueArguments(string $destination, int $ttl): array { return [ - 'x-dead-letter-exchange' => $this->getExchange() ?? '', + 'x-dead-letter-exchange' => $this->getExchange(), 'x-dead-letter-routing-key' => $this->getRoutingKey($destination), 'x-message-ttl' => $ttl, 'x-expires' => $ttl * 2, @@ -636,31 +629,11 @@ protected function getDelayQueueArguments(string $destination, int $ttl): array } /** - * Returns &true;, if delayed messages should be prioritized. - */ - protected function isPrioritizeDelayed(): bool - { - return (bool) (Arr::get($this->options, 'prioritize_delayed') ?: false); - } - - /** - * Returns a integer with a default of '2' for when using prioritization on delayed messages. - * If priority queues are desired, we recommend using between 1 and 10. - * Using more priority layers, will consume more CPU resources and would affect runtimes. - * - * @see https://www.rabbitmq.com/priority.html - */ - protected function getQueueMaxPriority(): int - { - return (int) (Arr::get($this->options, 'queue_max_priority') ?: 2); - } - - /** - * Get the exchange name, or &null; as default value. + * Get the exchange name, or empty string; as default value. */ - protected function getExchange(string $exchange = null): ?string + protected function getExchange(?string $exchange = null): string { - return $exchange ?: Arr::get($this->options, 'exchange') ?: null; + return $exchange ?? $this->getConfig()->getExchange(); } /** @@ -669,7 +642,7 @@ protected function getExchange(string $exchange = null): ?string */ protected function getRoutingKey(string $destination): string { - return ltrim(sprintf(Arr::get($this->options, 'exchange_routing_key') ?: '%s', $destination), '.'); + return ltrim(sprintf($this->getConfig()->getExchangeRoutingKey(), $destination), '.'); } /** @@ -677,34 +650,17 @@ protected function getRoutingKey(string $destination): string */ protected function getExchangeType(?string $type = null): string { - return @constant(AMQPExchangeType::class.'::'.Str::upper($type ?: Arr::get( - $this->options, - 'exchange_type' - ) ?: 'direct')) ?: AMQPExchangeType::DIRECT; - } + $constant = AMQPExchangeType::class.'::'.Str::upper($type ?: $this->getConfig()->getExchangeType()); - /** - * Returns &true;, if failed messages should be rerouted. - */ - protected function isRerouteFailed(): bool - { - return (bool) (Arr::get($this->options, 'reroute_failed') ?: false); - } - - /** - * Returns &true;, if declared queue must be quorum queue. - */ - protected function isQuorum(): bool - { - return (bool) (Arr::get($this->options, 'quorum') ?: false); + return defined($constant) ? constant($constant) : AMQPExchangeType::DIRECT; } /** * Get the exchange for failed messages. */ - protected function getFailedExchange(string $exchange = null): ?string + protected function getFailedExchange(?string $exchange = null): string { - return $exchange ?: Arr::get($this->options, 'failed_exchange') ?: null; + return $exchange ?? $this->getConfig()->getFailedExchange(); } /** @@ -713,7 +669,7 @@ protected function getFailedExchange(string $exchange = null): ?string */ protected function getFailedRoutingKey(string $destination): string { - return ltrim(sprintf(Arr::get($this->options, 'failed_routing_key') ?: '%s.failed', $destination), '.'); + return ltrim(sprintf($this->getConfig()->getFailedRoutingKey(), $destination), '.'); } /** @@ -735,21 +691,16 @@ protected function isQueueDeclared(string $name): bool /** * Declare the destination when necessary. * - * @param string|null $exchangeType - * * @throws AMQPProtocolChannelException */ - protected function declareDestination( - string $destination, - ?string $exchange = null, - string $exchangeType = AMQPExchangeType::DIRECT - ): void { - // When a exchange is provided and no exchange is present in RabbitMQ, create an exchange. + protected function declareDestination(string $destination, ?string $exchange = null, string $exchangeType = AMQPExchangeType::DIRECT): void + { + // When an exchange is provided and no exchange is present in RabbitMQ, create an exchange. if ($exchange && ! $this->isExchangeExists($exchange)) { $this->declareExchange($exchange, $exchangeType); } - // When a exchange is provided, just return. + // When an exchange is provided, just return. if ($exchange) { return; } @@ -777,4 +728,44 @@ protected function publishProperties($queue, array $options = []): array return [$destination, $exchange, $exchangeType, $attempts]; } + + protected function getConfig(): QueueConfig + { + return $this->config; + } + + protected function publishBasic($msg, $exchange = '', $destination = '', $mandatory = false, $immediate = false, $ticket = null): void + { + $this->getChannel()->basic_publish($msg, $exchange, $destination, $mandatory, $immediate, $ticket); + } + + protected function batchPublish(): void + { + $this->getChannel()->publish_batch(); + } + + public function getChannel($forceNew = false): AMQPChannel + { + if (! $this->channel || $forceNew) { + $this->channel = $this->createChannel(); + } + + return $this->channel; + } + + protected function createChannel(): AMQPChannel + { + return $this->getConnection()->channel(); + } + + /** + * @throws Exception + */ + protected function reconnect(): void + { + // Reconnects using the original connection settings. + $this->getConnection()->reconnect(); + // Create a new main channel because all old channels are removed. + $this->getChannel(true); + } } diff --git a/tests/Feature/ConnectorTest.php b/tests/Feature/ConnectorTest.php index 499018e9..3ecede98 100644 --- a/tests/Feature/ConnectorTest.php +++ b/tests/Feature/ConnectorTest.php @@ -5,6 +5,7 @@ use Illuminate\Queue\QueueManager; use PhpAmqpLib\Connection\AMQPLazyConnection; use PhpAmqpLib\Connection\AMQPSSLConnection; +use PhpAmqpLib\Connection\AMQPStreamConnection; use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue; class ConnectorTest extends \VladimirYuldashev\LaravelQueueRabbitMQ\Tests\TestCase @@ -47,8 +48,52 @@ public function testLazyConnection(): void $this->assertInstanceOf(RabbitMQQueue::class, $connection); $this->assertInstanceOf(AMQPLazyConnection::class, $connection->getConnection()); + $this->assertFalse($connection->getConnection()->isConnected()); + $this->assertTrue($connection->getChannel()->is_open()); $this->assertTrue($connection->getConnection()->isConnected()); + } + + public function testLazyStreamConnection(): void + { + $this->app['config']->set('queue.connections.rabbitmq', [ + 'driver' => 'rabbitmq', + 'queue' => env('RABBITMQ_QUEUE', 'default'), + 'connection' => 'default', + + 'hosts' => [ + [ + 'host' => getenv('HOST'), + 'port' => getenv('PORT'), + 'user' => 'guest', + 'password' => 'guest', + 'vhost' => '/', + ], + ], + + 'options' => [ + 'ssl_options' => [ + 'cafile' => env('RABBITMQ_SSL_CAFILE', null), + 'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null), + 'local_key' => env('RABBITMQ_SSL_LOCALKEY', null), + 'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true), + 'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null), + ], + ], + + 'worker' => env('RABBITMQ_WORKER', 'default'), + ]); + + /** @var QueueManager $queue */ + $queue = $this->app['queue']; + + /** @var RabbitMQQueue $connection */ + $connection = $queue->connection('rabbitmq'); + + $this->assertInstanceOf(RabbitMQQueue::class, $connection); + $this->assertInstanceOf(AMQPStreamConnection::class, $connection->getConnection()); + $this->assertFalse($connection->getConnection()->isConnected()); $this->assertTrue($connection->getChannel()->is_open()); + $this->assertTrue($connection->getConnection()->isConnected()); } public function testSslConnection(): void diff --git a/tests/Feature/QueueTest.php b/tests/Feature/QueueTest.php index 15b8acbb..5ecfb978 100644 --- a/tests/Feature/QueueTest.php +++ b/tests/Feature/QueueTest.php @@ -2,12 +2,42 @@ namespace VladimirYuldashev\LaravelQueueRabbitMQ\Tests\Feature; -use PhpAmqpLib\Connection\AMQPLazyConnection; +use PhpAmqpLib\Connection\AMQPStreamConnection; +use PhpAmqpLib\Exception\AMQPChannelClosedException; +use PhpAmqpLib\Exception\AMQPConnectionClosedException; +use PhpAmqpLib\Exception\AMQPProtocolChannelException; +use VladimirYuldashev\LaravelQueueRabbitMQ\Tests\Mocks\TestJob; class QueueTest extends TestCase { + public function setUp(): void + { + parent::setUp(); + + $this->withoutExceptionHandling([ + AMQPChannelClosedException::class, AMQPConnectionClosedException::class, + AMQPProtocolChannelException::class, + ]); + } + public function testConnection(): void { - $this->assertInstanceOf(AMQPLazyConnection::class, $this->connection()->getChannel()->getConnection()); + $this->assertInstanceOf(AMQPStreamConnection::class, $this->connection()->getChannel()->getConnection()); + } + + public function testWithoutReconnect(): void + { + $queue = $this->connection('rabbitmq'); + + $queue->push(new TestJob()); + sleep(1); + $this->assertSame(1, $queue->size()); + + // close connection + $queue->getConnection()->close(); + $this->assertFalse($queue->getConnection()->isConnected()); + + $this->expectException(AMQPChannelClosedException::class); + $queue->push(new TestJob()); } } diff --git a/tests/Functional/RabbitMQQueueTest.php b/tests/Functional/RabbitMQQueueTest.php index 1c03fe99..7a106a08 100644 --- a/tests/Functional/RabbitMQQueueTest.php +++ b/tests/Functional/RabbitMQQueueTest.php @@ -11,7 +11,6 @@ class RabbitMQQueueTest extends BaseTestCase { public function testConnection(): void { - /** @var $queue RabbitMQQueue */ $queue = $this->connection(); $this->assertInstanceOf(RabbitMQQueue::class, $queue); @@ -22,51 +21,57 @@ public function testConnection(): void $this->assertInstanceOf(RabbitMQQueue::class, $queue); } - public function testRerouteFailed(): void + public function testConfigRerouteFailed(): void { - /** @var $queue RabbitMQQueue */ $queue = $this->connection(); - $this->assertFalse($this->callMethod($queue, 'isRerouteFailed')); + $this->assertFalse($this->callProperty($queue, 'config')->isRerouteFailed()); $queue = $this->connection('rabbitmq-with-options'); - $this->assertTrue($this->callMethod($queue, 'isRerouteFailed')); + $this->assertTrue($this->callProperty($queue, 'config')->isRerouteFailed()); $queue = $this->connection('rabbitmq-with-options-empty'); - $this->assertFalse($this->callMethod($queue, 'isRerouteFailed')); + $this->assertFalse($this->callProperty($queue, 'config')->isRerouteFailed()); + + $queue = $this->connection('rabbitmq-with-options-null'); + $this->assertFalse($this->callProperty($queue, 'config')->isRerouteFailed()); } - public function testPrioritizeDelayed(): void + public function testConfigPrioritizeDelayed(): void { - /** @var $queue RabbitMQQueue */ $queue = $this->connection(); - $this->assertFalse($this->callMethod($queue, 'isPrioritizeDelayed')); + $this->assertFalse($this->callProperty($queue, 'config')->isPrioritizeDelayed()); $queue = $this->connection('rabbitmq-with-options'); - $this->assertTrue($this->callMethod($queue, 'isPrioritizeDelayed')); + $this->assertTrue($this->callProperty($queue, 'config')->isPrioritizeDelayed()); $queue = $this->connection('rabbitmq-with-options-empty'); - $this->assertFalse($this->callMethod($queue, 'isPrioritizeDelayed')); + $this->assertFalse($this->callProperty($queue, 'config')->isPrioritizeDelayed()); + + $queue = $this->connection('rabbitmq-with-options-null'); + $this->assertFalse($this->callProperty($queue, 'config')->isPrioritizeDelayed()); } public function testQueueMaxPriority(): void { - /** @var $queue RabbitMQQueue */ $queue = $this->connection(); - $this->assertIsInt($this->callMethod($queue, 'getQueueMaxPriority')); - $this->assertSame(2, $this->callMethod($queue, 'getQueueMaxPriority')); + $this->assertIsInt($this->callProperty($queue, 'config')->getQueueMaxPriority()); + $this->assertSame(2, $this->callProperty($queue, 'config')->getQueueMaxPriority()); $queue = $this->connection('rabbitmq-with-options'); - $this->assertIsInt($this->callMethod($queue, 'getQueueMaxPriority')); - $this->assertSame(20, $this->callMethod($queue, 'getQueueMaxPriority')); + $this->assertIsInt($this->callProperty($queue, 'config')->getQueueMaxPriority()); + $this->assertSame(20, $this->callProperty($queue, 'config')->getQueueMaxPriority()); $queue = $this->connection('rabbitmq-with-options-empty'); - $this->assertIsInt($this->callMethod($queue, 'getQueueMaxPriority')); - $this->assertSame(2, $this->callMethod($queue, 'getQueueMaxPriority')); + $this->assertIsInt($this->callProperty($queue, 'config')->getQueueMaxPriority()); + $this->assertSame(2, $this->callProperty($queue, 'config')->getQueueMaxPriority()); + + $queue = $this->connection('rabbitmq-with-options-null'); + $this->assertIsInt($this->callProperty($queue, 'config')->getQueueMaxPriority()); + $this->assertSame(2, $this->callProperty($queue, 'config')->getQueueMaxPriority()); } - public function testExchangeType(): void + public function testConfigExchangeType(): void { - /** @var $queue RabbitMQQueue */ $queue = $this->connection(); $this->assertSame(AMQPExchangeType::DIRECT, $this->callMethod($queue, 'getExchangeType')); $this->assertSame(AMQPExchangeType::DIRECT, $this->callMethod($queue, 'getExchangeType', [''])); @@ -74,48 +79,78 @@ public function testExchangeType(): void $queue = $this->connection('rabbitmq-with-options'); $this->assertSame(AMQPExchangeType::TOPIC, $this->callMethod($queue, 'getExchangeType')); + $this->assertSame(AMQPExchangeType::DIRECT, $this->callMethod($queue, 'getExchangeType', ['direct'])); $queue = $this->connection('rabbitmq-with-options-empty'); $this->assertSame(AMQPExchangeType::DIRECT, $this->callMethod($queue, 'getExchangeType')); + + $queue = $this->connection('rabbitmq-with-options-null'); + $this->assertSame(AMQPExchangeType::DIRECT, $this->callMethod($queue, 'getExchangeType')); + + //testing an unkown type with a default + $this->callProperty($queue, 'config')->setExchangeType('unknown'); + $this->assertSame(AMQPExchangeType::DIRECT, $this->callMethod($queue, 'getExchangeType')); } public function testExchange(): void { - /** @var $queue RabbitMQQueue */ $queue = $this->connection(); $this->assertSame('test', $this->callMethod($queue, 'getExchange', ['test'])); - $this->assertNull($this->callMethod($queue, 'getExchange', [''])); - $this->assertNull($this->callMethod($queue, 'getExchange')); + $this->assertSame('', $this->callMethod($queue, 'getExchange', [''])); + $this->assertSame('', $this->callMethod($queue, 'getExchange', [null])); + $this->assertSame('', $this->callMethod($queue, 'getExchange')); $queue = $this->connection('rabbitmq-with-options'); - $this->assertNotNull($this->callMethod($queue, 'getExchange')); $this->assertSame('application-x', $this->callMethod($queue, 'getExchange')); + $this->assertSame('application-x', $this->callMethod($queue, 'getExchange', [null])); + $this->assertSame('test', $this->callMethod($queue, 'getExchange', ['test'])); + $this->assertSame('', $this->callMethod($queue, 'getExchange', [''])); $queue = $this->connection('rabbitmq-with-options-empty'); - $this->assertNull($this->callMethod($queue, 'getExchange')); + $this->assertSame('', $this->callMethod($queue, 'getExchange')); + $this->assertSame('', $this->callMethod($queue, 'getExchange', [null])); + $this->assertSame('test', $this->callMethod($queue, 'getExchange', ['test'])); + $this->assertSame('', $this->callMethod($queue, 'getExchange', [''])); + + $queue = $this->connection('rabbitmq-with-options-null'); + $this->assertSame('', $this->callMethod($queue, 'getExchange')); + $this->assertSame('', $this->callMethod($queue, 'getExchange', [null])); + $this->assertSame('test', $this->callMethod($queue, 'getExchange', ['test'])); + $this->assertSame('', $this->callMethod($queue, 'getExchange', [''])); } public function testFailedExchange(): void { - /** @var $queue RabbitMQQueue */ $queue = $this->connection(); $this->assertSame('test', $this->callMethod($queue, 'getFailedExchange', ['test'])); - $this->assertNull($this->callMethod($queue, 'getExchange', [''])); - $this->assertNull($this->callMethod($queue, 'getFailedExchange')); + $this->assertSame('', $this->callMethod($queue, 'getFailedExchange', [''])); + $this->assertSame('', $this->callMethod($queue, 'getFailedExchange', [null])); + $this->assertSame('', $this->callMethod($queue, 'getFailedExchange')); $queue = $this->connection('rabbitmq-with-options'); - $this->assertNotNull($this->callMethod($queue, 'getFailedExchange')); $this->assertSame('failed-exchange', $this->callMethod($queue, 'getFailedExchange')); + $this->assertSame('failed-exchange', $this->callMethod($queue, 'getFailedExchange', [null])); + $this->assertSame('test', $this->callMethod($queue, 'getFailedExchange', ['test'])); + $this->assertSame('', $this->callMethod($queue, 'getFailedExchange', [''])); $queue = $this->connection('rabbitmq-with-options-empty'); - $this->assertNull($this->callMethod($queue, 'getFailedExchange')); + $this->assertSame('', $this->callMethod($queue, 'getFailedExchange')); + $this->assertSame('', $this->callMethod($queue, 'getFailedExchange', [null])); + $this->assertSame('test', $this->callMethod($queue, 'getFailedExchange', ['test'])); + $this->assertSame('', $this->callMethod($queue, 'getFailedExchange', [''])); + + $queue = $this->connection('rabbitmq-with-options-null'); + $this->assertSame('', $this->callMethod($queue, 'getFailedExchange')); + $this->assertSame('', $this->callMethod($queue, 'getFailedExchange', [null])); + $this->assertSame('test', $this->callMethod($queue, 'getFailedExchange', ['test'])); + $this->assertSame('', $this->callMethod($queue, 'getFailedExchange', [''])); } public function testRoutingKey(): void { - /** @var $queue RabbitMQQueue */ $queue = $this->connection(); $this->assertSame('test', $this->callMethod($queue, 'getRoutingKey', ['test'])); + $this->assertSame('test', $this->callMethod($queue, 'getRoutingKey', ['.test'])); $this->assertSame('', $this->callMethod($queue, 'getRoutingKey', [''])); $queue = $this->connection('rabbitmq-with-options'); @@ -123,13 +158,18 @@ public function testRoutingKey(): void $queue = $this->connection('rabbitmq-with-options-empty'); $this->assertSame('test', $this->callMethod($queue, 'getRoutingKey', ['test'])); + + $queue = $this->connection('rabbitmq-with-options-null'); + $this->assertSame('test', $this->callMethod($queue, 'getRoutingKey', ['test'])); + $this->callProperty($queue, 'config')->setExchangeRoutingKey('.an.alternate.routing-key'); + $this->assertSame('an.alternate.routing-key', $this->callMethod($queue, 'getRoutingKey', ['test'])); } public function testFailedRoutingKey(): void { - /** @var $queue RabbitMQQueue */ $queue = $this->connection(); $this->assertSame('test.failed', $this->callMethod($queue, 'getFailedRoutingKey', ['test'])); + $this->assertSame('test.failed', $this->callMethod($queue, 'getFailedRoutingKey', ['.test'])); $this->assertSame('failed', $this->callMethod($queue, 'getFailedRoutingKey', [''])); $queue = $this->connection('rabbitmq-with-options'); @@ -137,27 +177,33 @@ public function testFailedRoutingKey(): void $queue = $this->connection('rabbitmq-with-options-empty'); $this->assertSame('test.failed', $this->callMethod($queue, 'getFailedRoutingKey', ['test'])); + + $queue = $this->connection('rabbitmq-with-options-null'); + $this->assertSame('test.failed', $this->callMethod($queue, 'getFailedRoutingKey', ['test'])); + $this->callProperty($queue, 'config')->setFailedRoutingKey('.an.alternate.routing-key'); + $this->assertSame('an.alternate.routing-key', $this->callMethod($queue, 'getFailedRoutingKey', ['test'])); } - public function testQuorum(): void + public function testConfigQuorum(): void { - /** @var $queue RabbitMQQueue */ $queue = $this->connection(); - $this->assertFalse($this->callMethod($queue, 'isQuorum')); + $this->assertFalse($this->callProperty($queue, 'config')->isQuorum()); $queue = $this->connection('rabbitmq-with-options'); - $this->assertFalse($this->callMethod($queue, 'isQuorum')); - - $queue = $this->connection('rabbitmq-with-quorum-options'); - $this->assertTrue($this->callMethod($queue, 'isQuorum')); + $this->assertFalse($this->callProperty($queue, 'config')->isQuorum()); $queue = $this->connection('rabbitmq-with-options-empty'); - $this->assertFalse($this->callMethod($queue, 'isQuorum')); + $this->assertFalse($this->callProperty($queue, 'config')->isQuorum()); + + $queue = $this->connection('rabbitmq-with-options-null'); + $this->assertFalse($this->callProperty($queue, 'config')->isQuorum()); + + $queue = $this->connection('rabbitmq-with-quorum-options'); + $this->assertTrue($this->callProperty($queue, 'config')->isQuorum()); } public function testDeclareDeleteExchange(): void { - /** @var $queue RabbitMQQueue */ $queue = $this->connection(); $name = Str::random(); @@ -173,7 +219,6 @@ public function testDeclareDeleteExchange(): void public function testDeclareDeleteQueue(): void { - /** @var $queue RabbitMQQueue */ $queue = $this->connection(); $name = Str::random(); @@ -191,7 +236,6 @@ public function testQueueArguments(): void { $name = Str::random(); - /** @var $queue RabbitMQQueue */ $queue = $this->connection(); $actual = $this->callMethod($queue, 'getQueueArguments', [$name]); $expected = []; @@ -233,7 +277,6 @@ public function testDelayQueueArguments(): void $name = Str::random(); $ttl = 12000; - /** @var $queue RabbitMQQueue */ $queue = $this->connection(); $actual = $this->callMethod($queue, 'getDelayQueueArguments', [$name, $ttl]); $expected = [ diff --git a/tests/Functional/TestCase.php b/tests/Functional/TestCase.php index 01b3c069..8b843561 100644 --- a/tests/Functional/TestCase.php +++ b/tests/Functional/TestCase.php @@ -3,7 +3,7 @@ namespace VladimirYuldashev\LaravelQueueRabbitMQ\Tests\Functional; use Exception; -use PhpAmqpLib\Connection\AMQPLazyConnection; +use PhpAmqpLib\Channel\AMQPChannel; use ReflectionClass; use ReflectionException; use VladimirYuldashev\LaravelQueueRabbitMQ\Tests\TestCase as BaseTestCase; @@ -16,7 +16,7 @@ protected function getEnvironmentSetUp($app): void $app['config']->set('queue.connections.rabbitmq', [ 'driver' => 'rabbitmq', 'queue' => 'order', - 'connection' => AMQPLazyConnection::class, + 'connection' => 'default', 'hosts' => [ [ @@ -44,7 +44,7 @@ protected function getEnvironmentSetUp($app): void $app['config']->set('queue.connections.rabbitmq-with-options', [ 'driver' => 'rabbitmq', 'queue' => 'order', - 'connection' => AMQPLazyConnection::class, + 'connection' => 'default', 'hosts' => [ [ @@ -83,7 +83,7 @@ protected function getEnvironmentSetUp($app): void $app['config']->set('queue.connections.rabbitmq-with-options-empty', [ 'driver' => 'rabbitmq', 'queue' => 'order', - 'connection' => AMQPLazyConnection::class, + 'connection' => 'default', 'hosts' => [ [ @@ -119,11 +119,51 @@ protected function getEnvironmentSetUp($app): void 'worker' => 'default', + ]); + $app['config']->set('queue.connections.rabbitmq-with-options-null', [ + 'driver' => 'rabbitmq', + 'queue' => 'order', + 'connection' => 'default', + + 'hosts' => [ + [ + 'host' => null, + 'port' => null, + 'vhost' => null, + 'user' => null, + 'password' => null, + ], + ], + + 'options' => [ + 'ssl_options' => [ + 'cafile' => null, + 'local_cert' => null, + 'local_key' => null, + 'verify_peer' => null, + 'passphrase' => null, + ], + + 'queue' => [ + 'prioritize_delayed' => null, + 'queue_max_priority' => null, + 'exchange' => null, + 'exchange_type' => null, + 'exchange_routing_key' => null, + 'reroute_failed' => null, + 'failed_exchange' => null, + 'failed_routing_key' => null, + 'quorum' => null, + ], + ], + + 'worker' => 'default', + ]); $app['config']->set('queue.connections.rabbitmq-with-quorum-options', [ 'driver' => 'rabbitmq', 'queue' => 'order', - 'connection' => AMQPLazyConnection::class, + 'connection' => 'default', 'hosts' => [ [ @@ -161,11 +201,9 @@ protected function getEnvironmentSetUp($app): void } /** - * @return mixed - * * @throws Exception */ - protected function callMethod($object, string $method, array $parameters = []) + protected function callMethod($object, string $method, array $parameters = []): mixed { try { $className = get_class($object); @@ -179,4 +217,54 @@ protected function callMethod($object, string $method, array $parameters = []) return $method->invokeArgs($object, $parameters); } + + /** + * @throws Exception + */ + protected function callProperty($object, string $property): mixed + { + try { + $className = get_class($object); + $reflection = new ReflectionClass($className); + } catch (ReflectionException $e) { + throw new Exception($e->getMessage()); + } + + $property = $reflection->getProperty($property); + $property->setAccessible(true); + + return $property->getValue($object); + } + + public function testConnectChannel(): void + { + $queue = $this->connection(); + $this->assertFalse($queue->getConnection()->isConnected()); + + /** @var AMQPChannel $channel */ + $channel = $this->callMethod($queue, 'getChannel'); + $this->assertTrue($queue->getConnection()->isConnected()); + $this->assertSame($channel, $this->callProperty($queue, 'channel')); + $this->assertTrue($channel->is_open()); + } + + public function testReconnect(): void + { + $queue = $this->connection(); + $this->assertFalse($queue->getConnection()->isConnected()); + + // connect + $channel = $this->callMethod($queue, 'getChannel'); + $this->assertTrue($queue->getConnection()->isConnected()); + $this->assertSame($channel, $this->callProperty($queue, 'channel')); + + // close + $queue->getConnection()->close(); + $this->assertFalse($queue->getConnection()->isConnected()); + + // reconnect + $this->callMethod($queue, 'reconnect'); + $this->assertTrue($queue->getConnection()->isConnected()); + $this->assertTrue($queue->getChannel()->is_open()); + } }