diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 702bfe9b..4aad6d20 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -13,10 +13,16 @@ jobs: strategy: fail-fast: true matrix: - php: ['8.0', '8.1'] - stability: [prefer-lowest, prefer-stable] + php: ['8.1', '8.2', '8.3', '8.4'] + stability: ['prefer-lowest', 'prefer-stable'] + laravel: ['^10.0', '^11.0', '^12.0'] + exclude: + - php: '8.1' + laravel: '^11.0' + - php: '8.1' + laravel: '^12.0' - name: PHP ${{ matrix.php }} - ${{ matrix.stability }} + name: 'PHP ${{ matrix.php }} - Laravel: ${{matrix.laravel}} - ${{ matrix.stability }}' steps: - name: Checkout code @@ -29,21 +35,14 @@ jobs: extensions: dom, curl, libxml, mbstring, zip coverage: none - - name: Set up Docker - run: | - sudo rm /usr/local/bin/docker-compose - curl -L https://github.com/docker/compose/releases/download/1.24.1/docker-compose-`uname -s`-`uname -m` > docker-compose - chmod +x docker-compose - sudo mv docker-compose /usr/local/bin - - name: Start Docker container - run: docker-compose up -d rabbitmq + run: docker compose up -d rabbitmq - name: Install dependencies - run: composer update --${{ matrix.stability }} --prefer-dist --no-interaction --no-progress + run: composer update --with='laravel/framework:${{matrix.laravel}}' --${{ matrix.stability }} --prefer-dist --no-interaction --no-progress - name: Run Laravel Pint run: ./vendor/bin/pint --test - name: Execute tests - run: sleep 10 && vendor/bin/phpunit --verbose + run: sleep 10 && vendor/bin/phpunit diff --git a/.gitignore b/.gitignore index 00dea824..a2dcf885 100644 --- a/.gitignore +++ b/.gitignore @@ -4,5 +4,5 @@ composer.lock .phpstorm.meta.php phpunit.xml -.phpunit.result.cache +.phpunit.* .php_cs.cache diff --git a/CHANGELOG-13x.md b/CHANGELOG-13x.md index 0f9e7678..64e448ce 100644 --- a/CHANGELOG-13x.md +++ b/CHANGELOG-13x.md @@ -2,7 +2,32 @@ All notable changes to this project will be documented in this file. -## [Unreleased](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v13.1.0...master) +## [unreleased](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v13.3.0...master) + +## [13.3.1](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v13.3.0...13.3.3) +- Fix a bug when no job / message is available on the queue initially [#543](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/543) + +## [13.3.0](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v13.2.0...13.3.0) + +- Refactor the creation of RabbitMQ Connection and + QueueAPI. [#528](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/528) +- Added configuration object as single dependency for RabbitMQQueue in + constructor. [#528](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/528) +- Fix method getExchangeType, not throwing an + exception. [#528](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/528) +- Separating the api logic from the actual publishing to + RabbitMQ. [#528](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/528) +- Added a reconnect method. [#528](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/528) +- Fix the connection and channel not being fully lazy, when QueueAPI was + created. [#528](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/528) +- Keep track of declared queue's within RabbitMQ. [#528](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/528) +- Implemented the 'rest' option to the consumer [#530](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/530) +- Added ability to reconnect to RabbitMQ, by creating your + own `RabbitMQQueue:class` [#531](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/531) + +## [13.2.0](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v13.1.0...13.2.0) + +- Compatibility with Laravel 10 [#525](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/525) ## [13.1.0 (2023-01-25)](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v13.0.1...v13.1.0) @@ -12,7 +37,8 @@ All notable changes to this project will be documented in this file. ## [13.0.1 (2022-09-16)](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v13.0.0...v13.0.1) -- Add $dispatchAfterCommit when running via Horizon [#484](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/484) +- Add $dispatchAfterCommit when running via + Horizon [#484](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/484) ## [13.0.0 (2022-09-15)](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v12.0.1...v13.0.0) diff --git a/CHANGELOG-14x.md b/CHANGELOG-14x.md new file mode 100644 index 00000000..c94b3dd2 --- /dev/null +++ b/CHANGELOG-14x.md @@ -0,0 +1,8 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +## [unreleased](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v13.3.0...master) + +## [14.0.0] +- First release compatible with Laravel 11 diff --git a/README.md b/README.md index 24c25862..30a10df9 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,8 @@ RabbitMQ Queue driver for Laravel ====================== [![Latest Stable Version](https://poser.pugx.org/vladimir-yuldashev/laravel-queue-rabbitmq/v/stable?format=flat-square)](https://packagist.org/packages/vladimir-yuldashev/laravel-queue-rabbitmq) -[![Build Status](https://github.com/vyuldashev/laravel-queue-rabbitmq/workflows/Tests/badge.svg)](https://github.com/vyuldashev/laravel-queue-rabbitmq/actions) +[![Build Status](https://github.com/vyuldashev/laravel-queue-rabbitmq/actions/workflows/tests.yml/badge.svg?branch=master)](https://github.com/vyuldashev/laravel-queue-rabbitmq/actions/workflows/tests.yml) [![Total Downloads](https://poser.pugx.org/vladimir-yuldashev/laravel-queue-rabbitmq/downloads?format=flat-square)](https://packagist.org/packages/vladimir-yuldashev/laravel-queue-rabbitmq) -[![StyleCI](https://styleci.io/repos/14976752/shield)](https://styleci.io/repos/14976752) [![License](https://poser.pugx.org/vladimir-yuldashev/laravel-queue-rabbitmq/license?format=flat-square)](https://packagist.org/packages/vladimir-yuldashev/laravel-queue-rabbitmq) ## Support Policy @@ -24,8 +23,12 @@ composer require vladimir-yuldashev/laravel-queue-rabbitmq The package will automatically register itself. +### Configuration + Add connection to `config/queue.php`: +> This is the minimal config for the rabbitMQ connection/driver to work. + ```php 'connections' => [ // ... @@ -33,9 +36,6 @@ Add connection to `config/queue.php`: 'rabbitmq' => [ 'driver' => 'rabbitmq', - 'queue' => env('RABBITMQ_QUEUE', 'default'), - 'connection' => PhpAmqpLib\Connection\AMQPLazyConnection::class, - 'hosts' => [ [ 'host' => env('RABBITMQ_HOST', '127.0.0.1'), @@ -44,33 +44,17 @@ Add connection to `config/queue.php`: 'password' => env('RABBITMQ_PASSWORD', 'guest'), 'vhost' => env('RABBITMQ_VHOST', '/'), ], + // ... ], - - 'options' => [ - 'ssl_options' => [ - 'cafile' => env('RABBITMQ_SSL_CAFILE', null), - 'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null), - 'local_key' => env('RABBITMQ_SSL_LOCALKEY', null), - 'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true), - 'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null), - ], - 'queue' => [ - 'job' => VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob::class, - ], - ], - - /* - * Set to "horizon" if you wish to use Laravel Horizon. - */ - 'worker' => env('RABBITMQ_WORKER', 'default'), - 'after_commit' => false, + + // ... ], // ... ], ``` -### Optional Config +### Optional Queue Config Optionally add queue options to the config of a connection. Every queue created for this connection, gets the properties. @@ -164,6 +148,31 @@ by adding extra options. ], ``` +### Horizon support + +Starting with 8.0, this package supports [Laravel Horizon](https://laravel.com/docs/horizon) out of the box. Firstly, +install Horizon and then set `RABBITMQ_WORKER` to `horizon`. + +Horizon is depending on events dispatched by the worker. +These events inform Horizon what was done with the message/job. + +This Library supports Horizon, but in the config you have to inform Laravel to use the QueueApi compatible with horizon. + +```php +'connections' => [ + // ... + + 'rabbitmq' => [ + // ... + + /* Set to "horizon" if you wish to use Laravel Horizon. */ + 'worker' => env('RABBITMQ_WORKER', 'default'), + ], + + // ... +], +``` + ### Use your own RabbitMQJob class Sometimes you have to work with messages published by another application. @@ -254,16 +263,310 @@ class RabbitMQJob extends BaseJob } ``` -## Laravel Usage +If you want to handle raw message, not in JSON format or without 'job' key in JSON, +you should add stub for `getName` method: -Once you completed the configuration you can use the Laravel Queue API. If you used other queue drivers you do not need to -change anything else. If you do not know how to use the Queue API, please refer to the official Laravel -documentation: http://laravel.com/docs/queues +```php +getRawBody(); + Log::info($anyMessage); + + $this->delete(); + } + + public function getName() + { + return ''; + } +} +``` + +### Use your own Connection + +You can extend the built-in `PhpAmqpLib\Connection\AMQPStreamConnection::class` +or `PhpAmqpLib\Connection\AMQPSLLConnection::class` and within the connection config, you can define your own class. +When you specify a `connection` key in the config, with your own class name, every connection will use your own class. + +An example for the config: + +```php +'connections' => [ + // ... + + 'rabbitmq' => [ + // ... + + 'connection' = > \App\Queue\Connection\MyRabbitMQConnection::class, + ], + + // ... +], +``` + +### Use your own Worker class + +If you want to use your own `RabbitMQQueue::class` this is possible by +extending `VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue`. +and inform laravel to use your class by setting `RABBITMQ_WORKER` to `\App\Queue\RabbitMQQueue::class`. + +> Note: Worker classes **must** extend `VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue` + +```php +'connections' => [ + // ... + + 'rabbitmq' => [ + // ... + + /* Set to a class if you wish to use your own. */ + 'worker' => \App\Queue\RabbitMQQueue::class, + ], + + // ... +], +``` + +```php + Note: this is not best practice, it is an example. + +```php +reconnect(); + parent::publishBasic($msg, $exchange, $destination, $mandatory, $immediate, $ticket); + } + } + + protected function publishBatch($jobs, $data = '', $queue = null): void + { + try { + parent::publishBatch($jobs, $data, $queue); + } catch (AMQPConnectionClosedException|AMQPChannelClosedException) { + $this->reconnect(); + parent::publishBatch($jobs, $data, $queue); + } + } + + protected function createChannel(): AMQPChannel + { + try { + return parent::createChannel(); + } catch (AMQPConnectionClosedException) { + $this->reconnect(); + return parent::createChannel(); + } + } +} +``` + +### Default Queue + +The connection does use a default queue with value 'default', when no queue is provided by laravel. +It is possible to change te default queue by adding an extra parameter in the connection config. + +```php +'connections' => [ + // ... + + 'rabbitmq' => [ + // ... + + 'queue' => env('RABBITMQ_QUEUE', 'default'), + ], + + // ... +], +``` + +### Heartbeat + +By default, your connection will be created with a heartbeat setting of `0`. +You can alter the heartbeat settings by changing the config. + +```php + +'connections' => [ + // ... + + 'rabbitmq' => [ + // ... + + 'options' => [ + // ... + + 'heartbeat' => 10, + ], + ], + + // ... +], +``` + +### SSL Secure + +If you need a secure connection to rabbitMQ server(s), you will need to add these extra config options. + +```php +'connections' => [ + // ... + + 'rabbitmq' => [ + // ... + + 'secure' = > true, + 'options' => [ + // ... + + 'ssl_options' => [ + 'cafile' => env('RABBITMQ_SSL_CAFILE', null), + 'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null), + 'local_key' => env('RABBITMQ_SSL_LOCALKEY', null), + 'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true), + 'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null), + ], + ], + ], + + // ... +], +``` + +### Events after Database commits + +To instruct Laravel workers to dispatch events after all database commits are completed. + +```php +'connections' => [ + // ... + + 'rabbitmq' => [ + // ... + + 'after_commit' => true, + ], + + // ... +], +``` + +### Lazy Connection -## Laravel Horizon Usage +By default, your connection will be created as a lazy connection. +If for some reason you don't want the connection lazy you can turn it off by setting the following config. + +```php +'connections' => [ + // ... + + 'rabbitmq' => [ + // ... + + 'lazy' = > false, + ], -Starting with 8.0, this package supports [Laravel Horizon](http://horizon.laravel.com) out of the box. Firstly, install -Horizon and then set `RABBITMQ_WORKER` to `horizon`. + // ... +], +``` + +### Network Protocol + +By default, the network protocol used for connection is tcp. +If for some reason you want to use another network protocol, you can add the extra value in your config options. +Available protocols : `tcp`, `ssl`, `tls` + +```php +'connections' => [ + // ... + + 'rabbitmq' => [ + // ... + + 'network_protocol' => 'tcp', + ], + + // ... +], +``` + +### Network Timeouts + +For network timeouts configuration you can use option parameters. +All float values are in seconds and zero value can mean infinite timeout. +Example contains default values. + +```php +'connections' => [ + // ... + + 'rabbitmq' => [ + // ... + + 'options' => [ + // ... + + 'connection_timeout' => 3.0, + 'read_timeout' => 3.0, + 'write_timeout' => 3.0, + 'channel_rpc_timeout' => 0.0, + ], + ], + + // ... +], +``` + +### Octane support + +Starting with 13.3.0, this package supports [Laravel Octane](https://laravel.com/docs/octane) out of the box. +Firstly, install Octane and don't forget to warm 'rabbitmq' connection in the octane config. +> See: https://github.com/vyuldashev/laravel-queue-rabbitmq/issues/460#issuecomment-1469851667 + +## Laravel Usage + +Once you completed the configuration you can use the Laravel Queue API. If you used other queue drivers you do not +need to change anything else. If you do not know how to use the Queue API, please refer to the official Laravel +documentation: http://laravel.com/docs/queues ## Lumen Usage @@ -277,17 +580,16 @@ $app->register(VladimirYuldashev\LaravelQueueRabbitMQ\LaravelQueueRabbitMQServic There are two ways of consuming messages. -1. `queue:work` command which is Laravel's built-in command. This command utilizes `basic_get`. +1. `queue:work` command which is Laravel's built-in command. This command utilizes `basic_get`. Use this if you want to consume multiple queues. -2. `rabbitmq:consume` command which is provided by this package. This command utilizes `basic_consume` and is more - performant than `basic_get` by ~2x. +2. `rabbitmq:consume` command which is provided by this package. This command utilizes `basic_consume` and is more performant than `basic_get` by ~2x, but does not support multiple queues. ## Testing Setup RabbitMQ using `docker-compose`: ```bash -docker-compose up -d rabbitmq +docker compose up -d ``` To run the test suite you can use the following commands: @@ -304,7 +606,7 @@ composer test:unit ``` If you receive any errors from the style tests, you can automatically fix most, -if not all of the issues with the following command: +if not all the issues with the following command: ```bash composer fix:style diff --git a/composer.json b/composer.json index 15bf6e85..caf8a0fc 100644 --- a/composer.json +++ b/composer.json @@ -1,58 +1,59 @@ { - "name": "vladimir-yuldashev/laravel-queue-rabbitmq", - "description": "RabbitMQ driver for Laravel Queue. Supports Laravel Horizon.", - "license": "MIT", - "authors": [ - { - "name": "Vladimir Yuldashev", - "email": "misterio92@gmail.com" - } - ], - "require": { - "php": "^8.0", - "ext-json": "*", - "illuminate/queue": "^9.0", - "php-amqplib/php-amqplib": "^3.0" - }, - "require-dev": { - "phpunit/phpunit": "^9.3", - "mockery/mockery": "^1.0", - "laravel/horizon": "^5.0", - "orchestra/testbench": "^7.0", - "laravel/pint": "^1.2" - }, - "autoload": { - "psr-4": { - "VladimirYuldashev\\LaravelQueueRabbitMQ\\": "src/" - } - }, - "autoload-dev": { - "psr-4": { - "VladimirYuldashev\\LaravelQueueRabbitMQ\\Tests\\": "tests/" - } - }, - "extra": { - "branch-alias": { - "dev-master": "13.0-dev" - }, - "laravel": { - "providers": [ - "VladimirYuldashev\\LaravelQueueRabbitMQ\\LaravelQueueRabbitMQServiceProvider" - ] - } - }, - "suggest": { - "ext-pcntl": "Required to use all features of the queue consumer." - }, - "scripts": { - "test": [ - "@test:style", - "@test:unit" + "name": "vladimir-yuldashev/laravel-queue-rabbitmq", + "description": "RabbitMQ driver for Laravel Queue. Supports Laravel Horizon.", + "license": "MIT", + "authors": [ + { + "name": "Vladimir Yuldashev", + "email": "misterio92@gmail.com" + } ], - "test:style": "@php vendor/bin/php-cs-fixer fix --config=.php-cs-fixer.dist.php --allow-risky=yes --dry-run --diff --verbose", - "test:unit": "@php vendor/bin/phpunit", - "fix:style": "@php vendor/bin/php-cs-fixer fix --config=.php-cs-fixer.dist.php --allow-risky=yes --diff --verbose" - }, - "minimum-stability": "dev", - "prefer-stable": true + "require": { + "php": "^8.0", + "ext-json": "*", + "illuminate/queue": "^10.0|^11.0|^12.0", + "php-amqplib/php-amqplib": "^v3.6" + }, + "require-dev": { + "phpunit/phpunit": "^10.0|^11.0", + "mockery/mockery": "^1.0", + "laravel/horizon": "^5.0", + "orchestra/testbench": "^7.0|^8.0|^9.0|^10.0", + "laravel/pint": "^1.2", + "laravel/framework": "^9.0|^10.0|^11.0|^12.0" + }, + "autoload": { + "psr-4": { + "VladimirYuldashev\\LaravelQueueRabbitMQ\\": "src/" + } + }, + "autoload-dev": { + "psr-4": { + "VladimirYuldashev\\LaravelQueueRabbitMQ\\Tests\\": "tests/" + } + }, + "extra": { + "branch-alias": { + "dev-master": "13.0-dev" + }, + "laravel": { + "providers": [ + "VladimirYuldashev\\LaravelQueueRabbitMQ\\LaravelQueueRabbitMQServiceProvider" + ] + } + }, + "suggest": { + "ext-pcntl": "Required to use all features of the queue consumer." + }, + "scripts": { + "test": [ + "@test:style", + "@test:unit" + ], + "test:style": "@php vendor/bin/pint --test -v", + "test:unit": "@php vendor/bin/phpunit", + "fix:style": "@php vendor/bin/pint -v" + }, + "minimum-stability": "dev", + "prefer-stable": true } diff --git a/config/rabbitmq.php b/config/rabbitmq.php index d5390a5c..4c102ce8 100644 --- a/config/rabbitmq.php +++ b/config/rabbitmq.php @@ -9,7 +9,7 @@ 'driver' => 'rabbitmq', 'queue' => env('RABBITMQ_QUEUE', 'default'), - 'connection' => PhpAmqpLib\Connection\AMQPLazyConnection::class, + 'connection' => 'default', 'hosts' => [ [ @@ -22,16 +22,6 @@ ], 'options' => [ - 'ssl_options' => [ - 'cafile' => env('RABBITMQ_SSL_CAFILE', null), - 'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null), - 'local_key' => env('RABBITMQ_SSL_LOCALKEY', null), - 'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true), - 'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null), - ], - 'queue' => [ - 'job' => VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob::class, - ], ], /* diff --git a/phpunit.xml.dist b/phpunit.xml.dist index 7e166ab8..d213fd63 100644 --- a/phpunit.xml.dist +++ b/phpunit.xml.dist @@ -1,27 +1,16 @@ - - - - src/ - - - - - - - - ./tests/ - - - - - - - - - + + + + ./tests/ + + + + + + + + + + diff --git a/pint.json b/pint.json new file mode 100644 index 00000000..05f4b41e --- /dev/null +++ b/pint.json @@ -0,0 +1,8 @@ +{ + "preset": "laravel", + "rules": { + "php_unit_method_casing": { + "case": "camel_case" + } + } +} diff --git a/src/Console/ConsumeCommand.php b/src/Console/ConsumeCommand.php index 355c2e4b..4072132a 100644 --- a/src/Console/ConsumeCommand.php +++ b/src/Console/ConsumeCommand.php @@ -11,7 +11,7 @@ class ConsumeCommand extends WorkCommand protected $signature = 'rabbitmq:consume {connection? : The name of the queue connection to work} {--name=default : The name of the consumer} - {--queue= : The names of the queues to work} + {--queue= : The name of the queue to work. Please notice that there is no support for multiple queues} {--once : Only process the next job on the queue} {--stop-when-empty : Stop when the queue is empty} {--delay=0 : The number of seconds to delay failed jobs (Deprecated)} @@ -21,9 +21,10 @@ class ConsumeCommand extends WorkCommand {--force : Force the worker to run even in maintenance mode} {--memory=128 : The memory limit in megabytes} {--sleep=3 : Number of seconds to sleep when no job is available} + {--rest=0 : Number of seconds to rest between jobs} {--timeout=60 : The number of seconds a child process can run} {--tries=1 : Number of times to attempt a job before logging it failed} - {--rest=0 : Number of seconds to rest between jobs} + {--json : Output the queue worker information as JSON} {--max-priority=} {--consumer-tag} diff --git a/src/Console/ExchangeDeclareCommand.php b/src/Console/ExchangeDeclareCommand.php index 99f0b753..43aa9c5c 100644 --- a/src/Console/ExchangeDeclareCommand.php +++ b/src/Console/ExchangeDeclareCommand.php @@ -18,8 +18,6 @@ class ExchangeDeclareCommand extends Command protected $description = 'Declare exchange'; /** - * @param RabbitMQConnector $connector - * * @throws Exception */ public function handle(RabbitMQConnector $connector): void diff --git a/src/Console/ExchangeDeleteCommand.php b/src/Console/ExchangeDeleteCommand.php index 904f38ba..d5a8d8d4 100644 --- a/src/Console/ExchangeDeleteCommand.php +++ b/src/Console/ExchangeDeleteCommand.php @@ -16,8 +16,6 @@ class ExchangeDeleteCommand extends Command protected $description = 'Delete exchange'; /** - * @param RabbitMQConnector $connector - * * @throws Exception */ public function handle(RabbitMQConnector $connector): void diff --git a/src/Console/QueueBindCommand.php b/src/Console/QueueBindCommand.php index 40799460..ccbbd706 100644 --- a/src/Console/QueueBindCommand.php +++ b/src/Console/QueueBindCommand.php @@ -17,8 +17,6 @@ class QueueBindCommand extends Command protected $description = 'Bind queue to exchange'; /** - * @param RabbitMQConnector $connector - * * @throws Exception */ public function handle(RabbitMQConnector $connector): void diff --git a/src/Console/QueueDeclareCommand.php b/src/Console/QueueDeclareCommand.php index d43f49ce..54d6ea32 100644 --- a/src/Console/QueueDeclareCommand.php +++ b/src/Console/QueueDeclareCommand.php @@ -19,8 +19,6 @@ class QueueDeclareCommand extends Command protected $description = 'Declare queue'; /** - * @param RabbitMQConnector $connector - * * @throws Exception */ public function handle(RabbitMQConnector $connector): void diff --git a/src/Console/QueueDeleteCommand.php b/src/Console/QueueDeleteCommand.php index 8bca7c20..b2586ecd 100644 --- a/src/Console/QueueDeleteCommand.php +++ b/src/Console/QueueDeleteCommand.php @@ -17,8 +17,6 @@ class QueueDeleteCommand extends Command protected $description = 'Delete queue'; /** - * @param RabbitMQConnector $connector - * * @throws Exception */ public function handle(RabbitMQConnector $connector): void diff --git a/src/Console/QueuePurgeCommand.php b/src/Console/QueuePurgeCommand.php index 95b765ea..49be839b 100644 --- a/src/Console/QueuePurgeCommand.php +++ b/src/Console/QueuePurgeCommand.php @@ -19,8 +19,6 @@ class QueuePurgeCommand extends Command protected $description = 'Purge all messages in queue'; /** - * @param RabbitMQConnector $connector - * * @throws Exception */ public function handle(RabbitMQConnector $connector): void diff --git a/src/Consumer.php b/src/Consumer.php index d27b96c4..ed3d8099 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -65,7 +65,6 @@ public function setPrefetchCount(int $value): void * * @param string $connectionName * @param string $queue - * @param WorkerOptions $options * @return int * * @throws Throwable @@ -126,6 +125,10 @@ function (AMQPMessage $message) use ($connection, $options, $connectionName, $qu if ($this->supportsAsyncSignals()) { $this->resetTimeoutHandler(); } + + if ($options->rest > 0) { + $this->sleep($options->rest); + } }, null, $arguments @@ -181,10 +184,8 @@ function (AMQPMessage $message) use ($connection, $options, $connectionName, $qu /** * Determine if the daemon should process on this iteration. * - * @param WorkerOptions $options * @param string $connectionName * @param string $queue - * @return bool */ protected function daemonShouldRun(WorkerOptions $options, $connectionName, $queue): bool { diff --git a/src/Contracts/RabbitMQQueueContract.php b/src/Contracts/RabbitMQQueueContract.php new file mode 100644 index 00000000..cf04a66e --- /dev/null +++ b/src/Contracts/RabbitMQQueueContract.php @@ -0,0 +1,14 @@ +size($queue); } @@ -47,10 +43,12 @@ public function push($job, $data = '', $queue = null) /** * {@inheritdoc} + * + * @throws BindingResolutionException */ - public function pushRaw($payload, $queue = null, array $options = []) + public function pushRaw($payload, $queue = null, array $options = []): int|string|null { - $payload = (new JobPayload($payload))->prepare($this->lastPushed)->value; + $payload = (new JobPayload($payload))->prepare($this->lastPushed ?? null)->value; return tap(parent::pushRaw($payload, $queue, $options), function () use ($queue, $payload): void { $this->event($this->getQueue($queue), new JobPushed($payload)); @@ -59,8 +57,10 @@ public function pushRaw($payload, $queue = null, array $options = []) /** * {@inheritdoc} + * + * @throws BindingResolutionException */ - public function later($delay, $job, $data = '', $queue = null) + public function later($delay, $job, $data = '', $queue = null): mixed { $payload = (new JobPayload($this->createPayload($job, $data)))->prepare($job)->value; @@ -81,22 +81,11 @@ public function pop($queue = null) }); } - /** - * {@inheritdoc} - */ - public function release($delay, $job, $data, $queue, $attempts = 0) - { - $this->lastPushed = $job; - - return parent::release($delay, $job, $data, $queue, $attempts); - } - /** * Fire the job deleted event. * * @param string $queue * @param RabbitMQJob $job - * @return void * * @throws BindingResolutionException */ @@ -110,7 +99,6 @@ public function deleteReserved($queue, $job): void * * @param string $queue * @param mixed $event - * @return void * * @throws BindingResolutionException */ diff --git a/src/LaravelQueueRabbitMQServiceProvider.php b/src/LaravelQueueRabbitMQServiceProvider.php index 3be0fd81..ee46d6cd 100644 --- a/src/LaravelQueueRabbitMQServiceProvider.php +++ b/src/LaravelQueueRabbitMQServiceProvider.php @@ -12,8 +12,6 @@ class LaravelQueueRabbitMQServiceProvider extends ServiceProvider { /** * Register the service provider. - * - * @return void */ public function register(): void { @@ -60,8 +58,6 @@ public function register(): void /** * Register the application's event listeners. - * - * @return void */ public function boot(): void { diff --git a/src/Queue/Connection/ConfigFactory.php b/src/Queue/Connection/ConfigFactory.php new file mode 100644 index 00000000..06c8080d --- /dev/null +++ b/src/Queue/Connection/ConfigFactory.php @@ -0,0 +1,126 @@ +setIsLazy(! in_array( + Arr::get($config, 'lazy') ?? true, + [false, 0, '0', 'false', 'no'], + true) + ); + + // Set the connection to unsecure by default + $connectionConfig->setIsSecure(in_array( + Arr::get($config, 'secure'), + [true, 1, '1', 'true', 'yes'], + true) + ); + + if ($connectionConfig->isSecure()) { + self::getSLLOptionsFromConfig($connectionConfig, $config); + } + + self::getHostFromConfig($connectionConfig, $config); + self::getHeartbeatFromConfig($connectionConfig, $config); + self::getNetworkProtocolFromConfig($connectionConfig, $config); + self::getTimeoutsFromConfig($connectionConfig, $config); + }); + } + + protected static function getHostFromConfig(AMQPConnectionConfig $connectionConfig, array $config): void + { + $hostConfig = Arr::first(Arr::shuffle(Arr::get($config, self::CONFIG_HOSTS, [])), null, []); + + if ($location = Arr::get($hostConfig, 'host')) { + $connectionConfig->setHost($location); + } + if ($port = Arr::get($hostConfig, 'port')) { + $connectionConfig->setPort($port); + } + if ($vhost = Arr::get($hostConfig, 'vhost')) { + $connectionConfig->setVhost($vhost); + } + if ($user = Arr::get($hostConfig, 'user')) { + $connectionConfig->setUser($user); + } + if ($password = Arr::get($hostConfig, 'password')) { + $connectionConfig->setPassword($password); + } + } + + protected static function getSLLOptionsFromConfig(AMQPConnectionConfig $connectionConfig, array $config): void + { + $sslConfig = Arr::get($config, self::CONFIG_OPTIONS.'.ssl_options', []); + + if ($caFile = Arr::get($sslConfig, 'cafile')) { + $connectionConfig->setSslCaCert($caFile); + } + if ($cert = Arr::get($sslConfig, 'local_cert')) { + $connectionConfig->setSslCert($cert); + } + if ($key = Arr::get($sslConfig, 'local_key')) { + $connectionConfig->setSslKey($key); + } + if (Arr::has($sslConfig, 'verify_peer')) { + $verifyPeer = Arr::get($sslConfig, 'verify_peer'); + $connectionConfig->setSslVerify($verifyPeer); + } + if ($passphrase = Arr::get($sslConfig, 'passphrase')) { + $connectionConfig->setSslPassPhrase($passphrase); + } + } + + protected static function getHeartbeatFromConfig(AMQPConnectionConfig $connectionConfig, array $config): void + { + $heartbeat = Arr::get($config, self::CONFIG_OPTIONS.'.heartbeat'); + + if (is_numeric($heartbeat) && intval($heartbeat) > 0) { + $connectionConfig->setHeartbeat((int) $heartbeat); + } + } + + protected static function getNetworkProtocolFromConfig(AMQPConnectionConfig $connectionConfig, array $config): void + { + if ($networkProtocol = Arr::get($config, 'network_protocol')) { + $connectionConfig->setNetworkProtocol($networkProtocol); + } + } + + protected static function getTimeoutsFromConfig(AMQPConnectionConfig $connectionConfig, array $config): void + { + $connectionTimeout = Arr::get($config, self::CONFIG_OPTIONS.'.connection_timeout'); + if (is_numeric($connectionTimeout) && floatval($connectionTimeout) >= 0) { + $connectionConfig->setConnectionTimeout((float) $connectionTimeout); + } + + $readTimeout = Arr::get($config, self::CONFIG_OPTIONS.'.read_timeout'); + if (is_numeric($readTimeout) && floatval($readTimeout) >= 0) { + $connectionConfig->setReadTimeout((float) $readTimeout); + } + + $writeTimeout = Arr::get($config, self::CONFIG_OPTIONS.'.write_timeout'); + if (is_numeric($writeTimeout) && floatval($writeTimeout) >= 0) { + $connectionConfig->setWriteTimeout((float) $writeTimeout); + } + + $chanelRpcTimeout = Arr::get($config, self::CONFIG_OPTIONS.'.channel_rpc_timeout'); + if (is_numeric($chanelRpcTimeout) && floatval($chanelRpcTimeout) >= 0) { + $connectionConfig->setChannelRPCTimeout((float) $chanelRpcTimeout); + } + } +} diff --git a/src/Queue/Connection/ConnectionFactory.php b/src/Queue/Connection/ConnectionFactory.php new file mode 100644 index 00000000..f531e761 --- /dev/null +++ b/src/Queue/Connection/ConnectionFactory.php @@ -0,0 +1,223 @@ +getIoType() === AMQPConnectionConfig::IO_TYPE_SOCKET) { + return self::createSocketConnection($connection, $config); + } + + return self::createStreamConnection($connection, $config); + } + + protected static function createSocketConnection($connection, AMQPConnectionConfig $config): AMQPSocketConnection + { + self::assertSocketConnection($connection, $config); + + return new $connection( + $config->getHost(), + $config->getPort(), + $config->getUser(), + $config->getPassword(), + $config->getVhost(), + $config->isInsist(), + $config->getLoginMethod(), + $config->getLoginResponse(), + $config->getLocale(), + $config->getReadTimeout(), + $config->isKeepalive(), + $config->getWriteTimeout(), + $config->getHeartbeat(), + $config->getChannelRPCTimeout(), + $config + ); + } + + protected static function createStreamConnection($connection, AMQPConnectionConfig $config): AMQPStreamConnection + { + self::assertStreamConnection($connection); + + if ($config->isSecure()) { + self::assertSSLConnection($connection); + + return new $connection( + $config->getHost(), + $config->getPort(), + $config->getUser(), + $config->getPassword(), + $config->getVhost(), + self::getSslOptions($config), + [ + 'insist' => $config->isInsist(), + 'login_method' => $config->getLoginMethod(), + 'login_response' => $config->getLoginResponse(), + 'locale' => $config->getLocale(), + 'connection_timeout' => $config->getConnectionTimeout(), + 'read_write_timeout' => self::getReadWriteTimeout($config), + 'keepalive' => $config->isKeepalive(), + 'heartbeat' => $config->getHeartbeat(), + ], + $config + ); + } + + return new $connection( + $config->getHost(), + $config->getPort(), + $config->getUser(), + $config->getPassword(), + $config->getVhost(), + $config->isInsist(), + $config->getLoginMethod(), + $config->getLoginResponse(), + $config->getLocale(), + $config->getConnectionTimeout(), + self::getReadWriteTimeout($config), + $config->getStreamContext(), + $config->isKeepalive(), + $config->getHeartbeat(), + $config->getChannelRPCTimeout(), + $config->getNetworkProtocol(), + $config + ); + } + + protected static function getReadWriteTimeout(AMQPConnectionConfig $config): float + { + return min($config->getReadTimeout(), $config->getWriteTimeout()); + } + + protected static function getSslOptions(AMQPConnectionConfig $config): array + { + return array_filter([ + 'cafile' => $config->getSslCaCert(), + 'capath' => $config->getSslCaPath(), + 'local_cert' => $config->getSslCert(), + 'local_pk' => $config->getSslKey(), + 'verify_peer' => $config->getSslVerify(), + 'verify_peer_name' => $config->getSslVerifyName(), + 'passphrase' => $config->getSslPassPhrase(), + 'ciphers' => $config->getSslCiphers(), + 'security_level' => $config->getSslSecurityLevel(), + ], static function ($value) { + return $value !== null; + }); + } + + protected static function assertConnectionFromConfig(string $connection): void + { + if ($connection !== self::CONNECTION_TYPE_DEFAULT && ! is_subclass_of($connection, self::CONNECTION_TYPE_EXTENDED)) { + throw new AMQPLogicException(sprintf('The config property \'%s\' must contain \'%s\' or must extend: %s', self::CONFIG_CONNECTION, self::CONNECTION_TYPE_DEFAULT, class_basename(self::CONNECTION_TYPE_EXTENDED))); + } + } + + protected static function assertSocketConnection($connection, AMQPConnectionConfig $config): void + { + self::assertExtendedOf($connection, self::CONNECTION_SUB_TYPE_SOCKET); + + if ($config->isSecure()) { + throw new AMQPLogicException('The socket connection implementation does not support secure connections.'); + } + } + + protected static function assertStreamConnection($connection): void + { + self::assertExtendedOf($connection, self::CONNECTION_SUB_TYPE_STREAM); + } + + protected static function assertSSLConnection($connection): void + { + self::assertExtendedOf($connection, self::CONNECTION_SUB_TYPE_SSL); + } + + protected static function assertExtendedOf($connection, string $parent): void + { + if (! is_subclass_of($connection, $parent) && $connection !== $parent) { + throw new AMQPLogicException(sprintf('The connection must extend: %s', class_basename($parent))); + } + } + + /** + * @return mixed + * + * @throws Exception + * + * @deprecated This is the fallback method, update your config asap. (example: connection => 'default') + */ + protected static function _createLazyConnection($connection, array $config): AbstractConnection + { + return $connection::create_connection( + Arr::shuffle(Arr::get($config, ConfigFactory::CONFIG_HOSTS, [])), + Arr::add(Arr::get($config, 'options', []), 'heartbeat', 0) + ); + } +} diff --git a/src/Queue/Connectors/RabbitMQConnector.php b/src/Queue/Connectors/RabbitMQConnector.php index 903e1d3a..a0f79e32 100644 --- a/src/Queue/Connectors/RabbitMQConnector.php +++ b/src/Queue/Connectors/RabbitMQConnector.php @@ -8,20 +8,15 @@ use Illuminate\Queue\Connectors\ConnectorInterface; use Illuminate\Queue\Events\JobFailed; use Illuminate\Queue\Events\WorkerStopping; -use Illuminate\Support\Arr; -use InvalidArgumentException; -use PhpAmqpLib\Connection\AbstractConnection; -use PhpAmqpLib\Connection\AMQPLazyConnection; use VladimirYuldashev\LaravelQueueRabbitMQ\Horizon\Listeners\RabbitMQFailedEvent; use VladimirYuldashev\LaravelQueueRabbitMQ\Horizon\RabbitMQQueue as HorizonRabbitMQQueue; +use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Connection\ConnectionFactory; +use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\QueueFactory; use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue; class RabbitMQConnector implements ConnectorInterface { - /** - * @var Dispatcher - */ - private $dispatcher; + protected Dispatcher $dispatcher; public function __construct(Dispatcher $dispatcher) { @@ -31,26 +26,15 @@ public function __construct(Dispatcher $dispatcher) /** * Establish a queue connection. * - * @param array $config * @return RabbitMQQueue * * @throws Exception */ public function connect(array $config): Queue { - $connection = $this->createConnection(Arr::except($config, 'options.queue')); + $connection = ConnectionFactory::make($config); - $queue = $this->createQueue( - Arr::get($config, 'worker', 'default'), - $connection, - $config['queue'], - Arr::get($config, 'after_commit', false), - Arr::get($config, 'options.queue', []) - ); - - if (! $queue instanceof RabbitMQQueue) { - throw new InvalidArgumentException('Invalid worker.'); - } + $queue = QueueFactory::make($config)->setConnection($connection); if ($queue instanceof HorizonRabbitMQQueue) { $this->dispatcher->listen(JobFailed::class, RabbitMQFailedEvent::class); @@ -62,77 +46,4 @@ public function connect(array $config): Queue return $queue; } - - /** - * @param array $config - * @return AbstractConnection - * - * @throws Exception - */ - protected function createConnection(array $config): AbstractConnection - { - /** @var AbstractConnection $connection */ - $connection = Arr::get($config, 'connection', AMQPLazyConnection::class); - - // disable heartbeat when not configured, so long-running tasks will not fail - $config = Arr::add($config, 'options.heartbeat', 0); - - return $connection::create_connection( - Arr::shuffle(Arr::get($config, 'hosts', [])), - $this->filter(Arr::get($config, 'options', [])) - ); - } - - /** - * Create a queue for the worker. - * - * @param string $worker - * @param AbstractConnection $connection - * @param string $queue - * @param bool $dispatchAfterCommit - * @param array $options - * @return HorizonRabbitMQQueue|RabbitMQQueue|Queue - */ - protected function createQueue( - string $worker, - AbstractConnection $connection, - string $queue, - bool $dispatchAfterCommit, - array $options = [] - ) { - switch ($worker) { - case 'default': - return new RabbitMQQueue($connection, $queue, $dispatchAfterCommit, $options); - case 'horizon': - return new HorizonRabbitMQQueue($connection, $queue, $dispatchAfterCommit, $options); - default: - return new $worker($connection, $queue, $options); - } - } - - /** - * Recursively filter only null values. - * - * @param array $array - * @return array - */ - private function filter(array $array): array - { - foreach ($array as $index => &$value) { - if (is_array($value)) { - $value = $this->filter($value); - - continue; - } - - // If the value is null then remove it. - if ($value === null) { - unset($array[$index]); - - continue; - } - } - - return $array; - } } diff --git a/src/Queue/Jobs/RabbitMQJob.php b/src/Queue/Jobs/RabbitMQJob.php index 4176d80d..abcdfab4 100644 --- a/src/Queue/Jobs/RabbitMQJob.php +++ b/src/Queue/Jobs/RabbitMQJob.php @@ -137,8 +137,6 @@ public function release($delay = 0): void /** * Get the underlying RabbitMQ connection. - * - * @return RabbitMQQueue */ public function getRabbitMQ(): RabbitMQQueue { @@ -147,8 +145,6 @@ public function getRabbitMQ(): RabbitMQQueue /** * Get the underlying RabbitMQ message. - * - * @return AMQPMessage */ public function getRabbitMQMessage(): AMQPMessage { @@ -157,8 +153,6 @@ public function getRabbitMQMessage(): AMQPMessage /** * Get the headers from the rabbitMQ message. - * - * @return array|null */ protected function getRabbitMQMessageHeaders(): ?array { diff --git a/src/Queue/QueueConfig.php b/src/Queue/QueueConfig.php new file mode 100644 index 00000000..e7ec27c4 --- /dev/null +++ b/src/Queue/QueueConfig.php @@ -0,0 +1,278 @@ +queue; + } + + public function setQueue(string $queue): QueueConfig + { + $this->queue = $queue; + + return $this; + } + + /** + * Returns &true; as indication that jobs should be dispatched after all database transactions + * have been committed. + */ + public function isDispatchAfterCommit(): bool + { + return $this->dispatchAfterCommit; + } + + public function setDispatchAfterCommit($dispatchAfterCommit): QueueConfig + { + $this->dispatchAfterCommit = $this->toBoolean($dispatchAfterCommit); + + return $this; + } + + /** + * Get the Job::class to use when processing messages + */ + public function getAbstractJob(): string + { + return $this->abstractJob; + } + + public function setAbstractJob(string $abstract): QueueConfig + { + $this->abstractJob = $abstract; + + return $this; + } + + /** + * Returns &true;, if delayed messages should be prioritized. + * + * RabbitMQ queues work with the FIFO method. So when there are 10000 messages in the queue and + * the delayed message is put back to the queue (at the end) for further processing the delayed message won´t + * process before all 10000 messages are processed. The same is true for requeueing. + * + * This may not what you desire. + * When you want the message to get processed immediately after the delayed time expires or when requeueing, we can + * use prioritization. + * + * @see[https://www.rabbitmq.com/queues.html#basics] + */ + public function isPrioritizeDelayed(): bool + { + return $this->prioritizeDelayed; + } + + public function setPrioritizeDelayed($prioritizeDelayed): QueueConfig + { + $this->prioritizeDelayed = $this->toBoolean($prioritizeDelayed); + + return $this; + } + + /** + * Returns a integer with a default of '2' for when using prioritization on delayed messages. + * If priority queues are desired, we recommend using between 1 and 10. + * Using more priority layers, will consume more CPU resources and would affect runtimes. + * + * @see https://www.rabbitmq.com/priority.html + */ + public function getQueueMaxPriority(): int + { + return $this->queueMaxPriority; + } + + public function setQueueMaxPriority($queueMaxPriority): QueueConfig + { + if (is_numeric($queueMaxPriority) && intval($queueMaxPriority) > 1) { + $this->queueMaxPriority = (int) $queueMaxPriority; + } + + return $this; + } + + /** + * Get the exchange name, or empty string; as default value. + * + * The default exchange is an unnamed pre-declared direct exchange. Usually, an empty string + * is frequently used to indicate it. If you choose default exchange, your message will be delivered + * to a queue with the same name as the routing key. + * With a routing key that is the same as the queue name, every queue is immediately tied to the default exchange. + */ + public function getExchange(): string + { + return $this->exchange; + } + + public function setExchange(string $exchange): QueueConfig + { + $this->exchange = $exchange; + + return $this; + } + + /** + * Get the exchange type + * + * There are four basic RabbitMQ exchange types in RabbitMQ, each of which uses different parameters + * and bindings to route messages in various ways, These are: 'direct', 'topic', 'fanout', 'headers' + * + * The default type is set as 'direct' + */ + public function getExchangeType(): string + { + return $this->exchangeType; + } + + public function setExchangeType(string $exchangeType): QueueConfig + { + $this->exchangeType = $exchangeType; + + return $this; + } + + /** + * Get the routing key when using an exchange other than the direct exchange. + * The routing key is a message attribute taken into account by the exchange when deciding how to route a message. + * + * The default routing-key is the given destination: '%s'. + */ + public function getExchangeRoutingKey(): string + { + return $this->exchangeRoutingKey; + } + + public function setExchangeRoutingKey(string $exchangeRoutingKey): QueueConfig + { + $this->exchangeRoutingKey = $exchangeRoutingKey; + + return $this; + } + + /** + * Returns &true;, if failed messages should be rerouted. + */ + public function isRerouteFailed(): bool + { + return $this->rerouteFailed; + } + + public function setRerouteFailed($rerouteFailed): QueueConfig + { + $this->rerouteFailed = $this->toBoolean($rerouteFailed); + + return $this; + } + + /** + * Get the exchange name with messages are published against. + * The default exchange is empty, so messages will be published directly to a queue. + */ + public function getFailedExchange(): string + { + return $this->failedExchange; + } + + public function setFailedExchange(string $failedExchange): QueueConfig + { + $this->failedExchange = $failedExchange; + + return $this; + } + + /** + * Get the substitution string for failed messages + * The default routing-key is the given destination substituted by '%s.failed'. + */ + public function getFailedRoutingKey(): string + { + return $this->failedRoutingKey; + } + + public function setFailedRoutingKey(string $failedRoutingKey): QueueConfig + { + $this->failedRoutingKey = $failedRoutingKey; + + return $this; + } + + /** + * Returns &true;, if queue is marked or set as quorum queue. + */ + public function isQuorum(): bool + { + return $this->quorum; + } + + public function setQuorum($quorum): QueueConfig + { + $this->quorum = $this->toBoolean($quorum); + + return $this; + } + + /** + * Holds all unknown queue options provided in the connection config + */ + public function getOptions(): array + { + return $this->options; + } + + public function setOptions(array $options): QueueConfig + { + $this->options = $options; + + return $this; + } + + /** + * Filters $value to boolean value + * + * Returns: &true; + * For values: 1, '1', true, 'true', 'yes' + * + * Returns: &false; + * For values: 0, '0', false, 'false', '', null, [] , 'ok', 'no', 'no not a bool', 'yes a bool' + */ + protected function toBoolean($value): bool + { + return filter_var($value, FILTER_VALIDATE_BOOLEAN); + } +} diff --git a/src/Queue/QueueConfigFactory.php b/src/Queue/QueueConfigFactory.php new file mode 100644 index 00000000..6f2befc5 --- /dev/null +++ b/src/Queue/QueueConfigFactory.php @@ -0,0 +1,74 @@ +setQueue($queue); + } + if (! empty($afterCommit = Arr::get($config, 'after_commit'))) { + $queueConfig->setDispatchAfterCommit($afterCommit); + } + + self::getOptionsFromConfig($queueConfig, $config); + }); + } + + protected static function getOptionsFromConfig(QueueConfig $queueConfig, array $config): void + { + $queueOptions = Arr::get($config, self::CONFIG_OPTIONS.'.queue', []) ?: []; + + if ($job = Arr::pull($queueOptions, 'job')) { + $queueConfig->setAbstractJob($job); + } + + // Feature: Prioritize delayed messages. + if ($prioritizeDelayed = Arr::pull($queueOptions, 'prioritize_delayed')) { + $queueConfig->setPrioritizeDelayed($prioritizeDelayed); + } + if ($maxPriority = Arr::pull($queueOptions, 'queue_max_priority')) { + $queueConfig->setQueueMaxPriority($maxPriority); + } + + // Feature: Working with Exchange and routing-keys + if ($exchange = Arr::pull($queueOptions, 'exchange')) { + $queueConfig->setExchange($exchange); + } + if ($exchangeType = Arr::pull($queueOptions, 'exchange_type')) { + $queueConfig->setExchangeType($exchangeType); + } + if ($exchangeRoutingKey = Arr::pull($queueOptions, 'exchange_routing_key')) { + $queueConfig->setExchangeRoutingKey($exchangeRoutingKey); + } + + // Feature: Reroute failed messages + if ($rerouteFailed = Arr::pull($queueOptions, 'reroute_failed')) { + $queueConfig->setRerouteFailed($rerouteFailed); + } + if ($failedExchange = Arr::pull($queueOptions, 'failed_exchange')) { + $queueConfig->setFailedExchange($failedExchange); + } + if ($failedRoutingKey = Arr::pull($queueOptions, 'failed_routing_key')) { + $queueConfig->setFailedRoutingKey($failedRoutingKey); + } + + // Feature: Mark queue as quorum + if ($quorum = Arr::pull($queueOptions, 'quorum')) { + $queueConfig->setQuorum($quorum); + } + + // All extra options not defined + $queueConfig->setOptions($queueOptions); + } +} diff --git a/src/Queue/QueueFactory.php b/src/Queue/QueueFactory.php new file mode 100644 index 00000000..75bde1ed --- /dev/null +++ b/src/Queue/QueueFactory.php @@ -0,0 +1,25 @@ +connection = $connection; - $this->channel = $connection->channel(); - $this->default = $default; - $this->options = $options; - $this->dispatchAfterCommit = $dispatchAfterCommit; + */ + public function __construct(QueueConfig $config) + { + $this->rabbitMQConfig = $config; + $this->dispatchAfterCommit = $config->isDispatchAfterCommit(); } /** @@ -114,7 +87,7 @@ public function size($queue = null): int } // create a temporary channel, so the main channel will not be closed on exception - $channel = $this->connection->channel(); + $channel = $this->createChannel(); [, $size] = $channel->queue_declare($queue, true); $channel->close(); @@ -144,7 +117,7 @@ function ($payload, $queue) { * * @throws AMQPProtocolChannelException */ - public function pushRaw($payload, $queue = null, array $options = []) + public function pushRaw($payload, $queue = null, array $options = []): int|string|null { [$destination, $exchange, $exchangeType, $attempts] = $this->publishProperties($queue, $options); @@ -152,7 +125,7 @@ public function pushRaw($payload, $queue = null, array $options = []) [$message, $correlationId] = $this->createMessage($payload, $attempts); - $this->channel->basic_publish($message, $exchange, $destination, true, false); + $this->publishBasic($message, $exchange, $destination, true); return $correlationId; } @@ -162,7 +135,7 @@ public function pushRaw($payload, $queue = null, array $options = []) * * @throws AMQPProtocolChannelException */ - public function later($delay, $job, $data = '', $queue = null) + public function later($delay, $job, $data = '', $queue = null): mixed { return $this->enqueueUsing( $job, @@ -176,15 +149,9 @@ function ($payload, $queue, $delay) { } /** - * @param $delay - * @param $payload - * @param null $queue - * @param int $attempts - * @return mixed - * * @throws AMQPProtocolChannelException */ - public function laterRaw($delay, $payload, $queue = null, $attempts = 0) + public function laterRaw($delay, string $payload, $queue = null, int $attempts = 0): int|string|null { $ttl = $this->secondsUntil($delay) * 1000; @@ -206,8 +173,8 @@ public function laterRaw($delay, $payload, $queue = null, $attempts = 0) [$message, $correlationId] = $this->createMessage($payload, $attempts); - // Publish directly on the delayQueue, no need to publish trough an exchange. - $this->channel->basic_publish($message, null, $destination, true, false); + // Publish directly on the delayQueue, no need to publish through an exchange. + $this->publishBasic($message, null, $destination, true); return $correlationId; } @@ -219,22 +186,25 @@ public function laterRaw($delay, $payload, $queue = null, $attempts = 0) */ public function bulk($jobs, $data = '', $queue = null): void { - foreach ((array) $jobs as $job) { + $this->publishBatch($jobs, $data, $queue); + } + + /** + * @throws AMQPProtocolChannelException + */ + protected function publishBatch($jobs, $data = '', $queue = null): void + { + foreach ($jobs as $job) { $this->bulkRaw($this->createPayload($job, $queue, $data), $queue, ['job' => $job]); } - $this->channel->publish_batch(); + $this->batchPublish(); } /** - * @param string $payload - * @param null $queue - * @param array $options - * @return mixed - * * @throws AMQPProtocolChannelException */ - public function bulkRaw(string $payload, $queue = null, array $options = []) + public function bulkRaw(string $payload, ?string $queue = null, array $options = []): int|string|null { [$destination, $exchange, $exchangeType, $attempts] = $this->publishProperties($queue, $options); @@ -242,7 +212,7 @@ public function bulkRaw(string $payload, $queue = null, array $options = []) [$message, $correlationId] = $this->createMessage($payload, $attempts); - $this->channel->batch_basic_publish($message, $exchange, $destination); + $this->getChannel()->batch_basic_publish($message, $exchange, $destination); return $correlationId; } @@ -260,7 +230,7 @@ public function pop($queue = null) $job = $this->getJobClass(); /** @var AMQPMessage|null $message */ - if ($message = $this->channel->basic_get($queue)) { + if ($message = $this->getChannel()->basic_get($queue)) { return $this->currentJob = new $job( $this->container, $this, @@ -270,12 +240,12 @@ public function pop($queue = null) ); } } catch (AMQPProtocolChannelException $exception) { - // If there is not exchange or queue AMQP will throw exception with code 404 + // If there is no exchange or queue AMQP will throw exception with code 404 // We need to catch it and return null if ($exception->amqp_reply_code === 404) { // Because of the channel exception the channel was closed and removed. // We have to open a new channel. Because else the worker(s) are stuck in a loop, without processing. - $this->channel = $this->connection->channel(); + $this->getChannel(true); return null; } @@ -297,31 +267,33 @@ public function pop($queue = null) } /** - * @return AbstractConnection + * @throws RuntimeException */ public function getConnection(): AbstractConnection { + if (! $this->connection) { + throw new RuntimeException('Queue has no AMQPConnection set.'); + } + return $this->connection; } - /** - * @return AMQPChannel - */ - public function getChannel(): AMQPChannel + public function setConnection(AbstractConnection $connection): RabbitMQQueue { - return $this->channel; + $this->connection = $connection; + + return $this; } /** * Job class to use. * - * @return string * * @throws Throwable */ public function getJobClass(): string { - $job = Arr::get($this->options, 'job', RabbitMQJob::class); + $job = $this->getRabbitMQConfig()->getAbstractJob(); throw_if( ! is_a($job, RabbitMQJob::class, true), @@ -334,21 +306,16 @@ public function getJobClass(): string /** * Gets a queue/destination, by default the queue option set on the connection. - * - * @param null $queue - * @return string */ public function getQueue($queue = null): string { - return $queue ?: $this->default; + return $queue ?: $this->getRabbitMQConfig()->getQueue(); } /** * Checks if the given exchange already present/defined in RabbitMQ. - * Returns false when when the exchange is missing. + * Returns false when the exchange is missing. * - * @param string $exchange - * @return bool * * @throws AMQPProtocolChannelException */ @@ -360,7 +327,7 @@ public function isExchangeExists(string $exchange): bool try { // create a temporary channel, so the main channel will not be closed on exception - $channel = $this->connection->channel(); + $channel = $this->createChannel(); $channel->exchange_declare($exchange, '', true); $channel->close(); @@ -377,14 +344,7 @@ public function isExchangeExists(string $exchange): bool } /** - * Declare a exchange in rabbitMQ, when not already declared. - * - * @param string $name - * @param string $type - * @param bool $durable - * @param bool $autoDelete - * @param array $arguments - * @return void + * Declare an exchange in rabbitMQ, when not already declared. */ public function declareExchange( string $name, @@ -397,7 +357,7 @@ public function declareExchange( return; } - $this->channel->exchange_declare( + $this->getChannel()->exchange_declare( $name, $type, false, @@ -410,11 +370,8 @@ public function declareExchange( } /** - * Delete a exchange from rabbitMQ, only when present in RabbitMQ. + * Delete an exchange from rabbitMQ, only when present in RabbitMQ. * - * @param string $name - * @param bool $unused - * @return void * * @throws AMQPProtocolChannelException */ @@ -427,7 +384,7 @@ public function deleteExchange(string $name, bool $unused = false): void $idx = array_search($name, $this->exchanges); unset($this->exchanges[$idx]); - $this->channel->exchange_delete( + $this->getChannel()->exchange_delete( $name, $unused ); @@ -435,21 +392,27 @@ public function deleteExchange(string $name, bool $unused = false): void /** * Checks if the given queue already present/defined in RabbitMQ. - * Returns false when when the queue is missing. + * Returns false when the queue is missing. * - * @param string|null $name - * @return bool * * @throws AMQPProtocolChannelException */ - public function isQueueExists(string $name = null): bool + public function isQueueExists(?string $name = null): bool { + $queueName = $this->getQueue($name); + + if ($this->isQueueDeclared($queueName)) { + return true; + } + try { // create a temporary channel, so the main channel will not be closed on exception - $channel = $this->connection->channel(); - $channel->queue_declare($this->getQueue($name), true); + $channel = $this->createChannel(); + $channel->queue_declare($queueName, true); $channel->close(); + $this->queues[] = $queueName; + return true; } catch (AMQPProtocolChannelException $exception) { if ($exception->amqp_reply_code === 404) { @@ -462,12 +425,6 @@ public function isQueueExists(string $name = null): bool /** * Declare a queue in rabbitMQ, when not already declared. - * - * @param string $name - * @param bool $durable - * @param bool $autoDelete - * @param array $arguments - * @return void */ public function declareQueue( string $name, @@ -479,7 +436,7 @@ public function declareQueue( return; } - $this->channel->queue_declare( + $this->getChannel()->queue_declare( $name, false, $durable, @@ -493,10 +450,6 @@ public function declareQueue( /** * Delete a queue from rabbitMQ, only when present in RabbitMQ. * - * @param string $name - * @param bool $if_unused - * @param bool $if_empty - * @return void * * @throws AMQPProtocolChannelException */ @@ -506,16 +459,14 @@ public function deleteQueue(string $name, bool $if_unused = false, bool $if_empt return; } - $this->channel->queue_delete($name, $if_unused, $if_empty); + $idx = array_search($name, $this->queues); + unset($this->queues[$idx]); + + $this->getChannel()->queue_delete($name, $if_unused, $if_empty); } /** * Bind a queue to an exchange. - * - * @param string $queue - * @param string $exchange - * @param string $routingKey - * @return void */ public function bindQueue(string $queue, string $exchange, string $routingKey = ''): void { @@ -527,54 +478,38 @@ public function bindQueue(string $queue, string $exchange, string $routingKey = return; } - $this->channel->queue_bind($queue, $exchange, $routingKey); + $this->getChannel()->queue_bind($queue, $exchange, $routingKey); } /** * Purge the queue of messages. - * - * @param string|null $queue - * @return void */ - public function purge(string $queue = null): void + public function purge(?string $queue = null): void { // create a temporary channel, so the main channel will not be closed on exception - $channel = $this->connection->channel(); + $channel = $this->createChannel(); $channel->queue_purge($this->getQueue($queue)); $channel->close(); } /** * Acknowledge the message. - * - * @param RabbitMQJob $job - * @return void */ public function ack(RabbitMQJob $job): void { - $this->channel->basic_ack($job->getRabbitMQMessage()->getDeliveryTag()); + $this->getChannel()->basic_ack($job->getRabbitMQMessage()->getDeliveryTag()); } /** * Reject the message. - * - * @param RabbitMQJob $job - * @param bool $requeue - * @return void */ public function reject(RabbitMQJob $job, bool $requeue = false): void { - $this->channel->basic_reject($job->getRabbitMQMessage()->getDeliveryTag(), $requeue); + $this->getChannel()->basic_reject($job->getRabbitMQMessage()->getDeliveryTag(), $requeue); } /** * Create a AMQP message. - * - * @param $payload - * @param int $attempts - * @return array - * - * @throws JsonException */ protected function createMessage($payload, int $attempts = 0): array { @@ -583,16 +518,21 @@ protected function createMessage($payload, int $attempts = 0): array 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, ]; - $currentPayload = json_decode($payload, true, 512); + $currentPayload = json_decode($payload, true); if ($correlationId = $currentPayload['id'] ?? null) { $properties['correlation_id'] = $correlationId; } - if ($this->isPrioritizeDelayed()) { + if ($this->getRabbitMQConfig()->isPrioritizeDelayed()) { $properties['priority'] = $attempts; } if (isset($currentPayload['data']['command'])) { + // If the command data is encrypted, decrypt it first before attempting to unserialize + if (is_subclass_of($currentPayload['data']['commandName'], ShouldBeEncrypted::class)) { + $currentPayload['data']['command'] = Crypt::decrypt($currentPayload['data']['command']); + } + $commandData = unserialize($currentPayload['data']['command']); if (property_exists($commandData, 'priority')) { $properties['priority'] = $commandData->priority; @@ -619,7 +559,6 @@ protected function createMessage($payload, int $attempts = 0): array * @param string|object $job * @param string $queue * @param mixed $data - * @return array */ protected function createPayloadArray($job, $queue, $data = ''): array { @@ -630,8 +569,6 @@ protected function createPayloadArray($job, $queue, $data = ''): array /** * Get a random ID string. - * - * @return string */ protected function getRandomId(): string { @@ -641,28 +578,24 @@ protected function getRandomId(): string /** * Close the connection to RabbitMQ. * - * @return void * * @throws Exception */ public function close(): void { - if ($this->currentJob && ! $this->currentJob->isDeletedOrReleased()) { + if (isset($this->currentJob) && ! $this->currentJob->isDeletedOrReleased()) { $this->reject($this->currentJob, true); } try { - $this->connection->close(); - } catch (ErrorException $exception) { + $this->getConnection()->close(); + } catch (ErrorException) { // Ignore the exception } } /** * Get the Queue arguments. - * - * @param string $destination - * @return array */ protected function getQueueArguments(string $destination): array { @@ -672,16 +605,16 @@ protected function getQueueArguments(string $destination): array // Messages with a priority which is higher than the queue's maximum, are treated as if they were // published with the maximum priority. // Quorum queues does not support priority. - if ($this->isPrioritizeDelayed() && ! $this->isQuorum()) { - $arguments['x-max-priority'] = $this->getQueueMaxPriority(); + if ($this->getRabbitMQConfig()->isPrioritizeDelayed() && ! $this->getRabbitMQConfig()->isQuorum()) { + $arguments['x-max-priority'] = $this->getRabbitMQConfig()->getQueueMaxPriority(); } - if ($this->isRerouteFailed()) { - $arguments['x-dead-letter-exchange'] = $this->getFailedExchange() ?? ''; + if ($this->getRabbitMQConfig()->isRerouteFailed()) { + $arguments['x-dead-letter-exchange'] = $this->getFailedExchange(); $arguments['x-dead-letter-routing-key'] = $this->getFailedRoutingKey($destination); } - if ($this->isQuorum()) { + if ($this->getRabbitMQConfig()->isQuorum()) { $arguments['x-queue-type'] = 'quorum'; } @@ -690,15 +623,11 @@ protected function getQueueArguments(string $destination): array /** * Get the Delay queue arguments. - * - * @param string $destination - * @param int $ttl - * @return array */ protected function getDelayQueueArguments(string $destination, int $ttl): array { return [ - 'x-dead-letter-exchange' => $this->getExchange() ?? '', + 'x-dead-letter-exchange' => $this->getExchange(), 'x-dead-letter-routing-key' => $this->getRoutingKey($destination), 'x-message-ttl' => $ttl, 'x-expires' => $ttl * 2, @@ -706,114 +635,51 @@ protected function getDelayQueueArguments(string $destination, int $ttl): array } /** - * Returns &true;, if delayed messages should be prioritized. - * - * @return bool + * Get the exchange name, or empty string; as default value. */ - protected function isPrioritizeDelayed(): bool + protected function getExchange(?string $exchange = null): string { - return (bool) (Arr::get($this->options, 'prioritize_delayed') ?: false); - } - - /** - * Returns a integer with a default of '2' for when using prioritization on delayed messages. - * If priority queues are desired, we recommend using between 1 and 10. - * Using more priority layers, will consume more CPU resources and would affect runtimes. - * - * @see https://www.rabbitmq.com/priority.html - * - * @return int - */ - protected function getQueueMaxPriority(): int - { - return (int) (Arr::get($this->options, 'queue_max_priority') ?: 2); - } - - /** - * Get the exchange name, or &null; as default value. - * - * @param string|null $exchange - * @return string|null - */ - protected function getExchange(string $exchange = null): ?string - { - return $exchange ?: Arr::get($this->options, 'exchange') ?: null; + return $exchange ?? $this->getRabbitMQConfig()->getExchange(); } /** * Get the routing-key for when you use exchanges * The default routing-key is the given destination. - * - * @param string $destination - * @return string */ protected function getRoutingKey(string $destination): string { - return ltrim(sprintf(Arr::get($this->options, 'exchange_routing_key') ?: '%s', $destination), '.'); + return ltrim(sprintf($this->getRabbitMQConfig()->getExchangeRoutingKey(), $destination), '.'); } /** * Get the exchangeType, or AMQPExchangeType::DIRECT as default. - * - * @param string|null $type - * @return string */ protected function getExchangeType(?string $type = null): string { - return @constant(AMQPExchangeType::class.'::'.Str::upper($type ?: Arr::get( - $this->options, - 'exchange_type' - ) ?: 'direct')) ?: AMQPExchangeType::DIRECT; - } - - /** - * Returns &true;, if failed messages should be rerouted. - * - * @return bool - */ - protected function isRerouteFailed(): bool - { - return (bool) (Arr::get($this->options, 'reroute_failed') ?: false); - } + $constant = AMQPExchangeType::class.'::'.Str::upper($type ?: $this->getRabbitMQConfig()->getExchangeType()); - /** - * Returns &true;, if declared queue must be quorum queue. - * - * @return bool - */ - protected function isQuorum(): bool - { - return (bool) (Arr::get($this->options, 'quorum') ?: false); + return defined($constant) ? constant($constant) : AMQPExchangeType::DIRECT; } /** * Get the exchange for failed messages. - * - * @param string|null $exchange - * @return string|null */ - protected function getFailedExchange(string $exchange = null): ?string + protected function getFailedExchange(?string $exchange = null): string { - return $exchange ?: Arr::get($this->options, 'failed_exchange') ?: null; + return $exchange ?? $this->getRabbitMQConfig()->getFailedExchange(); } /** * Get the routing-key for failed messages * The default routing-key is the given destination substituted by '.failed'. - * - * @param string $destination - * @return string */ protected function getFailedRoutingKey(string $destination): string { - return ltrim(sprintf(Arr::get($this->options, 'failed_routing_key') ?: '%s.failed', $destination), '.'); + return ltrim(sprintf($this->getRabbitMQConfig()->getFailedRoutingKey(), $destination), '.'); } /** * Checks if the exchange was already declared. - * - * @param string $name - * @return bool */ protected function isExchangeDeclared(string $name): bool { @@ -822,9 +688,6 @@ protected function isExchangeDeclared(string $name): bool /** * Checks if the queue was already declared. - * - * @param string $name - * @return bool */ protected function isQueueDeclared(string $name): bool { @@ -834,24 +697,16 @@ protected function isQueueDeclared(string $name): bool /** * Declare the destination when necessary. * - * @param string $destination - * @param string|null $exchange - * @param string|null $exchangeType - * @return void - * * @throws AMQPProtocolChannelException */ - protected function declareDestination( - string $destination, - ?string $exchange = null, - string $exchangeType = AMQPExchangeType::DIRECT - ): void { - // When a exchange is provided and no exchange is present in RabbitMQ, create an exchange. + protected function declareDestination(string $destination, ?string $exchange = null, string $exchangeType = AMQPExchangeType::DIRECT): void + { + // When an exchange is provided and no exchange is present in RabbitMQ, create an exchange. if ($exchange && ! $this->isExchangeExists($exchange)) { $this->declareExchange($exchange, $exchangeType); } - // When a exchange is provided, just return. + // When an exchange is provided, just return. if ($exchange) { return; } @@ -867,10 +722,6 @@ protected function declareDestination( /** * Determine all publish properties. - * - * @param $queue - * @param array $options - * @return array */ protected function publishProperties($queue, array $options = []): array { @@ -883,4 +734,49 @@ protected function publishProperties($queue, array $options = []): array return [$destination, $exchange, $exchangeType, $attempts]; } + + protected function getRabbitMQConfig(): QueueConfig + { + return $this->rabbitMQConfig; + } + + /** + * @throws AMQPChannelClosedException + * @throws AMQPConnectionClosedException + * @throws AMQPConnectionBlockedException + */ + protected function publishBasic($msg, $exchange = '', $destination = '', $mandatory = false, $immediate = false, $ticket = null): void + { + $this->getChannel()->basic_publish($msg, $exchange, $destination, $mandatory, $immediate, $ticket); + } + + protected function batchPublish(): void + { + $this->getChannel()->publish_batch(); + } + + public function getChannel($forceNew = false): AMQPChannel + { + if (! $this->channel || $forceNew) { + $this->channel = $this->createChannel(); + } + + return $this->channel; + } + + protected function createChannel(): AMQPChannel + { + return $this->getConnection()->channel(); + } + + /** + * @throws Exception + */ + protected function reconnect(): void + { + // Reconnects using the original connection settings. + $this->getConnection()->reconnect(); + // Create a new main channel because all old channels are removed. + $this->getChannel(true); + } } diff --git a/tests/Feature/ConnectorTest.php b/tests/Feature/ConnectorTest.php index 499018e9..91660ad2 100644 --- a/tests/Feature/ConnectorTest.php +++ b/tests/Feature/ConnectorTest.php @@ -3,9 +3,12 @@ namespace VladimirYuldashev\LaravelQueueRabbitMQ\Tests\Feature; use Illuminate\Queue\QueueManager; +use PhpAmqpLib\Connection\AMQPConnectionConfig; use PhpAmqpLib\Connection\AMQPLazyConnection; use PhpAmqpLib\Connection\AMQPSSLConnection; +use PhpAmqpLib\Connection\AMQPStreamConnection; use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue; +use VladimirYuldashev\LaravelQueueRabbitMQ\Tests\Mocks\TestSSLConnection; class ConnectorTest extends \VladimirYuldashev\LaravelQueueRabbitMQ\Tests\TestCase { @@ -47,8 +50,52 @@ public function testLazyConnection(): void $this->assertInstanceOf(RabbitMQQueue::class, $connection); $this->assertInstanceOf(AMQPLazyConnection::class, $connection->getConnection()); + $this->assertFalse($connection->getConnection()->isConnected()); + $this->assertTrue($connection->getChannel()->is_open()); $this->assertTrue($connection->getConnection()->isConnected()); + } + + public function testLazyStreamConnection(): void + { + $this->app['config']->set('queue.connections.rabbitmq', [ + 'driver' => 'rabbitmq', + 'queue' => env('RABBITMQ_QUEUE', 'default'), + 'connection' => 'default', + + 'hosts' => [ + [ + 'host' => getenv('HOST'), + 'port' => getenv('PORT'), + 'user' => 'guest', + 'password' => 'guest', + 'vhost' => '/', + ], + ], + + 'options' => [ + 'ssl_options' => [ + 'cafile' => env('RABBITMQ_SSL_CAFILE', null), + 'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null), + 'local_key' => env('RABBITMQ_SSL_LOCALKEY', null), + 'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true), + 'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null), + ], + ], + + 'worker' => env('RABBITMQ_WORKER', 'default'), + ]); + + /** @var QueueManager $queue */ + $queue = $this->app['queue']; + + /** @var RabbitMQQueue $connection */ + $connection = $queue->connection('rabbitmq'); + + $this->assertInstanceOf(RabbitMQQueue::class, $connection); + $this->assertInstanceOf(AMQPStreamConnection::class, $connection->getConnection()); + $this->assertFalse($connection->getConnection()->isConnected()); $this->assertTrue($connection->getChannel()->is_open()); + $this->assertTrue($connection->getConnection()->isConnected()); } public function testSslConnection(): void @@ -93,4 +140,48 @@ public function testSslConnection(): void $this->assertTrue($connection->getConnection()->isConnected()); $this->assertTrue($connection->getChannel()->is_open()); } + + // Test to validate ssl connection params + public function testNoVerificationSslConnection(): void + { + $this->app['config']->set('queue.connections.rabbitmq', [ + 'driver' => 'rabbitmq', + 'queue' => env('RABBITMQ_QUEUE', 'default'), + 'connection' => TestSSLConnection::class, + 'secure' => true, + + 'hosts' => [ + [ + 'host' => getenv('HOST'), + 'port' => getenv('PORT_SSL'), + 'user' => 'guest', + 'password' => 'guest', + 'vhost' => '/', + ], + ], + + 'options' => [ + 'ssl_options' => [ + 'cafile' => getenv('RABBITMQ_SSL_CAFILE'), + 'local_cert' => null, + 'local_key' => null, + 'verify_peer' => false, + 'passphrase' => null, + ], + ], + + 'worker' => env('RABBITMQ_WORKER', 'default'), + ]); + + /** @var QueueManager $queue */ + $queue = $this->app['queue']; + + /** @var RabbitMQQueue $connection */ + $connection = $queue->connection('rabbitmq'); + $this->assertInstanceOf(RabbitMQQueue::class, $connection); + $this->assertInstanceOf(AMQPSSLConnection::class, $connection->getConnection()); + /** @var AMQPConnectionConfig */ + $config = $connection->getConnection()->getConfig(); + $this->assertFalse($config->getSslVerify()); + } } diff --git a/tests/Feature/QueueTest.php b/tests/Feature/QueueTest.php index 15b8acbb..ee324c9a 100644 --- a/tests/Feature/QueueTest.php +++ b/tests/Feature/QueueTest.php @@ -2,12 +2,42 @@ namespace VladimirYuldashev\LaravelQueueRabbitMQ\Tests\Feature; -use PhpAmqpLib\Connection\AMQPLazyConnection; +use PhpAmqpLib\Connection\AMQPStreamConnection; +use PhpAmqpLib\Exception\AMQPChannelClosedException; +use PhpAmqpLib\Exception\AMQPConnectionClosedException; +use PhpAmqpLib\Exception\AMQPProtocolChannelException; +use VladimirYuldashev\LaravelQueueRabbitMQ\Tests\Mocks\TestJob; class QueueTest extends TestCase { + protected function setUp(): void + { + parent::setUp(); + + $this->withoutExceptionHandling([ + AMQPChannelClosedException::class, AMQPConnectionClosedException::class, + AMQPProtocolChannelException::class, + ]); + } + public function testConnection(): void { - $this->assertInstanceOf(AMQPLazyConnection::class, $this->connection()->getChannel()->getConnection()); + $this->assertInstanceOf(AMQPStreamConnection::class, $this->connection()->getChannel()->getConnection()); + } + + public function testWithoutReconnect(): void + { + $queue = $this->connection('rabbitmq'); + + $queue->push(new TestJob); + sleep(1); + $this->assertSame(1, $queue->size()); + + // close connection + $queue->getConnection()->close(); + $this->assertFalse($queue->getConnection()->isConnected()); + + $this->expectException(AMQPChannelClosedException::class); + $queue->push(new TestJob); } } diff --git a/tests/Feature/SslQueueTest.php b/tests/Feature/SslQueueTest.php index 7233ee09..11c93f5a 100644 --- a/tests/Feature/SslQueueTest.php +++ b/tests/Feature/SslQueueTest.php @@ -4,13 +4,14 @@ use PhpAmqpLib\Connection\AMQPSSLConnection; -/** - * @group functional - */ class SslQueueTest extends TestCase { - public function setUp(): void + protected bool $interactsWithConnection = false; + + protected function setUp(): void { + parent::setUp(); + $this->markTestSkipped(); } diff --git a/tests/Feature/TestCase.php b/tests/Feature/TestCase.php index 299e7024..f8a9cb05 100644 --- a/tests/Feature/TestCase.php +++ b/tests/Feature/TestCase.php @@ -8,20 +8,28 @@ use PhpAmqpLib\Exception\AMQPProtocolChannelException; use RuntimeException; use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob; +use VladimirYuldashev\LaravelQueueRabbitMQ\Tests\Mocks\TestEncryptedJob; use VladimirYuldashev\LaravelQueueRabbitMQ\Tests\Mocks\TestJob; use VladimirYuldashev\LaravelQueueRabbitMQ\Tests\TestCase as BaseTestCase; abstract class TestCase extends BaseTestCase { + /** + * Set to false for skipped tests. + */ + protected bool $interactsWithConnection = true; + /** * @throws AMQPProtocolChannelException */ - public function setUp(): void + protected function setUp(): void { parent::setUp(); - if ($this->connection()->isQueueExists()) { - $this->connection()->purge(); + if ($this->interactsWithConnection) { + if ($this->connection()->isQueueExists()) { + $this->connection()->purge(); + } } } @@ -30,11 +38,13 @@ public function setUp(): void */ protected function tearDown(): void { - if ($this->connection()->isQueueExists()) { - $this->connection()->purge(); - } + if ($this->interactsWithConnection) { + if ($this->connection()->isQueueExists()) { + $this->connection()->purge(); + } - self::assertSame(0, Queue::size()); + self::assertSame(0, Queue::size()); + } parent::tearDown(); } @@ -69,7 +79,7 @@ public function testPushRaw(): void public function testPush(): void { - Queue::push(new TestJob()); + Queue::push(new TestJob); sleep(1); @@ -110,7 +120,7 @@ public function testPushAfterCommit(): void $this->assertSame(0, Queue::size()); $this->assertNull(Queue::pop()); - $transaction->commit('FakeDBConnection'); + $transaction->commit('FakeDBConnection', 1, 0); sleep(1); @@ -153,7 +163,7 @@ public function testLaterRaw(): void public function testLater(): void { - Queue::later(3, new TestJob()); + Queue::later(3, new TestJob); sleep(1); @@ -194,6 +204,103 @@ public function testBulk(): void $this->assertSame($count, Queue::size()); } + public function testPushEncrypted(): void + { + Queue::push(new TestEncryptedJob); + + sleep(1); + + $this->assertSame(1, Queue::size()); + $this->assertNotNull($job = Queue::pop()); + $this->assertSame(1, $job->attempts()); + $this->assertInstanceOf(RabbitMQJob::class, $job); + $this->assertSame(TestEncryptedJob::class, $job->resolveName()); + $this->assertNotNull($job->getJobId()); + + $payload = $job->payload(); + + $this->assertSame(TestEncryptedJob::class, $payload['displayName']); + $this->assertSame('Illuminate\Queue\CallQueuedHandler@call', $payload['job']); + $this->assertNull($payload['maxTries']); + $this->assertNull($payload['backoff']); + $this->assertNull($payload['timeout']); + $this->assertNull($payload['retryUntil']); + $this->assertSame($job->getJobId(), $payload['id']); + + $job->delete(); + $this->assertSame(0, Queue::size()); + } + + public function testPushEncryptedAfterCommit(): void + { + $transaction = new DatabaseTransactionsManager; + + $this->app->singleton('db.transactions', function ($app) use ($transaction) { + $transaction->begin('FakeDBConnection', 1); + + return $transaction; + }); + + TestEncryptedJob::dispatch()->afterCommit(); + + sleep(1); + $this->assertSame(0, Queue::size()); + $this->assertNull(Queue::pop()); + + $transaction->commit('FakeDBConnection', 1, 0); + + sleep(1); + + $this->assertSame(1, Queue::size()); + $this->assertNotNull($job = Queue::pop()); + + $job->delete(); + $this->assertSame(0, Queue::size()); + } + + public function testEncryptedLater(): void + { + Queue::later(3, new TestEncryptedJob); + + sleep(1); + + $this->assertSame(0, Queue::size()); + $this->assertNull(Queue::pop()); + + sleep(3); + + $this->assertSame(1, Queue::size()); + $this->assertNotNull($job = Queue::pop()); + + $this->assertInstanceOf(RabbitMQJob::class, $job); + + $body = json_decode($job->getRawBody(), true); + + $this->assertSame(TestEncryptedJob::class, $body['displayName']); + $this->assertSame('Illuminate\Queue\CallQueuedHandler@call', $body['job']); + $this->assertSame(TestEncryptedJob::class, $body['data']['commandName']); + $this->assertNotNull($job->getJobId()); + + $job->delete(); + $this->assertSame(0, Queue::size()); + } + + public function testEncryptedBulk(): void + { + $count = 100; + $jobs = []; + + for ($i = 0; $i < $count; $i++) { + $jobs[$i] = new TestEncryptedJob($i); + } + + Queue::bulk($jobs); + + sleep(1); + + $this->assertSame($count, Queue::size()); + } + public function testReleaseRaw(): void { Queue::pushRaw($payload = Str::random()); @@ -222,7 +329,7 @@ public function testReleaseRaw(): void public function testRelease(): void { - Queue::push(new TestJob()); + Queue::push(new TestJob); sleep(1); @@ -279,7 +386,7 @@ public function testReleaseWithDelayRaw(): void public function testReleaseInThePast(): void { - Queue::push(new TestJob()); + Queue::push(new TestJob); $job = Queue::pop(); $job->release(-3); @@ -294,7 +401,7 @@ public function testReleaseInThePast(): void public function testReleaseAndReleaseWithDelayAttempts(): void { - Queue::push(new TestJob()); + Queue::push(new TestJob); sleep(1); @@ -321,7 +428,7 @@ public function testReleaseAndReleaseWithDelayAttempts(): void public function testDelete(): void { - Queue::push(new TestJob()); + Queue::push(new TestJob); $job = Queue::pop(); @@ -335,7 +442,7 @@ public function testDelete(): void public function testFailed(): void { - Queue::push(new TestJob()); + Queue::push(new TestJob); $job = Queue::pop(); diff --git a/tests/Functional/RabbitMQQueueTest.php b/tests/Functional/RabbitMQQueueTest.php index 1c03fe99..f3ca4005 100644 --- a/tests/Functional/RabbitMQQueueTest.php +++ b/tests/Functional/RabbitMQQueueTest.php @@ -11,7 +11,6 @@ class RabbitMQQueueTest extends BaseTestCase { public function testConnection(): void { - /** @var $queue RabbitMQQueue */ $queue = $this->connection(); $this->assertInstanceOf(RabbitMQQueue::class, $queue); @@ -22,51 +21,57 @@ public function testConnection(): void $this->assertInstanceOf(RabbitMQQueue::class, $queue); } - public function testRerouteFailed(): void + public function testConfigRerouteFailed(): void { - /** @var $queue RabbitMQQueue */ $queue = $this->connection(); - $this->assertFalse($this->callMethod($queue, 'isRerouteFailed')); + $this->assertFalse($this->callProperty($queue, 'rabbitMQConfig')->isRerouteFailed()); $queue = $this->connection('rabbitmq-with-options'); - $this->assertTrue($this->callMethod($queue, 'isRerouteFailed')); + $this->assertTrue($this->callProperty($queue, 'rabbitMQConfig')->isRerouteFailed()); $queue = $this->connection('rabbitmq-with-options-empty'); - $this->assertFalse($this->callMethod($queue, 'isRerouteFailed')); + $this->assertFalse($this->callProperty($queue, 'rabbitMQConfig')->isRerouteFailed()); + + $queue = $this->connection('rabbitmq-with-options-null'); + $this->assertFalse($this->callProperty($queue, 'rabbitMQConfig')->isRerouteFailed()); } - public function testPrioritizeDelayed(): void + public function testConfigPrioritizeDelayed(): void { - /** @var $queue RabbitMQQueue */ $queue = $this->connection(); - $this->assertFalse($this->callMethod($queue, 'isPrioritizeDelayed')); + $this->assertFalse($this->callProperty($queue, 'rabbitMQConfig')->isPrioritizeDelayed()); $queue = $this->connection('rabbitmq-with-options'); - $this->assertTrue($this->callMethod($queue, 'isPrioritizeDelayed')); + $this->assertTrue($this->callProperty($queue, 'rabbitMQConfig')->isPrioritizeDelayed()); $queue = $this->connection('rabbitmq-with-options-empty'); - $this->assertFalse($this->callMethod($queue, 'isPrioritizeDelayed')); + $this->assertFalse($this->callProperty($queue, 'rabbitMQConfig')->isPrioritizeDelayed()); + + $queue = $this->connection('rabbitmq-with-options-null'); + $this->assertFalse($this->callProperty($queue, 'rabbitMQConfig')->isPrioritizeDelayed()); } public function testQueueMaxPriority(): void { - /** @var $queue RabbitMQQueue */ $queue = $this->connection(); - $this->assertIsInt($this->callMethod($queue, 'getQueueMaxPriority')); - $this->assertSame(2, $this->callMethod($queue, 'getQueueMaxPriority')); + $this->assertIsInt($this->callProperty($queue, 'rabbitMQConfig')->getQueueMaxPriority()); + $this->assertSame(2, $this->callProperty($queue, 'rabbitMQConfig')->getQueueMaxPriority()); $queue = $this->connection('rabbitmq-with-options'); - $this->assertIsInt($this->callMethod($queue, 'getQueueMaxPriority')); - $this->assertSame(20, $this->callMethod($queue, 'getQueueMaxPriority')); + $this->assertIsInt($this->callProperty($queue, 'rabbitMQConfig')->getQueueMaxPriority()); + $this->assertSame(20, $this->callProperty($queue, 'rabbitMQConfig')->getQueueMaxPriority()); $queue = $this->connection('rabbitmq-with-options-empty'); - $this->assertIsInt($this->callMethod($queue, 'getQueueMaxPriority')); - $this->assertSame(2, $this->callMethod($queue, 'getQueueMaxPriority')); + $this->assertIsInt($this->callProperty($queue, 'rabbitMQConfig')->getQueueMaxPriority()); + $this->assertSame(2, $this->callProperty($queue, 'rabbitMQConfig')->getQueueMaxPriority()); + + $queue = $this->connection('rabbitmq-with-options-null'); + $this->assertIsInt($this->callProperty($queue, 'rabbitMQConfig')->getQueueMaxPriority()); + $this->assertSame(2, $this->callProperty($queue, 'rabbitMQConfig')->getQueueMaxPriority()); } - public function testExchangeType(): void + public function testConfigExchangeType(): void { - /** @var $queue RabbitMQQueue */ $queue = $this->connection(); $this->assertSame(AMQPExchangeType::DIRECT, $this->callMethod($queue, 'getExchangeType')); $this->assertSame(AMQPExchangeType::DIRECT, $this->callMethod($queue, 'getExchangeType', [''])); @@ -74,48 +79,78 @@ public function testExchangeType(): void $queue = $this->connection('rabbitmq-with-options'); $this->assertSame(AMQPExchangeType::TOPIC, $this->callMethod($queue, 'getExchangeType')); + $this->assertSame(AMQPExchangeType::DIRECT, $this->callMethod($queue, 'getExchangeType', ['direct'])); $queue = $this->connection('rabbitmq-with-options-empty'); $this->assertSame(AMQPExchangeType::DIRECT, $this->callMethod($queue, 'getExchangeType')); + + $queue = $this->connection('rabbitmq-with-options-null'); + $this->assertSame(AMQPExchangeType::DIRECT, $this->callMethod($queue, 'getExchangeType')); + + // testing an unkown type with a default + $this->callProperty($queue, 'rabbitMQConfig')->setExchangeType('unknown'); + $this->assertSame(AMQPExchangeType::DIRECT, $this->callMethod($queue, 'getExchangeType')); } public function testExchange(): void { - /** @var $queue RabbitMQQueue */ $queue = $this->connection(); $this->assertSame('test', $this->callMethod($queue, 'getExchange', ['test'])); - $this->assertNull($this->callMethod($queue, 'getExchange', [''])); - $this->assertNull($this->callMethod($queue, 'getExchange')); + $this->assertSame('', $this->callMethod($queue, 'getExchange', [''])); + $this->assertSame('', $this->callMethod($queue, 'getExchange', [null])); + $this->assertSame('', $this->callMethod($queue, 'getExchange')); $queue = $this->connection('rabbitmq-with-options'); - $this->assertNotNull($this->callMethod($queue, 'getExchange')); $this->assertSame('application-x', $this->callMethod($queue, 'getExchange')); + $this->assertSame('application-x', $this->callMethod($queue, 'getExchange', [null])); + $this->assertSame('test', $this->callMethod($queue, 'getExchange', ['test'])); + $this->assertSame('', $this->callMethod($queue, 'getExchange', [''])); $queue = $this->connection('rabbitmq-with-options-empty'); - $this->assertNull($this->callMethod($queue, 'getExchange')); + $this->assertSame('', $this->callMethod($queue, 'getExchange')); + $this->assertSame('', $this->callMethod($queue, 'getExchange', [null])); + $this->assertSame('test', $this->callMethod($queue, 'getExchange', ['test'])); + $this->assertSame('', $this->callMethod($queue, 'getExchange', [''])); + + $queue = $this->connection('rabbitmq-with-options-null'); + $this->assertSame('', $this->callMethod($queue, 'getExchange')); + $this->assertSame('', $this->callMethod($queue, 'getExchange', [null])); + $this->assertSame('test', $this->callMethod($queue, 'getExchange', ['test'])); + $this->assertSame('', $this->callMethod($queue, 'getExchange', [''])); } public function testFailedExchange(): void { - /** @var $queue RabbitMQQueue */ $queue = $this->connection(); $this->assertSame('test', $this->callMethod($queue, 'getFailedExchange', ['test'])); - $this->assertNull($this->callMethod($queue, 'getExchange', [''])); - $this->assertNull($this->callMethod($queue, 'getFailedExchange')); + $this->assertSame('', $this->callMethod($queue, 'getFailedExchange', [''])); + $this->assertSame('', $this->callMethod($queue, 'getFailedExchange', [null])); + $this->assertSame('', $this->callMethod($queue, 'getFailedExchange')); $queue = $this->connection('rabbitmq-with-options'); - $this->assertNotNull($this->callMethod($queue, 'getFailedExchange')); $this->assertSame('failed-exchange', $this->callMethod($queue, 'getFailedExchange')); + $this->assertSame('failed-exchange', $this->callMethod($queue, 'getFailedExchange', [null])); + $this->assertSame('test', $this->callMethod($queue, 'getFailedExchange', ['test'])); + $this->assertSame('', $this->callMethod($queue, 'getFailedExchange', [''])); $queue = $this->connection('rabbitmq-with-options-empty'); - $this->assertNull($this->callMethod($queue, 'getFailedExchange')); + $this->assertSame('', $this->callMethod($queue, 'getFailedExchange')); + $this->assertSame('', $this->callMethod($queue, 'getFailedExchange', [null])); + $this->assertSame('test', $this->callMethod($queue, 'getFailedExchange', ['test'])); + $this->assertSame('', $this->callMethod($queue, 'getFailedExchange', [''])); + + $queue = $this->connection('rabbitmq-with-options-null'); + $this->assertSame('', $this->callMethod($queue, 'getFailedExchange')); + $this->assertSame('', $this->callMethod($queue, 'getFailedExchange', [null])); + $this->assertSame('test', $this->callMethod($queue, 'getFailedExchange', ['test'])); + $this->assertSame('', $this->callMethod($queue, 'getFailedExchange', [''])); } public function testRoutingKey(): void { - /** @var $queue RabbitMQQueue */ $queue = $this->connection(); $this->assertSame('test', $this->callMethod($queue, 'getRoutingKey', ['test'])); + $this->assertSame('test', $this->callMethod($queue, 'getRoutingKey', ['.test'])); $this->assertSame('', $this->callMethod($queue, 'getRoutingKey', [''])); $queue = $this->connection('rabbitmq-with-options'); @@ -123,13 +158,18 @@ public function testRoutingKey(): void $queue = $this->connection('rabbitmq-with-options-empty'); $this->assertSame('test', $this->callMethod($queue, 'getRoutingKey', ['test'])); + + $queue = $this->connection('rabbitmq-with-options-null'); + $this->assertSame('test', $this->callMethod($queue, 'getRoutingKey', ['test'])); + $this->callProperty($queue, 'rabbitMQConfig')->setExchangeRoutingKey('.an.alternate.routing-key'); + $this->assertSame('an.alternate.routing-key', $this->callMethod($queue, 'getRoutingKey', ['test'])); } public function testFailedRoutingKey(): void { - /** @var $queue RabbitMQQueue */ $queue = $this->connection(); $this->assertSame('test.failed', $this->callMethod($queue, 'getFailedRoutingKey', ['test'])); + $this->assertSame('test.failed', $this->callMethod($queue, 'getFailedRoutingKey', ['.test'])); $this->assertSame('failed', $this->callMethod($queue, 'getFailedRoutingKey', [''])); $queue = $this->connection('rabbitmq-with-options'); @@ -137,27 +177,33 @@ public function testFailedRoutingKey(): void $queue = $this->connection('rabbitmq-with-options-empty'); $this->assertSame('test.failed', $this->callMethod($queue, 'getFailedRoutingKey', ['test'])); + + $queue = $this->connection('rabbitmq-with-options-null'); + $this->assertSame('test.failed', $this->callMethod($queue, 'getFailedRoutingKey', ['test'])); + $this->callProperty($queue, 'rabbitMQConfig')->setFailedRoutingKey('.an.alternate.routing-key'); + $this->assertSame('an.alternate.routing-key', $this->callMethod($queue, 'getFailedRoutingKey', ['test'])); } - public function testQuorum(): void + public function testConfigQuorum(): void { - /** @var $queue RabbitMQQueue */ $queue = $this->connection(); - $this->assertFalse($this->callMethod($queue, 'isQuorum')); + $this->assertFalse($this->callProperty($queue, 'rabbitMQConfig')->isQuorum()); $queue = $this->connection('rabbitmq-with-options'); - $this->assertFalse($this->callMethod($queue, 'isQuorum')); - - $queue = $this->connection('rabbitmq-with-quorum-options'); - $this->assertTrue($this->callMethod($queue, 'isQuorum')); + $this->assertFalse($this->callProperty($queue, 'rabbitMQConfig')->isQuorum()); $queue = $this->connection('rabbitmq-with-options-empty'); - $this->assertFalse($this->callMethod($queue, 'isQuorum')); + $this->assertFalse($this->callProperty($queue, 'rabbitMQConfig')->isQuorum()); + + $queue = $this->connection('rabbitmq-with-options-null'); + $this->assertFalse($this->callProperty($queue, 'rabbitMQConfig')->isQuorum()); + + $queue = $this->connection('rabbitmq-with-quorum-options'); + $this->assertTrue($this->callProperty($queue, 'rabbitMQConfig')->isQuorum()); } public function testDeclareDeleteExchange(): void { - /** @var $queue RabbitMQQueue */ $queue = $this->connection(); $name = Str::random(); @@ -173,7 +219,6 @@ public function testDeclareDeleteExchange(): void public function testDeclareDeleteQueue(): void { - /** @var $queue RabbitMQQueue */ $queue = $this->connection(); $name = Str::random(); @@ -191,7 +236,6 @@ public function testQueueArguments(): void { $name = Str::random(); - /** @var $queue RabbitMQQueue */ $queue = $this->connection(); $actual = $this->callMethod($queue, 'getQueueArguments', [$name]); $expected = []; @@ -233,7 +277,6 @@ public function testDelayQueueArguments(): void $name = Str::random(); $ttl = 12000; - /** @var $queue RabbitMQQueue */ $queue = $this->connection(); $actual = $this->callMethod($queue, 'getDelayQueueArguments', [$name, $ttl]); $expected = [ diff --git a/tests/Functional/TestCase.php b/tests/Functional/TestCase.php index 40e3e116..8b843561 100644 --- a/tests/Functional/TestCase.php +++ b/tests/Functional/TestCase.php @@ -3,7 +3,7 @@ namespace VladimirYuldashev\LaravelQueueRabbitMQ\Tests\Functional; use Exception; -use PhpAmqpLib\Connection\AMQPLazyConnection; +use PhpAmqpLib\Channel\AMQPChannel; use ReflectionClass; use ReflectionException; use VladimirYuldashev\LaravelQueueRabbitMQ\Tests\TestCase as BaseTestCase; @@ -16,7 +16,7 @@ protected function getEnvironmentSetUp($app): void $app['config']->set('queue.connections.rabbitmq', [ 'driver' => 'rabbitmq', 'queue' => 'order', - 'connection' => AMQPLazyConnection::class, + 'connection' => 'default', 'hosts' => [ [ @@ -44,7 +44,7 @@ protected function getEnvironmentSetUp($app): void $app['config']->set('queue.connections.rabbitmq-with-options', [ 'driver' => 'rabbitmq', 'queue' => 'order', - 'connection' => AMQPLazyConnection::class, + 'connection' => 'default', 'hosts' => [ [ @@ -83,7 +83,7 @@ protected function getEnvironmentSetUp($app): void $app['config']->set('queue.connections.rabbitmq-with-options-empty', [ 'driver' => 'rabbitmq', 'queue' => 'order', - 'connection' => AMQPLazyConnection::class, + 'connection' => 'default', 'hosts' => [ [ @@ -119,11 +119,51 @@ protected function getEnvironmentSetUp($app): void 'worker' => 'default', + ]); + $app['config']->set('queue.connections.rabbitmq-with-options-null', [ + 'driver' => 'rabbitmq', + 'queue' => 'order', + 'connection' => 'default', + + 'hosts' => [ + [ + 'host' => null, + 'port' => null, + 'vhost' => null, + 'user' => null, + 'password' => null, + ], + ], + + 'options' => [ + 'ssl_options' => [ + 'cafile' => null, + 'local_cert' => null, + 'local_key' => null, + 'verify_peer' => null, + 'passphrase' => null, + ], + + 'queue' => [ + 'prioritize_delayed' => null, + 'queue_max_priority' => null, + 'exchange' => null, + 'exchange_type' => null, + 'exchange_routing_key' => null, + 'reroute_failed' => null, + 'failed_exchange' => null, + 'failed_routing_key' => null, + 'quorum' => null, + ], + ], + + 'worker' => 'default', + ]); $app['config']->set('queue.connections.rabbitmq-with-quorum-options', [ 'driver' => 'rabbitmq', 'queue' => 'order', - 'connection' => AMQPLazyConnection::class, + 'connection' => 'default', 'hosts' => [ [ @@ -161,14 +201,9 @@ protected function getEnvironmentSetUp($app): void } /** - * @param $object - * @param string $method - * @param array $parameters - * @return mixed - * * @throws Exception */ - protected function callMethod($object, string $method, array $parameters = []) + protected function callMethod($object, string $method, array $parameters = []): mixed { try { $className = get_class($object); @@ -182,4 +217,54 @@ protected function callMethod($object, string $method, array $parameters = []) return $method->invokeArgs($object, $parameters); } + + /** + * @throws Exception + */ + protected function callProperty($object, string $property): mixed + { + try { + $className = get_class($object); + $reflection = new ReflectionClass($className); + } catch (ReflectionException $e) { + throw new Exception($e->getMessage()); + } + + $property = $reflection->getProperty($property); + $property->setAccessible(true); + + return $property->getValue($object); + } + + public function testConnectChannel(): void + { + $queue = $this->connection(); + $this->assertFalse($queue->getConnection()->isConnected()); + + /** @var AMQPChannel $channel */ + $channel = $this->callMethod($queue, 'getChannel'); + $this->assertTrue($queue->getConnection()->isConnected()); + $this->assertSame($channel, $this->callProperty($queue, 'channel')); + $this->assertTrue($channel->is_open()); + } + + public function testReconnect(): void + { + $queue = $this->connection(); + $this->assertFalse($queue->getConnection()->isConnected()); + + // connect + $channel = $this->callMethod($queue, 'getChannel'); + $this->assertTrue($queue->getConnection()->isConnected()); + $this->assertSame($channel, $this->callProperty($queue, 'channel')); + + // close + $queue->getConnection()->close(); + $this->assertFalse($queue->getConnection()->isConnected()); + + // reconnect + $this->callMethod($queue, 'reconnect'); + $this->assertTrue($queue->getConnection()->isConnected()); + $this->assertTrue($queue->getChannel()->is_open()); + } } diff --git a/tests/Mocks/TestEncryptedJob.php b/tests/Mocks/TestEncryptedJob.php new file mode 100644 index 00000000..1c0e1762 --- /dev/null +++ b/tests/Mocks/TestEncryptedJob.php @@ -0,0 +1,25 @@ +i = $i; + } + + public function handle(): void + { + // + } +} diff --git a/tests/Mocks/TestSSLConnection.php b/tests/Mocks/TestSSLConnection.php new file mode 100644 index 00000000..c1586475 --- /dev/null +++ b/tests/Mocks/TestSSLConnection.php @@ -0,0 +1,14 @@ +config; + } +} diff --git a/tests/TestCase.php b/tests/TestCase.php index 7d50fa67..ec49a1ac 100644 --- a/tests/TestCase.php +++ b/tests/TestCase.php @@ -40,7 +40,7 @@ protected function getEnvironmentSetUp($app): void 'cafile' => null, 'local_cert' => null, 'local_key' => null, - 'verify_peer' => true, + 'verify_peer' => false, 'passphrase' => null, ], ], @@ -50,7 +50,7 @@ protected function getEnvironmentSetUp($app): void ]); } - protected function connection(string $name = null): RabbitMQQueue + protected function connection(?string $name = null): RabbitMQQueue { return Queue::connection($name); }