diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index afcf10e8..4aad6d20 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -13,13 +13,16 @@ jobs: strategy: fail-fast: true matrix: - php: ['7.3', '7.4', '8.0'] - stability: [prefer-lowest, prefer-stable] + php: ['8.1', '8.2', '8.3', '8.4'] + stability: ['prefer-lowest', 'prefer-stable'] + laravel: ['^10.0', '^11.0', '^12.0'] exclude: - - php: '8.0' - stability: 'prefer-lowest' + - php: '8.1' + laravel: '^11.0' + - php: '8.1' + laravel: '^12.0' - name: PHP ${{ matrix.php }} - ${{ matrix.stability }} + name: 'PHP ${{ matrix.php }} - Laravel: ${{matrix.laravel}} - ${{ matrix.stability }}' steps: - name: Checkout code @@ -32,22 +35,14 @@ jobs: extensions: dom, curl, libxml, mbstring, zip coverage: none - - name: Set up Docker - run: | - sudo rm /usr/local/bin/docker-compose - curl -L https://github.com/docker/compose/releases/download/1.24.1/docker-compose-`uname -s`-`uname -m` > docker-compose - chmod +x docker-compose - sudo mv docker-compose /usr/local/bin - - name: Start Docker container - run: docker-compose up -d rabbitmq + run: docker compose up -d rabbitmq - name: Install dependencies - run: composer update --${{ matrix.stability }} --prefer-dist --no-interaction --no-progress + run: composer update --with='laravel/framework:${{matrix.laravel}}' --${{ matrix.stability }} --prefer-dist --no-interaction --no-progress - - name: Run PHP CS Fixer - if: ${{ matrix.php != 8.0 }} - run: vendor/bin/php-cs-fixer fix --config=.php_cs.dist --allow-risky=yes --dry-run --diff --verbose + - name: Run Laravel Pint + run: ./vendor/bin/pint --test - name: Execute tests - run: vendor/bin/phpunit --verbose + run: sleep 10 && vendor/bin/phpunit diff --git a/.php_cs.dist b/.php_cs.dist deleted file mode 100644 index 930c161e..00000000 --- a/.php_cs.dist +++ /dev/null @@ -1,57 +0,0 @@ -in(['config', 'src', 'tests']); - -return PhpCsFixer\Config::create() - ->setUsingCache(false) - ->setFinder($finder) - ->setRules([ - 'psr0' => false, - '@PSR2' => true, - 'blank_line_after_namespace' => true, - 'braces' => true, - 'class_definition' => true, - 'concat_space' => ['spacing' => 'none'], - 'elseif' => true, - 'function_declaration' => true, - 'indentation_type' => true, - 'line_ending' => true, - 'lowercase_constants' => true, - 'lowercase_keywords' => true, - 'method_argument_space' => [ - 'ensure_fully_multiline' => true, - ], - 'no_break_comment' => true, - 'no_closing_tag' => true, - 'no_spaces_after_function_name' => true, - 'no_spaces_inside_parenthesis' => true, - 'no_trailing_whitespace' => true, - 'no_trailing_whitespace_in_comment' => true, - 'single_blank_line_at_eof' => true, - 'single_class_element_per_statement' => [ - 'elements' => ['property'], - ], - 'single_import_per_statement' => true, - 'single_line_after_imports' => true, - 'switch_case_semicolon_to_colon' => true, - 'switch_case_space' => true, - 'visibility_required' => true, - 'encoding' => true, - 'full_opening_tag' => true, - 'blank_line_before_return' => true, - 'no_trailing_comma_in_singleline_array' => true, - 'trailing_comma_in_multiline_array' => true, - 'array_indentation' => true, - 'binary_operator_spaces' => [ - 'operators' => [ - '=' => 'single_space', - ], - ], - 'fully_qualified_strict_types' => true, - 'void_return' => true, - 'cast_spaces' => [ - 'space' => 'single', - ], - 'not_operator_with_successor_space' => true, - ]); diff --git a/.styleci.yml b/.styleci.yml deleted file mode 100644 index fe42f6da..00000000 --- a/.styleci.yml +++ /dev/null @@ -1,2 +0,0 @@ -preset: laravel - diff --git a/CHANGELOG-11x.md b/CHANGELOG-11x.md index f5a83a38..cbbc691e 100644 --- a/CHANGELOG-11x.md +++ b/CHANGELOG-11x.md @@ -2,7 +2,18 @@ All notable changes to this project will be documented in this file. -## [Unreleased](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v11.2.0...master) +## [Unreleased](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v11.4.0...master) + +## [11.4.0 (2021-07-27)](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v11.3.0...v11.4.0) + +- Randomize consumer tag [#432](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/432) + +## [11.3.0 (2021-07-06)](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v11.2.0...v11.3.0) + +- Quorum queues support [#359](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/359) +- max-priority support [#422](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/422) +- Ability to specify exchange and exchange_type when using pushRaw() [#420](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/420) +- Remember exchanges once they have been verified [#407](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/407) ## [11.2.0 (2021-03-16)](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v11.1.2...v11.2.0) diff --git a/CHANGELOG-12x.md b/CHANGELOG-12x.md new file mode 100644 index 00000000..3887c2d1 --- /dev/null +++ b/CHANGELOG-12x.md @@ -0,0 +1,14 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +## [Unreleased](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v12.0.1...master) + +## [12.0.1 (2022-04-06)](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v12.0.0...v12.0.1) + +- Allow laravel to end workers with lost connection [#457](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/457) + +## [12.0.0 (2022-02-23)](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v11.4.0...v12.0.0) + +- Laravel 9 support +- Minimum PHP version is set to 8.0 diff --git a/CHANGELOG-13x.md b/CHANGELOG-13x.md new file mode 100644 index 00000000..64e448ce --- /dev/null +++ b/CHANGELOG-13x.md @@ -0,0 +1,45 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +## [unreleased](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v13.3.0...master) + +## [13.3.1](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v13.3.0...13.3.3) +- Fix a bug when no job / message is available on the queue initially [#543](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/543) + +## [13.3.0](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v13.2.0...13.3.0) + +- Refactor the creation of RabbitMQ Connection and + QueueAPI. [#528](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/528) +- Added configuration object as single dependency for RabbitMQQueue in + constructor. [#528](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/528) +- Fix method getExchangeType, not throwing an + exception. [#528](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/528) +- Separating the api logic from the actual publishing to + RabbitMQ. [#528](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/528) +- Added a reconnect method. [#528](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/528) +- Fix the connection and channel not being fully lazy, when QueueAPI was + created. [#528](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/528) +- Keep track of declared queue's within RabbitMQ. [#528](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/528) +- Implemented the 'rest' option to the consumer [#530](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/530) +- Added ability to reconnect to RabbitMQ, by creating your + own `RabbitMQQueue:class` [#531](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/531) + +## [13.2.0](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v13.1.0...13.2.0) + +- Compatibility with Laravel 10 [#525](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/525) + +## [13.1.0 (2023-01-25)](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v13.0.1...v13.1.0) + +- Fix delay parameter not being used [#502](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/502) +- Resolve Laravel 9 incompatabilities [#502](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/502) +- Fix Horizon invalid delay property [#502](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/502) + +## [13.0.1 (2022-09-16)](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v13.0.0...v13.0.1) + +- Add $dispatchAfterCommit when running via + Horizon [#484](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/484) + +## [13.0.0 (2022-09-15)](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v12.0.1...v13.0.0) + +- Dispatch a job after DB transaction commit [#468](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/468) diff --git a/CHANGELOG-14x.md b/CHANGELOG-14x.md new file mode 100644 index 00000000..c94b3dd2 --- /dev/null +++ b/CHANGELOG-14x.md @@ -0,0 +1,8 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +## [unreleased](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v13.3.0...master) + +## [14.0.0] +- First release compatible with Laravel 11 diff --git a/README.md b/README.md index 8328df6f..30a10df9 100644 --- a/README.md +++ b/README.md @@ -1,20 +1,17 @@ RabbitMQ Queue driver for Laravel ====================== [![Latest Stable Version](https://poser.pugx.org/vladimir-yuldashev/laravel-queue-rabbitmq/v/stable?format=flat-square)](https://packagist.org/packages/vladimir-yuldashev/laravel-queue-rabbitmq) -[![Build Status](https://github.com/vyuldashev/laravel-queue-rabbitmq/workflows/Tests/badge.svg)](https://github.com/vyuldashev/laravel-queue-rabbitmq/actions) +[![Build Status](https://github.com/vyuldashev/laravel-queue-rabbitmq/actions/workflows/tests.yml/badge.svg?branch=master)](https://github.com/vyuldashev/laravel-queue-rabbitmq/actions/workflows/tests.yml) [![Total Downloads](https://poser.pugx.org/vladimir-yuldashev/laravel-queue-rabbitmq/downloads?format=flat-square)](https://packagist.org/packages/vladimir-yuldashev/laravel-queue-rabbitmq) -[![StyleCI](https://styleci.io/repos/14976752/shield)](https://styleci.io/repos/14976752) [![License](https://poser.pugx.org/vladimir-yuldashev/laravel-queue-rabbitmq/license?format=flat-square)](https://packagist.org/packages/vladimir-yuldashev/laravel-queue-rabbitmq) ## Support Policy Only the latest version will get new features. Bug fixes will be provided using the following scheme: -| Package Version | Laravel Version | Bug Fixes Until | | -|-----------------|-----------------|---------------------|---------------------------------------------------------------------------------------------| -| 9 | 6 | October 5th, 2021 | [Documentation](https://github.com/vyuldashev/laravel-queue-rabbitmq/blob/v9.0/README.md) | -| 10 | 6, 7 | October 5th, 2021 | [Documentation](https://github.com/vyuldashev/laravel-queue-rabbitmq/blob/v10.0/README.md) | -| 11 | 8 | April 6th, 2021 | [Documentation](https://github.com/vyuldashev/laravel-queue-rabbitmq/blob/master/README.md) | +| Package Version | Laravel Version | Bug Fixes Until | | +|-----------------|-----------------|------------------|---------------------------------------------------------------------------------------------| +| 13 | 9 | August 8th, 2023 | [Documentation](https://github.com/vyuldashev/laravel-queue-rabbitmq/blob/master/README.md) | ## Installation @@ -26,8 +23,12 @@ composer require vladimir-yuldashev/laravel-queue-rabbitmq The package will automatically register itself. +### Configuration + Add connection to `config/queue.php`: +> This is the minimal config for the rabbitMQ connection/driver to work. + ```php 'connections' => [ // ... @@ -35,9 +36,6 @@ Add connection to `config/queue.php`: 'rabbitmq' => [ 'driver' => 'rabbitmq', - 'queue' => env('RABBITMQ_QUEUE', 'default'), - 'connection' => PhpAmqpLib\Connection\AMQPLazyConnection::class, - 'hosts' => [ [ 'host' => env('RABBITMQ_HOST', '127.0.0.1'), @@ -46,38 +44,23 @@ Add connection to `config/queue.php`: 'password' => env('RABBITMQ_PASSWORD', 'guest'), 'vhost' => env('RABBITMQ_VHOST', '/'), ], + // ... ], - - 'options' => [ - 'ssl_options' => [ - 'cafile' => env('RABBITMQ_SSL_CAFILE', null), - 'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null), - 'local_key' => env('RABBITMQ_SSL_LOCALKEY', null), - 'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true), - 'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null), - ], - 'queue' => [ - 'job' => VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob::class, - ], - ], - - /* - * Set to "horizon" if you wish to use Laravel Horizon. - */ - 'worker' => env('RABBITMQ_WORKER', 'default'), - + + // ... ], // ... ], ``` -### Optional Config +### Optional Queue Config -Optionally add queue options to the config of a connection. -Every queue created for this connection, get's the properties. +Optionally add queue options to the config of a connection. +Every queue created for this connection, gets the properties. When you want to prioritize messages when they were delayed, then this is possible by adding extra options. + - When max-priority is omitted, the max priority is set with 2 when used. ```php @@ -101,13 +84,14 @@ When you want to prioritize messages when they were delayed, then this is possib ], ``` -When you want to publish messages against an exchange with routing-key's, then this is possible by adding extra options. +When you want to publish messages against an exchange with routing-keys, then this is possible by adding extra options. + - When the exchange is omitted, RabbitMQ will use the `amq.direct` exchange for the routing-key - When routing-key is omitted the routing-key by default is the `queue` name. - When using `%s` in the routing-key the queue_name will be substituted. -> Note: when using exchange with routing-key, u probably create your queues with bindings yourself. - +> Note: when using an exchange with routing-key, you probably create your queues with bindings yourself. + ```php 'connections' => [ // ... @@ -130,14 +114,18 @@ When you want to publish messages against an exchange with routing-key's, then t ], ``` -In Laravel failed jobs are stored into the database. But maybe you want to instruct some other process to also do something with the message. -When you want to instruct RabbitMQ to reroute failed messages to a exchange or a specific queue, then this is possible by adding extra options. +In Laravel failed jobs are stored into the database. But maybe you want to instruct some other process to also do +something with the message. +When you want to instruct RabbitMQ to reroute failed messages to a exchange or a specific queue, then this is possible +by adding extra options. + - When the exchange is omitted, RabbitMQ will use the `amq.direct` exchange for the routing-key - When routing-key is omitted, the routing-key by default the `queue` name is substituted with `'.failed'`. - When using `%s` in the routing-key the queue_name will be substituted. -> Note: When using failed_job exchange with routing-key, u probably need to create your exchange/queue with bindings yourself. - +> Note: When using failed_job exchange with routing-key, you probably need to create your exchange/queue with bindings +> yourself. + ```php 'connections' => [ // ... @@ -160,13 +148,40 @@ When you want to instruct RabbitMQ to reroute failed messages to a exchange or a ], ``` +### Horizon support + +Starting with 8.0, this package supports [Laravel Horizon](https://laravel.com/docs/horizon) out of the box. Firstly, +install Horizon and then set `RABBITMQ_WORKER` to `horizon`. + +Horizon is depending on events dispatched by the worker. +These events inform Horizon what was done with the message/job. + +This Library supports Horizon, but in the config you have to inform Laravel to use the QueueApi compatible with horizon. + +```php +'connections' => [ + // ... + + 'rabbitmq' => [ + // ... + + /* Set to "horizon" if you wish to use Laravel Horizon. */ + 'worker' => env('RABBITMQ_WORKER', 'default'), + ], + + // ... +], +``` + ### Use your own RabbitMQJob class + Sometimes you have to work with messages published by another application. Those messages probably won't respect Laravel's job payload schema. -The problem with these messages is that, Laravel workers won't be able to determine the actual job or class to execute. +The problem with these messages is that, Laravel workers won't be able to determine the actual job or class to execute. You can extend the build-in `RabbitMQJob::class` and within the queue connection config, you can define your own class. -When you specify an `job` key in the config, with your own class name, every message retrieved from the broker will get wrapped by your own class. +When you specify a `job` key in the config, with your own class name, every message retrieved from the broker will get +wrapped by your own class. An example for the config: @@ -215,6 +230,8 @@ class RabbitMQJob extends BaseJob $method = 'handle'; ($this->instance = $this->resolve($class))->{$method}($this, $payload); + + $this->delete(); } } @@ -246,13 +263,310 @@ class RabbitMQJob extends BaseJob } ``` -## Laravel Usage +If you want to handle raw message, not in JSON format or without 'job' key in JSON, +you should add stub for `getName` method: + +```php +getRawBody(); + Log::info($anyMessage); + + $this->delete(); + } + + public function getName() + { + return ''; + } +} +``` + +### Use your own Connection + +You can extend the built-in `PhpAmqpLib\Connection\AMQPStreamConnection::class` +or `PhpAmqpLib\Connection\AMQPSLLConnection::class` and within the connection config, you can define your own class. +When you specify a `connection` key in the config, with your own class name, every connection will use your own class. + +An example for the config: + +```php +'connections' => [ + // ... + + 'rabbitmq' => [ + // ... + + 'connection' = > \App\Queue\Connection\MyRabbitMQConnection::class, + ], + + // ... +], +``` + +### Use your own Worker class + +If you want to use your own `RabbitMQQueue::class` this is possible by +extending `VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue`. +and inform laravel to use your class by setting `RABBITMQ_WORKER` to `\App\Queue\RabbitMQQueue::class`. + +> Note: Worker classes **must** extend `VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue` + +```php +'connections' => [ + // ... + + 'rabbitmq' => [ + // ... + + /* Set to a class if you wish to use your own. */ + 'worker' => \App\Queue\RabbitMQQueue::class, + ], + + // ... +], +``` + +```php + Note: this is not best practice, it is an example. + +```php +reconnect(); + parent::publishBasic($msg, $exchange, $destination, $mandatory, $immediate, $ticket); + } + } + + protected function publishBatch($jobs, $data = '', $queue = null): void + { + try { + parent::publishBatch($jobs, $data, $queue); + } catch (AMQPConnectionClosedException|AMQPChannelClosedException) { + $this->reconnect(); + parent::publishBatch($jobs, $data, $queue); + } + } + + protected function createChannel(): AMQPChannel + { + try { + return parent::createChannel(); + } catch (AMQPConnectionClosedException) { + $this->reconnect(); + return parent::createChannel(); + } + } +} +``` -Once you completed the configuration you can use Laravel Queue API. If you used other queue drivers you do not need to change anything else. If you do not know how to use Queue API, please refer to the official Laravel documentation: http://laravel.com/docs/queues +### Default Queue -## Laravel Horizon Usage +The connection does use a default queue with value 'default', when no queue is provided by laravel. +It is possible to change te default queue by adding an extra parameter in the connection config. + +```php +'connections' => [ + // ... + + 'rabbitmq' => [ + // ... + + 'queue' => env('RABBITMQ_QUEUE', 'default'), + ], + + // ... +], +``` + +### Heartbeat + +By default, your connection will be created with a heartbeat setting of `0`. +You can alter the heartbeat settings by changing the config. + +```php + +'connections' => [ + // ... + + 'rabbitmq' => [ + // ... + + 'options' => [ + // ... + + 'heartbeat' => 10, + ], + ], + + // ... +], +``` + +### SSL Secure + +If you need a secure connection to rabbitMQ server(s), you will need to add these extra config options. + +```php +'connections' => [ + // ... + + 'rabbitmq' => [ + // ... + + 'secure' = > true, + 'options' => [ + // ... + + 'ssl_options' => [ + 'cafile' => env('RABBITMQ_SSL_CAFILE', null), + 'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null), + 'local_key' => env('RABBITMQ_SSL_LOCALKEY', null), + 'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true), + 'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null), + ], + ], + ], + + // ... +], +``` + +### Events after Database commits + +To instruct Laravel workers to dispatch events after all database commits are completed. + +```php +'connections' => [ + // ... + + 'rabbitmq' => [ + // ... + + 'after_commit' => true, + ], + + // ... +], +``` + +### Lazy Connection + +By default, your connection will be created as a lazy connection. +If for some reason you don't want the connection lazy you can turn it off by setting the following config. + +```php +'connections' => [ + // ... + + 'rabbitmq' => [ + // ... + + 'lazy' = > false, + ], + + // ... +], +``` + +### Network Protocol + +By default, the network protocol used for connection is tcp. +If for some reason you want to use another network protocol, you can add the extra value in your config options. +Available protocols : `tcp`, `ssl`, `tls` + +```php +'connections' => [ + // ... + + 'rabbitmq' => [ + // ... + + 'network_protocol' => 'tcp', + ], + + // ... +], +``` + +### Network Timeouts + +For network timeouts configuration you can use option parameters. +All float values are in seconds and zero value can mean infinite timeout. +Example contains default values. + +```php +'connections' => [ + // ... + + 'rabbitmq' => [ + // ... + + 'options' => [ + // ... + + 'connection_timeout' => 3.0, + 'read_timeout' => 3.0, + 'write_timeout' => 3.0, + 'channel_rpc_timeout' => 0.0, + ], + ], + + // ... +], +``` + +### Octane support + +Starting with 13.3.0, this package supports [Laravel Octane](https://laravel.com/docs/octane) out of the box. +Firstly, install Octane and don't forget to warm 'rabbitmq' connection in the octane config. +> See: https://github.com/vyuldashev/laravel-queue-rabbitmq/issues/460#issuecomment-1469851667 + +## Laravel Usage -Starting with 8.0, this package supports [Laravel Horizon](http://horizon.laravel.com) out of the box. Firstly, install Horizon and then set `RABBITMQ_WORKER` to `horizon`. +Once you completed the configuration you can use the Laravel Queue API. If you used other queue drivers you do not +need to change anything else. If you do not know how to use the Queue API, please refer to the official Laravel +documentation: http://laravel.com/docs/queues ## Lumen Usage @@ -264,18 +578,18 @@ $app->register(VladimirYuldashev\LaravelQueueRabbitMQ\LaravelQueueRabbitMQServic ## Consuming Messages -There are two ways of consuming messages. +There are two ways of consuming messages. -1. `queue:work` command which is Laravel's built-in command. This command utilizes `basic_get`. +1. `queue:work` command which is Laravel's built-in command. This command utilizes `basic_get`. Use this if you want to consume multiple queues. -2. `rabbitmq:consume` command which is provided by this package. This command utilizes `basic_consume` and is more performant than `basic_get` by ~2x. +2. `rabbitmq:consume` command which is provided by this package. This command utilizes `basic_consume` and is more performant than `basic_get` by ~2x, but does not support multiple queues. ## Testing Setup RabbitMQ using `docker-compose`: ```bash -docker-compose up -d rabbitmq +docker compose up -d ``` To run the test suite you can use the following commands: @@ -292,7 +606,7 @@ composer test:unit ``` If you receive any errors from the style tests, you can automatically fix most, -if not all of the issues with the following command: +if not all the issues with the following command: ```bash composer fix:style @@ -300,4 +614,5 @@ composer fix:style ## Contribution -You can contribute to this package by discovering bugs and opening issues. Please, add to which version of package you create pull request or issue. (e.g. [5.2] Fatal error on delayed job) +You can contribute to this package by discovering bugs and opening issues. Please, add to which version of package you +create pull request or issue. (e.g. [5.2] Fatal error on delayed job) diff --git a/composer.json b/composer.json index d9ed64fc..caf8a0fc 100644 --- a/composer.json +++ b/composer.json @@ -1,58 +1,59 @@ { - "name": "vladimir-yuldashev/laravel-queue-rabbitmq", - "description": "RabbitMQ driver for Laravel Queue. Supports Laravel Horizon.", - "license": "MIT", - "authors": [ - { - "name": "Vladimir Yuldashev", - "email": "misterio92@gmail.com" - } - ], - "require": { - "php": "^7.3|^8.0", - "ext-json": "*", - "illuminate/queue": "^8.0", - "php-amqplib/php-amqplib": "^2.12|^3.0" - }, - "require-dev": { - "phpunit/phpunit": "^9.3", - "mockery/mockery": "^1.0", - "laravel/horizon": "^5.0", - "friendsofphp/php-cs-fixer": "^2.17", - "orchestra/testbench": "^6.0" - }, - "autoload": { - "psr-4": { - "VladimirYuldashev\\LaravelQueueRabbitMQ\\": "src/" - } - }, - "autoload-dev": { - "psr-4": { - "VladimirYuldashev\\LaravelQueueRabbitMQ\\Tests\\": "tests/" - } - }, - "extra": { - "branch-alias": { - "dev-master": "11.0-dev" - }, - "laravel": { - "providers": [ - "VladimirYuldashev\\LaravelQueueRabbitMQ\\LaravelQueueRabbitMQServiceProvider" - ] - } - }, - "suggest": { - "ext-pcntl": "Required to use all features of the queue consumer." - }, - "scripts": { - "test": [ - "@test:style", - "@test:unit" + "name": "vladimir-yuldashev/laravel-queue-rabbitmq", + "description": "RabbitMQ driver for Laravel Queue. Supports Laravel Horizon.", + "license": "MIT", + "authors": [ + { + "name": "Vladimir Yuldashev", + "email": "misterio92@gmail.com" + } ], - "test:style": "@php vendor/bin/php-cs-fixer fix --config=.php_cs.dist --allow-risky=yes --dry-run --diff --verbose", - "test:unit": "@php vendor/bin/phpunit", - "fix:style": "@php vendor/bin/php-cs-fixer fix --config=.php_cs.dist --allow-risky=yes --diff --verbose" - }, - "minimum-stability": "dev", - "prefer-stable": true + "require": { + "php": "^8.0", + "ext-json": "*", + "illuminate/queue": "^10.0|^11.0|^12.0", + "php-amqplib/php-amqplib": "^v3.6" + }, + "require-dev": { + "phpunit/phpunit": "^10.0|^11.0", + "mockery/mockery": "^1.0", + "laravel/horizon": "^5.0", + "orchestra/testbench": "^7.0|^8.0|^9.0|^10.0", + "laravel/pint": "^1.2", + "laravel/framework": "^9.0|^10.0|^11.0|^12.0" + }, + "autoload": { + "psr-4": { + "VladimirYuldashev\\LaravelQueueRabbitMQ\\": "src/" + } + }, + "autoload-dev": { + "psr-4": { + "VladimirYuldashev\\LaravelQueueRabbitMQ\\Tests\\": "tests/" + } + }, + "extra": { + "branch-alias": { + "dev-master": "13.0-dev" + }, + "laravel": { + "providers": [ + "VladimirYuldashev\\LaravelQueueRabbitMQ\\LaravelQueueRabbitMQServiceProvider" + ] + } + }, + "suggest": { + "ext-pcntl": "Required to use all features of the queue consumer." + }, + "scripts": { + "test": [ + "@test:style", + "@test:unit" + ], + "test:style": "@php vendor/bin/pint --test -v", + "test:unit": "@php vendor/bin/phpunit", + "fix:style": "@php vendor/bin/pint -v" + }, + "minimum-stability": "dev", + "prefer-stable": true } diff --git a/config/rabbitmq.php b/config/rabbitmq.php index d5390a5c..4c102ce8 100644 --- a/config/rabbitmq.php +++ b/config/rabbitmq.php @@ -9,7 +9,7 @@ 'driver' => 'rabbitmq', 'queue' => env('RABBITMQ_QUEUE', 'default'), - 'connection' => PhpAmqpLib\Connection\AMQPLazyConnection::class, + 'connection' => 'default', 'hosts' => [ [ @@ -22,16 +22,6 @@ ], 'options' => [ - 'ssl_options' => [ - 'cafile' => env('RABBITMQ_SSL_CAFILE', null), - 'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null), - 'local_key' => env('RABBITMQ_SSL_LOCALKEY', null), - 'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true), - 'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null), - ], - 'queue' => [ - 'job' => VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob::class, - ], ], /* diff --git a/docker-compose.yml b/docker-compose.yml index 6052f7f9..4ac32caa 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -3,7 +3,7 @@ version: '3.7' services: rabbitmq: - image: rabbitmq + image: rabbitmq:3.8 environment: RABBITMQ_DEFAULT_USER: guest RABBITMQ_DEFAULT_PASSWORD: guest @@ -17,10 +17,10 @@ services: - "./tests/files/rootCA.pem:/rootCA.pem:ro" - "./tests/files/rootCA.key:/rootCA.key:ro" ports: - - 15671:15671 - - 15672:15672 - - 5671:5671 - - 5672:5672 + - "15671:15671" + - "15672:15672" + - "5671:5671" + - "5672:5672" rabbitmq-management: image: rabbitmq:management diff --git a/phpunit.xml.dist b/phpunit.xml.dist index 7e166ab8..d213fd63 100644 --- a/phpunit.xml.dist +++ b/phpunit.xml.dist @@ -1,27 +1,16 @@ - - - - src/ - - - - - - - - ./tests/ - - - - - - - - - + + + + ./tests/ + + + + + + + + + + diff --git a/pint.json b/pint.json new file mode 100644 index 00000000..05f4b41e --- /dev/null +++ b/pint.json @@ -0,0 +1,8 @@ +{ + "preset": "laravel", + "rules": { + "php_unit_method_casing": { + "case": "camel_case" + } + } +} diff --git a/src/Console/ConsumeCommand.php b/src/Console/ConsumeCommand.php index 100be54a..4072132a 100644 --- a/src/Console/ConsumeCommand.php +++ b/src/Console/ConsumeCommand.php @@ -11,7 +11,7 @@ class ConsumeCommand extends WorkCommand protected $signature = 'rabbitmq:consume {connection? : The name of the queue connection to work} {--name=default : The name of the consumer} - {--queue= : The names of the queues to work} + {--queue= : The name of the queue to work. Please notice that there is no support for multiple queues} {--once : Only process the next job on the queue} {--stop-when-empty : Stop when the queue is empty} {--delay=0 : The number of seconds to delay failed jobs (Deprecated)} @@ -21,10 +21,12 @@ class ConsumeCommand extends WorkCommand {--force : Force the worker to run even in maintenance mode} {--memory=128 : The memory limit in megabytes} {--sleep=3 : Number of seconds to sleep when no job is available} + {--rest=0 : Number of seconds to rest between jobs} {--timeout=60 : The number of seconds a child process can run} {--tries=1 : Number of times to attempt a job before logging it failed} - {--rest=0 : Number of seconds to rest between jobs} - + {--json : Output the queue worker information as JSON} + + {--max-priority=} {--consumer-tag} {--prefetch-size=0} {--prefetch-count=1000} @@ -40,6 +42,7 @@ public function handle(): void $consumer->setContainer($this->laravel); $consumer->setName($this->option('name')); $consumer->setConsumerTag($this->consumerTag()); + $consumer->setMaxPriority((int) $this->option('max-priority')); $consumer->setPrefetchSize((int) $this->option('prefetch-size')); $consumer->setPrefetchCount((int) $this->option('prefetch-count')); @@ -52,6 +55,12 @@ protected function consumerTag(): string return $consumerTag; } - return Str::slug(config('app.name', 'laravel'), '_').'_'.getmypid(); + $consumerTag = implode('_', [ + Str::slug(config('app.name', 'laravel')), + Str::slug($this->option('name')), + md5(serialize($this->options()).Str::random(16).getmypid()), + ]); + + return Str::substr($consumerTag, 0, 255); } } diff --git a/src/Console/ExchangeDeclareCommand.php b/src/Console/ExchangeDeclareCommand.php index 955da0a9..43aa9c5c 100644 --- a/src/Console/ExchangeDeclareCommand.php +++ b/src/Console/ExchangeDeclareCommand.php @@ -18,7 +18,6 @@ class ExchangeDeclareCommand extends Command protected $description = 'Declare exchange'; /** - * @param RabbitMQConnector $connector * @throws Exception */ public function handle(RabbitMQConnector $connector): void diff --git a/src/Console/ExchangeDeleteCommand.php b/src/Console/ExchangeDeleteCommand.php index af4473a2..d5a8d8d4 100644 --- a/src/Console/ExchangeDeleteCommand.php +++ b/src/Console/ExchangeDeleteCommand.php @@ -16,7 +16,6 @@ class ExchangeDeleteCommand extends Command protected $description = 'Delete exchange'; /** - * @param RabbitMQConnector $connector * @throws Exception */ public function handle(RabbitMQConnector $connector): void diff --git a/src/Console/QueueBindCommand.php b/src/Console/QueueBindCommand.php index 686a3430..ccbbd706 100644 --- a/src/Console/QueueBindCommand.php +++ b/src/Console/QueueBindCommand.php @@ -17,7 +17,6 @@ class QueueBindCommand extends Command protected $description = 'Bind queue to exchange'; /** - * @param RabbitMQConnector $connector * @throws Exception */ public function handle(RabbitMQConnector $connector): void diff --git a/src/Console/QueueDeclareCommand.php b/src/Console/QueueDeclareCommand.php index 0c89f329..54d6ea32 100644 --- a/src/Console/QueueDeclareCommand.php +++ b/src/Console/QueueDeclareCommand.php @@ -11,13 +11,14 @@ class QueueDeclareCommand extends Command protected $signature = 'rabbitmq:queue-declare {name : The name of the queue to declare} {connection=rabbitmq : The name of the queue connection to use} + {--max-priority} {--durable=1} - {--auto-delete=0}'; + {--auto-delete=0} + {--quorum=0}'; protected $description = 'Declare queue'; /** - * @param RabbitMQConnector $connector * @throws Exception */ public function handle(RabbitMQConnector $connector): void @@ -32,10 +33,22 @@ public function handle(RabbitMQConnector $connector): void return; } + $arguments = []; + + $maxPriority = (int) $this->option('max-priority'); + if ($maxPriority) { + $arguments['x-max-priority'] = $maxPriority; + } + + if ($this->option('quorum')) { + $arguments['x-queue-type'] = 'quorum'; + } + $queue->declareQueue( $this->argument('name'), (bool) $this->option('durable'), - (bool) $this->option('auto-delete') + (bool) $this->option('auto-delete'), + $arguments ); $this->info('Queue declared successfully.'); diff --git a/src/Console/QueueDeleteCommand.php b/src/Console/QueueDeleteCommand.php index a1e331e2..b2586ecd 100644 --- a/src/Console/QueueDeleteCommand.php +++ b/src/Console/QueueDeleteCommand.php @@ -17,7 +17,6 @@ class QueueDeleteCommand extends Command protected $description = 'Delete queue'; /** - * @param RabbitMQConnector $connector * @throws Exception */ public function handle(RabbitMQConnector $connector): void diff --git a/src/Console/QueuePurgeCommand.php b/src/Console/QueuePurgeCommand.php index 8b03604a..49be839b 100644 --- a/src/Console/QueuePurgeCommand.php +++ b/src/Console/QueuePurgeCommand.php @@ -19,7 +19,6 @@ class QueuePurgeCommand extends Command protected $description = 'Purge all messages in queue'; /** - * @param RabbitMQConnector $connector * @throws Exception */ public function handle(RabbitMQConnector $connector): void diff --git a/src/Consumer.php b/src/Consumer.php index 3690fcb4..ed3d8099 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -23,6 +23,9 @@ class Consumer extends Worker /** @var int */ protected $prefetchSize; + /** @var int */ + protected $maxPriority; + /** @var int */ protected $prefetchCount; @@ -42,6 +45,11 @@ public function setConsumerTag(string $value): void $this->consumerTag = $value; } + public function setMaxPriority(int $value): void + { + $this->maxPriority = $value; + } + public function setPrefetchSize(int $value): void { $this->prefetchSize = $value; @@ -55,10 +63,10 @@ public function setPrefetchCount(int $value): void /** * Listen to the given queue in a loop. * - * @param string $connectionName - * @param string $queue - * @param WorkerOptions $options + * @param string $connectionName + * @param string $queue * @return int + * * @throws Throwable */ public function daemon($connectionName, $queue, WorkerOptions $options) @@ -79,10 +87,14 @@ public function daemon($connectionName, $queue, WorkerOptions $options) $this->channel->basic_qos( $this->prefetchSize, $this->prefetchCount, - null + false ); $jobClass = $connection->getJobClass(); + $arguments = []; + if ($this->maxPriority) { + $arguments['priority'] = ['I', $this->maxPriority]; + } $this->channel->basic_consume( $queue, @@ -113,7 +125,13 @@ function (AMQPMessage $message) use ($connection, $options, $connectionName, $qu if ($this->supportsAsyncSignals()) { $this->resetTimeoutHandler(); } - } + + if ($options->rest > 0) { + $this->sleep($options->rest); + } + }, + null, + $arguments ); while ($this->channel->is_consuming()) { @@ -122,6 +140,7 @@ function (AMQPMessage $message) use ($connection, $options, $connectionName, $qu // make sure we do not need to kill this worker process off completely. if (! $this->daemonShouldRun($options, $connectionName, $queue)) { $this->pauseWorker($options, $lastRestart); + continue; } @@ -131,8 +150,8 @@ function (AMQPMessage $message) use ($connection, $options, $connectionName, $qu } catch (AMQPRuntimeException $exception) { $this->exceptions->report($exception); - $this->kill(1); - } catch (Exception | Throwable $exception) { + $this->kill(self::EXIT_ERROR, $options); + } catch (Exception|Throwable $exception) { $this->exceptions->report($exception); $this->stopWorkerIfLostConnection($exception); @@ -155,7 +174,7 @@ function (AMQPMessage $message) use ($connection, $options, $connectionName, $qu ); if (! is_null($status)) { - return $this->stop($status); + return $this->stop($status, $options); } $this->currentJob = null; @@ -165,10 +184,8 @@ function (AMQPMessage $message) use ($connection, $options, $connectionName, $qu /** * Determine if the daemon should process on this iteration. * - * @param WorkerOptions $options - * @param string $connectionName - * @param string $queue - * @return bool + * @param string $connectionName + * @param string $queue */ protected function daemonShouldRun(WorkerOptions $options, $connectionName, $queue): bool { @@ -179,14 +196,15 @@ protected function daemonShouldRun(WorkerOptions $options, $connectionName, $que * Stop listening and bail out of the script. * * @param int $status + * @param WorkerOptions|null $options * @return int */ - public function stop($status = 0): int + public function stop($status = 0, $options = null) { // Tell the server you are going to stop consuming. // It will finish up the last message and not send you any more. $this->channel->basic_cancel($this->consumerTag, false, true); - return parent::stop($status); + return parent::stop($status, $options); } } diff --git a/src/Contracts/RabbitMQQueueContract.php b/src/Contracts/RabbitMQQueueContract.php new file mode 100644 index 00000000..cf04a66e --- /dev/null +++ b/src/Contracts/RabbitMQQueueContract.php @@ -0,0 +1,14 @@ +size($queue); } @@ -46,10 +43,12 @@ public function push($job, $data = '', $queue = null) /** * {@inheritdoc} + * + * @throws BindingResolutionException */ - public function pushRaw($payload, $queue = null, array $options = []) + public function pushRaw($payload, $queue = null, array $options = []): int|string|null { - $payload = (new JobPayload($payload))->prepare($this->lastPushed)->value; + $payload = (new JobPayload($payload))->prepare($this->lastPushed ?? null)->value; return tap(parent::pushRaw($payload, $queue, $options), function () use ($queue, $payload): void { $this->event($this->getQueue($queue), new JobPushed($payload)); @@ -58,12 +57,14 @@ public function pushRaw($payload, $queue = null, array $options = []) /** * {@inheritdoc} + * + * @throws BindingResolutionException */ - public function later($delay, $job, $data = '', $queue = null) + public function later($delay, $job, $data = '', $queue = null): mixed { $payload = (new JobPayload($this->createPayload($job, $data)))->prepare($job)->value; - return tap(parent::pushRaw($payload, $queue, ['delay' => $this->secondsUntil($delay)]), function () use ($payload, $queue): void { + return tap(parent::laterRaw($delay, $payload, $queue), function () use ($payload, $queue): void { $this->event($this->getQueue($queue), new JobPushed($payload)); }); } @@ -80,22 +81,12 @@ public function pop($queue = null) }); } - /** - * {@inheritdoc} - */ - public function release($delay, $job, $data, $queue, $attempts = 0) - { - $this->lastPushed = $job; - - return parent::release($delay, $job, $data, $queue, $attempts); - } - /** * Fire the job deleted event. * - * @param string $queue - * @param RabbitMQJob $job - * @return void + * @param string $queue + * @param RabbitMQJob $job + * * @throws BindingResolutionException */ public function deleteReserved($queue, $job): void @@ -106,9 +97,9 @@ public function deleteReserved($queue, $job): void /** * Fire the given event if a dispatcher is bound. * - * @param string $queue - * @param mixed $event - * @return void + * @param string $queue + * @param mixed $event + * * @throws BindingResolutionException */ protected function event($queue, $event): void diff --git a/src/LaravelQueueRabbitMQServiceProvider.php b/src/LaravelQueueRabbitMQServiceProvider.php index 3be0fd81..ee46d6cd 100644 --- a/src/LaravelQueueRabbitMQServiceProvider.php +++ b/src/LaravelQueueRabbitMQServiceProvider.php @@ -12,8 +12,6 @@ class LaravelQueueRabbitMQServiceProvider extends ServiceProvider { /** * Register the service provider. - * - * @return void */ public function register(): void { @@ -60,8 +58,6 @@ public function register(): void /** * Register the application's event listeners. - * - * @return void */ public function boot(): void { diff --git a/src/Queue/Connection/ConfigFactory.php b/src/Queue/Connection/ConfigFactory.php new file mode 100644 index 00000000..06c8080d --- /dev/null +++ b/src/Queue/Connection/ConfigFactory.php @@ -0,0 +1,126 @@ +setIsLazy(! in_array( + Arr::get($config, 'lazy') ?? true, + [false, 0, '0', 'false', 'no'], + true) + ); + + // Set the connection to unsecure by default + $connectionConfig->setIsSecure(in_array( + Arr::get($config, 'secure'), + [true, 1, '1', 'true', 'yes'], + true) + ); + + if ($connectionConfig->isSecure()) { + self::getSLLOptionsFromConfig($connectionConfig, $config); + } + + self::getHostFromConfig($connectionConfig, $config); + self::getHeartbeatFromConfig($connectionConfig, $config); + self::getNetworkProtocolFromConfig($connectionConfig, $config); + self::getTimeoutsFromConfig($connectionConfig, $config); + }); + } + + protected static function getHostFromConfig(AMQPConnectionConfig $connectionConfig, array $config): void + { + $hostConfig = Arr::first(Arr::shuffle(Arr::get($config, self::CONFIG_HOSTS, [])), null, []); + + if ($location = Arr::get($hostConfig, 'host')) { + $connectionConfig->setHost($location); + } + if ($port = Arr::get($hostConfig, 'port')) { + $connectionConfig->setPort($port); + } + if ($vhost = Arr::get($hostConfig, 'vhost')) { + $connectionConfig->setVhost($vhost); + } + if ($user = Arr::get($hostConfig, 'user')) { + $connectionConfig->setUser($user); + } + if ($password = Arr::get($hostConfig, 'password')) { + $connectionConfig->setPassword($password); + } + } + + protected static function getSLLOptionsFromConfig(AMQPConnectionConfig $connectionConfig, array $config): void + { + $sslConfig = Arr::get($config, self::CONFIG_OPTIONS.'.ssl_options', []); + + if ($caFile = Arr::get($sslConfig, 'cafile')) { + $connectionConfig->setSslCaCert($caFile); + } + if ($cert = Arr::get($sslConfig, 'local_cert')) { + $connectionConfig->setSslCert($cert); + } + if ($key = Arr::get($sslConfig, 'local_key')) { + $connectionConfig->setSslKey($key); + } + if (Arr::has($sslConfig, 'verify_peer')) { + $verifyPeer = Arr::get($sslConfig, 'verify_peer'); + $connectionConfig->setSslVerify($verifyPeer); + } + if ($passphrase = Arr::get($sslConfig, 'passphrase')) { + $connectionConfig->setSslPassPhrase($passphrase); + } + } + + protected static function getHeartbeatFromConfig(AMQPConnectionConfig $connectionConfig, array $config): void + { + $heartbeat = Arr::get($config, self::CONFIG_OPTIONS.'.heartbeat'); + + if (is_numeric($heartbeat) && intval($heartbeat) > 0) { + $connectionConfig->setHeartbeat((int) $heartbeat); + } + } + + protected static function getNetworkProtocolFromConfig(AMQPConnectionConfig $connectionConfig, array $config): void + { + if ($networkProtocol = Arr::get($config, 'network_protocol')) { + $connectionConfig->setNetworkProtocol($networkProtocol); + } + } + + protected static function getTimeoutsFromConfig(AMQPConnectionConfig $connectionConfig, array $config): void + { + $connectionTimeout = Arr::get($config, self::CONFIG_OPTIONS.'.connection_timeout'); + if (is_numeric($connectionTimeout) && floatval($connectionTimeout) >= 0) { + $connectionConfig->setConnectionTimeout((float) $connectionTimeout); + } + + $readTimeout = Arr::get($config, self::CONFIG_OPTIONS.'.read_timeout'); + if (is_numeric($readTimeout) && floatval($readTimeout) >= 0) { + $connectionConfig->setReadTimeout((float) $readTimeout); + } + + $writeTimeout = Arr::get($config, self::CONFIG_OPTIONS.'.write_timeout'); + if (is_numeric($writeTimeout) && floatval($writeTimeout) >= 0) { + $connectionConfig->setWriteTimeout((float) $writeTimeout); + } + + $chanelRpcTimeout = Arr::get($config, self::CONFIG_OPTIONS.'.channel_rpc_timeout'); + if (is_numeric($chanelRpcTimeout) && floatval($chanelRpcTimeout) >= 0) { + $connectionConfig->setChannelRPCTimeout((float) $chanelRpcTimeout); + } + } +} diff --git a/src/Queue/Connection/ConnectionFactory.php b/src/Queue/Connection/ConnectionFactory.php new file mode 100644 index 00000000..f531e761 --- /dev/null +++ b/src/Queue/Connection/ConnectionFactory.php @@ -0,0 +1,223 @@ +getIoType() === AMQPConnectionConfig::IO_TYPE_SOCKET) { + return self::createSocketConnection($connection, $config); + } + + return self::createStreamConnection($connection, $config); + } + + protected static function createSocketConnection($connection, AMQPConnectionConfig $config): AMQPSocketConnection + { + self::assertSocketConnection($connection, $config); + + return new $connection( + $config->getHost(), + $config->getPort(), + $config->getUser(), + $config->getPassword(), + $config->getVhost(), + $config->isInsist(), + $config->getLoginMethod(), + $config->getLoginResponse(), + $config->getLocale(), + $config->getReadTimeout(), + $config->isKeepalive(), + $config->getWriteTimeout(), + $config->getHeartbeat(), + $config->getChannelRPCTimeout(), + $config + ); + } + + protected static function createStreamConnection($connection, AMQPConnectionConfig $config): AMQPStreamConnection + { + self::assertStreamConnection($connection); + + if ($config->isSecure()) { + self::assertSSLConnection($connection); + + return new $connection( + $config->getHost(), + $config->getPort(), + $config->getUser(), + $config->getPassword(), + $config->getVhost(), + self::getSslOptions($config), + [ + 'insist' => $config->isInsist(), + 'login_method' => $config->getLoginMethod(), + 'login_response' => $config->getLoginResponse(), + 'locale' => $config->getLocale(), + 'connection_timeout' => $config->getConnectionTimeout(), + 'read_write_timeout' => self::getReadWriteTimeout($config), + 'keepalive' => $config->isKeepalive(), + 'heartbeat' => $config->getHeartbeat(), + ], + $config + ); + } + + return new $connection( + $config->getHost(), + $config->getPort(), + $config->getUser(), + $config->getPassword(), + $config->getVhost(), + $config->isInsist(), + $config->getLoginMethod(), + $config->getLoginResponse(), + $config->getLocale(), + $config->getConnectionTimeout(), + self::getReadWriteTimeout($config), + $config->getStreamContext(), + $config->isKeepalive(), + $config->getHeartbeat(), + $config->getChannelRPCTimeout(), + $config->getNetworkProtocol(), + $config + ); + } + + protected static function getReadWriteTimeout(AMQPConnectionConfig $config): float + { + return min($config->getReadTimeout(), $config->getWriteTimeout()); + } + + protected static function getSslOptions(AMQPConnectionConfig $config): array + { + return array_filter([ + 'cafile' => $config->getSslCaCert(), + 'capath' => $config->getSslCaPath(), + 'local_cert' => $config->getSslCert(), + 'local_pk' => $config->getSslKey(), + 'verify_peer' => $config->getSslVerify(), + 'verify_peer_name' => $config->getSslVerifyName(), + 'passphrase' => $config->getSslPassPhrase(), + 'ciphers' => $config->getSslCiphers(), + 'security_level' => $config->getSslSecurityLevel(), + ], static function ($value) { + return $value !== null; + }); + } + + protected static function assertConnectionFromConfig(string $connection): void + { + if ($connection !== self::CONNECTION_TYPE_DEFAULT && ! is_subclass_of($connection, self::CONNECTION_TYPE_EXTENDED)) { + throw new AMQPLogicException(sprintf('The config property \'%s\' must contain \'%s\' or must extend: %s', self::CONFIG_CONNECTION, self::CONNECTION_TYPE_DEFAULT, class_basename(self::CONNECTION_TYPE_EXTENDED))); + } + } + + protected static function assertSocketConnection($connection, AMQPConnectionConfig $config): void + { + self::assertExtendedOf($connection, self::CONNECTION_SUB_TYPE_SOCKET); + + if ($config->isSecure()) { + throw new AMQPLogicException('The socket connection implementation does not support secure connections.'); + } + } + + protected static function assertStreamConnection($connection): void + { + self::assertExtendedOf($connection, self::CONNECTION_SUB_TYPE_STREAM); + } + + protected static function assertSSLConnection($connection): void + { + self::assertExtendedOf($connection, self::CONNECTION_SUB_TYPE_SSL); + } + + protected static function assertExtendedOf($connection, string $parent): void + { + if (! is_subclass_of($connection, $parent) && $connection !== $parent) { + throw new AMQPLogicException(sprintf('The connection must extend: %s', class_basename($parent))); + } + } + + /** + * @return mixed + * + * @throws Exception + * + * @deprecated This is the fallback method, update your config asap. (example: connection => 'default') + */ + protected static function _createLazyConnection($connection, array $config): AbstractConnection + { + return $connection::create_connection( + Arr::shuffle(Arr::get($config, ConfigFactory::CONFIG_HOSTS, [])), + Arr::add(Arr::get($config, 'options', []), 'heartbeat', 0) + ); + } +} diff --git a/src/Queue/Connectors/RabbitMQConnector.php b/src/Queue/Connectors/RabbitMQConnector.php index e39f7b9a..a0f79e32 100644 --- a/src/Queue/Connectors/RabbitMQConnector.php +++ b/src/Queue/Connectors/RabbitMQConnector.php @@ -8,20 +8,15 @@ use Illuminate\Queue\Connectors\ConnectorInterface; use Illuminate\Queue\Events\JobFailed; use Illuminate\Queue\Events\WorkerStopping; -use Illuminate\Support\Arr; -use InvalidArgumentException; -use PhpAmqpLib\Connection\AbstractConnection; -use PhpAmqpLib\Connection\AMQPLazyConnection; use VladimirYuldashev\LaravelQueueRabbitMQ\Horizon\Listeners\RabbitMQFailedEvent; use VladimirYuldashev\LaravelQueueRabbitMQ\Horizon\RabbitMQQueue as HorizonRabbitMQQueue; +use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Connection\ConnectionFactory; +use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\QueueFactory; use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue; class RabbitMQConnector implements ConnectorInterface { - /** - * @var Dispatcher - */ - private $dispatcher; + protected Dispatcher $dispatcher; public function __construct(Dispatcher $dispatcher) { @@ -31,25 +26,15 @@ public function __construct(Dispatcher $dispatcher) /** * Establish a queue connection. * - * @param array $config - * * @return RabbitMQQueue + * * @throws Exception */ public function connect(array $config): Queue { - $connection = $this->createConnection(Arr::except($config, 'options.queue')); + $connection = ConnectionFactory::make($config); - $queue = $this->createQueue( - Arr::get($config, 'worker', 'default'), - $connection, - $config['queue'], - Arr::get($config, 'options.queue', []) - ); - - if (! $queue instanceof RabbitMQQueue) { - throw new InvalidArgumentException('Invalid worker.'); - } + $queue = QueueFactory::make($config)->setConnection($connection); if ($queue instanceof HorizonRabbitMQQueue) { $this->dispatcher->listen(JobFailed::class, RabbitMQFailedEvent::class); @@ -61,68 +46,4 @@ public function connect(array $config): Queue return $queue; } - - /** - * @param array $config - * @return AbstractConnection - * @throws Exception - */ - protected function createConnection(array $config): AbstractConnection - { - /** @var AbstractConnection $connection */ - $connection = Arr::get($config, 'connection', AMQPLazyConnection::class); - - // disable heartbeat when not configured, so long-running tasks will not fail - $config = Arr::add($config, 'options.heartbeat', 0); - - return $connection::create_connection( - Arr::shuffle(Arr::get($config, 'hosts', [])), - $this->filter(Arr::get($config, 'options', [])) - ); - } - - /** - * Create a queue for the worker. - * - * @param string $worker - * @param AbstractConnection $connection - * @param string $queue - * @param array $options - * @return HorizonRabbitMQQueue|RabbitMQQueue|Queue - */ - protected function createQueue(string $worker, AbstractConnection $connection, string $queue, array $options = []) - { - switch ($worker) { - case 'default': - return new RabbitMQQueue($connection, $queue, $options); - case 'horizon': - return new HorizonRabbitMQQueue($connection, $queue, $options); - default: - return new $worker($connection, $queue, $options); - } - } - - /** - * Recursively filter only null values. - * - * @param array $array - * @return array - */ - private function filter(array $array): array - { - foreach ($array as $index => &$value) { - if (is_array($value)) { - $value = $this->filter($value); - continue; - } - - // If the value is null then remove it. - if ($value === null) { - unset($array[$index]); - continue; - } - } - - return $array; - } } diff --git a/src/Queue/Jobs/RabbitMQJob.php b/src/Queue/Jobs/RabbitMQJob.php index 70e9e7f5..abcdfab4 100644 --- a/src/Queue/Jobs/RabbitMQJob.php +++ b/src/Queue/Jobs/RabbitMQJob.php @@ -118,7 +118,8 @@ public function delete(): void /** * Release the job back into the queue. * - * @param int $delay + * @param int $delay + * * @throws AMQPProtocolChannelException */ public function release($delay = 0): void @@ -136,8 +137,6 @@ public function release($delay = 0): void /** * Get the underlying RabbitMQ connection. - * - * @return RabbitMQQueue */ public function getRabbitMQ(): RabbitMQQueue { @@ -146,8 +145,6 @@ public function getRabbitMQ(): RabbitMQQueue /** * Get the underlying RabbitMQ message. - * - * @return AMQPMessage */ public function getRabbitMQMessage(): AMQPMessage { @@ -156,8 +153,6 @@ public function getRabbitMQMessage(): AMQPMessage /** * Get the headers from the rabbitMQ message. - * - * @return array|null */ protected function getRabbitMQMessageHeaders(): ?array { diff --git a/src/Queue/QueueConfig.php b/src/Queue/QueueConfig.php new file mode 100644 index 00000000..e7ec27c4 --- /dev/null +++ b/src/Queue/QueueConfig.php @@ -0,0 +1,278 @@ +queue; + } + + public function setQueue(string $queue): QueueConfig + { + $this->queue = $queue; + + return $this; + } + + /** + * Returns &true; as indication that jobs should be dispatched after all database transactions + * have been committed. + */ + public function isDispatchAfterCommit(): bool + { + return $this->dispatchAfterCommit; + } + + public function setDispatchAfterCommit($dispatchAfterCommit): QueueConfig + { + $this->dispatchAfterCommit = $this->toBoolean($dispatchAfterCommit); + + return $this; + } + + /** + * Get the Job::class to use when processing messages + */ + public function getAbstractJob(): string + { + return $this->abstractJob; + } + + public function setAbstractJob(string $abstract): QueueConfig + { + $this->abstractJob = $abstract; + + return $this; + } + + /** + * Returns &true;, if delayed messages should be prioritized. + * + * RabbitMQ queues work with the FIFO method. So when there are 10000 messages in the queue and + * the delayed message is put back to the queue (at the end) for further processing the delayed message won´t + * process before all 10000 messages are processed. The same is true for requeueing. + * + * This may not what you desire. + * When you want the message to get processed immediately after the delayed time expires or when requeueing, we can + * use prioritization. + * + * @see[https://www.rabbitmq.com/queues.html#basics] + */ + public function isPrioritizeDelayed(): bool + { + return $this->prioritizeDelayed; + } + + public function setPrioritizeDelayed($prioritizeDelayed): QueueConfig + { + $this->prioritizeDelayed = $this->toBoolean($prioritizeDelayed); + + return $this; + } + + /** + * Returns a integer with a default of '2' for when using prioritization on delayed messages. + * If priority queues are desired, we recommend using between 1 and 10. + * Using more priority layers, will consume more CPU resources and would affect runtimes. + * + * @see https://www.rabbitmq.com/priority.html + */ + public function getQueueMaxPriority(): int + { + return $this->queueMaxPriority; + } + + public function setQueueMaxPriority($queueMaxPriority): QueueConfig + { + if (is_numeric($queueMaxPriority) && intval($queueMaxPriority) > 1) { + $this->queueMaxPriority = (int) $queueMaxPriority; + } + + return $this; + } + + /** + * Get the exchange name, or empty string; as default value. + * + * The default exchange is an unnamed pre-declared direct exchange. Usually, an empty string + * is frequently used to indicate it. If you choose default exchange, your message will be delivered + * to a queue with the same name as the routing key. + * With a routing key that is the same as the queue name, every queue is immediately tied to the default exchange. + */ + public function getExchange(): string + { + return $this->exchange; + } + + public function setExchange(string $exchange): QueueConfig + { + $this->exchange = $exchange; + + return $this; + } + + /** + * Get the exchange type + * + * There are four basic RabbitMQ exchange types in RabbitMQ, each of which uses different parameters + * and bindings to route messages in various ways, These are: 'direct', 'topic', 'fanout', 'headers' + * + * The default type is set as 'direct' + */ + public function getExchangeType(): string + { + return $this->exchangeType; + } + + public function setExchangeType(string $exchangeType): QueueConfig + { + $this->exchangeType = $exchangeType; + + return $this; + } + + /** + * Get the routing key when using an exchange other than the direct exchange. + * The routing key is a message attribute taken into account by the exchange when deciding how to route a message. + * + * The default routing-key is the given destination: '%s'. + */ + public function getExchangeRoutingKey(): string + { + return $this->exchangeRoutingKey; + } + + public function setExchangeRoutingKey(string $exchangeRoutingKey): QueueConfig + { + $this->exchangeRoutingKey = $exchangeRoutingKey; + + return $this; + } + + /** + * Returns &true;, if failed messages should be rerouted. + */ + public function isRerouteFailed(): bool + { + return $this->rerouteFailed; + } + + public function setRerouteFailed($rerouteFailed): QueueConfig + { + $this->rerouteFailed = $this->toBoolean($rerouteFailed); + + return $this; + } + + /** + * Get the exchange name with messages are published against. + * The default exchange is empty, so messages will be published directly to a queue. + */ + public function getFailedExchange(): string + { + return $this->failedExchange; + } + + public function setFailedExchange(string $failedExchange): QueueConfig + { + $this->failedExchange = $failedExchange; + + return $this; + } + + /** + * Get the substitution string for failed messages + * The default routing-key is the given destination substituted by '%s.failed'. + */ + public function getFailedRoutingKey(): string + { + return $this->failedRoutingKey; + } + + public function setFailedRoutingKey(string $failedRoutingKey): QueueConfig + { + $this->failedRoutingKey = $failedRoutingKey; + + return $this; + } + + /** + * Returns &true;, if queue is marked or set as quorum queue. + */ + public function isQuorum(): bool + { + return $this->quorum; + } + + public function setQuorum($quorum): QueueConfig + { + $this->quorum = $this->toBoolean($quorum); + + return $this; + } + + /** + * Holds all unknown queue options provided in the connection config + */ + public function getOptions(): array + { + return $this->options; + } + + public function setOptions(array $options): QueueConfig + { + $this->options = $options; + + return $this; + } + + /** + * Filters $value to boolean value + * + * Returns: &true; + * For values: 1, '1', true, 'true', 'yes' + * + * Returns: &false; + * For values: 0, '0', false, 'false', '', null, [] , 'ok', 'no', 'no not a bool', 'yes a bool' + */ + protected function toBoolean($value): bool + { + return filter_var($value, FILTER_VALIDATE_BOOLEAN); + } +} diff --git a/src/Queue/QueueConfigFactory.php b/src/Queue/QueueConfigFactory.php new file mode 100644 index 00000000..6f2befc5 --- /dev/null +++ b/src/Queue/QueueConfigFactory.php @@ -0,0 +1,74 @@ +setQueue($queue); + } + if (! empty($afterCommit = Arr::get($config, 'after_commit'))) { + $queueConfig->setDispatchAfterCommit($afterCommit); + } + + self::getOptionsFromConfig($queueConfig, $config); + }); + } + + protected static function getOptionsFromConfig(QueueConfig $queueConfig, array $config): void + { + $queueOptions = Arr::get($config, self::CONFIG_OPTIONS.'.queue', []) ?: []; + + if ($job = Arr::pull($queueOptions, 'job')) { + $queueConfig->setAbstractJob($job); + } + + // Feature: Prioritize delayed messages. + if ($prioritizeDelayed = Arr::pull($queueOptions, 'prioritize_delayed')) { + $queueConfig->setPrioritizeDelayed($prioritizeDelayed); + } + if ($maxPriority = Arr::pull($queueOptions, 'queue_max_priority')) { + $queueConfig->setQueueMaxPriority($maxPriority); + } + + // Feature: Working with Exchange and routing-keys + if ($exchange = Arr::pull($queueOptions, 'exchange')) { + $queueConfig->setExchange($exchange); + } + if ($exchangeType = Arr::pull($queueOptions, 'exchange_type')) { + $queueConfig->setExchangeType($exchangeType); + } + if ($exchangeRoutingKey = Arr::pull($queueOptions, 'exchange_routing_key')) { + $queueConfig->setExchangeRoutingKey($exchangeRoutingKey); + } + + // Feature: Reroute failed messages + if ($rerouteFailed = Arr::pull($queueOptions, 'reroute_failed')) { + $queueConfig->setRerouteFailed($rerouteFailed); + } + if ($failedExchange = Arr::pull($queueOptions, 'failed_exchange')) { + $queueConfig->setFailedExchange($failedExchange); + } + if ($failedRoutingKey = Arr::pull($queueOptions, 'failed_routing_key')) { + $queueConfig->setFailedRoutingKey($failedRoutingKey); + } + + // Feature: Mark queue as quorum + if ($quorum = Arr::pull($queueOptions, 'quorum')) { + $queueConfig->setQuorum($quorum); + } + + // All extra options not defined + $queueConfig->setOptions($queueOptions); + } +} diff --git a/src/Queue/QueueFactory.php b/src/Queue/QueueFactory.php new file mode 100644 index 00000000..75bde1ed --- /dev/null +++ b/src/Queue/QueueFactory.php @@ -0,0 +1,25 @@ +connection = $connection; - $this->channel = $connection->channel(); - $this->default = $default; - $this->options = $options; + */ + public function __construct(QueueConfig $config) + { + $this->config = $config; + $this->dispatchAfterCommit = $config->isDispatchAfterCommit(); } /** @@ -108,7 +87,7 @@ public function size($queue = null): int } // create a temporary channel, so the main channel will not be closed on exception - $channel = $this->connection->channel(); + $channel = $this->createChannel(); [, $size] = $channel->queue_declare($queue, true); $channel->close(); @@ -122,7 +101,15 @@ public function size($queue = null): int */ public function push($job, $data = '', $queue = null) { - return $this->pushRaw($this->createPayload($job, $queue, $data), $queue, []); + return $this->enqueueUsing( + $job, + $this->createPayload($job, $this->getQueue($queue), $data), + $queue, + null, + function ($payload, $queue) { + return $this->pushRaw($payload, $queue); + } + ); } /** @@ -130,7 +117,7 @@ public function push($job, $data = '', $queue = null) * * @throws AMQPProtocolChannelException */ - public function pushRaw($payload, $queue = null, array $options = []) + public function pushRaw($payload, $queue = null, array $options = []): int|string|null { [$destination, $exchange, $exchangeType, $attempts] = $this->publishProperties($queue, $options); @@ -138,7 +125,7 @@ public function pushRaw($payload, $queue = null, array $options = []) [$message, $correlationId] = $this->createMessage($payload, $attempts); - $this->channel->basic_publish($message, $exchange, $destination, true, false); + $this->publishBasic($message, $exchange, $destination, true); return $correlationId; } @@ -148,40 +135,46 @@ public function pushRaw($payload, $queue = null, array $options = []) * * @throws AMQPProtocolChannelException */ - public function later($delay, $job, $data = '', $queue = null) + public function later($delay, $job, $data = '', $queue = null): mixed { - return $this->laterRaw( + return $this->enqueueUsing( + $job, + $this->createPayload($job, $this->getQueue($queue), $data), + $queue, $delay, - $this->createPayload($job, $queue, $data), - $queue + function ($payload, $queue, $delay) { + return $this->laterRaw($delay, $payload, $queue); + } ); } /** - * @param $delay - * @param $payload - * @param null $queue - * @param int $attempts - * @return mixed * @throws AMQPProtocolChannelException */ - public function laterRaw($delay, $payload, $queue = null, $attempts = 0) + public function laterRaw($delay, string $payload, $queue = null, int $attempts = 0): int|string|null { $ttl = $this->secondsUntil($delay) * 1000; + // default options + $options = ['delay' => $delay, 'attempts' => $attempts]; + // When no ttl just publish a new message to the exchange or queue if ($ttl <= 0) { - return $this->pushRaw($payload, $queue, ['delay' => $delay, 'attempts' => $attempts]); + return $this->pushRaw($payload, $queue, $options); } + // Create a main queue to handle delayed messages + [$mainDestination, $exchange, $exchangeType, $attempts] = $this->publishProperties($queue, $options); + $this->declareDestination($mainDestination, $exchange, $exchangeType); + $destination = $this->getQueue($queue).'.delay.'.$ttl; $this->declareQueue($destination, true, false, $this->getDelayQueueArguments($this->getQueue($queue), $ttl)); [$message, $correlationId] = $this->createMessage($payload, $attempts); - // Publish directly on the delayQueue, no need to publish trough an exchange. - $this->channel->basic_publish($message, null, $destination, true, false); + // Publish directly on the delayQueue, no need to publish through an exchange. + $this->publishBasic($message, null, $destination, true); return $correlationId; } @@ -193,21 +186,25 @@ public function laterRaw($delay, $payload, $queue = null, $attempts = 0) */ public function bulk($jobs, $data = '', $queue = null): void { - foreach ((array) $jobs as $job) { + $this->publishBatch($jobs, $data, $queue); + } + + /** + * @throws AMQPProtocolChannelException + */ + protected function publishBatch($jobs, $data = '', $queue = null): void + { + foreach ($jobs as $job) { $this->bulkRaw($this->createPayload($job, $queue, $data), $queue, ['job' => $job]); } - $this->channel->publish_batch(); + $this->batchPublish(); } /** - * @param string $payload - * @param null $queue - * @param array $options - * @return mixed * @throws AMQPProtocolChannelException */ - public function bulkRaw(string $payload, $queue = null, array $options = []) + public function bulkRaw(string $payload, ?string $queue = null, array $options = []): int|string|null { [$destination, $exchange, $exchangeType, $attempts] = $this->publishProperties($queue, $options); @@ -215,7 +212,7 @@ public function bulkRaw(string $payload, $queue = null, array $options = []) [$message, $correlationId] = $this->createMessage($payload, $attempts); - $this->channel->batch_basic_publish($message, $exchange, $destination); + $this->getChannel()->batch_basic_publish($message, $exchange, $destination); return $correlationId; } @@ -233,7 +230,7 @@ public function pop($queue = null) $job = $this->getJobClass(); /** @var AMQPMessage|null $message */ - if ($message = $this->channel->basic_get($queue)) { + if ($message = $this->getChannel()->basic_get($queue)) { return $this->currentJob = new $job( $this->container, $this, @@ -243,47 +240,60 @@ public function pop($queue = null) ); } } catch (AMQPProtocolChannelException $exception) { - // If there is not exchange or queue AMQP will throw exception with code 404 + // If there is no exchange or queue AMQP will throw exception with code 404 // We need to catch it and return null if ($exception->amqp_reply_code === 404) { // Because of the channel exception the channel was closed and removed. // We have to open a new channel. Because else the worker(s) are stuck in a loop, without processing. - $this->channel = $this->connection->channel(); + $this->getChannel(true); return null; } throw $exception; + } catch (AMQPChannelClosedException|AMQPConnectionClosedException $exception) { + // Queue::pop used by worker to receive new job + // Thrown exception is checked by Illuminate\Database\DetectsLostConnections::causedByLostConnection + // Is has to contain one of the several phrases in exception message in order to restart worker + // Otherwise worker continues to work with broken connection + throw new AMQPRuntimeException( + 'Lost connection: '.$exception->getMessage(), + $exception->getCode(), + $exception + ); } return null; } /** - * @return AbstractConnection + * @throws RuntimeException */ public function getConnection(): AbstractConnection { + if (! $this->connection) { + throw new RuntimeException('Queue has no AMQPConnection set.'); + } + return $this->connection; } - /** - * @return AMQPChannel - */ - public function getChannel(): AMQPChannel + public function setConnection(AbstractConnection $connection): RabbitMQQueue { - return $this->channel; + $this->connection = $connection; + + return $this; } /** * Job class to use. * - * @return string + * * @throws Throwable */ public function getJobClass(): string { - $job = Arr::get($this->options, 'job', RabbitMQJob::class); + $job = $this->getConfig()->getAbstractJob(); throw_if( ! is_a($job, RabbitMQJob::class, true), @@ -296,31 +306,33 @@ public function getJobClass(): string /** * Gets a queue/destination, by default the queue option set on the connection. - * - * @param null $queue - * @return string */ public function getQueue($queue = null): string { - return $queue ?: $this->default; + return $queue ?: $this->getConfig()->getQueue(); } /** * Checks if the given exchange already present/defined in RabbitMQ. - * Returns false when when the exchange is missing. + * Returns false when the exchange is missing. + * * - * @param string $exchange - * @return bool * @throws AMQPProtocolChannelException */ public function isExchangeExists(string $exchange): bool { + if ($this->isExchangeDeclared($exchange)) { + return true; + } + try { // create a temporary channel, so the main channel will not be closed on exception - $channel = $this->connection->channel(); + $channel = $this->createChannel(); $channel->exchange_declare($exchange, '', true); $channel->close(); + $this->exchanges[] = $exchange; + return true; } catch (AMQPProtocolChannelException $exception) { if ($exception->amqp_reply_code === 404) { @@ -332,14 +344,7 @@ public function isExchangeExists(string $exchange): bool } /** - * Declare a exchange in rabbitMQ, when not already declared. - * - * @param string $name - * @param string $type - * @param bool $durable - * @param bool $autoDelete - * @param array $arguments - * @return void + * Declare an exchange in rabbitMQ, when not already declared. */ public function declareExchange( string $name, @@ -352,7 +357,7 @@ public function declareExchange( return; } - $this->channel->exchange_declare( + $this->getChannel()->exchange_declare( $name, $type, false, @@ -365,11 +370,9 @@ public function declareExchange( } /** - * Delete a exchange from rabbitMQ, only when present in RabbitMQ. + * Delete an exchange from rabbitMQ, only when present in RabbitMQ. + * * - * @param string $name - * @param bool $unused - * @return void * @throws AMQPProtocolChannelException */ public function deleteExchange(string $name, bool $unused = false): void @@ -378,7 +381,10 @@ public function deleteExchange(string $name, bool $unused = false): void return; } - $this->channel->exchange_delete( + $idx = array_search($name, $this->exchanges); + unset($this->exchanges[$idx]); + + $this->getChannel()->exchange_delete( $name, $unused ); @@ -386,20 +392,27 @@ public function deleteExchange(string $name, bool $unused = false): void /** * Checks if the given queue already present/defined in RabbitMQ. - * Returns false when when the queue is missing. + * Returns false when the queue is missing. + * * - * @param string|null $name - * @return bool * @throws AMQPProtocolChannelException */ - public function isQueueExists(string $name = null): bool + public function isQueueExists(?string $name = null): bool { + $queueName = $this->getQueue($name); + + if ($this->isQueueDeclared($queueName)) { + return true; + } + try { // create a temporary channel, so the main channel will not be closed on exception - $channel = $this->connection->channel(); - $channel->queue_declare($this->getQueue($name), true); + $channel = $this->createChannel(); + $channel->queue_declare($queueName, true); $channel->close(); + $this->queues[] = $queueName; + return true; } catch (AMQPProtocolChannelException $exception) { if ($exception->amqp_reply_code === 404) { @@ -412,12 +425,6 @@ public function isQueueExists(string $name = null): bool /** * Declare a queue in rabbitMQ, when not already declared. - * - * @param string $name - * @param bool $durable - * @param bool $autoDelete - * @param array $arguments - * @return void */ public function declareQueue( string $name, @@ -429,7 +436,7 @@ public function declareQueue( return; } - $this->channel->queue_declare( + $this->getChannel()->queue_declare( $name, false, $durable, @@ -443,10 +450,7 @@ public function declareQueue( /** * Delete a queue from rabbitMQ, only when present in RabbitMQ. * - * @param string $name - * @param bool $if_unused - * @param bool $if_empty - * @return void + * * @throws AMQPProtocolChannelException */ public function deleteQueue(string $name, bool $if_unused = false, bool $if_empty = false): void @@ -455,16 +459,14 @@ public function deleteQueue(string $name, bool $if_unused = false, bool $if_empt return; } - $this->channel->queue_delete($name, $if_unused, $if_empty); + $idx = array_search($name, $this->queues); + unset($this->queues[$idx]); + + $this->getChannel()->queue_delete($name, $if_unused, $if_empty); } /** * Bind a queue to an exchange. - * - * @param string $queue - * @param string $exchange - * @param string $routingKey - * @return void */ public function bindQueue(string $queue, string $exchange, string $routingKey = ''): void { @@ -476,54 +478,38 @@ public function bindQueue(string $queue, string $exchange, string $routingKey = return; } - $this->channel->queue_bind($queue, $exchange, $routingKey); + $this->getChannel()->queue_bind($queue, $exchange, $routingKey); } /** * Purge the queue of messages. - * - * @param string|null $queue - * @return void */ - public function purge(string $queue = null): void + public function purge(?string $queue = null): void { // create a temporary channel, so the main channel will not be closed on exception - $channel = $this->connection->channel(); + $channel = $this->createChannel(); $channel->queue_purge($this->getQueue($queue)); $channel->close(); } /** * Acknowledge the message. - * - * @param RabbitMQJob $job - * @return void */ public function ack(RabbitMQJob $job): void { - $this->channel->basic_ack($job->getRabbitMQMessage()->getDeliveryTag()); + $this->getChannel()->basic_ack($job->getRabbitMQMessage()->getDeliveryTag()); } /** * Reject the message. - * - * @param RabbitMQJob $job - * @param bool $requeue - * - * @return void */ public function reject(RabbitMQJob $job, bool $requeue = false): void { - $this->channel->basic_reject($job->getRabbitMQMessage()->getDeliveryTag(), $requeue); + $this->getChannel()->basic_reject($job->getRabbitMQMessage()->getDeliveryTag(), $requeue); } /** * Create a AMQP message. - * - * @param $payload - * @param int $attempts - * @return array - * @throws JsonException */ protected function createMessage($payload, int $attempts = 0): array { @@ -532,14 +518,27 @@ protected function createMessage($payload, int $attempts = 0): array 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, ]; - if ($correlationId = json_decode($payload, true, 512)['id'] ?? null) { + $currentPayload = json_decode($payload, true); + if ($correlationId = $currentPayload['id'] ?? null) { $properties['correlation_id'] = $correlationId; } - if ($this->isPrioritizeDelayed()) { + if ($this->getConfig()->isPrioritizeDelayed()) { $properties['priority'] = $attempts; } + if (isset($currentPayload['data']['command'])) { + // If the command data is encrypted, decrypt it first before attempting to unserialize + if (is_subclass_of($currentPayload['data']['commandName'], ShouldBeEncrypted::class)) { + $currentPayload['data']['command'] = Crypt::decrypt($currentPayload['data']['command']); + } + + $commandData = unserialize($currentPayload['data']['command']); + if (property_exists($commandData, 'priority')) { + $properties['priority'] = $commandData->priority; + } + } + $message = new AMQPMessage($payload, $properties); $message->set('application_headers', new AMQPTable([ @@ -557,10 +556,9 @@ protected function createMessage($payload, int $attempts = 0): array /** * Create a payload array from the given job and data. * - * @param string|object $job - * @param string $queue - * @param mixed $data - * @return array + * @param string|object $job + * @param string $queue + * @param mixed $data */ protected function createPayloadArray($job, $queue, $data = ''): array { @@ -571,8 +569,6 @@ protected function createPayloadArray($job, $queue, $data = ''): array /** * Get a random ID string. - * - * @return string */ protected function getRandomId(): string { @@ -582,27 +578,24 @@ protected function getRandomId(): string /** * Close the connection to RabbitMQ. * - * @return void + * * @throws Exception */ public function close(): void { - if ($this->currentJob && ! $this->currentJob->isDeletedOrReleased()) { + if (isset($this->currentJob) && ! $this->currentJob->isDeletedOrReleased()) { $this->reject($this->currentJob, true); } try { - $this->connection->close(); - } catch (ErrorException $exception) { + $this->getConnection()->close(); + } catch (ErrorException) { // Ignore the exception } } /** * Get the Queue arguments. - * - * @param string $destination - * @return array */ protected function getQueueArguments(string $destination): array { @@ -611,29 +604,30 @@ protected function getQueueArguments(string $destination): array // Messages without a priority property are treated as if their priority were 0. // Messages with a priority which is higher than the queue's maximum, are treated as if they were // published with the maximum priority. - if ($this->isPrioritizeDelayed()) { - $arguments['x-max-priority'] = $this->getQueueMaxPriority(); + // Quorum queues does not support priority. + if ($this->getConfig()->isPrioritizeDelayed() && ! $this->getConfig()->isQuorum()) { + $arguments['x-max-priority'] = $this->getConfig()->getQueueMaxPriority(); } - if ($this->isRerouteFailed()) { - $arguments['x-dead-letter-exchange'] = $this->getFailedExchange() ?? ''; + if ($this->getConfig()->isRerouteFailed()) { + $arguments['x-dead-letter-exchange'] = $this->getFailedExchange(); $arguments['x-dead-letter-routing-key'] = $this->getFailedRoutingKey($destination); } + if ($this->getConfig()->isQuorum()) { + $arguments['x-queue-type'] = 'quorum'; + } + return $arguments; } /** * Get the Delay queue arguments. - * - * @param string $destination - * @param int $ttl - * @return array */ protected function getDelayQueueArguments(string $destination, int $ttl): array { return [ - 'x-dead-letter-exchange' => $this->getExchange() ?? '', + 'x-dead-letter-exchange' => $this->getExchange(), 'x-dead-letter-routing-key' => $this->getRoutingKey($destination), 'x-message-ttl' => $ttl, 'x-expires' => $ttl * 2, @@ -641,103 +635,51 @@ protected function getDelayQueueArguments(string $destination, int $ttl): array } /** - * Returns &true;, if delayed messages should be prioritized. - * - * @return bool - */ - protected function isPrioritizeDelayed(): bool - { - return (bool) (Arr::get($this->options, 'prioritize_delayed') ?: false); - } - - /** - * Returns a integer with a default of '2' for when using prioritization on delayed messages. - * If priority queues are desired, we recommend using between 1 and 10. - * Using more priority layers, will consume more CPU resources and would affect runtimes. - * - * @see https://www.rabbitmq.com/priority.html - * @return int - */ - protected function getQueueMaxPriority(): int - { - return (int) (Arr::get($this->options, 'queue_max_priority') ?: 2); - } - - /** - * Get the exchange name, or &null; as default value. - * - * @param string|null $exchange - * @return string|null + * Get the exchange name, or empty string; as default value. */ - protected function getExchange(string $exchange = null): ?string + protected function getExchange(?string $exchange = null): string { - return $exchange ?: Arr::get($this->options, 'exchange') ?: null; + return $exchange ?? $this->getConfig()->getExchange(); } /** * Get the routing-key for when you use exchanges * The default routing-key is the given destination. - * - * @param string $destination - * @return string */ protected function getRoutingKey(string $destination): string { - return ltrim(sprintf(Arr::get($this->options, 'exchange_routing_key') ?: '%s', $destination), '.'); + return ltrim(sprintf($this->getConfig()->getExchangeRoutingKey(), $destination), '.'); } /** * Get the exchangeType, or AMQPExchangeType::DIRECT as default. - * - * @param string|null $type - * @return string */ protected function getExchangeType(?string $type = null): string { - return @constant(AMQPExchangeType::class.'::'.Str::upper($type ?: Arr::get( - $this->options, - 'exchange_type' - ) ?: 'direct')) ?: AMQPExchangeType::DIRECT; - } + $constant = AMQPExchangeType::class.'::'.Str::upper($type ?: $this->getConfig()->getExchangeType()); - /** - * Returns &true;, if failed messages should be rerouted. - * - * @return bool - */ - protected function isRerouteFailed(): bool - { - return (bool) (Arr::get($this->options, 'reroute_failed') ?: false); + return defined($constant) ? constant($constant) : AMQPExchangeType::DIRECT; } /** * Get the exchange for failed messages. - * - * @param string|null $exchange - * @return string|null */ - protected function getFailedExchange(string $exchange = null): ?string + protected function getFailedExchange(?string $exchange = null): string { - return $exchange ?: Arr::get($this->options, 'failed_exchange') ?: null; + return $exchange ?? $this->getConfig()->getFailedExchange(); } /** * Get the routing-key for failed messages * The default routing-key is the given destination substituted by '.failed'. - * - * @param string $destination - * @return string */ protected function getFailedRoutingKey(string $destination): string { - return ltrim(sprintf(Arr::get($this->options, 'failed_routing_key') ?: '%s.failed', $destination), '.'); + return ltrim(sprintf($this->getConfig()->getFailedRoutingKey(), $destination), '.'); } /** * Checks if the exchange was already declared. - * - * @param string $name - * @return bool */ protected function isExchangeDeclared(string $name): bool { @@ -746,9 +688,6 @@ protected function isExchangeDeclared(string $name): bool /** * Checks if the queue was already declared. - * - * @param string $name - * @return bool */ protected function isQueueDeclared(string $name): bool { @@ -758,23 +697,16 @@ protected function isQueueDeclared(string $name): bool /** * Declare the destination when necessary. * - * @param string $destination - * @param string|null $exchange - * @param string|null $exchangeType - * @return void * @throws AMQPProtocolChannelException */ - protected function declareDestination( - string $destination, - ?string $exchange = null, - string $exchangeType = AMQPExchangeType::DIRECT - ): void { - // When a exchange is provided and no exchange is present in RabbitMQ, create an exchange. + protected function declareDestination(string $destination, ?string $exchange = null, string $exchangeType = AMQPExchangeType::DIRECT): void + { + // When an exchange is provided and no exchange is present in RabbitMQ, create an exchange. if ($exchange && ! $this->isExchangeExists($exchange)) { $this->declareExchange($exchange, $exchangeType); } - // When a exchange is provided, just return. + // When an exchange is provided, just return. if ($exchange) { return; } @@ -790,10 +722,6 @@ protected function declareDestination( /** * Determine all publish properties. - * - * @param $queue - * @param array $options - * @return array */ protected function publishProperties($queue, array $options = []): array { @@ -801,9 +729,54 @@ protected function publishProperties($queue, array $options = []): array $attempts = Arr::get($options, 'attempts') ?: 0; $destination = $this->getRoutingKey($queue); - $exchange = $this->getExchange(); - $exchangeType = $this->getExchangeType(); + $exchange = $this->getExchange(Arr::get($options, 'exchange')); + $exchangeType = $this->getExchangeType(Arr::get($options, 'exchange_type')); return [$destination, $exchange, $exchangeType, $attempts]; } + + protected function getConfig(): QueueConfig + { + return $this->config; + } + + /** + * @throws AMQPChannelClosedException + * @throws AMQPConnectionClosedException + * @throws AMQPConnectionBlockedException + */ + protected function publishBasic($msg, $exchange = '', $destination = '', $mandatory = false, $immediate = false, $ticket = null): void + { + $this->getChannel()->basic_publish($msg, $exchange, $destination, $mandatory, $immediate, $ticket); + } + + protected function batchPublish(): void + { + $this->getChannel()->publish_batch(); + } + + public function getChannel($forceNew = false): AMQPChannel + { + if (! $this->channel || $forceNew) { + $this->channel = $this->createChannel(); + } + + return $this->channel; + } + + protected function createChannel(): AMQPChannel + { + return $this->getConnection()->channel(); + } + + /** + * @throws Exception + */ + protected function reconnect(): void + { + // Reconnects using the original connection settings. + $this->getConnection()->reconnect(); + // Create a new main channel because all old channels are removed. + $this->getChannel(true); + } } diff --git a/tests/Feature/ConnectorTest.php b/tests/Feature/ConnectorTest.php index 2c58874a..91660ad2 100644 --- a/tests/Feature/ConnectorTest.php +++ b/tests/Feature/ConnectorTest.php @@ -3,9 +3,12 @@ namespace VladimirYuldashev\LaravelQueueRabbitMQ\Tests\Feature; use Illuminate\Queue\QueueManager; +use PhpAmqpLib\Connection\AMQPConnectionConfig; use PhpAmqpLib\Connection\AMQPLazyConnection; use PhpAmqpLib\Connection\AMQPSSLConnection; +use PhpAmqpLib\Connection\AMQPStreamConnection; use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue; +use VladimirYuldashev\LaravelQueueRabbitMQ\Tests\Mocks\TestSSLConnection; class ConnectorTest extends \VladimirYuldashev\LaravelQueueRabbitMQ\Tests\TestCase { @@ -47,12 +50,58 @@ public function testLazyConnection(): void $this->assertInstanceOf(RabbitMQQueue::class, $connection); $this->assertInstanceOf(AMQPLazyConnection::class, $connection->getConnection()); + $this->assertFalse($connection->getConnection()->isConnected()); + $this->assertTrue($connection->getChannel()->is_open()); $this->assertTrue($connection->getConnection()->isConnected()); + } + + public function testLazyStreamConnection(): void + { + $this->app['config']->set('queue.connections.rabbitmq', [ + 'driver' => 'rabbitmq', + 'queue' => env('RABBITMQ_QUEUE', 'default'), + 'connection' => 'default', + + 'hosts' => [ + [ + 'host' => getenv('HOST'), + 'port' => getenv('PORT'), + 'user' => 'guest', + 'password' => 'guest', + 'vhost' => '/', + ], + ], + + 'options' => [ + 'ssl_options' => [ + 'cafile' => env('RABBITMQ_SSL_CAFILE', null), + 'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null), + 'local_key' => env('RABBITMQ_SSL_LOCALKEY', null), + 'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true), + 'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null), + ], + ], + + 'worker' => env('RABBITMQ_WORKER', 'default'), + ]); + + /** @var QueueManager $queue */ + $queue = $this->app['queue']; + + /** @var RabbitMQQueue $connection */ + $connection = $queue->connection('rabbitmq'); + + $this->assertInstanceOf(RabbitMQQueue::class, $connection); + $this->assertInstanceOf(AMQPStreamConnection::class, $connection->getConnection()); + $this->assertFalse($connection->getConnection()->isConnected()); $this->assertTrue($connection->getChannel()->is_open()); + $this->assertTrue($connection->getConnection()->isConnected()); } public function testSslConnection(): void { + $this->markTestSkipped(); + $this->app['config']->set('queue.connections.rabbitmq', [ 'driver' => 'rabbitmq', 'queue' => env('RABBITMQ_QUEUE', 'default'), @@ -91,4 +140,48 @@ public function testSslConnection(): void $this->assertTrue($connection->getConnection()->isConnected()); $this->assertTrue($connection->getChannel()->is_open()); } + + // Test to validate ssl connection params + public function testNoVerificationSslConnection(): void + { + $this->app['config']->set('queue.connections.rabbitmq', [ + 'driver' => 'rabbitmq', + 'queue' => env('RABBITMQ_QUEUE', 'default'), + 'connection' => TestSSLConnection::class, + 'secure' => true, + + 'hosts' => [ + [ + 'host' => getenv('HOST'), + 'port' => getenv('PORT_SSL'), + 'user' => 'guest', + 'password' => 'guest', + 'vhost' => '/', + ], + ], + + 'options' => [ + 'ssl_options' => [ + 'cafile' => getenv('RABBITMQ_SSL_CAFILE'), + 'local_cert' => null, + 'local_key' => null, + 'verify_peer' => false, + 'passphrase' => null, + ], + ], + + 'worker' => env('RABBITMQ_WORKER', 'default'), + ]); + + /** @var QueueManager $queue */ + $queue = $this->app['queue']; + + /** @var RabbitMQQueue $connection */ + $connection = $queue->connection('rabbitmq'); + $this->assertInstanceOf(RabbitMQQueue::class, $connection); + $this->assertInstanceOf(AMQPSSLConnection::class, $connection->getConnection()); + /** @var AMQPConnectionConfig */ + $config = $connection->getConnection()->getConfig(); + $this->assertFalse($config->getSslVerify()); + } } diff --git a/tests/Feature/QueueTest.php b/tests/Feature/QueueTest.php index 15b8acbb..ee324c9a 100644 --- a/tests/Feature/QueueTest.php +++ b/tests/Feature/QueueTest.php @@ -2,12 +2,42 @@ namespace VladimirYuldashev\LaravelQueueRabbitMQ\Tests\Feature; -use PhpAmqpLib\Connection\AMQPLazyConnection; +use PhpAmqpLib\Connection\AMQPStreamConnection; +use PhpAmqpLib\Exception\AMQPChannelClosedException; +use PhpAmqpLib\Exception\AMQPConnectionClosedException; +use PhpAmqpLib\Exception\AMQPProtocolChannelException; +use VladimirYuldashev\LaravelQueueRabbitMQ\Tests\Mocks\TestJob; class QueueTest extends TestCase { + protected function setUp(): void + { + parent::setUp(); + + $this->withoutExceptionHandling([ + AMQPChannelClosedException::class, AMQPConnectionClosedException::class, + AMQPProtocolChannelException::class, + ]); + } + public function testConnection(): void { - $this->assertInstanceOf(AMQPLazyConnection::class, $this->connection()->getChannel()->getConnection()); + $this->assertInstanceOf(AMQPStreamConnection::class, $this->connection()->getChannel()->getConnection()); + } + + public function testWithoutReconnect(): void + { + $queue = $this->connection('rabbitmq'); + + $queue->push(new TestJob); + sleep(1); + $this->assertSame(1, $queue->size()); + + // close connection + $queue->getConnection()->close(); + $this->assertFalse($queue->getConnection()->isConnected()); + + $this->expectException(AMQPChannelClosedException::class); + $queue->push(new TestJob); } } diff --git a/tests/Feature/SslQueueTest.php b/tests/Feature/SslQueueTest.php index 54969a9c..02181c96 100644 --- a/tests/Feature/SslQueueTest.php +++ b/tests/Feature/SslQueueTest.php @@ -4,11 +4,13 @@ use PhpAmqpLib\Connection\AMQPSSLConnection; -/** - * @group functional - */ class SslQueueTest extends TestCase { + protected function setUp(): void + { + $this->markTestSkipped(); + } + protected function getEnvironmentSetUp($app): void { $app['config']->set('queue.default', 'rabbitmq'); diff --git a/tests/Feature/TestCase.php b/tests/Feature/TestCase.php index 0a387ef6..5afda85d 100644 --- a/tests/Feature/TestCase.php +++ b/tests/Feature/TestCase.php @@ -2,11 +2,13 @@ namespace VladimirYuldashev\LaravelQueueRabbitMQ\Tests\Feature; +use Illuminate\Database\DatabaseTransactionsManager; use Illuminate\Support\Facades\Queue; use Illuminate\Support\Str; use PhpAmqpLib\Exception\AMQPProtocolChannelException; use RuntimeException; use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob; +use VladimirYuldashev\LaravelQueueRabbitMQ\Tests\Mocks\TestEncryptedJob; use VladimirYuldashev\LaravelQueueRabbitMQ\Tests\Mocks\TestJob; use VladimirYuldashev\LaravelQueueRabbitMQ\Tests\TestCase as BaseTestCase; @@ -15,7 +17,7 @@ abstract class TestCase extends BaseTestCase /** * @throws AMQPProtocolChannelException */ - public function setUp(): void + protected function setUp(): void { parent::setUp(); @@ -68,7 +70,7 @@ public function testPushRaw(): void public function testPush(): void { - Queue::push(new TestJob()); + Queue::push(new TestJob); sleep(1); @@ -93,6 +95,33 @@ public function testPush(): void $this->assertSame(0, Queue::size()); } + public function testPushAfterCommit(): void + { + $transaction = new DatabaseTransactionsManager; + + $this->app->singleton('db.transactions', function ($app) use ($transaction) { + $transaction->begin('FakeDBConnection', 1); + + return $transaction; + }); + + TestJob::dispatch()->afterCommit(); + + sleep(1); + $this->assertSame(0, Queue::size()); + $this->assertNull(Queue::pop()); + + $transaction->commit('FakeDBConnection', 1, 0); + + sleep(1); + + $this->assertSame(1, Queue::size()); + $this->assertNotNull($job = Queue::pop()); + + $job->delete(); + $this->assertSame(0, Queue::size()); + } + public function testLaterRaw(): void { $payload = Str::random(); @@ -125,7 +154,7 @@ public function testLaterRaw(): void public function testLater(): void { - Queue::later(3, new TestJob()); + Queue::later(3, new TestJob); sleep(1); @@ -166,6 +195,103 @@ public function testBulk(): void $this->assertSame($count, Queue::size()); } + public function testPushEncrypted(): void + { + Queue::push(new TestEncryptedJob); + + sleep(1); + + $this->assertSame(1, Queue::size()); + $this->assertNotNull($job = Queue::pop()); + $this->assertSame(1, $job->attempts()); + $this->assertInstanceOf(RabbitMQJob::class, $job); + $this->assertSame(TestEncryptedJob::class, $job->resolveName()); + $this->assertNotNull($job->getJobId()); + + $payload = $job->payload(); + + $this->assertSame(TestEncryptedJob::class, $payload['displayName']); + $this->assertSame('Illuminate\Queue\CallQueuedHandler@call', $payload['job']); + $this->assertNull($payload['maxTries']); + $this->assertNull($payload['backoff']); + $this->assertNull($payload['timeout']); + $this->assertNull($payload['retryUntil']); + $this->assertSame($job->getJobId(), $payload['id']); + + $job->delete(); + $this->assertSame(0, Queue::size()); + } + + public function testPushEncryptedAfterCommit(): void + { + $transaction = new DatabaseTransactionsManager; + + $this->app->singleton('db.transactions', function ($app) use ($transaction) { + $transaction->begin('FakeDBConnection', 1); + + return $transaction; + }); + + TestEncryptedJob::dispatch()->afterCommit(); + + sleep(1); + $this->assertSame(0, Queue::size()); + $this->assertNull(Queue::pop()); + + $transaction->commit('FakeDBConnection', 1, 0); + + sleep(1); + + $this->assertSame(1, Queue::size()); + $this->assertNotNull($job = Queue::pop()); + + $job->delete(); + $this->assertSame(0, Queue::size()); + } + + public function testEncryptedLater(): void + { + Queue::later(3, new TestEncryptedJob); + + sleep(1); + + $this->assertSame(0, Queue::size()); + $this->assertNull(Queue::pop()); + + sleep(3); + + $this->assertSame(1, Queue::size()); + $this->assertNotNull($job = Queue::pop()); + + $this->assertInstanceOf(RabbitMQJob::class, $job); + + $body = json_decode($job->getRawBody(), true); + + $this->assertSame(TestEncryptedJob::class, $body['displayName']); + $this->assertSame('Illuminate\Queue\CallQueuedHandler@call', $body['job']); + $this->assertSame(TestEncryptedJob::class, $body['data']['commandName']); + $this->assertNotNull($job->getJobId()); + + $job->delete(); + $this->assertSame(0, Queue::size()); + } + + public function testEncryptedBulk(): void + { + $count = 100; + $jobs = []; + + for ($i = 0; $i < $count; $i++) { + $jobs[$i] = new TestEncryptedJob($i); + } + + Queue::bulk($jobs); + + sleep(1); + + $this->assertSame($count, Queue::size()); + } + public function testReleaseRaw(): void { Queue::pushRaw($payload = Str::random()); @@ -194,7 +320,7 @@ public function testReleaseRaw(): void public function testRelease(): void { - Queue::push(new TestJob()); + Queue::push(new TestJob); sleep(1); @@ -251,7 +377,7 @@ public function testReleaseWithDelayRaw(): void public function testReleaseInThePast(): void { - Queue::push(new TestJob()); + Queue::push(new TestJob); $job = Queue::pop(); $job->release(-3); @@ -266,7 +392,7 @@ public function testReleaseInThePast(): void public function testReleaseAndReleaseWithDelayAttempts(): void { - Queue::push(new TestJob()); + Queue::push(new TestJob); sleep(1); @@ -293,7 +419,7 @@ public function testReleaseAndReleaseWithDelayAttempts(): void public function testDelete(): void { - Queue::push(new TestJob()); + Queue::push(new TestJob); $job = Queue::pop(); @@ -307,7 +433,7 @@ public function testDelete(): void public function testFailed(): void { - Queue::push(new TestJob()); + Queue::push(new TestJob); $job = Queue::pop(); diff --git a/tests/Functional/RabbitMQQueueTest.php b/tests/Functional/RabbitMQQueueTest.php index 2c8484f7..1c5d94fb 100644 --- a/tests/Functional/RabbitMQQueueTest.php +++ b/tests/Functional/RabbitMQQueueTest.php @@ -11,7 +11,6 @@ class RabbitMQQueueTest extends BaseTestCase { public function testConnection(): void { - /** @var $queue RabbitMQQueue */ $queue = $this->connection(); $this->assertInstanceOf(RabbitMQQueue::class, $queue); @@ -22,51 +21,57 @@ public function testConnection(): void $this->assertInstanceOf(RabbitMQQueue::class, $queue); } - public function testRerouteFailed(): void + public function testConfigRerouteFailed(): void { - /** @var $queue RabbitMQQueue */ $queue = $this->connection(); - $this->assertFalse($this->callMethod($queue, 'isRerouteFailed')); + $this->assertFalse($this->callProperty($queue, 'config')->isRerouteFailed()); $queue = $this->connection('rabbitmq-with-options'); - $this->assertTrue($this->callMethod($queue, 'isRerouteFailed')); + $this->assertTrue($this->callProperty($queue, 'config')->isRerouteFailed()); $queue = $this->connection('rabbitmq-with-options-empty'); - $this->assertFalse($this->callMethod($queue, 'isRerouteFailed')); + $this->assertFalse($this->callProperty($queue, 'config')->isRerouteFailed()); + + $queue = $this->connection('rabbitmq-with-options-null'); + $this->assertFalse($this->callProperty($queue, 'config')->isRerouteFailed()); } - public function testPrioritizeDelayed(): void + public function testConfigPrioritizeDelayed(): void { - /** @var $queue RabbitMQQueue */ $queue = $this->connection(); - $this->assertFalse($this->callMethod($queue, 'isPrioritizeDelayed')); + $this->assertFalse($this->callProperty($queue, 'config')->isPrioritizeDelayed()); $queue = $this->connection('rabbitmq-with-options'); - $this->assertTrue($this->callMethod($queue, 'isPrioritizeDelayed')); + $this->assertTrue($this->callProperty($queue, 'config')->isPrioritizeDelayed()); $queue = $this->connection('rabbitmq-with-options-empty'); - $this->assertFalse($this->callMethod($queue, 'isPrioritizeDelayed')); + $this->assertFalse($this->callProperty($queue, 'config')->isPrioritizeDelayed()); + + $queue = $this->connection('rabbitmq-with-options-null'); + $this->assertFalse($this->callProperty($queue, 'config')->isPrioritizeDelayed()); } public function testQueueMaxPriority(): void { - /** @var $queue RabbitMQQueue */ $queue = $this->connection(); - $this->assertIsInt($this->callMethod($queue, 'getQueueMaxPriority')); - $this->assertSame(2, $this->callMethod($queue, 'getQueueMaxPriority')); + $this->assertIsInt($this->callProperty($queue, 'config')->getQueueMaxPriority()); + $this->assertSame(2, $this->callProperty($queue, 'config')->getQueueMaxPriority()); $queue = $this->connection('rabbitmq-with-options'); - $this->assertIsInt($this->callMethod($queue, 'getQueueMaxPriority')); - $this->assertSame(20, $this->callMethod($queue, 'getQueueMaxPriority')); + $this->assertIsInt($this->callProperty($queue, 'config')->getQueueMaxPriority()); + $this->assertSame(20, $this->callProperty($queue, 'config')->getQueueMaxPriority()); $queue = $this->connection('rabbitmq-with-options-empty'); - $this->assertIsInt($this->callMethod($queue, 'getQueueMaxPriority')); - $this->assertSame(2, $this->callMethod($queue, 'getQueueMaxPriority')); + $this->assertIsInt($this->callProperty($queue, 'config')->getQueueMaxPriority()); + $this->assertSame(2, $this->callProperty($queue, 'config')->getQueueMaxPriority()); + + $queue = $this->connection('rabbitmq-with-options-null'); + $this->assertIsInt($this->callProperty($queue, 'config')->getQueueMaxPriority()); + $this->assertSame(2, $this->callProperty($queue, 'config')->getQueueMaxPriority()); } - public function testExchangeType(): void + public function testConfigExchangeType(): void { - /** @var $queue RabbitMQQueue */ $queue = $this->connection(); $this->assertSame(AMQPExchangeType::DIRECT, $this->callMethod($queue, 'getExchangeType')); $this->assertSame(AMQPExchangeType::DIRECT, $this->callMethod($queue, 'getExchangeType', [''])); @@ -74,48 +79,78 @@ public function testExchangeType(): void $queue = $this->connection('rabbitmq-with-options'); $this->assertSame(AMQPExchangeType::TOPIC, $this->callMethod($queue, 'getExchangeType')); + $this->assertSame(AMQPExchangeType::DIRECT, $this->callMethod($queue, 'getExchangeType', ['direct'])); $queue = $this->connection('rabbitmq-with-options-empty'); $this->assertSame(AMQPExchangeType::DIRECT, $this->callMethod($queue, 'getExchangeType')); + + $queue = $this->connection('rabbitmq-with-options-null'); + $this->assertSame(AMQPExchangeType::DIRECT, $this->callMethod($queue, 'getExchangeType')); + + // testing an unkown type with a default + $this->callProperty($queue, 'config')->setExchangeType('unknown'); + $this->assertSame(AMQPExchangeType::DIRECT, $this->callMethod($queue, 'getExchangeType')); } public function testExchange(): void { - /** @var $queue RabbitMQQueue */ $queue = $this->connection(); $this->assertSame('test', $this->callMethod($queue, 'getExchange', ['test'])); - $this->assertNull($this->callMethod($queue, 'getExchange', [''])); - $this->assertNull($this->callMethod($queue, 'getExchange')); + $this->assertSame('', $this->callMethod($queue, 'getExchange', [''])); + $this->assertSame('', $this->callMethod($queue, 'getExchange', [null])); + $this->assertSame('', $this->callMethod($queue, 'getExchange')); $queue = $this->connection('rabbitmq-with-options'); - $this->assertNotNull($this->callMethod($queue, 'getExchange')); $this->assertSame('application-x', $this->callMethod($queue, 'getExchange')); + $this->assertSame('application-x', $this->callMethod($queue, 'getExchange', [null])); + $this->assertSame('test', $this->callMethod($queue, 'getExchange', ['test'])); + $this->assertSame('', $this->callMethod($queue, 'getExchange', [''])); $queue = $this->connection('rabbitmq-with-options-empty'); - $this->assertNull($this->callMethod($queue, 'getExchange')); + $this->assertSame('', $this->callMethod($queue, 'getExchange')); + $this->assertSame('', $this->callMethod($queue, 'getExchange', [null])); + $this->assertSame('test', $this->callMethod($queue, 'getExchange', ['test'])); + $this->assertSame('', $this->callMethod($queue, 'getExchange', [''])); + + $queue = $this->connection('rabbitmq-with-options-null'); + $this->assertSame('', $this->callMethod($queue, 'getExchange')); + $this->assertSame('', $this->callMethod($queue, 'getExchange', [null])); + $this->assertSame('test', $this->callMethod($queue, 'getExchange', ['test'])); + $this->assertSame('', $this->callMethod($queue, 'getExchange', [''])); } public function testFailedExchange(): void { - /** @var $queue RabbitMQQueue */ $queue = $this->connection(); $this->assertSame('test', $this->callMethod($queue, 'getFailedExchange', ['test'])); - $this->assertNull($this->callMethod($queue, 'getExchange', [''])); - $this->assertNull($this->callMethod($queue, 'getFailedExchange')); + $this->assertSame('', $this->callMethod($queue, 'getFailedExchange', [''])); + $this->assertSame('', $this->callMethod($queue, 'getFailedExchange', [null])); + $this->assertSame('', $this->callMethod($queue, 'getFailedExchange')); $queue = $this->connection('rabbitmq-with-options'); - $this->assertNotNull($this->callMethod($queue, 'getFailedExchange')); $this->assertSame('failed-exchange', $this->callMethod($queue, 'getFailedExchange')); + $this->assertSame('failed-exchange', $this->callMethod($queue, 'getFailedExchange', [null])); + $this->assertSame('test', $this->callMethod($queue, 'getFailedExchange', ['test'])); + $this->assertSame('', $this->callMethod($queue, 'getFailedExchange', [''])); $queue = $this->connection('rabbitmq-with-options-empty'); - $this->assertNull($this->callMethod($queue, 'getFailedExchange')); + $this->assertSame('', $this->callMethod($queue, 'getFailedExchange')); + $this->assertSame('', $this->callMethod($queue, 'getFailedExchange', [null])); + $this->assertSame('test', $this->callMethod($queue, 'getFailedExchange', ['test'])); + $this->assertSame('', $this->callMethod($queue, 'getFailedExchange', [''])); + + $queue = $this->connection('rabbitmq-with-options-null'); + $this->assertSame('', $this->callMethod($queue, 'getFailedExchange')); + $this->assertSame('', $this->callMethod($queue, 'getFailedExchange', [null])); + $this->assertSame('test', $this->callMethod($queue, 'getFailedExchange', ['test'])); + $this->assertSame('', $this->callMethod($queue, 'getFailedExchange', [''])); } public function testRoutingKey(): void { - /** @var $queue RabbitMQQueue */ $queue = $this->connection(); $this->assertSame('test', $this->callMethod($queue, 'getRoutingKey', ['test'])); + $this->assertSame('test', $this->callMethod($queue, 'getRoutingKey', ['.test'])); $this->assertSame('', $this->callMethod($queue, 'getRoutingKey', [''])); $queue = $this->connection('rabbitmq-with-options'); @@ -123,13 +158,18 @@ public function testRoutingKey(): void $queue = $this->connection('rabbitmq-with-options-empty'); $this->assertSame('test', $this->callMethod($queue, 'getRoutingKey', ['test'])); + + $queue = $this->connection('rabbitmq-with-options-null'); + $this->assertSame('test', $this->callMethod($queue, 'getRoutingKey', ['test'])); + $this->callProperty($queue, 'config')->setExchangeRoutingKey('.an.alternate.routing-key'); + $this->assertSame('an.alternate.routing-key', $this->callMethod($queue, 'getRoutingKey', ['test'])); } public function testFailedRoutingKey(): void { - /** @var $queue RabbitMQQueue */ $queue = $this->connection(); $this->assertSame('test.failed', $this->callMethod($queue, 'getFailedRoutingKey', ['test'])); + $this->assertSame('test.failed', $this->callMethod($queue, 'getFailedRoutingKey', ['.test'])); $this->assertSame('failed', $this->callMethod($queue, 'getFailedRoutingKey', [''])); $queue = $this->connection('rabbitmq-with-options'); @@ -137,11 +177,33 @@ public function testFailedRoutingKey(): void $queue = $this->connection('rabbitmq-with-options-empty'); $this->assertSame('test.failed', $this->callMethod($queue, 'getFailedRoutingKey', ['test'])); + + $queue = $this->connection('rabbitmq-with-options-null'); + $this->assertSame('test.failed', $this->callMethod($queue, 'getFailedRoutingKey', ['test'])); + $this->callProperty($queue, 'config')->setFailedRoutingKey('.an.alternate.routing-key'); + $this->assertSame('an.alternate.routing-key', $this->callMethod($queue, 'getFailedRoutingKey', ['test'])); + } + + public function testConfigQuorum(): void + { + $queue = $this->connection(); + $this->assertFalse($this->callProperty($queue, 'config')->isQuorum()); + + $queue = $this->connection('rabbitmq-with-options'); + $this->assertFalse($this->callProperty($queue, 'config')->isQuorum()); + + $queue = $this->connection('rabbitmq-with-options-empty'); + $this->assertFalse($this->callProperty($queue, 'config')->isQuorum()); + + $queue = $this->connection('rabbitmq-with-options-null'); + $this->assertFalse($this->callProperty($queue, 'config')->isQuorum()); + + $queue = $this->connection('rabbitmq-with-quorum-options'); + $this->assertTrue($this->callProperty($queue, 'config')->isQuorum()); } public function testDeclareDeleteExchange(): void { - /** @var $queue RabbitMQQueue */ $queue = $this->connection(); $name = Str::random(); @@ -157,7 +219,6 @@ public function testDeclareDeleteExchange(): void public function testDeclareDeleteQueue(): void { - /** @var $queue RabbitMQQueue */ $queue = $this->connection(); $name = Str::random(); @@ -175,7 +236,6 @@ public function testQueueArguments(): void { $name = Str::random(); - /** @var $queue RabbitMQQueue */ $queue = $this->connection(); $actual = $this->callMethod($queue, 'getQueueArguments', [$name]); $expected = []; @@ -193,6 +253,17 @@ public function testQueueArguments(): void $this->assertEquals(array_keys($expected), array_keys($actual)); $this->assertEquals(array_values($expected), array_values($actual)); + $queue = $this->connection('rabbitmq-with-quorum-options'); + $actual = $this->callMethod($queue, 'getQueueArguments', [$name]); + $expected = [ + 'x-dead-letter-exchange' => 'failed-exchange', + 'x-dead-letter-routing-key' => sprintf('application-x.%s.failed', $name), + 'x-queue-type' => 'quorum', + ]; + + $this->assertEquals(array_keys($expected), array_keys($actual)); + $this->assertEquals(array_values($expected), array_values($actual)); + $queue = $this->connection('rabbitmq-with-options-empty'); $actual = $this->callMethod($queue, 'getQueueArguments', [$name]); $expected = []; @@ -206,7 +277,6 @@ public function testDelayQueueArguments(): void $name = Str::random(); $ttl = 12000; - /** @var $queue RabbitMQQueue */ $queue = $this->connection(); $actual = $this->callMethod($queue, 'getDelayQueueArguments', [$name, $ttl]); $expected = [ diff --git a/tests/Functional/TestCase.php b/tests/Functional/TestCase.php index b7f5b48b..8b843561 100644 --- a/tests/Functional/TestCase.php +++ b/tests/Functional/TestCase.php @@ -2,7 +2,10 @@ namespace VladimirYuldashev\LaravelQueueRabbitMQ\Tests\Functional; -use PhpAmqpLib\Connection\AMQPLazyConnection; +use Exception; +use PhpAmqpLib\Channel\AMQPChannel; +use ReflectionClass; +use ReflectionException; use VladimirYuldashev\LaravelQueueRabbitMQ\Tests\TestCase as BaseTestCase; abstract class TestCase extends BaseTestCase @@ -13,7 +16,7 @@ protected function getEnvironmentSetUp($app): void $app['config']->set('queue.connections.rabbitmq', [ 'driver' => 'rabbitmq', 'queue' => 'order', - 'connection' => AMQPLazyConnection::class, + 'connection' => 'default', 'hosts' => [ [ @@ -41,7 +44,7 @@ protected function getEnvironmentSetUp($app): void $app['config']->set('queue.connections.rabbitmq-with-options', [ 'driver' => 'rabbitmq', 'queue' => 'order', - 'connection' => AMQPLazyConnection::class, + 'connection' => 'default', 'hosts' => [ [ @@ -80,7 +83,7 @@ protected function getEnvironmentSetUp($app): void $app['config']->set('queue.connections.rabbitmq-with-options-empty', [ 'driver' => 'rabbitmq', 'queue' => 'order', - 'connection' => AMQPLazyConnection::class, + 'connection' => 'default', 'hosts' => [ [ @@ -110,6 +113,85 @@ protected function getEnvironmentSetUp($app): void 'reroute_failed' => '', 'failed_exchange' => '', 'failed_routing_key' => '', + 'quorum' => '', + ], + ], + + 'worker' => 'default', + + ]); + $app['config']->set('queue.connections.rabbitmq-with-options-null', [ + 'driver' => 'rabbitmq', + 'queue' => 'order', + 'connection' => 'default', + + 'hosts' => [ + [ + 'host' => null, + 'port' => null, + 'vhost' => null, + 'user' => null, + 'password' => null, + ], + ], + + 'options' => [ + 'ssl_options' => [ + 'cafile' => null, + 'local_cert' => null, + 'local_key' => null, + 'verify_peer' => null, + 'passphrase' => null, + ], + + 'queue' => [ + 'prioritize_delayed' => null, + 'queue_max_priority' => null, + 'exchange' => null, + 'exchange_type' => null, + 'exchange_routing_key' => null, + 'reroute_failed' => null, + 'failed_exchange' => null, + 'failed_routing_key' => null, + 'quorum' => null, + ], + ], + + 'worker' => 'default', + + ]); + $app['config']->set('queue.connections.rabbitmq-with-quorum-options', [ + 'driver' => 'rabbitmq', + 'queue' => 'order', + 'connection' => 'default', + + 'hosts' => [ + [ + 'host' => getenv('HOST'), + 'port' => getenv('PORT'), + 'vhost' => '/', + 'user' => 'guest', + 'password' => 'guest', + ], + ], + + 'options' => [ + 'ssl_options' => [ + 'cafile' => null, + 'local_cert' => null, + 'local_key' => null, + 'verify_peer' => true, + 'passphrase' => null, + ], + + 'queue' => [ + 'exchange' => 'application-x', + 'exchange_type' => 'topic', + 'exchange_routing_key' => 'process.%s', + 'reroute_failed' => true, + 'failed_exchange' => 'failed-exchange', + 'failed_routing_key' => 'application-x.%s.failed', + 'quorum' => true, ], ], @@ -119,19 +201,15 @@ protected function getEnvironmentSetUp($app): void } /** - * @param $object - * @param string $method - * @param array $parameters - * @return mixed - * @throws \Exception + * @throws Exception */ - protected function callMethod($object, string $method, array $parameters = []) + protected function callMethod($object, string $method, array $parameters = []): mixed { try { $className = get_class($object); - $reflection = new \ReflectionClass($className); - } catch (\ReflectionException $e) { - throw new \Exception($e->getMessage()); + $reflection = new ReflectionClass($className); + } catch (ReflectionException $e) { + throw new Exception($e->getMessage()); } $method = $reflection->getMethod($method); @@ -139,4 +217,54 @@ protected function callMethod($object, string $method, array $parameters = []) return $method->invokeArgs($object, $parameters); } + + /** + * @throws Exception + */ + protected function callProperty($object, string $property): mixed + { + try { + $className = get_class($object); + $reflection = new ReflectionClass($className); + } catch (ReflectionException $e) { + throw new Exception($e->getMessage()); + } + + $property = $reflection->getProperty($property); + $property->setAccessible(true); + + return $property->getValue($object); + } + + public function testConnectChannel(): void + { + $queue = $this->connection(); + $this->assertFalse($queue->getConnection()->isConnected()); + + /** @var AMQPChannel $channel */ + $channel = $this->callMethod($queue, 'getChannel'); + $this->assertTrue($queue->getConnection()->isConnected()); + $this->assertSame($channel, $this->callProperty($queue, 'channel')); + $this->assertTrue($channel->is_open()); + } + + public function testReconnect(): void + { + $queue = $this->connection(); + $this->assertFalse($queue->getConnection()->isConnected()); + + // connect + $channel = $this->callMethod($queue, 'getChannel'); + $this->assertTrue($queue->getConnection()->isConnected()); + $this->assertSame($channel, $this->callProperty($queue, 'channel')); + + // close + $queue->getConnection()->close(); + $this->assertFalse($queue->getConnection()->isConnected()); + + // reconnect + $this->callMethod($queue, 'reconnect'); + $this->assertTrue($queue->getConnection()->isConnected()); + $this->assertTrue($queue->getChannel()->is_open()); + } } diff --git a/tests/Mocks/TestEncryptedJob.php b/tests/Mocks/TestEncryptedJob.php new file mode 100644 index 00000000..1c0e1762 --- /dev/null +++ b/tests/Mocks/TestEncryptedJob.php @@ -0,0 +1,25 @@ +i = $i; + } + + public function handle(): void + { + // + } +} diff --git a/tests/Mocks/TestJob.php b/tests/Mocks/TestJob.php index 0f290fd4..f97c7e12 100644 --- a/tests/Mocks/TestJob.php +++ b/tests/Mocks/TestJob.php @@ -2,10 +2,14 @@ namespace VladimirYuldashev\LaravelQueueRabbitMQ\Tests\Mocks; +use Illuminate\Bus\Queueable; use Illuminate\Contracts\Queue\ShouldQueue; +use Illuminate\Foundation\Bus\Dispatchable; class TestJob implements ShouldQueue { + use Dispatchable, Queueable; + public $i; public function __construct($i = 0) diff --git a/tests/Mocks/TestSSLConnection.php b/tests/Mocks/TestSSLConnection.php new file mode 100644 index 00000000..c1586475 --- /dev/null +++ b/tests/Mocks/TestSSLConnection.php @@ -0,0 +1,14 @@ +config; + } +} diff --git a/tests/TestCase.php b/tests/TestCase.php index 7d50fa67..ec49a1ac 100644 --- a/tests/TestCase.php +++ b/tests/TestCase.php @@ -40,7 +40,7 @@ protected function getEnvironmentSetUp($app): void 'cafile' => null, 'local_cert' => null, 'local_key' => null, - 'verify_peer' => true, + 'verify_peer' => false, 'passphrase' => null, ], ], @@ -50,7 +50,7 @@ protected function getEnvironmentSetUp($app): void ]); } - protected function connection(string $name = null): RabbitMQQueue + protected function connection(?string $name = null): RabbitMQQueue { return Queue::connection($name); }