diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 00000000..6537ca46 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,15 @@ +root = true + +[*] +charset = utf-8 +end_of_line = lf +insert_final_newline = true +indent_style = space +indent_size = 4 +trim_trailing_whitespace = true + +[*.md] +trim_trailing_whitespace = false + +[*.{yml,yaml}] +indent_size = 2 diff --git a/.gitattributes b/.gitattributes index ff1a3cb7..f9ab78ed 100644 --- a/.gitattributes +++ b/.gitattributes @@ -6,9 +6,8 @@ .gitattributes export-ignore .gitignore export-ignore .styleci.yml export-ignore -.travis.yml export-ignore .php_cs.dist export-ignore -CHANGELOG.md export-ignore +CHANGELOG-* export-ignore CODE_OF_CONDUCT.md export-ignore CONTRIBUTING.md export-ignore phpunit.xml.dist export-ignore diff --git a/.github/ISSUE_TEMPLATE/bug_report.md b/.github/ISSUE_TEMPLATE/bug_report.md index dea6573b..d7f499d0 100644 --- a/.github/ISSUE_TEMPLATE/bug_report.md +++ b/.github/ISSUE_TEMPLATE/bug_report.md @@ -7,16 +7,30 @@ assignees: vyuldashev --- -- Laravel Version: #.#.# -- Package Version: #.#.# -- RabbitMQ Version: #.#.# +- Laravel/Lumen version: #.#.# +- RabbitMQ version: #.#.# +- Package version: #.#.# **Describe the bug** + A clear and concise description of what the bug is. **Steps To Reproduce** +What steps needed, to reproduce this bug. + +**Current behavior** + +- What happens with the worker? +- Is the message retried or put back into the queue? +- Is the message acknowledged or rejected? +- Is the message unacked? +- Is the message gone? + **Expected behavior** +What is the expected behavior + **Additional context** -Add any other context about the problem here. + +Add any other context about the problem or describe the use-case. diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml new file mode 100644 index 00000000..4aad6d20 --- /dev/null +++ b/.github/workflows/tests.yml @@ -0,0 +1,48 @@ +name: Tests + +on: + push: + pull_request: + schedule: + - cron: '0 0 * * *' + +jobs: + tests: + runs-on: ubuntu-latest + + strategy: + fail-fast: true + matrix: + php: ['8.1', '8.2', '8.3', '8.4'] + stability: ['prefer-lowest', 'prefer-stable'] + laravel: ['^10.0', '^11.0', '^12.0'] + exclude: + - php: '8.1' + laravel: '^11.0' + - php: '8.1' + laravel: '^12.0' + + name: 'PHP ${{ matrix.php }} - Laravel: ${{matrix.laravel}} - ${{ matrix.stability }}' + + steps: + - name: Checkout code + uses: actions/checkout@v2 + + - name: Setup PHP + uses: shivammathur/setup-php@v2 + with: + php-version: ${{ matrix.php }} + extensions: dom, curl, libxml, mbstring, zip + coverage: none + + - name: Start Docker container + run: docker compose up -d rabbitmq + + - name: Install dependencies + run: composer update --with='laravel/framework:${{matrix.laravel}}' --${{ matrix.stability }} --prefer-dist --no-interaction --no-progress + + - name: Run Laravel Pint + run: ./vendor/bin/pint --test + + - name: Execute tests + run: sleep 10 && vendor/bin/phpunit diff --git a/.gitignore b/.gitignore index 00dea824..a2dcf885 100644 --- a/.gitignore +++ b/.gitignore @@ -4,5 +4,5 @@ composer.lock .phpstorm.meta.php phpunit.xml -.phpunit.result.cache +.phpunit.* .php_cs.cache diff --git a/.php_cs.dist b/.php_cs.dist deleted file mode 100644 index 930c161e..00000000 --- a/.php_cs.dist +++ /dev/null @@ -1,57 +0,0 @@ -in(['config', 'src', 'tests']); - -return PhpCsFixer\Config::create() - ->setUsingCache(false) - ->setFinder($finder) - ->setRules([ - 'psr0' => false, - '@PSR2' => true, - 'blank_line_after_namespace' => true, - 'braces' => true, - 'class_definition' => true, - 'concat_space' => ['spacing' => 'none'], - 'elseif' => true, - 'function_declaration' => true, - 'indentation_type' => true, - 'line_ending' => true, - 'lowercase_constants' => true, - 'lowercase_keywords' => true, - 'method_argument_space' => [ - 'ensure_fully_multiline' => true, - ], - 'no_break_comment' => true, - 'no_closing_tag' => true, - 'no_spaces_after_function_name' => true, - 'no_spaces_inside_parenthesis' => true, - 'no_trailing_whitespace' => true, - 'no_trailing_whitespace_in_comment' => true, - 'single_blank_line_at_eof' => true, - 'single_class_element_per_statement' => [ - 'elements' => ['property'], - ], - 'single_import_per_statement' => true, - 'single_line_after_imports' => true, - 'switch_case_semicolon_to_colon' => true, - 'switch_case_space' => true, - 'visibility_required' => true, - 'encoding' => true, - 'full_opening_tag' => true, - 'blank_line_before_return' => true, - 'no_trailing_comma_in_singleline_array' => true, - 'trailing_comma_in_multiline_array' => true, - 'array_indentation' => true, - 'binary_operator_spaces' => [ - 'operators' => [ - '=' => 'single_space', - ], - ], - 'fully_qualified_strict_types' => true, - 'void_return' => true, - 'cast_spaces' => [ - 'space' => 'single', - ], - 'not_operator_with_successor_space' => true, - ]); diff --git a/.styleci.yml b/.styleci.yml deleted file mode 100644 index bdff1b9f..00000000 --- a/.styleci.yml +++ /dev/null @@ -1,4 +0,0 @@ -preset: laravel - -disabled: - - simplified_null_return diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index 960328da..00000000 --- a/.travis.yml +++ /dev/null @@ -1,27 +0,0 @@ -language: php - -php: - - 7.2 - - 7.3 - -services: - - docker - -cache: - directories: - - $HOME/.composer/cache - -before_install: - - sudo rm /usr/local/bin/docker-compose - - curl -L https://github.com/docker/compose/releases/download/1.24.1/docker-compose-`uname -s`-`uname -m` > docker-compose - - chmod +x docker-compose - - sudo mv docker-compose /usr/local/bin - -before_script: - - docker-compose up -d rabbitmq - - travis_retry composer self-update - - travis_retry composer update --no-progress --no-interaction --prefer-dist - - sleep 10 - -script: - - composer test diff --git a/CHANGELOG-10x.md b/CHANGELOG-10x.md new file mode 100644 index 00000000..a396c6d6 --- /dev/null +++ b/CHANGELOG-10x.md @@ -0,0 +1,72 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +## [Unreleased](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v10.2.3...v10.0) + +## [10.2.3 (2021-02-12)](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v10.2.2...v10.2.3) + +- Fix Worker is getting killed by timeout when no more jobs available [#404](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/404) + +## [10.2.2 (2020-07-18)](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v10.2.1...v10.2.2) + +- Fix: Make Artisan commands visible not just in console [#348](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/348) +- Fix: Disable heartbeat when not configured [#346](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/346) + +## [10.2.1 (2020-05-11)](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v10.2.0...v10.2.1) + +- Fix: When a job fails it tries to reject it twice [#322](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/322) + +## [10.2.0 (2020-04-24)](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v10.1.3...v10.2.0) + +Huge thanks to [adm-bone](https://github.com/adm-bome) for this release. + +- Added support for Laravel 7.0 [#319](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/319) +- Added `rabbitmq:exchange-delete` artisan command [#317](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/317) +- Added `rabbitmq:queue-delete` artisan command [#317](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/317) +- Failed jobs can be rerouted to another exchange [#317](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/317) +- Exchange type is configurable [#317](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/317) +- Job attempts are fixed [#304](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/304) +- Added prioritization for failed jobs [#304](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/304) +- Fixed: if delay is not set when releasing a job, job will be lost [#304](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/304) +- Fix loosing messages when forced to close connection [#311](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/311) +- Fixed unacked message when class not found [#314](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/314) + +## [10.1.3 (2020-01-12)](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v10.1.2...v10.1.3) + +- Fix 100% CPU usage of `rabbitmq:consume` command by adding sleep to consumer when no messages are got from the queue. +- Fix `stop-on-empty` flag for `rabbitmq:consume` command. + +## [10.1.2 (2019-12-24)](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v10.1.1...v10.1.2) + +- Fix `rabbitmq:queue-bind` command. [#294](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/294) + +## [10.1.1 (2019-12-18)](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v10.1.0...v10.1.1) + +- Fix `rabbitmq:exchange-declare` command. [#293](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/293) + +## [10.1.0 (2019-12-16)](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v10.0.2...v10.1.0) + +- Add `rabbitmq:consume` command which uses `basic_consume` instead of `basic_get` used by `queue:work`. [#289](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/289) +- Heartbeat disabled globally +- Shuffle hosts before connecting to get better load balancing + +## [10.0.2 (2019-12-13)](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v10.0.1...v10.0.2) + +- Finally fix [#235](https://github.com/vyuldashev/laravel-queue-rabbitmq/issues/235) + +## [10.0.1 (2019-12-13)](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v10.0.0...v10.0.1) + +- Add missing container instance and connectionName to RabbitMQJob + +## [10.0.0 (2019-12-12)](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v9.0...v10.0.0) + +- Switch from enqueue to [php-amqplib](https://github.com/php-amqplib/php-amqplib) +- Fix [#235](https://github.com/vyuldashev/laravel-queue-rabbitmq/issues/235) +- Add support for multiple hosts +- Added `exchange:declare` artisan command +- Added `queue:bind` artisan command +- Added `queue:declare` artisan command +- Added `queue:purge` artisan command +- Bulk push messages using `batch_basic_publish` +- No more “sleeps”. Exception will be thrown on lost connection or if any other exception occurs and process manager should be configured properly to manage such situations. diff --git a/CHANGELOG-11x.md b/CHANGELOG-11x.md new file mode 100644 index 00000000..cbbc691e --- /dev/null +++ b/CHANGELOG-11x.md @@ -0,0 +1,47 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +## [Unreleased](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v11.4.0...master) + +## [11.4.0 (2021-07-27)](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v11.3.0...v11.4.0) + +- Randomize consumer tag [#432](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/432) + +## [11.3.0 (2021-07-06)](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v11.2.0...v11.3.0) + +- Quorum queues support [#359](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/359) +- max-priority support [#422](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/422) +- Ability to specify exchange and exchange_type when using pushRaw() [#420](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/420) +- Remember exchanges once they have been verified [#407](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/407) + +## [11.2.0 (2021-03-16)](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v11.1.2...v11.2.0) + +- PHP 8 support +- Fix missing rest option in `php artisan rabbitmq:consume` command [#416](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/416) + +## [11.1.2 (2021-03-07)](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v11.1.1...v11.1.2) + +- Update Consumer to stop when stopIfNecessary() returns exit code [#409](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/409) + +## [11.1.1 (2020-12-07)](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v11.1.0...v11.1.1) + +- Fix worker is stopped by timeout when no new jobs available [#352](https://github.com/vyuldashev/laravel-queue-rabbitmq/issues/352) + +## [11.1.0 (2020-12-05)](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v11.0.2...v11.1.0) + +- Custom job class [#370](https://github.com/vyuldashev/laravel-queue-rabbitmq/issues/370) + +## [11.0.2 (2020-09-20)](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v11.0.1...v11.0.2) + +- Add missing options to rabbitmq:consume command [#363](https://github.com/vyuldashev/laravel-queue-rabbitmq/issues/363) + +## [11.0.1 (2020-09-19)](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v11.0.0...v11.0.1) + +- Fix rabbitmq:consume name option does not exist [#363](https://github.com/vyuldashev/laravel-queue-rabbitmq/issues/363) +- Fix Class 'Laravel\Horizon\JobId' not found [#362](https://github.com/vyuldashev/laravel-queue-rabbitmq/issues/362) + +## [11.0.0 (2020-09-09)](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v10.2.2...v11.0.0) + +- Laravel 8 support +- Minimum PHP version is set to 7.3 diff --git a/CHANGELOG-12x.md b/CHANGELOG-12x.md new file mode 100644 index 00000000..3887c2d1 --- /dev/null +++ b/CHANGELOG-12x.md @@ -0,0 +1,14 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +## [Unreleased](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v12.0.1...master) + +## [12.0.1 (2022-04-06)](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v12.0.0...v12.0.1) + +- Allow laravel to end workers with lost connection [#457](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/457) + +## [12.0.0 (2022-02-23)](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v11.4.0...v12.0.0) + +- Laravel 9 support +- Minimum PHP version is set to 8.0 diff --git a/CHANGELOG-13x.md b/CHANGELOG-13x.md new file mode 100644 index 00000000..64e448ce --- /dev/null +++ b/CHANGELOG-13x.md @@ -0,0 +1,45 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +## [unreleased](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v13.3.0...master) + +## [13.3.1](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v13.3.0...13.3.3) +- Fix a bug when no job / message is available on the queue initially [#543](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/543) + +## [13.3.0](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v13.2.0...13.3.0) + +- Refactor the creation of RabbitMQ Connection and + QueueAPI. [#528](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/528) +- Added configuration object as single dependency for RabbitMQQueue in + constructor. [#528](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/528) +- Fix method getExchangeType, not throwing an + exception. [#528](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/528) +- Separating the api logic from the actual publishing to + RabbitMQ. [#528](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/528) +- Added a reconnect method. [#528](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/528) +- Fix the connection and channel not being fully lazy, when QueueAPI was + created. [#528](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/528) +- Keep track of declared queue's within RabbitMQ. [#528](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/528) +- Implemented the 'rest' option to the consumer [#530](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/530) +- Added ability to reconnect to RabbitMQ, by creating your + own `RabbitMQQueue:class` [#531](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/531) + +## [13.2.0](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v13.1.0...13.2.0) + +- Compatibility with Laravel 10 [#525](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/525) + +## [13.1.0 (2023-01-25)](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v13.0.1...v13.1.0) + +- Fix delay parameter not being used [#502](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/502) +- Resolve Laravel 9 incompatabilities [#502](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/502) +- Fix Horizon invalid delay property [#502](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/502) + +## [13.0.1 (2022-09-16)](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v13.0.0...v13.0.1) + +- Add $dispatchAfterCommit when running via + Horizon [#484](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/484) + +## [13.0.0 (2022-09-15)](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v12.0.1...v13.0.0) + +- Dispatch a job after DB transaction commit [#468](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/468) diff --git a/CHANGELOG-14x.md b/CHANGELOG-14x.md new file mode 100644 index 00000000..c94b3dd2 --- /dev/null +++ b/CHANGELOG-14x.md @@ -0,0 +1,8 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +## [unreleased](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v13.3.0...master) + +## [14.0.0] +- First release compatible with Laravel 11 diff --git a/CHANGELOG.md b/CHANGELOG.md deleted file mode 100644 index 37858754..00000000 --- a/CHANGELOG.md +++ /dev/null @@ -1,15 +0,0 @@ -# Changelog - -All notable changes to this project will be documented in this file. - -## [10.0.0 (2019-xx-xx)](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v9.0...master) - -- Switch from enqueue to [php-amqplib](https://github.com/php-amqplib/php-amqplib) -- Fix #235 -- Add support for multiple hosts -- Added `exchange:declare` artisan command -- Added `queue:bind` artisan command -- Added `queue:declare` artisan command -- Added `queue:purge` artisan command -- Bulk push messages using `batch_basic_publish` -- No more “sleeps”. Exception will be thrown on lost connection or if any other exception occurs and process manager should be configured properly to manage such situations. diff --git a/LICENSE.md b/LICENSE.md new file mode 100644 index 00000000..07fbf64c --- /dev/null +++ b/LICENSE.md @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) Vladimir Yuldashev + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/README.md b/README.md index 1a197721..30a10df9 100644 --- a/README.md +++ b/README.md @@ -1,23 +1,17 @@ RabbitMQ Queue driver for Laravel ====================== [![Latest Stable Version](https://poser.pugx.org/vladimir-yuldashev/laravel-queue-rabbitmq/v/stable?format=flat-square)](https://packagist.org/packages/vladimir-yuldashev/laravel-queue-rabbitmq) -[![Build Status](https://img.shields.io/travis/vyuldashev/laravel-queue-rabbitmq.svg?style=flat-square)](https://travis-ci.org/vyuldashev/laravel-queue-rabbitmq) +[![Build Status](https://github.com/vyuldashev/laravel-queue-rabbitmq/actions/workflows/tests.yml/badge.svg?branch=master)](https://github.com/vyuldashev/laravel-queue-rabbitmq/actions/workflows/tests.yml) [![Total Downloads](https://poser.pugx.org/vladimir-yuldashev/laravel-queue-rabbitmq/downloads?format=flat-square)](https://packagist.org/packages/vladimir-yuldashev/laravel-queue-rabbitmq) -[![StyleCI](https://styleci.io/repos/14976752/shield)](https://styleci.io/repos/14976752) [![License](https://poser.pugx.org/vladimir-yuldashev/laravel-queue-rabbitmq/license?format=flat-square)](https://packagist.org/packages/vladimir-yuldashev/laravel-queue-rabbitmq) ## Support Policy Only the latest version will get new features. Bug fixes will be provided using the following scheme: -| Package Version | Laravel Version | Bug Fixes Until | -|-----------------|-----------------|---------------------| -| 6.0 | 5.5 | August 30th, 2019 | -| 7.0 | 5.6 | August 7th, 2018 | -| 7.1 | 5.7 | March 4th, 2019 | -| 7.2 | 5.8 | August 26th, 2019 | -| 9 | 6 | September 3rd, 2021 | -| 10 | 6 | September 3rd, 2021 | +| Package Version | Laravel Version | Bug Fixes Until | | +|-----------------|-----------------|------------------|---------------------------------------------------------------------------------------------| +| 13 | 9 | August 8th, 2023 | [Documentation](https://github.com/vyuldashev/laravel-queue-rabbitmq/blob/master/README.md) | ## Installation @@ -27,9 +21,13 @@ You can install this package via composer using this command: composer require vladimir-yuldashev/laravel-queue-rabbitmq ``` -The package will automatically register itself using Laravel auto-discovery. +The package will automatically register itself. -Setup connection in `config/queue.php` +### Configuration + +Add connection to `config/queue.php`: + +> This is the minimal config for the rabbitMQ connection/driver to work. ```php 'connections' => [ @@ -38,9 +36,6 @@ Setup connection in `config/queue.php` 'rabbitmq' => [ 'driver' => 'rabbitmq', - 'queue' => env('RABBITMQ_QUEUE', 'default'), - 'connection' => PhpAmqpLib\Connection\AMQPLazyConnection::class, - 'hosts' => [ [ 'host' => env('RABBITMQ_HOST', '127.0.0.1'), @@ -49,36 +44,529 @@ Setup connection in `config/queue.php` 'password' => env('RABBITMQ_PASSWORD', 'guest'), 'vhost' => env('RABBITMQ_VHOST', '/'), ], + // ... ], - - 'options' => [ - 'ssl_options' => [ - 'cafile' => env('RABBITMQ_SSL_CAFILE', null), - 'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null), - 'local_key' => env('RABBITMQ_SSL_LOCALKEY', null), - 'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true), - 'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null), - ], - ], - - /* - * Set to "horizon" if you wish to use Laravel Horizon. - */ + + // ... + ], + + // ... +], +``` + +### Optional Queue Config + +Optionally add queue options to the config of a connection. +Every queue created for this connection, gets the properties. + +When you want to prioritize messages when they were delayed, then this is possible by adding extra options. + +- When max-priority is omitted, the max priority is set with 2 when used. + +```php +'connections' => [ + // ... + + 'rabbitmq' => [ + // ... + + 'options' => [ + 'queue' => [ + // ... + + 'prioritize_delayed' => false, + 'queue_max_priority' => 10, + ], + ], + ], + + // ... +], +``` + +When you want to publish messages against an exchange with routing-keys, then this is possible by adding extra options. + +- When the exchange is omitted, RabbitMQ will use the `amq.direct` exchange for the routing-key +- When routing-key is omitted the routing-key by default is the `queue` name. +- When using `%s` in the routing-key the queue_name will be substituted. + +> Note: when using an exchange with routing-key, you probably create your queues with bindings yourself. + +```php +'connections' => [ + // ... + + 'rabbitmq' => [ + // ... + + 'options' => [ + 'queue' => [ + // ... + + 'exchange' => 'application-x', + 'exchange_type' => 'topic', + 'exchange_routing_key' => '', + ], + ], + ], + + // ... +], +``` + +In Laravel failed jobs are stored into the database. But maybe you want to instruct some other process to also do +something with the message. +When you want to instruct RabbitMQ to reroute failed messages to a exchange or a specific queue, then this is possible +by adding extra options. + +- When the exchange is omitted, RabbitMQ will use the `amq.direct` exchange for the routing-key +- When routing-key is omitted, the routing-key by default the `queue` name is substituted with `'.failed'`. +- When using `%s` in the routing-key the queue_name will be substituted. + +> Note: When using failed_job exchange with routing-key, you probably need to create your exchange/queue with bindings +> yourself. + +```php +'connections' => [ + // ... + + 'rabbitmq' => [ + // ... + + 'options' => [ + 'queue' => [ + // ... + + 'reroute_failed' => true, + 'failed_exchange' => 'failed-exchange', + 'failed_routing_key' => 'application-x.%s', + ], + ], + ], + + // ... +], +``` + +### Horizon support + +Starting with 8.0, this package supports [Laravel Horizon](https://laravel.com/docs/horizon) out of the box. Firstly, +install Horizon and then set `RABBITMQ_WORKER` to `horizon`. + +Horizon is depending on events dispatched by the worker. +These events inform Horizon what was done with the message/job. + +This Library supports Horizon, but in the config you have to inform Laravel to use the QueueApi compatible with horizon. + +```php +'connections' => [ + // ... + + 'rabbitmq' => [ + // ... + + /* Set to "horizon" if you wish to use Laravel Horizon. */ 'worker' => env('RABBITMQ_WORKER', 'default'), - ], // ... ], ``` -## Laravel Usage +### Use your own RabbitMQJob class + +Sometimes you have to work with messages published by another application. +Those messages probably won't respect Laravel's job payload schema. +The problem with these messages is that, Laravel workers won't be able to determine the actual job or class to execute. + +You can extend the build-in `RabbitMQJob::class` and within the queue connection config, you can define your own class. +When you specify a `job` key in the config, with your own class name, every message retrieved from the broker will get +wrapped by your own class. + +An example for the config: + +```php +'connections' => [ + // ... + + 'rabbitmq' => [ + // ... + + 'options' => [ + 'queue' => [ + // ... + + 'job' => \App\Queue\Jobs\RabbitMQJob::class, + ], + ], + ], + + // ... +], +``` + +An example of your own job class: + +```php +payload(); + + $class = WhatheverClassNameToExecute::class; + $method = 'handle'; + + ($this->instance = $this->resolve($class))->{$method}($this, $payload); + + $this->delete(); + } +} + +``` + +Or maybe you want to add extra properties to the payload: + +```php + 'WhatheverFullyQualifiedClassNameToExecute@handle', + 'data' => json_decode($this->getRawBody(), true) + ]; + } +} +``` + +If you want to handle raw message, not in JSON format or without 'job' key in JSON, +you should add stub for `getName` method: + +```php +getRawBody(); + Log::info($anyMessage); + + $this->delete(); + } + + public function getName() + { + return ''; + } +} +``` + +### Use your own Connection + +You can extend the built-in `PhpAmqpLib\Connection\AMQPStreamConnection::class` +or `PhpAmqpLib\Connection\AMQPSLLConnection::class` and within the connection config, you can define your own class. +When you specify a `connection` key in the config, with your own class name, every connection will use your own class. + +An example for the config: + +```php +'connections' => [ + // ... + + 'rabbitmq' => [ + // ... + + 'connection' = > \App\Queue\Connection\MyRabbitMQConnection::class, + ], + + // ... +], +``` + +### Use your own Worker class + +If you want to use your own `RabbitMQQueue::class` this is possible by +extending `VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue`. +and inform laravel to use your class by setting `RABBITMQ_WORKER` to `\App\Queue\RabbitMQQueue::class`. + +> Note: Worker classes **must** extend `VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue` + +```php +'connections' => [ + // ... + + 'rabbitmq' => [ + // ... + + /* Set to a class if you wish to use your own. */ + 'worker' => \App\Queue\RabbitMQQueue::class, + ], + + // ... +], +``` + +```php + Note: this is not best practice, it is an example. + +```php +reconnect(); + parent::publishBasic($msg, $exchange, $destination, $mandatory, $immediate, $ticket); + } + } + + protected function publishBatch($jobs, $data = '', $queue = null): void + { + try { + parent::publishBatch($jobs, $data, $queue); + } catch (AMQPConnectionClosedException|AMQPChannelClosedException) { + $this->reconnect(); + parent::publishBatch($jobs, $data, $queue); + } + } + + protected function createChannel(): AMQPChannel + { + try { + return parent::createChannel(); + } catch (AMQPConnectionClosedException) { + $this->reconnect(); + return parent::createChannel(); + } + } +} +``` + +### Default Queue + +The connection does use a default queue with value 'default', when no queue is provided by laravel. +It is possible to change te default queue by adding an extra parameter in the connection config. + +```php +'connections' => [ + // ... + + 'rabbitmq' => [ + // ... + + 'queue' => env('RABBITMQ_QUEUE', 'default'), + ], + + // ... +], +``` + +### Heartbeat + +By default, your connection will be created with a heartbeat setting of `0`. +You can alter the heartbeat settings by changing the config. + +```php + +'connections' => [ + // ... + + 'rabbitmq' => [ + // ... + + 'options' => [ + // ... + + 'heartbeat' => 10, + ], + ], + + // ... +], +``` + +### SSL Secure + +If you need a secure connection to rabbitMQ server(s), you will need to add these extra config options. + +```php +'connections' => [ + // ... + + 'rabbitmq' => [ + // ... + + 'secure' = > true, + 'options' => [ + // ... + + 'ssl_options' => [ + 'cafile' => env('RABBITMQ_SSL_CAFILE', null), + 'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null), + 'local_key' => env('RABBITMQ_SSL_LOCALKEY', null), + 'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true), + 'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null), + ], + ], + ], + + // ... +], +``` + +### Events after Database commits + +To instruct Laravel workers to dispatch events after all database commits are completed. + +```php +'connections' => [ + // ... + + 'rabbitmq' => [ + // ... + + 'after_commit' => true, + ], + + // ... +], +``` + +### Lazy Connection + +By default, your connection will be created as a lazy connection. +If for some reason you don't want the connection lazy you can turn it off by setting the following config. + +```php +'connections' => [ + // ... + + 'rabbitmq' => [ + // ... + + 'lazy' = > false, + ], + + // ... +], +``` + +### Network Protocol + +By default, the network protocol used for connection is tcp. +If for some reason you want to use another network protocol, you can add the extra value in your config options. +Available protocols : `tcp`, `ssl`, `tls` + +```php +'connections' => [ + // ... + + 'rabbitmq' => [ + // ... + + 'network_protocol' => 'tcp', + ], + + // ... +], +``` + +### Network Timeouts + +For network timeouts configuration you can use option parameters. +All float values are in seconds and zero value can mean infinite timeout. +Example contains default values. + +```php +'connections' => [ + // ... + + 'rabbitmq' => [ + // ... + + 'options' => [ + // ... + + 'connection_timeout' => 3.0, + 'read_timeout' => 3.0, + 'write_timeout' => 3.0, + 'channel_rpc_timeout' => 0.0, + ], + ], + + // ... +], +``` + +### Octane support + +Starting with 13.3.0, this package supports [Laravel Octane](https://laravel.com/docs/octane) out of the box. +Firstly, install Octane and don't forget to warm 'rabbitmq' connection in the octane config. +> See: https://github.com/vyuldashev/laravel-queue-rabbitmq/issues/460#issuecomment-1469851667 + +## Laravel Usage + +Once you completed the configuration you can use the Laravel Queue API. If you used other queue drivers you do not +need to change anything else. If you do not know how to use the Queue API, please refer to the official Laravel +documentation: http://laravel.com/docs/queues ## Lumen Usage @@ -88,12 +576,20 @@ For Lumen usage the service provider should be registered manually as follow in $app->register(VladimirYuldashev\LaravelQueueRabbitMQ\LaravelQueueRabbitMQServiceProvider::class); ``` +## Consuming Messages + +There are two ways of consuming messages. + +1. `queue:work` command which is Laravel's built-in command. This command utilizes `basic_get`. Use this if you want to consume multiple queues. + +2. `rabbitmq:consume` command which is provided by this package. This command utilizes `basic_consume` and is more performant than `basic_get` by ~2x, but does not support multiple queues. + ## Testing Setup RabbitMQ using `docker-compose`: ```bash -docker-compose up -d rabbitmq +docker compose up -d ``` To run the test suite you can use the following commands: @@ -110,7 +606,7 @@ composer test:unit ``` If you receive any errors from the style tests, you can automatically fix most, -if not all of the issues with the following command: +if not all the issues with the following command: ```bash composer fix:style @@ -118,4 +614,5 @@ composer fix:style ## Contribution -You can contribute to this package by discovering bugs and opening issues. Please, add to which version of package you create pull request or issue. (e.g. [5.2] Fatal error on delayed job) +You can contribute to this package by discovering bugs and opening issues. Please, add to which version of package you +create pull request or issue. (e.g. [5.2] Fatal error on delayed job) diff --git a/composer.json b/composer.json index 779064c3..caf8a0fc 100644 --- a/composer.json +++ b/composer.json @@ -1,55 +1,59 @@ { - "name": "vladimir-yuldashev/laravel-queue-rabbitmq", - "description": "RabbitMQ driver for Laravel Queue. Supports Laravel Horizon.", - "license": "MIT", - "authors": [ - { - "name": "Vladimir Yuldashev", - "email": "misterio92@gmail.com" - } - ], - "require": { - "php": "^7.2", - "ext-json": "*", - "illuminate/queue": "^6.0", - "php-amqplib/php-amqplib": "^2.11" - }, - "require-dev": { - "phpunit/phpunit": "^8.4", - "mockery/mockery": "^1.0", - "laravel/horizon": "^3.0", - "friendsofphp/php-cs-fixer": "^2.16", - "orchestra/testbench": "^4.3" - }, - "autoload": { - "psr-4": { - "VladimirYuldashev\\LaravelQueueRabbitMQ\\": "src/" - } - }, - "autoload-dev": { - "psr-4": { - "VladimirYuldashev\\LaravelQueueRabbitMQ\\Tests\\": "tests/" - } - }, - "extra": { - "branch-alias": { - "dev-master": "10.0-dev" - }, - "laravel": { - "providers": [ - "VladimirYuldashev\\LaravelQueueRabbitMQ\\LaravelQueueRabbitMQServiceProvider" - ] - } - }, - "scripts": { - "test": [ - "@test:style", - "@test:unit" + "name": "vladimir-yuldashev/laravel-queue-rabbitmq", + "description": "RabbitMQ driver for Laravel Queue. Supports Laravel Horizon.", + "license": "MIT", + "authors": [ + { + "name": "Vladimir Yuldashev", + "email": "misterio92@gmail.com" + } ], - "test:style": "@php vendor/bin/php-cs-fixer fix --config=.php_cs.dist --allow-risky=yes --dry-run --diff --verbose", - "test:unit": "@php vendor/bin/phpunit", - "fix:style": "@php vendor/bin/php-cs-fixer fix --config=.php_cs.dist --allow-risky=yes --diff --verbose" - }, - "minimum-stability": "dev", - "prefer-stable": true + "require": { + "php": "^8.0", + "ext-json": "*", + "illuminate/queue": "^10.0|^11.0|^12.0", + "php-amqplib/php-amqplib": "^v3.6" + }, + "require-dev": { + "phpunit/phpunit": "^10.0|^11.0", + "mockery/mockery": "^1.0", + "laravel/horizon": "^5.0", + "orchestra/testbench": "^7.0|^8.0|^9.0|^10.0", + "laravel/pint": "^1.2", + "laravel/framework": "^9.0|^10.0|^11.0|^12.0" + }, + "autoload": { + "psr-4": { + "VladimirYuldashev\\LaravelQueueRabbitMQ\\": "src/" + } + }, + "autoload-dev": { + "psr-4": { + "VladimirYuldashev\\LaravelQueueRabbitMQ\\Tests\\": "tests/" + } + }, + "extra": { + "branch-alias": { + "dev-master": "13.0-dev" + }, + "laravel": { + "providers": [ + "VladimirYuldashev\\LaravelQueueRabbitMQ\\LaravelQueueRabbitMQServiceProvider" + ] + } + }, + "suggest": { + "ext-pcntl": "Required to use all features of the queue consumer." + }, + "scripts": { + "test": [ + "@test:style", + "@test:unit" + ], + "test:style": "@php vendor/bin/pint --test -v", + "test:unit": "@php vendor/bin/phpunit", + "fix:style": "@php vendor/bin/pint -v" + }, + "minimum-stability": "dev", + "prefer-stable": true } diff --git a/config/rabbitmq.php b/config/rabbitmq.php index 765d73b8..4c102ce8 100644 --- a/config/rabbitmq.php +++ b/config/rabbitmq.php @@ -9,7 +9,7 @@ 'driver' => 'rabbitmq', 'queue' => env('RABBITMQ_QUEUE', 'default'), - 'connection' => PhpAmqpLib\Connection\AMQPLazyConnection::class, + 'connection' => 'default', 'hosts' => [ [ @@ -22,13 +22,6 @@ ], 'options' => [ - 'ssl_options' => [ - 'cafile' => env('RABBITMQ_SSL_CAFILE', null), - 'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null), - 'local_key' => env('RABBITMQ_SSL_LOCALKEY', null), - 'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true), - 'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null), - ], ], /* diff --git a/docker-compose.yml b/docker-compose.yml index 6052f7f9..4ac32caa 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -3,7 +3,7 @@ version: '3.7' services: rabbitmq: - image: rabbitmq + image: rabbitmq:3.8 environment: RABBITMQ_DEFAULT_USER: guest RABBITMQ_DEFAULT_PASSWORD: guest @@ -17,10 +17,10 @@ services: - "./tests/files/rootCA.pem:/rootCA.pem:ro" - "./tests/files/rootCA.key:/rootCA.key:ro" ports: - - 15671:15671 - - 15672:15672 - - 5671:5671 - - 5672:5672 + - "15671:15671" + - "15672:15672" + - "5671:5671" + - "5672:5672" rabbitmq-management: image: rabbitmq:management diff --git a/phpunit.xml.dist b/phpunit.xml.dist index d31909dc..d213fd63 100644 --- a/phpunit.xml.dist +++ b/phpunit.xml.dist @@ -1,34 +1,16 @@ - - - - ./tests/ - - - - - - - - - - - src/ - - - - - + + + + ./tests/ + + + + + + + + + + diff --git a/pint.json b/pint.json new file mode 100644 index 00000000..05f4b41e --- /dev/null +++ b/pint.json @@ -0,0 +1,8 @@ +{ + "preset": "laravel", + "rules": { + "php_unit_method_casing": { + "case": "camel_case" + } + } +} diff --git a/src/Console/ConsumeCommand.php b/src/Console/ConsumeCommand.php new file mode 100644 index 00000000..4072132a --- /dev/null +++ b/src/Console/ConsumeCommand.php @@ -0,0 +1,66 @@ +worker; + + $consumer->setContainer($this->laravel); + $consumer->setName($this->option('name')); + $consumer->setConsumerTag($this->consumerTag()); + $consumer->setMaxPriority((int) $this->option('max-priority')); + $consumer->setPrefetchSize((int) $this->option('prefetch-size')); + $consumer->setPrefetchCount((int) $this->option('prefetch-count')); + + parent::handle(); + } + + protected function consumerTag(): string + { + if ($consumerTag = $this->option('consumer-tag')) { + return $consumerTag; + } + + $consumerTag = implode('_', [ + Str::slug(config('app.name', 'laravel')), + Str::slug($this->option('name')), + md5(serialize($this->options()).Str::random(16).getmypid()), + ]); + + return Str::substr($consumerTag, 0, 255); + } +} diff --git a/src/Console/ExchangeDeclareCommand.php b/src/Console/ExchangeDeclareCommand.php index 48cafaf1..43aa9c5c 100644 --- a/src/Console/ExchangeDeclareCommand.php +++ b/src/Console/ExchangeDeclareCommand.php @@ -18,7 +18,6 @@ class ExchangeDeclareCommand extends Command protected $description = 'Declare exchange'; /** - * @param RabbitMQConnector $connector * @throws Exception */ public function handle(RabbitMQConnector $connector): void @@ -35,11 +34,11 @@ public function handle(RabbitMQConnector $connector): void $queue->declareExchange( $this->argument('name'), - $this->argument('type'), + $this->option('type'), (bool) $this->option('durable'), (bool) $this->option('auto-delete') ); - $this->warn('Exchange declared successfully.'); + $this->info('Exchange declared successfully.'); } } diff --git a/src/Console/ExchangeDeleteCommand.php b/src/Console/ExchangeDeleteCommand.php new file mode 100644 index 00000000..d5a8d8d4 --- /dev/null +++ b/src/Console/ExchangeDeleteCommand.php @@ -0,0 +1,40 @@ +laravel['config']->get('queue.connections.'.$this->argument('connection')); + + $queue = $connector->connect($config); + + if (! $queue->isExchangeExists($this->argument('name'))) { + $this->warn('Exchange does not exist.'); + + return; + } + + $queue->deleteExchange( + $this->argument('name'), + (bool) $this->option('unused') + ); + + $this->info('Exchange deleted successfully.'); + } +} diff --git a/src/Console/QueueBindCommand.php b/src/Console/QueueBindCommand.php index 47c4ea3a..ccbbd706 100644 --- a/src/Console/QueueBindCommand.php +++ b/src/Console/QueueBindCommand.php @@ -12,12 +12,11 @@ class QueueBindCommand extends Command {queue} {exchange} {connection=rabbitmq : The name of the queue connection to use} - {--routing-key}'; + {--routing-key= : Bind queue to exchange via routing key}'; protected $description = 'Bind queue to exchange'; /** - * @param RabbitMQConnector $connector * @throws Exception */ public function handle(RabbitMQConnector $connector): void diff --git a/src/Console/QueueDeclareCommand.php b/src/Console/QueueDeclareCommand.php index 0c89f329..54d6ea32 100644 --- a/src/Console/QueueDeclareCommand.php +++ b/src/Console/QueueDeclareCommand.php @@ -11,13 +11,14 @@ class QueueDeclareCommand extends Command protected $signature = 'rabbitmq:queue-declare {name : The name of the queue to declare} {connection=rabbitmq : The name of the queue connection to use} + {--max-priority} {--durable=1} - {--auto-delete=0}'; + {--auto-delete=0} + {--quorum=0}'; protected $description = 'Declare queue'; /** - * @param RabbitMQConnector $connector * @throws Exception */ public function handle(RabbitMQConnector $connector): void @@ -32,10 +33,22 @@ public function handle(RabbitMQConnector $connector): void return; } + $arguments = []; + + $maxPriority = (int) $this->option('max-priority'); + if ($maxPriority) { + $arguments['x-max-priority'] = $maxPriority; + } + + if ($this->option('quorum')) { + $arguments['x-queue-type'] = 'quorum'; + } + $queue->declareQueue( $this->argument('name'), (bool) $this->option('durable'), - (bool) $this->option('auto-delete') + (bool) $this->option('auto-delete'), + $arguments ); $this->info('Queue declared successfully.'); diff --git a/src/Console/QueueDeleteCommand.php b/src/Console/QueueDeleteCommand.php new file mode 100644 index 00000000..b2586ecd --- /dev/null +++ b/src/Console/QueueDeleteCommand.php @@ -0,0 +1,42 @@ +laravel['config']->get('queue.connections.'.$this->argument('connection')); + + $queue = $connector->connect($config); + + if (! $queue->isQueueExists($this->argument('name'))) { + $this->warn('Queue does not exist.'); + + return; + } + + $queue->deleteQueue( + $this->argument('name'), + (bool) $this->option('unused'), + (bool) $this->option('empty') + ); + + $this->info('Queue deleted successfully.'); + } +} diff --git a/src/Console/QueuePurgeCommand.php b/src/Console/QueuePurgeCommand.php index 8b03604a..49be839b 100644 --- a/src/Console/QueuePurgeCommand.php +++ b/src/Console/QueuePurgeCommand.php @@ -19,7 +19,6 @@ class QueuePurgeCommand extends Command protected $description = 'Purge all messages in queue'; /** - * @param RabbitMQConnector $connector * @throws Exception */ public function handle(RabbitMQConnector $connector): void diff --git a/src/Consumer.php b/src/Consumer.php new file mode 100644 index 00000000..ed3d8099 --- /dev/null +++ b/src/Consumer.php @@ -0,0 +1,210 @@ +container = $value; + } + + public function setConsumerTag(string $value): void + { + $this->consumerTag = $value; + } + + public function setMaxPriority(int $value): void + { + $this->maxPriority = $value; + } + + public function setPrefetchSize(int $value): void + { + $this->prefetchSize = $value; + } + + public function setPrefetchCount(int $value): void + { + $this->prefetchCount = $value; + } + + /** + * Listen to the given queue in a loop. + * + * @param string $connectionName + * @param string $queue + * @return int + * + * @throws Throwable + */ + public function daemon($connectionName, $queue, WorkerOptions $options) + { + if ($this->supportsAsyncSignals()) { + $this->listenForSignals(); + } + + $lastRestart = $this->getTimestampOfLastQueueRestart(); + + [$startTime, $jobsProcessed] = [hrtime(true) / 1e9, 0]; + + /** @var RabbitMQQueue $connection */ + $connection = $this->manager->connection($connectionName); + + $this->channel = $connection->getChannel(); + + $this->channel->basic_qos( + $this->prefetchSize, + $this->prefetchCount, + false + ); + + $jobClass = $connection->getJobClass(); + $arguments = []; + if ($this->maxPriority) { + $arguments['priority'] = ['I', $this->maxPriority]; + } + + $this->channel->basic_consume( + $queue, + $this->consumerTag, + false, + false, + false, + false, + function (AMQPMessage $message) use ($connection, $options, $connectionName, $queue, $jobClass, &$jobsProcessed): void { + $job = new $jobClass( + $this->container, + $connection, + $message, + $connectionName, + $queue + ); + + $this->currentJob = $job; + + if ($this->supportsAsyncSignals()) { + $this->registerTimeoutHandler($job, $options); + } + + $jobsProcessed++; + + $this->runJob($job, $connectionName, $options); + + if ($this->supportsAsyncSignals()) { + $this->resetTimeoutHandler(); + } + + if ($options->rest > 0) { + $this->sleep($options->rest); + } + }, + null, + $arguments + ); + + while ($this->channel->is_consuming()) { + // Before reserving any jobs, we will make sure this queue is not paused and + // if it is we will just pause this worker for a given amount of time and + // make sure we do not need to kill this worker process off completely. + if (! $this->daemonShouldRun($options, $connectionName, $queue)) { + $this->pauseWorker($options, $lastRestart); + + continue; + } + + // If the daemon should run (not in maintenance mode, etc.), then we can wait for a job. + try { + $this->channel->wait(null, true, (int) $options->timeout); + } catch (AMQPRuntimeException $exception) { + $this->exceptions->report($exception); + + $this->kill(self::EXIT_ERROR, $options); + } catch (Exception|Throwable $exception) { + $this->exceptions->report($exception); + + $this->stopWorkerIfLostConnection($exception); + } + + // If no job is got off the queue, we will need to sleep the worker. + if ($this->currentJob === null) { + $this->sleep($options->sleep); + } + + // Finally, we will check to see if we have exceeded our memory limits or if + // the queue should restart based on other indications. If so, we'll stop + // this worker and let whatever is "monitoring" it restart the process. + $status = $this->stopIfNecessary( + $options, + $lastRestart, + $startTime, + $jobsProcessed, + $this->currentJob + ); + + if (! is_null($status)) { + return $this->stop($status, $options); + } + + $this->currentJob = null; + } + } + + /** + * Determine if the daemon should process on this iteration. + * + * @param string $connectionName + * @param string $queue + */ + protected function daemonShouldRun(WorkerOptions $options, $connectionName, $queue): bool + { + return ! ((($this->isDownForMaintenance)() && ! $options->force) || $this->paused); + } + + /** + * Stop listening and bail out of the script. + * + * @param int $status + * @param WorkerOptions|null $options + * @return int + */ + public function stop($status = 0, $options = null) + { + // Tell the server you are going to stop consuming. + // It will finish up the last message and not send you any more. + $this->channel->basic_cancel($this->consumerTag, false, true); + + return parent::stop($status, $options); + } +} diff --git a/src/Contracts/RabbitMQQueueContract.php b/src/Contracts/RabbitMQQueueContract.php new file mode 100644 index 00000000..cf04a66e --- /dev/null +++ b/src/Contracts/RabbitMQQueueContract.php @@ -0,0 +1,14 @@ +size($queue); } @@ -46,10 +43,12 @@ public function push($job, $data = '', $queue = null) /** * {@inheritdoc} + * + * @throws BindingResolutionException */ - public function pushRaw($payload, $queue = null, array $options = []) + public function pushRaw($payload, $queue = null, array $options = []): int|string|null { - $payload = (new JobPayload($payload))->prepare($this->lastPushed)->value; + $payload = (new JobPayload($payload))->prepare($this->lastPushed ?? null)->value; return tap(parent::pushRaw($payload, $queue, $options), function () use ($queue, $payload): void { $this->event($this->getQueue($queue), new JobPushed($payload)); @@ -58,12 +57,14 @@ public function pushRaw($payload, $queue = null, array $options = []) /** * {@inheritdoc} + * + * @throws BindingResolutionException */ - public function later($delay, $job, $data = '', $queue = null) + public function later($delay, $job, $data = '', $queue = null): mixed { $payload = (new JobPayload($this->createPayload($job, $data)))->prepare($job)->value; - return tap(parent::pushRaw($payload, $queue, ['delay' => $this->secondsUntil($delay)]), function () use ($payload, $queue): void { + return tap(parent::laterRaw($delay, $payload, $queue), function () use ($payload, $queue): void { $this->event($this->getQueue($queue), new JobPushed($payload)); }); } @@ -74,28 +75,18 @@ public function later($delay, $job, $data = '', $queue = null) public function pop($queue = null) { return tap(parent::pop($queue), function ($result) use ($queue): void { - if ($result instanceof RabbitMQJob) { + if (is_a($result, RabbitMQJob::class, true)) { $this->event($this->getQueue($queue), new JobReserved($result->getRawBody())); } }); } - /** - * {@inheritdoc} - */ - public function release($delay, $job, $data, $queue, $attempts = 0) - { - $this->lastPushed = $job; - - return parent::release($delay, $job, $data, $queue, $attempts); - } - /** * Fire the job deleted event. * - * @param string $queue - * @param RabbitMQJob $job - * @return void + * @param string $queue + * @param RabbitMQJob $job + * * @throws BindingResolutionException */ public function deleteReserved($queue, $job): void @@ -106,9 +97,9 @@ public function deleteReserved($queue, $job): void /** * Fire the given event if a dispatcher is bound. * - * @param string $queue - * @param mixed $event - * @return void + * @param string $queue + * @param mixed $event + * * @throws BindingResolutionException */ protected function event($queue, $event): void @@ -125,6 +116,6 @@ protected function event($queue, $event): void */ protected function getRandomId(): string { - return JobId::generate(); + return Str::uuid(); } } diff --git a/src/LaravelQueueRabbitMQServiceProvider.php b/src/LaravelQueueRabbitMQServiceProvider.php index c5c66b4b..ee46d6cd 100644 --- a/src/LaravelQueueRabbitMQServiceProvider.php +++ b/src/LaravelQueueRabbitMQServiceProvider.php @@ -2,16 +2,16 @@ namespace VladimirYuldashev\LaravelQueueRabbitMQ; +use Illuminate\Contracts\Debug\ExceptionHandler; use Illuminate\Queue\QueueManager; use Illuminate\Support\ServiceProvider; +use VladimirYuldashev\LaravelQueueRabbitMQ\Console\ConsumeCommand; use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Connectors\RabbitMQConnector; class LaravelQueueRabbitMQServiceProvider extends ServiceProvider { /** * Register the service provider. - * - * @return void */ public function register(): void { @@ -21,19 +21,43 @@ public function register(): void ); if ($this->app->runningInConsole()) { + $this->app->singleton('rabbitmq.consumer', function () { + $isDownForMaintenance = function () { + return $this->app->isDownForMaintenance(); + }; + + return new Consumer( + $this->app['queue'], + $this->app['events'], + $this->app[ExceptionHandler::class], + $isDownForMaintenance + ); + }); + + $this->app->singleton(ConsumeCommand::class, static function ($app) { + return new ConsumeCommand( + $app['rabbitmq.consumer'], + $app['cache.store'] + ); + }); + $this->commands([ - Console\ExchangeDeclareCommand::class, - Console\QueueBindCommand::class, - Console\QueueDeclareCommand::class, - Console\QueuePurgeCommand::class, + Console\ConsumeCommand::class, ]); } + + $this->commands([ + Console\ExchangeDeclareCommand::class, + Console\ExchangeDeleteCommand::class, + Console\QueueBindCommand::class, + Console\QueueDeclareCommand::class, + Console\QueueDeleteCommand::class, + Console\QueuePurgeCommand::class, + ]); } /** * Register the application's event listeners. - * - * @return void */ public function boot(): void { diff --git a/src/Queue/Connection/ConfigFactory.php b/src/Queue/Connection/ConfigFactory.php new file mode 100644 index 00000000..06c8080d --- /dev/null +++ b/src/Queue/Connection/ConfigFactory.php @@ -0,0 +1,126 @@ +setIsLazy(! in_array( + Arr::get($config, 'lazy') ?? true, + [false, 0, '0', 'false', 'no'], + true) + ); + + // Set the connection to unsecure by default + $connectionConfig->setIsSecure(in_array( + Arr::get($config, 'secure'), + [true, 1, '1', 'true', 'yes'], + true) + ); + + if ($connectionConfig->isSecure()) { + self::getSLLOptionsFromConfig($connectionConfig, $config); + } + + self::getHostFromConfig($connectionConfig, $config); + self::getHeartbeatFromConfig($connectionConfig, $config); + self::getNetworkProtocolFromConfig($connectionConfig, $config); + self::getTimeoutsFromConfig($connectionConfig, $config); + }); + } + + protected static function getHostFromConfig(AMQPConnectionConfig $connectionConfig, array $config): void + { + $hostConfig = Arr::first(Arr::shuffle(Arr::get($config, self::CONFIG_HOSTS, [])), null, []); + + if ($location = Arr::get($hostConfig, 'host')) { + $connectionConfig->setHost($location); + } + if ($port = Arr::get($hostConfig, 'port')) { + $connectionConfig->setPort($port); + } + if ($vhost = Arr::get($hostConfig, 'vhost')) { + $connectionConfig->setVhost($vhost); + } + if ($user = Arr::get($hostConfig, 'user')) { + $connectionConfig->setUser($user); + } + if ($password = Arr::get($hostConfig, 'password')) { + $connectionConfig->setPassword($password); + } + } + + protected static function getSLLOptionsFromConfig(AMQPConnectionConfig $connectionConfig, array $config): void + { + $sslConfig = Arr::get($config, self::CONFIG_OPTIONS.'.ssl_options', []); + + if ($caFile = Arr::get($sslConfig, 'cafile')) { + $connectionConfig->setSslCaCert($caFile); + } + if ($cert = Arr::get($sslConfig, 'local_cert')) { + $connectionConfig->setSslCert($cert); + } + if ($key = Arr::get($sslConfig, 'local_key')) { + $connectionConfig->setSslKey($key); + } + if (Arr::has($sslConfig, 'verify_peer')) { + $verifyPeer = Arr::get($sslConfig, 'verify_peer'); + $connectionConfig->setSslVerify($verifyPeer); + } + if ($passphrase = Arr::get($sslConfig, 'passphrase')) { + $connectionConfig->setSslPassPhrase($passphrase); + } + } + + protected static function getHeartbeatFromConfig(AMQPConnectionConfig $connectionConfig, array $config): void + { + $heartbeat = Arr::get($config, self::CONFIG_OPTIONS.'.heartbeat'); + + if (is_numeric($heartbeat) && intval($heartbeat) > 0) { + $connectionConfig->setHeartbeat((int) $heartbeat); + } + } + + protected static function getNetworkProtocolFromConfig(AMQPConnectionConfig $connectionConfig, array $config): void + { + if ($networkProtocol = Arr::get($config, 'network_protocol')) { + $connectionConfig->setNetworkProtocol($networkProtocol); + } + } + + protected static function getTimeoutsFromConfig(AMQPConnectionConfig $connectionConfig, array $config): void + { + $connectionTimeout = Arr::get($config, self::CONFIG_OPTIONS.'.connection_timeout'); + if (is_numeric($connectionTimeout) && floatval($connectionTimeout) >= 0) { + $connectionConfig->setConnectionTimeout((float) $connectionTimeout); + } + + $readTimeout = Arr::get($config, self::CONFIG_OPTIONS.'.read_timeout'); + if (is_numeric($readTimeout) && floatval($readTimeout) >= 0) { + $connectionConfig->setReadTimeout((float) $readTimeout); + } + + $writeTimeout = Arr::get($config, self::CONFIG_OPTIONS.'.write_timeout'); + if (is_numeric($writeTimeout) && floatval($writeTimeout) >= 0) { + $connectionConfig->setWriteTimeout((float) $writeTimeout); + } + + $chanelRpcTimeout = Arr::get($config, self::CONFIG_OPTIONS.'.channel_rpc_timeout'); + if (is_numeric($chanelRpcTimeout) && floatval($chanelRpcTimeout) >= 0) { + $connectionConfig->setChannelRPCTimeout((float) $chanelRpcTimeout); + } + } +} diff --git a/src/Queue/Connection/ConnectionFactory.php b/src/Queue/Connection/ConnectionFactory.php new file mode 100644 index 00000000..f531e761 --- /dev/null +++ b/src/Queue/Connection/ConnectionFactory.php @@ -0,0 +1,223 @@ +getIoType() === AMQPConnectionConfig::IO_TYPE_SOCKET) { + return self::createSocketConnection($connection, $config); + } + + return self::createStreamConnection($connection, $config); + } + + protected static function createSocketConnection($connection, AMQPConnectionConfig $config): AMQPSocketConnection + { + self::assertSocketConnection($connection, $config); + + return new $connection( + $config->getHost(), + $config->getPort(), + $config->getUser(), + $config->getPassword(), + $config->getVhost(), + $config->isInsist(), + $config->getLoginMethod(), + $config->getLoginResponse(), + $config->getLocale(), + $config->getReadTimeout(), + $config->isKeepalive(), + $config->getWriteTimeout(), + $config->getHeartbeat(), + $config->getChannelRPCTimeout(), + $config + ); + } + + protected static function createStreamConnection($connection, AMQPConnectionConfig $config): AMQPStreamConnection + { + self::assertStreamConnection($connection); + + if ($config->isSecure()) { + self::assertSSLConnection($connection); + + return new $connection( + $config->getHost(), + $config->getPort(), + $config->getUser(), + $config->getPassword(), + $config->getVhost(), + self::getSslOptions($config), + [ + 'insist' => $config->isInsist(), + 'login_method' => $config->getLoginMethod(), + 'login_response' => $config->getLoginResponse(), + 'locale' => $config->getLocale(), + 'connection_timeout' => $config->getConnectionTimeout(), + 'read_write_timeout' => self::getReadWriteTimeout($config), + 'keepalive' => $config->isKeepalive(), + 'heartbeat' => $config->getHeartbeat(), + ], + $config + ); + } + + return new $connection( + $config->getHost(), + $config->getPort(), + $config->getUser(), + $config->getPassword(), + $config->getVhost(), + $config->isInsist(), + $config->getLoginMethod(), + $config->getLoginResponse(), + $config->getLocale(), + $config->getConnectionTimeout(), + self::getReadWriteTimeout($config), + $config->getStreamContext(), + $config->isKeepalive(), + $config->getHeartbeat(), + $config->getChannelRPCTimeout(), + $config->getNetworkProtocol(), + $config + ); + } + + protected static function getReadWriteTimeout(AMQPConnectionConfig $config): float + { + return min($config->getReadTimeout(), $config->getWriteTimeout()); + } + + protected static function getSslOptions(AMQPConnectionConfig $config): array + { + return array_filter([ + 'cafile' => $config->getSslCaCert(), + 'capath' => $config->getSslCaPath(), + 'local_cert' => $config->getSslCert(), + 'local_pk' => $config->getSslKey(), + 'verify_peer' => $config->getSslVerify(), + 'verify_peer_name' => $config->getSslVerifyName(), + 'passphrase' => $config->getSslPassPhrase(), + 'ciphers' => $config->getSslCiphers(), + 'security_level' => $config->getSslSecurityLevel(), + ], static function ($value) { + return $value !== null; + }); + } + + protected static function assertConnectionFromConfig(string $connection): void + { + if ($connection !== self::CONNECTION_TYPE_DEFAULT && ! is_subclass_of($connection, self::CONNECTION_TYPE_EXTENDED)) { + throw new AMQPLogicException(sprintf('The config property \'%s\' must contain \'%s\' or must extend: %s', self::CONFIG_CONNECTION, self::CONNECTION_TYPE_DEFAULT, class_basename(self::CONNECTION_TYPE_EXTENDED))); + } + } + + protected static function assertSocketConnection($connection, AMQPConnectionConfig $config): void + { + self::assertExtendedOf($connection, self::CONNECTION_SUB_TYPE_SOCKET); + + if ($config->isSecure()) { + throw new AMQPLogicException('The socket connection implementation does not support secure connections.'); + } + } + + protected static function assertStreamConnection($connection): void + { + self::assertExtendedOf($connection, self::CONNECTION_SUB_TYPE_STREAM); + } + + protected static function assertSSLConnection($connection): void + { + self::assertExtendedOf($connection, self::CONNECTION_SUB_TYPE_SSL); + } + + protected static function assertExtendedOf($connection, string $parent): void + { + if (! is_subclass_of($connection, $parent) && $connection !== $parent) { + throw new AMQPLogicException(sprintf('The connection must extend: %s', class_basename($parent))); + } + } + + /** + * @return mixed + * + * @throws Exception + * + * @deprecated This is the fallback method, update your config asap. (example: connection => 'default') + */ + protected static function _createLazyConnection($connection, array $config): AbstractConnection + { + return $connection::create_connection( + Arr::shuffle(Arr::get($config, ConfigFactory::CONFIG_HOSTS, [])), + Arr::add(Arr::get($config, 'options', []), 'heartbeat', 0) + ); + } +} diff --git a/src/Queue/Connectors/RabbitMQConnector.php b/src/Queue/Connectors/RabbitMQConnector.php index 79b25e0a..a0f79e32 100644 --- a/src/Queue/Connectors/RabbitMQConnector.php +++ b/src/Queue/Connectors/RabbitMQConnector.php @@ -8,20 +8,15 @@ use Illuminate\Queue\Connectors\ConnectorInterface; use Illuminate\Queue\Events\JobFailed; use Illuminate\Queue\Events\WorkerStopping; -use Illuminate\Support\Arr; -use InvalidArgumentException; -use PhpAmqpLib\Connection\AbstractConnection; -use PhpAmqpLib\Connection\AMQPLazyConnection; use VladimirYuldashev\LaravelQueueRabbitMQ\Horizon\Listeners\RabbitMQFailedEvent; use VladimirYuldashev\LaravelQueueRabbitMQ\Horizon\RabbitMQQueue as HorizonRabbitMQQueue; +use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Connection\ConnectionFactory; +use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\QueueFactory; use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue; class RabbitMQConnector implements ConnectorInterface { - /** - * @var Dispatcher - */ - private $dispatcher; + protected Dispatcher $dispatcher; public function __construct(Dispatcher $dispatcher) { @@ -31,24 +26,15 @@ public function __construct(Dispatcher $dispatcher) /** * Establish a queue connection. * - * @param array $config - * * @return RabbitMQQueue + * * @throws Exception */ public function connect(array $config): Queue { - $connection = $this->createConnection($config); - - $queue = $this->createQueue( - Arr::get($config, 'worker', 'default'), - $connection, - $config['queue'] - ); + $connection = ConnectionFactory::make($config); - if (! $queue instanceof RabbitMQQueue) { - throw new InvalidArgumentException('Invalid worker.'); - } + $queue = QueueFactory::make($config)->setConnection($connection); if ($queue instanceof HorizonRabbitMQQueue) { $this->dispatcher->listen(JobFailed::class, RabbitMQFailedEvent::class); @@ -60,50 +46,4 @@ public function connect(array $config): Queue return $queue; } - - /** - * @param array $config - * @return AbstractConnection - * @throws Exception - */ - protected function createConnection(array $config): AbstractConnection - { - /** @var AbstractConnection $connection */ - $connection = Arr::get($config, 'connection', AMQPLazyConnection::class); - - return $connection::create_connection( - Arr::get($config, 'hosts', []), - $this->filter(Arr::get($config, 'options', [])) - ); - } - - protected function createQueue(string $worker, AbstractConnection $connection, string $queue) - { - switch ($worker) { - case 'default': - return new RabbitMQQueue($connection, $queue); - case 'horizon': - return new HorizonRabbitMQQueue($connection, $queue); - default: - return new $worker($connection, $queue); - } - } - - private function filter(array $array): array - { - foreach ($array as $index => &$value) { - if (is_array($value)) { - $value = $this->filter($value); - continue; - } - - // If the value is null then remove it. - if ($value === null) { - unset($array[$index]); - continue; - } - } - - return $array; - } } diff --git a/src/Queue/Jobs/RabbitMQJob.php b/src/Queue/Jobs/RabbitMQJob.php index d851d211..abcdfab4 100644 --- a/src/Queue/Jobs/RabbitMQJob.php +++ b/src/Queue/Jobs/RabbitMQJob.php @@ -2,10 +2,12 @@ namespace VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs; +use Illuminate\Container\Container; use Illuminate\Contracts\Container\BindingResolutionException; use Illuminate\Contracts\Queue\Job as JobContract; use Illuminate\Queue\Jobs\Job; use Illuminate\Support\Arr; +use PhpAmqpLib\Exception\AMQPProtocolChannelException; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; use VladimirYuldashev\LaravelQueueRabbitMQ\Horizon\RabbitMQQueue as HorizonRabbitMQQueue; @@ -35,12 +37,16 @@ class RabbitMQJob extends Job implements JobContract protected $decoded; public function __construct( + Container $container, RabbitMQQueue $rabbitmq, AMQPMessage $message, + string $connectionName, string $queue ) { + $this->container = $container; $this->rabbitmq = $rabbitmq; $this->message = $message; + $this->connectionName = $connectionName; $this->queue = $queue; $this->decoded = $this->payload(); } @@ -50,7 +56,7 @@ public function __construct( */ public function getJobId() { - return json_decode($this->message->getBody(), true)['id'] ?? null; + return $this->decoded['id'] ?? null; } /** @@ -66,19 +72,26 @@ public function getRawBody(): string */ public function attempts(): int { - /** @var AMQPTable|null $headers */ - $headers = Arr::get($this->message->get_properties(), 'application_headers'); - - if (! $headers) { - return 0; + if (! $data = $this->getRabbitMQMessageHeaders()) { + return 1; } - $data = $headers->getNativeData(); - $laravelAttempts = (int) Arr::get($data, 'laravel.attempts', 0); - $xDeathCount = (int) Arr::get($headers->getNativeData(), 'x-death.0.count', 0); - return $laravelAttempts + $xDeathCount; + return $laravelAttempts + 1; + } + + /** + * {@inheritdoc} + */ + public function markAsFailed(): void + { + parent::markAsFailed(); + + // We must tel rabbitMQ this Job is failed + // The message must be rejected when the Job marked as failed, in case rabbitMQ wants to do some extra magic. + // like: Death lettering the message to an other exchange/routing-key. + $this->rabbitmq->reject($this); } /** @@ -90,7 +103,11 @@ public function delete(): void { parent::delete(); - $this->rabbitmq->ack($this); + // When delete is called and the Job was not failed, the message must be acknowledged. + // This is because this is a controlled call by a developer. So the message was handled correct. + if (! $this->failed) { + $this->rabbitmq->ack($this); + } // required for Laravel Horizon if ($this->rabbitmq instanceof HorizonRabbitMQQueue) { @@ -99,27 +116,27 @@ public function delete(): void } /** - * {@inheritdoc} + * Release the job back into the queue. + * + * @param int $delay + * + * @throws AMQPProtocolChannelException */ public function release($delay = 0): void { - parent::release($delay); + parent::release(); - if ($delay > 0) { - $this->rabbitmq->ack($this); - - $this->rabbitmq->laterRaw($delay, $this->message->body, $this->queue, $this->attempts()); - - return; - } + // Always create a new message when this Job is released + $this->rabbitmq->laterRaw($delay, $this->message->getBody(), $this->queue, $this->attempts()); - $this->rabbitmq->reject($this); + // Releasing a Job means the message was failed to process. + // Because this Job message is always recreated and pushed as new message, this Job message is correctly handled. + // We must tell rabbitMQ this job message can be removed by acknowledging the message. + $this->rabbitmq->ack($this); } /** * Get the underlying RabbitMQ connection. - * - * @return RabbitMQQueue */ public function getRabbitMQ(): RabbitMQQueue { @@ -128,11 +145,22 @@ public function getRabbitMQ(): RabbitMQQueue /** * Get the underlying RabbitMQ message. - * - * @return AMQPMessage */ public function getRabbitMQMessage(): AMQPMessage { return $this->message; } + + /** + * Get the headers from the rabbitMQ message. + */ + protected function getRabbitMQMessageHeaders(): ?array + { + /** @var AMQPTable|null $headers */ + if (! $headers = Arr::get($this->message->get_properties(), 'application_headers')) { + return null; + } + + return $headers->getNativeData(); + } } diff --git a/src/Queue/QueueConfig.php b/src/Queue/QueueConfig.php new file mode 100644 index 00000000..e7ec27c4 --- /dev/null +++ b/src/Queue/QueueConfig.php @@ -0,0 +1,278 @@ +queue; + } + + public function setQueue(string $queue): QueueConfig + { + $this->queue = $queue; + + return $this; + } + + /** + * Returns &true; as indication that jobs should be dispatched after all database transactions + * have been committed. + */ + public function isDispatchAfterCommit(): bool + { + return $this->dispatchAfterCommit; + } + + public function setDispatchAfterCommit($dispatchAfterCommit): QueueConfig + { + $this->dispatchAfterCommit = $this->toBoolean($dispatchAfterCommit); + + return $this; + } + + /** + * Get the Job::class to use when processing messages + */ + public function getAbstractJob(): string + { + return $this->abstractJob; + } + + public function setAbstractJob(string $abstract): QueueConfig + { + $this->abstractJob = $abstract; + + return $this; + } + + /** + * Returns &true;, if delayed messages should be prioritized. + * + * RabbitMQ queues work with the FIFO method. So when there are 10000 messages in the queue and + * the delayed message is put back to the queue (at the end) for further processing the delayed message won´t + * process before all 10000 messages are processed. The same is true for requeueing. + * + * This may not what you desire. + * When you want the message to get processed immediately after the delayed time expires or when requeueing, we can + * use prioritization. + * + * @see[https://www.rabbitmq.com/queues.html#basics] + */ + public function isPrioritizeDelayed(): bool + { + return $this->prioritizeDelayed; + } + + public function setPrioritizeDelayed($prioritizeDelayed): QueueConfig + { + $this->prioritizeDelayed = $this->toBoolean($prioritizeDelayed); + + return $this; + } + + /** + * Returns a integer with a default of '2' for when using prioritization on delayed messages. + * If priority queues are desired, we recommend using between 1 and 10. + * Using more priority layers, will consume more CPU resources and would affect runtimes. + * + * @see https://www.rabbitmq.com/priority.html + */ + public function getQueueMaxPriority(): int + { + return $this->queueMaxPriority; + } + + public function setQueueMaxPriority($queueMaxPriority): QueueConfig + { + if (is_numeric($queueMaxPriority) && intval($queueMaxPriority) > 1) { + $this->queueMaxPriority = (int) $queueMaxPriority; + } + + return $this; + } + + /** + * Get the exchange name, or empty string; as default value. + * + * The default exchange is an unnamed pre-declared direct exchange. Usually, an empty string + * is frequently used to indicate it. If you choose default exchange, your message will be delivered + * to a queue with the same name as the routing key. + * With a routing key that is the same as the queue name, every queue is immediately tied to the default exchange. + */ + public function getExchange(): string + { + return $this->exchange; + } + + public function setExchange(string $exchange): QueueConfig + { + $this->exchange = $exchange; + + return $this; + } + + /** + * Get the exchange type + * + * There are four basic RabbitMQ exchange types in RabbitMQ, each of which uses different parameters + * and bindings to route messages in various ways, These are: 'direct', 'topic', 'fanout', 'headers' + * + * The default type is set as 'direct' + */ + public function getExchangeType(): string + { + return $this->exchangeType; + } + + public function setExchangeType(string $exchangeType): QueueConfig + { + $this->exchangeType = $exchangeType; + + return $this; + } + + /** + * Get the routing key when using an exchange other than the direct exchange. + * The routing key is a message attribute taken into account by the exchange when deciding how to route a message. + * + * The default routing-key is the given destination: '%s'. + */ + public function getExchangeRoutingKey(): string + { + return $this->exchangeRoutingKey; + } + + public function setExchangeRoutingKey(string $exchangeRoutingKey): QueueConfig + { + $this->exchangeRoutingKey = $exchangeRoutingKey; + + return $this; + } + + /** + * Returns &true;, if failed messages should be rerouted. + */ + public function isRerouteFailed(): bool + { + return $this->rerouteFailed; + } + + public function setRerouteFailed($rerouteFailed): QueueConfig + { + $this->rerouteFailed = $this->toBoolean($rerouteFailed); + + return $this; + } + + /** + * Get the exchange name with messages are published against. + * The default exchange is empty, so messages will be published directly to a queue. + */ + public function getFailedExchange(): string + { + return $this->failedExchange; + } + + public function setFailedExchange(string $failedExchange): QueueConfig + { + $this->failedExchange = $failedExchange; + + return $this; + } + + /** + * Get the substitution string for failed messages + * The default routing-key is the given destination substituted by '%s.failed'. + */ + public function getFailedRoutingKey(): string + { + return $this->failedRoutingKey; + } + + public function setFailedRoutingKey(string $failedRoutingKey): QueueConfig + { + $this->failedRoutingKey = $failedRoutingKey; + + return $this; + } + + /** + * Returns &true;, if queue is marked or set as quorum queue. + */ + public function isQuorum(): bool + { + return $this->quorum; + } + + public function setQuorum($quorum): QueueConfig + { + $this->quorum = $this->toBoolean($quorum); + + return $this; + } + + /** + * Holds all unknown queue options provided in the connection config + */ + public function getOptions(): array + { + return $this->options; + } + + public function setOptions(array $options): QueueConfig + { + $this->options = $options; + + return $this; + } + + /** + * Filters $value to boolean value + * + * Returns: &true; + * For values: 1, '1', true, 'true', 'yes' + * + * Returns: &false; + * For values: 0, '0', false, 'false', '', null, [] , 'ok', 'no', 'no not a bool', 'yes a bool' + */ + protected function toBoolean($value): bool + { + return filter_var($value, FILTER_VALIDATE_BOOLEAN); + } +} diff --git a/src/Queue/QueueConfigFactory.php b/src/Queue/QueueConfigFactory.php new file mode 100644 index 00000000..6f2befc5 --- /dev/null +++ b/src/Queue/QueueConfigFactory.php @@ -0,0 +1,74 @@ +setQueue($queue); + } + if (! empty($afterCommit = Arr::get($config, 'after_commit'))) { + $queueConfig->setDispatchAfterCommit($afterCommit); + } + + self::getOptionsFromConfig($queueConfig, $config); + }); + } + + protected static function getOptionsFromConfig(QueueConfig $queueConfig, array $config): void + { + $queueOptions = Arr::get($config, self::CONFIG_OPTIONS.'.queue', []) ?: []; + + if ($job = Arr::pull($queueOptions, 'job')) { + $queueConfig->setAbstractJob($job); + } + + // Feature: Prioritize delayed messages. + if ($prioritizeDelayed = Arr::pull($queueOptions, 'prioritize_delayed')) { + $queueConfig->setPrioritizeDelayed($prioritizeDelayed); + } + if ($maxPriority = Arr::pull($queueOptions, 'queue_max_priority')) { + $queueConfig->setQueueMaxPriority($maxPriority); + } + + // Feature: Working with Exchange and routing-keys + if ($exchange = Arr::pull($queueOptions, 'exchange')) { + $queueConfig->setExchange($exchange); + } + if ($exchangeType = Arr::pull($queueOptions, 'exchange_type')) { + $queueConfig->setExchangeType($exchangeType); + } + if ($exchangeRoutingKey = Arr::pull($queueOptions, 'exchange_routing_key')) { + $queueConfig->setExchangeRoutingKey($exchangeRoutingKey); + } + + // Feature: Reroute failed messages + if ($rerouteFailed = Arr::pull($queueOptions, 'reroute_failed')) { + $queueConfig->setRerouteFailed($rerouteFailed); + } + if ($failedExchange = Arr::pull($queueOptions, 'failed_exchange')) { + $queueConfig->setFailedExchange($failedExchange); + } + if ($failedRoutingKey = Arr::pull($queueOptions, 'failed_routing_key')) { + $queueConfig->setFailedRoutingKey($failedRoutingKey); + } + + // Feature: Mark queue as quorum + if ($quorum = Arr::pull($queueOptions, 'quorum')) { + $queueConfig->setQuorum($quorum); + } + + // All extra options not defined + $queueConfig->setOptions($queueOptions); + } +} diff --git a/src/Queue/QueueFactory.php b/src/Queue/QueueFactory.php new file mode 100644 index 00000000..75bde1ed --- /dev/null +++ b/src/Queue/QueueFactory.php @@ -0,0 +1,25 @@ +connection = $connection; - $this->channel = $connection->channel(); - $this->default = $default; + /** + * Current job being processed. + */ + protected ?RabbitMQJob $currentJob = null; + + /** + * Holds the Configuration + */ + protected QueueConfig $rabbitMQConfig; + + /** + * RabbitMQQueue constructor. + */ + public function __construct(QueueConfig $config) + { + $this->rabbitMQConfig = $config; + $this->dispatchAfterCommit = $config->isDispatchAfterCommit(); } /** @@ -83,7 +87,7 @@ public function size($queue = null): int } // create a temporary channel, so the main channel will not be closed on exception - $channel = $this->connection->channel(); + $channel = $this->createChannel(); [, $size] = $channel->queue_declare($queue, true); $channel->close(); @@ -92,155 +96,243 @@ public function size($queue = null): int /** * {@inheritdoc} + * + * @throws AMQPProtocolChannelException */ public function push($job, $data = '', $queue = null) { - return $this->pushRaw($this->createPayload($job, $queue, $data), $queue, []); + return $this->enqueueUsing( + $job, + $this->createPayload($job, $this->getQueue($queue), $data), + $queue, + null, + function ($payload, $queue) { + return $this->pushRaw($payload, $queue); + } + ); } /** * {@inheritdoc} + * + * @throws AMQPProtocolChannelException */ - public function pushRaw($payload, $queue = null, array $options = []) + public function pushRaw($payload, $queue = null, array $options = []): int|string|null { - $queue = $this->getQueue($queue); + [$destination, $exchange, $exchangeType, $attempts] = $this->publishProperties($queue, $options); - $this->declareExchange($queue); - $this->declareQueue($queue, true, false, [ - 'x-dead-letter-exchange' => $queue, - 'x-dead-letter-routing-key' => $queue, - ]); - $this->bindQueue($queue, $queue, $queue); + $this->declareDestination($destination, $exchange, $exchangeType); - [$message, $correlationId] = $this->createMessage($payload); + [$message, $correlationId] = $this->createMessage($payload, $attempts); - $this->channel->basic_publish($message, $queue, $queue, true, false); + $this->publishBasic($message, $exchange, $destination, true); return $correlationId; } /** * {@inheritdoc} + * + * @throws AMQPProtocolChannelException */ - public function later($delay, $job, $data = '', $queue = null) + public function later($delay, $job, $data = '', $queue = null): mixed { - return $this->laterRaw( + return $this->enqueueUsing( + $job, + $this->createPayload($job, $this->getQueue($queue), $data), + $queue, $delay, - $this->createPayload($job, $queue, $data), - $queue + function ($payload, $queue, $delay) { + return $this->laterRaw($delay, $payload, $queue); + } ); } - public function laterRaw($delay, $payload, $queue = null, $attempts = 0) + /** + * @throws AMQPProtocolChannelException + */ + public function laterRaw($delay, string $payload, $queue = null, int $attempts = 0): int|string|null { $ttl = $this->secondsUntil($delay) * 1000; - if ($ttl < 0) { - return $this->pushRaw($payload, $queue, []); + // default options + $options = ['delay' => $delay, 'attempts' => $attempts]; + + // When no ttl just publish a new message to the exchange or queue + if ($ttl <= 0) { + return $this->pushRaw($payload, $queue, $options); } - $destinationQueue = $this->getQueue($queue); - $delayedQueue = $this->getQueue($queue).'.delay.'.$ttl; + // Create a main queue to handle delayed messages + [$mainDestination, $exchange, $exchangeType, $attempts] = $this->publishProperties($queue, $options); + $this->declareDestination($mainDestination, $exchange, $exchangeType); - $this->declareExchange($destinationQueue); - $this->declareQueue($destinationQueue, true, false, [ - 'x-dead-letter-exchange' => $destinationQueue, - 'x-dead-letter-routing-key' => $destinationQueue, - ]); - $this->declareQueue($delayedQueue, true, false, [ - 'x-dead-letter-exchange' => $destinationQueue, - 'x-dead-letter-routing-key' => $destinationQueue, - 'x-message-ttl' => $ttl, - ]); - $this->bindQueue($destinationQueue, $destinationQueue, $destinationQueue); + $destination = $this->getQueue($queue).'.delay.'.$ttl; + + $this->declareQueue($destination, true, false, $this->getDelayQueueArguments($this->getQueue($queue), $ttl)); [$message, $correlationId] = $this->createMessage($payload, $attempts); - $this->channel->basic_publish($message, null, $delayedQueue, true, false); + // Publish directly on the delayQueue, no need to publish through an exchange. + $this->publishBasic($message, null, $destination, true); return $correlationId; } /** * {@inheritdoc} + * + * @throws AMQPProtocolChannelException */ public function bulk($jobs, $data = '', $queue = null): void { - $queue = $this->getQueue($queue); + $this->publishBatch($jobs, $data, $queue); + } - foreach ((array) $jobs as $job) { - [$message] = $this->createMessage( - $this->createPayload($job, $queue, $data) - ); + /** + * @throws AMQPProtocolChannelException + */ + protected function publishBatch($jobs, $data = '', $queue = null): void + { + foreach ($jobs as $job) { + $this->bulkRaw($this->createPayload($job, $queue, $data), $queue, ['job' => $job]); + } - $this->declareExchange($queue); - $this->declareQueue($queue, true, false, [ - 'x-dead-letter-exchange' => $queue, - 'x-dead-letter-routing-key' => $queue, - ]); - $this->bindQueue($queue, $queue, $queue); + $this->batchPublish(); + } - $this->channel->batch_basic_publish($message, $queue, $queue); - } + /** + * @throws AMQPProtocolChannelException + */ + public function bulkRaw(string $payload, ?string $queue = null, array $options = []): int|string|null + { + [$destination, $exchange, $exchangeType, $attempts] = $this->publishProperties($queue, $options); - $this->channel->publish_batch(); + $this->declareDestination($destination, $exchange, $exchangeType); + + [$message, $correlationId] = $this->createMessage($payload, $attempts); + + $this->getChannel()->batch_basic_publish($message, $exchange, $destination); + + return $correlationId; } /** * {@inheritdoc} * - * @throws Exception + * @throws Throwable */ public function pop($queue = null) { try { $queue = $this->getQueue($queue); + $job = $this->getJobClass(); + /** @var AMQPMessage|null $message */ - if ($message = $this->channel->basic_get($queue)) { - return new RabbitMQJob($this, $message, $queue); + if ($message = $this->getChannel()->basic_get($queue)) { + return $this->currentJob = new $job( + $this->container, + $this, + $message, + $this->connectionName, + $queue + ); } } catch (AMQPProtocolChannelException $exception) { - // if there is not exchange or queue AMQP will throw exception with code 404 - // we need to catch it and return null + // If there is no exchange or queue AMQP will throw exception with code 404 + // We need to catch it and return null if ($exception->amqp_reply_code === 404) { + // Because of the channel exception the channel was closed and removed. + // We have to open a new channel. Because else the worker(s) are stuck in a loop, without processing. + $this->getChannel(true); + return null; } throw $exception; + } catch (AMQPChannelClosedException|AMQPConnectionClosedException $exception) { + // Queue::pop used by worker to receive new job + // Thrown exception is checked by Illuminate\Database\DetectsLostConnections::causedByLostConnection + // Is has to contain one of the several phrases in exception message in order to restart worker + // Otherwise worker continues to work with broken connection + throw new AMQPRuntimeException( + 'Lost connection: '.$exception->getMessage(), + $exception->getCode(), + $exception + ); } return null; } + /** + * @throws RuntimeException + */ public function getConnection(): AbstractConnection { + if (! $this->connection) { + throw new RuntimeException('Queue has no AMQPConnection set.'); + } + return $this->connection; } - public function getChannel(): AMQPChannel + public function setConnection(AbstractConnection $connection): RabbitMQQueue { - return $this->channel; + $this->connection = $connection; + + return $this; } - public function getQueue($queue = null) + /** + * Job class to use. + * + * + * @throws Throwable + */ + public function getJobClass(): string { - return $queue ?: $this->default; + $job = $this->getRabbitMQConfig()->getAbstractJob(); + + throw_if( + ! is_a($job, RabbitMQJob::class, true), + Exception::class, + sprintf('Class %s must extend: %s', $job, RabbitMQJob::class) + ); + + return $job; } /** - * @param string $exchange - * @return bool + * Gets a queue/destination, by default the queue option set on the connection. + */ + public function getQueue($queue = null): string + { + return $queue ?: $this->getRabbitMQConfig()->getQueue(); + } + + /** + * Checks if the given exchange already present/defined in RabbitMQ. + * Returns false when the exchange is missing. + * + * * @throws AMQPProtocolChannelException */ public function isExchangeExists(string $exchange): bool { + if ($this->isExchangeDeclared($exchange)) { + return true; + } + try { // create a temporary channel, so the main channel will not be closed on exception - $channel = $this->connection->channel(); + $channel = $this->createChannel(); $channel->exchange_declare($exchange, '', true); $channel->close(); + $this->exchanges[] = $exchange; + return true; } catch (AMQPProtocolChannelException $exception) { if ($exception->amqp_reply_code === 404) { @@ -251,42 +343,76 @@ public function isExchangeExists(string $exchange): bool } } + /** + * Declare an exchange in rabbitMQ, when not already declared. + */ public function declareExchange( string $name, string $type = AMQPExchangeType::DIRECT, bool $durable = true, - bool $autoDelete = false + bool $autoDelete = false, + array $arguments = [] ): void { - if (in_array($name, $this->exchanges, true)) { + if ($this->isExchangeDeclared($name)) { return; } - $this->channel->exchange_declare( + $this->getChannel()->exchange_declare( $name, $type, false, $durable, $autoDelete, false, - true + true, + new AMQPTable($arguments) + ); + } + + /** + * Delete an exchange from rabbitMQ, only when present in RabbitMQ. + * + * + * @throws AMQPProtocolChannelException + */ + public function deleteExchange(string $name, bool $unused = false): void + { + if (! $this->isExchangeExists($name)) { + return; + } + + $idx = array_search($name, $this->exchanges); + unset($this->exchanges[$idx]); + + $this->getChannel()->exchange_delete( + $name, + $unused ); } /** - * @param string $name - * @return bool + * Checks if the given queue already present/defined in RabbitMQ. + * Returns false when the queue is missing. + * + * * @throws AMQPProtocolChannelException */ public function isQueueExists(?string $name = null): bool { - try { - $name = $this->getQueue($name); + $queueName = $this->getQueue($name); + if ($this->isQueueDeclared($queueName)) { + return true; + } + + try { // create a temporary channel, so the main channel will not be closed on exception - $channel = $this->connection->channel(); - $channel->queue_declare($name, true); + $channel = $this->createChannel(); + $channel->queue_declare($queueName, true); $channel->close(); + $this->queues[] = $queueName; + return true; } catch (AMQPProtocolChannelException $exception) { if ($exception->amqp_reply_code === 404) { @@ -297,13 +423,20 @@ public function isQueueExists(?string $name = null): bool } } - public function declareQueue(string $name, bool $durable = true, bool $autoDelete = false, array $arguments = []): void - { - if (in_array($name, $this->queues, true)) { + /** + * Declare a queue in rabbitMQ, when not already declared. + */ + public function declareQueue( + string $name, + bool $durable = true, + bool $autoDelete = false, + array $arguments = [] + ): void { + if ($this->isQueueDeclared($name)) { return; } - $this->channel->queue_declare( + $this->getChannel()->queue_declare( $name, false, $durable, @@ -314,6 +447,27 @@ public function declareQueue(string $name, bool $durable = true, bool $autoDelet ); } + /** + * Delete a queue from rabbitMQ, only when present in RabbitMQ. + * + * + * @throws AMQPProtocolChannelException + */ + public function deleteQueue(string $name, bool $if_unused = false, bool $if_empty = false): void + { + if (! $this->isQueueExists($name)) { + return; + } + + $idx = array_search($name, $this->queues); + unset($this->queues[$idx]); + + $this->getChannel()->queue_delete($name, $if_unused, $if_empty); + } + + /** + * Bind a queue to an exchange. + */ public function bindQueue(string $queue, string $exchange, string $routingKey = ''): void { if (in_array( @@ -324,35 +478,39 @@ public function bindQueue(string $queue, string $exchange, string $routingKey = return; } - $this->channel->queue_bind($queue, $exchange, $routingKey); + $this->getChannel()->queue_bind($queue, $exchange, $routingKey); } - public function purge($queue = null): void + /** + * Purge the queue of messages. + */ + public function purge(?string $queue = null): void { // create a temporary channel, so the main channel will not be closed on exception - $channel = $this->connection->channel(); + $channel = $this->createChannel(); $channel->queue_purge($this->getQueue($queue)); $channel->close(); } + /** + * Acknowledge the message. + */ public function ack(RabbitMQJob $job): void { - $this->channel->basic_ack($job->getRabbitMQMessage()->getDeliveryTag()); + $this->getChannel()->basic_ack($job->getRabbitMQMessage()->getDeliveryTag()); } + /** + * Reject the message. + */ public function reject(RabbitMQJob $job, bool $requeue = false): void { - $this->channel->basic_reject($job->getRabbitMQMessage()->getDeliveryTag(), $requeue); + $this->getChannel()->basic_reject($job->getRabbitMQMessage()->getDeliveryTag(), $requeue); } /** - * @throws Exception + * Create a AMQP message. */ - public function close(): void - { - $this->connection->close(); - } - protected function createMessage($payload, int $attempts = 0): array { $properties = [ @@ -360,10 +518,27 @@ protected function createMessage($payload, int $attempts = 0): array 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, ]; - if ($correlationId = json_decode($payload, true)['id'] ?? null) { + $currentPayload = json_decode($payload, true); + if ($correlationId = $currentPayload['id'] ?? null) { $properties['correlation_id'] = $correlationId; } + if ($this->getRabbitMQConfig()->isPrioritizeDelayed()) { + $properties['priority'] = $attempts; + } + + if (isset($currentPayload['data']['command'])) { + // If the command data is encrypted, decrypt it first before attempting to unserialize + if (is_subclass_of($currentPayload['data']['commandName'], ShouldBeEncrypted::class)) { + $currentPayload['data']['command'] = Crypt::decrypt($currentPayload['data']['command']); + } + + $commandData = unserialize($currentPayload['data']['command']); + if (property_exists($commandData, 'priority')) { + $properties['priority'] = $commandData->priority; + } + } + $message = new AMQPMessage($payload, $properties); $message->set('application_headers', new AMQPTable([ @@ -378,7 +553,14 @@ protected function createMessage($payload, int $attempts = 0): array ]; } - protected function createPayloadArray($job, $queue, $data = '') + /** + * Create a payload array from the given job and data. + * + * @param string|object $job + * @param string $queue + * @param mixed $data + */ + protected function createPayloadArray($job, $queue, $data = ''): array { return array_merge(parent::createPayloadArray($job, $queue, $data), [ 'id' => $this->getRandomId(), @@ -387,11 +569,214 @@ protected function createPayloadArray($job, $queue, $data = '') /** * Get a random ID string. - * - * @return string */ protected function getRandomId(): string { - return Str::random(32); + return Str::uuid(); + } + + /** + * Close the connection to RabbitMQ. + * + * + * @throws Exception + */ + public function close(): void + { + if (isset($this->currentJob) && ! $this->currentJob->isDeletedOrReleased()) { + $this->reject($this->currentJob, true); + } + + try { + $this->getConnection()->close(); + } catch (ErrorException) { + // Ignore the exception + } + } + + /** + * Get the Queue arguments. + */ + protected function getQueueArguments(string $destination): array + { + $arguments = []; + + // Messages without a priority property are treated as if their priority were 0. + // Messages with a priority which is higher than the queue's maximum, are treated as if they were + // published with the maximum priority. + // Quorum queues does not support priority. + if ($this->getRabbitMQConfig()->isPrioritizeDelayed() && ! $this->getRabbitMQConfig()->isQuorum()) { + $arguments['x-max-priority'] = $this->getRabbitMQConfig()->getQueueMaxPriority(); + } + + if ($this->getRabbitMQConfig()->isRerouteFailed()) { + $arguments['x-dead-letter-exchange'] = $this->getFailedExchange(); + $arguments['x-dead-letter-routing-key'] = $this->getFailedRoutingKey($destination); + } + + if ($this->getRabbitMQConfig()->isQuorum()) { + $arguments['x-queue-type'] = 'quorum'; + } + + return $arguments; + } + + /** + * Get the Delay queue arguments. + */ + protected function getDelayQueueArguments(string $destination, int $ttl): array + { + return [ + 'x-dead-letter-exchange' => $this->getExchange(), + 'x-dead-letter-routing-key' => $this->getRoutingKey($destination), + 'x-message-ttl' => $ttl, + 'x-expires' => $ttl * 2, + ]; + } + + /** + * Get the exchange name, or empty string; as default value. + */ + protected function getExchange(?string $exchange = null): string + { + return $exchange ?? $this->getRabbitMQConfig()->getExchange(); + } + + /** + * Get the routing-key for when you use exchanges + * The default routing-key is the given destination. + */ + protected function getRoutingKey(string $destination): string + { + return ltrim(sprintf($this->getRabbitMQConfig()->getExchangeRoutingKey(), $destination), '.'); + } + + /** + * Get the exchangeType, or AMQPExchangeType::DIRECT as default. + */ + protected function getExchangeType(?string $type = null): string + { + $constant = AMQPExchangeType::class.'::'.Str::upper($type ?: $this->getRabbitMQConfig()->getExchangeType()); + + return defined($constant) ? constant($constant) : AMQPExchangeType::DIRECT; + } + + /** + * Get the exchange for failed messages. + */ + protected function getFailedExchange(?string $exchange = null): string + { + return $exchange ?? $this->getRabbitMQConfig()->getFailedExchange(); + } + + /** + * Get the routing-key for failed messages + * The default routing-key is the given destination substituted by '.failed'. + */ + protected function getFailedRoutingKey(string $destination): string + { + return ltrim(sprintf($this->getRabbitMQConfig()->getFailedRoutingKey(), $destination), '.'); + } + + /** + * Checks if the exchange was already declared. + */ + protected function isExchangeDeclared(string $name): bool + { + return in_array($name, $this->exchanges, true); + } + + /** + * Checks if the queue was already declared. + */ + protected function isQueueDeclared(string $name): bool + { + return in_array($name, $this->queues, true); + } + + /** + * Declare the destination when necessary. + * + * @throws AMQPProtocolChannelException + */ + protected function declareDestination(string $destination, ?string $exchange = null, string $exchangeType = AMQPExchangeType::DIRECT): void + { + // When an exchange is provided and no exchange is present in RabbitMQ, create an exchange. + if ($exchange && ! $this->isExchangeExists($exchange)) { + $this->declareExchange($exchange, $exchangeType); + } + + // When an exchange is provided, just return. + if ($exchange) { + return; + } + + // When the queue already exists, just return. + if ($this->isQueueExists($destination)) { + return; + } + + // Create a queue for amq.direct publishing. + $this->declareQueue($destination, true, false, $this->getQueueArguments($destination)); + } + + /** + * Determine all publish properties. + */ + protected function publishProperties($queue, array $options = []): array + { + $queue = $this->getQueue($queue); + $attempts = Arr::get($options, 'attempts') ?: 0; + + $destination = $this->getRoutingKey($queue); + $exchange = $this->getExchange(Arr::get($options, 'exchange')); + $exchangeType = $this->getExchangeType(Arr::get($options, 'exchange_type')); + + return [$destination, $exchange, $exchangeType, $attempts]; + } + + protected function getRabbitMQConfig(): QueueConfig + { + return $this->rabbitMQConfig; + } + + /** + * @throws AMQPChannelClosedException + * @throws AMQPConnectionClosedException + * @throws AMQPConnectionBlockedException + */ + protected function publishBasic($msg, $exchange = '', $destination = '', $mandatory = false, $immediate = false, $ticket = null): void + { + $this->getChannel()->basic_publish($msg, $exchange, $destination, $mandatory, $immediate, $ticket); + } + + protected function batchPublish(): void + { + $this->getChannel()->publish_batch(); + } + + public function getChannel($forceNew = false): AMQPChannel + { + if (! $this->channel || $forceNew) { + $this->channel = $this->createChannel(); + } + + return $this->channel; + } + + protected function createChannel(): AMQPChannel + { + return $this->getConnection()->channel(); + } + + /** + * @throws Exception + */ + protected function reconnect(): void + { + // Reconnects using the original connection settings. + $this->getConnection()->reconnect(); + // Create a new main channel because all old channels are removed. + $this->getChannel(true); } } diff --git a/tests/Feature/ConnectorTest.php b/tests/Feature/ConnectorTest.php index 2c58874a..91660ad2 100644 --- a/tests/Feature/ConnectorTest.php +++ b/tests/Feature/ConnectorTest.php @@ -3,9 +3,12 @@ namespace VladimirYuldashev\LaravelQueueRabbitMQ\Tests\Feature; use Illuminate\Queue\QueueManager; +use PhpAmqpLib\Connection\AMQPConnectionConfig; use PhpAmqpLib\Connection\AMQPLazyConnection; use PhpAmqpLib\Connection\AMQPSSLConnection; +use PhpAmqpLib\Connection\AMQPStreamConnection; use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue; +use VladimirYuldashev\LaravelQueueRabbitMQ\Tests\Mocks\TestSSLConnection; class ConnectorTest extends \VladimirYuldashev\LaravelQueueRabbitMQ\Tests\TestCase { @@ -47,12 +50,58 @@ public function testLazyConnection(): void $this->assertInstanceOf(RabbitMQQueue::class, $connection); $this->assertInstanceOf(AMQPLazyConnection::class, $connection->getConnection()); + $this->assertFalse($connection->getConnection()->isConnected()); + $this->assertTrue($connection->getChannel()->is_open()); $this->assertTrue($connection->getConnection()->isConnected()); + } + + public function testLazyStreamConnection(): void + { + $this->app['config']->set('queue.connections.rabbitmq', [ + 'driver' => 'rabbitmq', + 'queue' => env('RABBITMQ_QUEUE', 'default'), + 'connection' => 'default', + + 'hosts' => [ + [ + 'host' => getenv('HOST'), + 'port' => getenv('PORT'), + 'user' => 'guest', + 'password' => 'guest', + 'vhost' => '/', + ], + ], + + 'options' => [ + 'ssl_options' => [ + 'cafile' => env('RABBITMQ_SSL_CAFILE', null), + 'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null), + 'local_key' => env('RABBITMQ_SSL_LOCALKEY', null), + 'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true), + 'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null), + ], + ], + + 'worker' => env('RABBITMQ_WORKER', 'default'), + ]); + + /** @var QueueManager $queue */ + $queue = $this->app['queue']; + + /** @var RabbitMQQueue $connection */ + $connection = $queue->connection('rabbitmq'); + + $this->assertInstanceOf(RabbitMQQueue::class, $connection); + $this->assertInstanceOf(AMQPStreamConnection::class, $connection->getConnection()); + $this->assertFalse($connection->getConnection()->isConnected()); $this->assertTrue($connection->getChannel()->is_open()); + $this->assertTrue($connection->getConnection()->isConnected()); } public function testSslConnection(): void { + $this->markTestSkipped(); + $this->app['config']->set('queue.connections.rabbitmq', [ 'driver' => 'rabbitmq', 'queue' => env('RABBITMQ_QUEUE', 'default'), @@ -91,4 +140,48 @@ public function testSslConnection(): void $this->assertTrue($connection->getConnection()->isConnected()); $this->assertTrue($connection->getChannel()->is_open()); } + + // Test to validate ssl connection params + public function testNoVerificationSslConnection(): void + { + $this->app['config']->set('queue.connections.rabbitmq', [ + 'driver' => 'rabbitmq', + 'queue' => env('RABBITMQ_QUEUE', 'default'), + 'connection' => TestSSLConnection::class, + 'secure' => true, + + 'hosts' => [ + [ + 'host' => getenv('HOST'), + 'port' => getenv('PORT_SSL'), + 'user' => 'guest', + 'password' => 'guest', + 'vhost' => '/', + ], + ], + + 'options' => [ + 'ssl_options' => [ + 'cafile' => getenv('RABBITMQ_SSL_CAFILE'), + 'local_cert' => null, + 'local_key' => null, + 'verify_peer' => false, + 'passphrase' => null, + ], + ], + + 'worker' => env('RABBITMQ_WORKER', 'default'), + ]); + + /** @var QueueManager $queue */ + $queue = $this->app['queue']; + + /** @var RabbitMQQueue $connection */ + $connection = $queue->connection('rabbitmq'); + $this->assertInstanceOf(RabbitMQQueue::class, $connection); + $this->assertInstanceOf(AMQPSSLConnection::class, $connection->getConnection()); + /** @var AMQPConnectionConfig */ + $config = $connection->getConnection()->getConfig(); + $this->assertFalse($config->getSslVerify()); + } } diff --git a/tests/Feature/QueueTest.php b/tests/Feature/QueueTest.php index 15b8acbb..ee324c9a 100644 --- a/tests/Feature/QueueTest.php +++ b/tests/Feature/QueueTest.php @@ -2,12 +2,42 @@ namespace VladimirYuldashev\LaravelQueueRabbitMQ\Tests\Feature; -use PhpAmqpLib\Connection\AMQPLazyConnection; +use PhpAmqpLib\Connection\AMQPStreamConnection; +use PhpAmqpLib\Exception\AMQPChannelClosedException; +use PhpAmqpLib\Exception\AMQPConnectionClosedException; +use PhpAmqpLib\Exception\AMQPProtocolChannelException; +use VladimirYuldashev\LaravelQueueRabbitMQ\Tests\Mocks\TestJob; class QueueTest extends TestCase { + protected function setUp(): void + { + parent::setUp(); + + $this->withoutExceptionHandling([ + AMQPChannelClosedException::class, AMQPConnectionClosedException::class, + AMQPProtocolChannelException::class, + ]); + } + public function testConnection(): void { - $this->assertInstanceOf(AMQPLazyConnection::class, $this->connection()->getChannel()->getConnection()); + $this->assertInstanceOf(AMQPStreamConnection::class, $this->connection()->getChannel()->getConnection()); + } + + public function testWithoutReconnect(): void + { + $queue = $this->connection('rabbitmq'); + + $queue->push(new TestJob); + sleep(1); + $this->assertSame(1, $queue->size()); + + // close connection + $queue->getConnection()->close(); + $this->assertFalse($queue->getConnection()->isConnected()); + + $this->expectException(AMQPChannelClosedException::class); + $queue->push(new TestJob); } } diff --git a/tests/Feature/SslQueueTest.php b/tests/Feature/SslQueueTest.php index 54969a9c..11c93f5a 100644 --- a/tests/Feature/SslQueueTest.php +++ b/tests/Feature/SslQueueTest.php @@ -4,11 +4,17 @@ use PhpAmqpLib\Connection\AMQPSSLConnection; -/** - * @group functional - */ class SslQueueTest extends TestCase { + protected bool $interactsWithConnection = false; + + protected function setUp(): void + { + parent::setUp(); + + $this->markTestSkipped(); + } + protected function getEnvironmentSetUp($app): void { $app['config']->set('queue.default', 'rabbitmq'); diff --git a/tests/Feature/TestCase.php b/tests/Feature/TestCase.php index 60f39b4c..f8a9cb05 100644 --- a/tests/Feature/TestCase.php +++ b/tests/Feature/TestCase.php @@ -2,15 +2,23 @@ namespace VladimirYuldashev\LaravelQueueRabbitMQ\Tests\Feature; +use Illuminate\Database\DatabaseTransactionsManager; use Illuminate\Support\Facades\Queue; use Illuminate\Support\Str; use PhpAmqpLib\Exception\AMQPProtocolChannelException; +use RuntimeException; use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob; +use VladimirYuldashev\LaravelQueueRabbitMQ\Tests\Mocks\TestEncryptedJob; use VladimirYuldashev\LaravelQueueRabbitMQ\Tests\Mocks\TestJob; use VladimirYuldashev\LaravelQueueRabbitMQ\Tests\TestCase as BaseTestCase; abstract class TestCase extends BaseTestCase { + /** + * Set to false for skipped tests. + */ + protected bool $interactsWithConnection = true; + /** * @throws AMQPProtocolChannelException */ @@ -18,8 +26,10 @@ protected function setUp(): void { parent::setUp(); - if ($this->connection()->isQueueExists()) { - $this->connection()->purge(); + if ($this->interactsWithConnection) { + if ($this->connection()->isQueueExists()) { + $this->connection()->purge(); + } } } @@ -28,11 +38,13 @@ protected function setUp(): void */ protected function tearDown(): void { - if ($this->connection()->isQueueExists()) { - $this->connection()->purge(); - } + if ($this->interactsWithConnection) { + if ($this->connection()->isQueueExists()) { + $this->connection()->purge(); + } - $this->assertSame(0, Queue::size()); + self::assertSame(0, Queue::size()); + } parent::tearDown(); } @@ -55,26 +67,25 @@ public function testPushRaw(): void $this->assertSame(1, Queue::size()); $this->assertNotNull($job = Queue::pop()); - $this->assertSame(0, $job->attempts()); + $this->assertSame(1, $job->attempts()); $this->assertInstanceOf(RabbitMQJob::class, $job); $this->assertSame($payload, $job->getRawBody()); $this->assertNull($job->getJobId()); $job->delete(); - $this->assertSame(0, Queue::size()); } public function testPush(): void { - Queue::push(new TestJob()); + Queue::push(new TestJob); sleep(1); $this->assertSame(1, Queue::size()); $this->assertNotNull($job = Queue::pop()); - $this->assertSame(0, $job->attempts()); + $this->assertSame(1, $job->attempts()); $this->assertInstanceOf(RabbitMQJob::class, $job); $this->assertSame(TestJob::class, $job->resolveName()); $this->assertNotNull($job->getJobId()); @@ -84,10 +95,40 @@ public function testPush(): void $this->assertSame(TestJob::class, $payload['displayName']); $this->assertSame('Illuminate\Queue\CallQueuedHandler@call', $payload['job']); $this->assertNull($payload['maxTries']); - $this->assertNull($payload['delay']); + $this->assertNull($payload['backoff']); $this->assertNull($payload['timeout']); - $this->assertNull($payload['timeoutAt']); + $this->assertNull($payload['retryUntil']); $this->assertSame($job->getJobId(), $payload['id']); + + $job->delete(); + $this->assertSame(0, Queue::size()); + } + + public function testPushAfterCommit(): void + { + $transaction = new DatabaseTransactionsManager; + + $this->app->singleton('db.transactions', function ($app) use ($transaction) { + $transaction->begin('FakeDBConnection', 1); + + return $transaction; + }); + + TestJob::dispatch()->afterCommit(); + + sleep(1); + $this->assertSame(0, Queue::size()); + $this->assertNull(Queue::pop()); + + $transaction->commit('FakeDBConnection', 1, 0); + + sleep(1); + + $this->assertSame(1, Queue::size()); + $this->assertNotNull($job = Queue::pop()); + + $job->delete(); + $this->assertSame(0, Queue::size()); } public function testLaterRaw(): void @@ -117,13 +158,12 @@ public function testLaterRaw(): void $this->assertSame($data, $body['data']); $job->delete(); - $this->assertSame(0, Queue::size()); } public function testLater(): void { - Queue::later(3, new TestJob()); + Queue::later(3, new TestJob); sleep(1); @@ -145,7 +185,6 @@ public function testLater(): void $this->assertNotNull($job->getJobId()); $job->delete(); - $this->assertSame(0, Queue::size()); } @@ -165,6 +204,103 @@ public function testBulk(): void $this->assertSame($count, Queue::size()); } + public function testPushEncrypted(): void + { + Queue::push(new TestEncryptedJob); + + sleep(1); + + $this->assertSame(1, Queue::size()); + $this->assertNotNull($job = Queue::pop()); + $this->assertSame(1, $job->attempts()); + $this->assertInstanceOf(RabbitMQJob::class, $job); + $this->assertSame(TestEncryptedJob::class, $job->resolveName()); + $this->assertNotNull($job->getJobId()); + + $payload = $job->payload(); + + $this->assertSame(TestEncryptedJob::class, $payload['displayName']); + $this->assertSame('Illuminate\Queue\CallQueuedHandler@call', $payload['job']); + $this->assertNull($payload['maxTries']); + $this->assertNull($payload['backoff']); + $this->assertNull($payload['timeout']); + $this->assertNull($payload['retryUntil']); + $this->assertSame($job->getJobId(), $payload['id']); + + $job->delete(); + $this->assertSame(0, Queue::size()); + } + + public function testPushEncryptedAfterCommit(): void + { + $transaction = new DatabaseTransactionsManager; + + $this->app->singleton('db.transactions', function ($app) use ($transaction) { + $transaction->begin('FakeDBConnection', 1); + + return $transaction; + }); + + TestEncryptedJob::dispatch()->afterCommit(); + + sleep(1); + $this->assertSame(0, Queue::size()); + $this->assertNull(Queue::pop()); + + $transaction->commit('FakeDBConnection', 1, 0); + + sleep(1); + + $this->assertSame(1, Queue::size()); + $this->assertNotNull($job = Queue::pop()); + + $job->delete(); + $this->assertSame(0, Queue::size()); + } + + public function testEncryptedLater(): void + { + Queue::later(3, new TestEncryptedJob); + + sleep(1); + + $this->assertSame(0, Queue::size()); + $this->assertNull(Queue::pop()); + + sleep(3); + + $this->assertSame(1, Queue::size()); + $this->assertNotNull($job = Queue::pop()); + + $this->assertInstanceOf(RabbitMQJob::class, $job); + + $body = json_decode($job->getRawBody(), true); + + $this->assertSame(TestEncryptedJob::class, $body['displayName']); + $this->assertSame('Illuminate\Queue\CallQueuedHandler@call', $body['job']); + $this->assertSame(TestEncryptedJob::class, $body['data']['commandName']); + $this->assertNotNull($job->getJobId()); + + $job->delete(); + $this->assertSame(0, Queue::size()); + } + + public function testEncryptedBulk(): void + { + $count = 100; + $jobs = []; + + for ($i = 0; $i < $count; $i++) { + $jobs[$i] = new TestEncryptedJob($i); + } + + Queue::bulk($jobs); + + sleep(1); + + $this->assertSame($count, Queue::size()); + } + public function testReleaseRaw(): void { Queue::pushRaw($payload = Str::random()); @@ -173,9 +309,9 @@ public function testReleaseRaw(): void $this->assertSame(1, Queue::size()); $this->assertNotNull($job = Queue::pop()); - $this->assertSame(0, $job->attempts()); + $this->assertSame(1, $job->attempts()); - for ($attempt = 1; $attempt <= 3; $attempt++) { + for ($attempt = 2; $attempt <= 4; $attempt++) { $job->release(); sleep(1); @@ -186,19 +322,22 @@ public function testReleaseRaw(): void $this->assertSame($attempt, $job->attempts()); } + + $job->delete(); + $this->assertSame(0, Queue::size()); } public function testRelease(): void { - Queue::push(new TestJob()); + Queue::push(new TestJob); sleep(1); $this->assertSame(1, Queue::size()); $this->assertNotNull($job = Queue::pop()); - $this->assertSame(0, $job->attempts()); + $this->assertSame(1, $job->attempts()); - for ($attempt = 1; $attempt <= 3; $attempt++) { + for ($attempt = 2; $attempt <= 4; $attempt++) { $job->release(); sleep(1); @@ -209,6 +348,9 @@ public function testRelease(): void $this->assertSame($attempt, $job->attempts()); } + + $job->delete(); + $this->assertSame(0, Queue::size()); } public function testReleaseWithDelayRaw(): void @@ -219,9 +361,9 @@ public function testReleaseWithDelayRaw(): void $this->assertSame(1, Queue::size()); $this->assertNotNull($job = Queue::pop()); - $this->assertSame(0, $job->attempts()); + $this->assertSame(1, $job->attempts()); - for ($attempt = 1; $attempt <= 3; $attempt++) { + for ($attempt = 2; $attempt <= 4; $attempt++) { $job->release(4); sleep(1); @@ -237,23 +379,29 @@ public function testReleaseWithDelayRaw(): void $this->assertSame($attempt, $job->attempts()); } + + $job->delete(); + $this->assertSame(0, Queue::size()); } public function testReleaseInThePast(): void { - Queue::push(new TestJob()); + Queue::push(new TestJob); $job = Queue::pop(); $job->release(-3); sleep(1); - $this->assertInstanceOf(RabbitMQJob::class, Queue::pop()); + $this->assertInstanceOf(RabbitMQJob::class, $job = Queue::pop()); + + $job->delete(); + $this->assertSame(0, Queue::size()); } public function testReleaseAndReleaseWithDelayAttempts(): void { - Queue::push(new TestJob()); + Queue::push(new TestJob); sleep(1); @@ -265,20 +413,22 @@ public function testReleaseAndReleaseWithDelayAttempts(): void sleep(1); $this->assertNotNull($job = Queue::pop()); - $this->assertSame(1, $job->attempts()); + $this->assertSame(2, $job->attempts()); $job->release(3); sleep(4); $this->assertNotNull($job = Queue::pop()); + $this->assertSame(3, $job->attempts()); - $this->assertSame(2, $job->attempts()); + $job->delete(); + $this->assertSame(0, Queue::size()); } public function testDelete(): void { - Queue::push(new TestJob()); + Queue::push(new TestJob); $job = Queue::pop(); @@ -289,4 +439,20 @@ public function testDelete(): void $this->assertSame(0, Queue::size()); $this->assertNull(Queue::pop()); } + + public function testFailed(): void + { + Queue::push(new TestJob); + + $job = Queue::pop(); + + $job->fail(new RuntimeException($job->resolveName().' has an exception.')); + + sleep(1); + + $this->assertSame(true, $job->hasFailed()); + $this->assertSame(true, $job->isDeleted()); + $this->assertSame(0, Queue::size()); + $this->assertNull(Queue::pop()); + } } diff --git a/tests/Functional/RabbitMQQueueTest.php b/tests/Functional/RabbitMQQueueTest.php new file mode 100644 index 00000000..f3ca4005 --- /dev/null +++ b/tests/Functional/RabbitMQQueueTest.php @@ -0,0 +1,313 @@ +connection(); + $this->assertInstanceOf(RabbitMQQueue::class, $queue); + + $queue = $this->connection('rabbitmq-with-options'); + $this->assertInstanceOf(RabbitMQQueue::class, $queue); + + $queue = $this->connection('rabbitmq-with-options-empty'); + $this->assertInstanceOf(RabbitMQQueue::class, $queue); + } + + public function testConfigRerouteFailed(): void + { + $queue = $this->connection(); + $this->assertFalse($this->callProperty($queue, 'rabbitMQConfig')->isRerouteFailed()); + + $queue = $this->connection('rabbitmq-with-options'); + $this->assertTrue($this->callProperty($queue, 'rabbitMQConfig')->isRerouteFailed()); + + $queue = $this->connection('rabbitmq-with-options-empty'); + $this->assertFalse($this->callProperty($queue, 'rabbitMQConfig')->isRerouteFailed()); + + $queue = $this->connection('rabbitmq-with-options-null'); + $this->assertFalse($this->callProperty($queue, 'rabbitMQConfig')->isRerouteFailed()); + } + + public function testConfigPrioritizeDelayed(): void + { + $queue = $this->connection(); + $this->assertFalse($this->callProperty($queue, 'rabbitMQConfig')->isPrioritizeDelayed()); + + $queue = $this->connection('rabbitmq-with-options'); + $this->assertTrue($this->callProperty($queue, 'rabbitMQConfig')->isPrioritizeDelayed()); + + $queue = $this->connection('rabbitmq-with-options-empty'); + $this->assertFalse($this->callProperty($queue, 'rabbitMQConfig')->isPrioritizeDelayed()); + + $queue = $this->connection('rabbitmq-with-options-null'); + $this->assertFalse($this->callProperty($queue, 'rabbitMQConfig')->isPrioritizeDelayed()); + } + + public function testQueueMaxPriority(): void + { + $queue = $this->connection(); + $this->assertIsInt($this->callProperty($queue, 'rabbitMQConfig')->getQueueMaxPriority()); + $this->assertSame(2, $this->callProperty($queue, 'rabbitMQConfig')->getQueueMaxPriority()); + + $queue = $this->connection('rabbitmq-with-options'); + $this->assertIsInt($this->callProperty($queue, 'rabbitMQConfig')->getQueueMaxPriority()); + $this->assertSame(20, $this->callProperty($queue, 'rabbitMQConfig')->getQueueMaxPriority()); + + $queue = $this->connection('rabbitmq-with-options-empty'); + $this->assertIsInt($this->callProperty($queue, 'rabbitMQConfig')->getQueueMaxPriority()); + $this->assertSame(2, $this->callProperty($queue, 'rabbitMQConfig')->getQueueMaxPriority()); + + $queue = $this->connection('rabbitmq-with-options-null'); + $this->assertIsInt($this->callProperty($queue, 'rabbitMQConfig')->getQueueMaxPriority()); + $this->assertSame(2, $this->callProperty($queue, 'rabbitMQConfig')->getQueueMaxPriority()); + } + + public function testConfigExchangeType(): void + { + $queue = $this->connection(); + $this->assertSame(AMQPExchangeType::DIRECT, $this->callMethod($queue, 'getExchangeType')); + $this->assertSame(AMQPExchangeType::DIRECT, $this->callMethod($queue, 'getExchangeType', [''])); + $this->assertSame(AMQPExchangeType::TOPIC, $this->callMethod($queue, 'getExchangeType', ['topic'])); + + $queue = $this->connection('rabbitmq-with-options'); + $this->assertSame(AMQPExchangeType::TOPIC, $this->callMethod($queue, 'getExchangeType')); + $this->assertSame(AMQPExchangeType::DIRECT, $this->callMethod($queue, 'getExchangeType', ['direct'])); + + $queue = $this->connection('rabbitmq-with-options-empty'); + $this->assertSame(AMQPExchangeType::DIRECT, $this->callMethod($queue, 'getExchangeType')); + + $queue = $this->connection('rabbitmq-with-options-null'); + $this->assertSame(AMQPExchangeType::DIRECT, $this->callMethod($queue, 'getExchangeType')); + + // testing an unkown type with a default + $this->callProperty($queue, 'rabbitMQConfig')->setExchangeType('unknown'); + $this->assertSame(AMQPExchangeType::DIRECT, $this->callMethod($queue, 'getExchangeType')); + } + + public function testExchange(): void + { + $queue = $this->connection(); + $this->assertSame('test', $this->callMethod($queue, 'getExchange', ['test'])); + $this->assertSame('', $this->callMethod($queue, 'getExchange', [''])); + $this->assertSame('', $this->callMethod($queue, 'getExchange', [null])); + $this->assertSame('', $this->callMethod($queue, 'getExchange')); + + $queue = $this->connection('rabbitmq-with-options'); + $this->assertSame('application-x', $this->callMethod($queue, 'getExchange')); + $this->assertSame('application-x', $this->callMethod($queue, 'getExchange', [null])); + $this->assertSame('test', $this->callMethod($queue, 'getExchange', ['test'])); + $this->assertSame('', $this->callMethod($queue, 'getExchange', [''])); + + $queue = $this->connection('rabbitmq-with-options-empty'); + $this->assertSame('', $this->callMethod($queue, 'getExchange')); + $this->assertSame('', $this->callMethod($queue, 'getExchange', [null])); + $this->assertSame('test', $this->callMethod($queue, 'getExchange', ['test'])); + $this->assertSame('', $this->callMethod($queue, 'getExchange', [''])); + + $queue = $this->connection('rabbitmq-with-options-null'); + $this->assertSame('', $this->callMethod($queue, 'getExchange')); + $this->assertSame('', $this->callMethod($queue, 'getExchange', [null])); + $this->assertSame('test', $this->callMethod($queue, 'getExchange', ['test'])); + $this->assertSame('', $this->callMethod($queue, 'getExchange', [''])); + } + + public function testFailedExchange(): void + { + $queue = $this->connection(); + $this->assertSame('test', $this->callMethod($queue, 'getFailedExchange', ['test'])); + $this->assertSame('', $this->callMethod($queue, 'getFailedExchange', [''])); + $this->assertSame('', $this->callMethod($queue, 'getFailedExchange', [null])); + $this->assertSame('', $this->callMethod($queue, 'getFailedExchange')); + + $queue = $this->connection('rabbitmq-with-options'); + $this->assertSame('failed-exchange', $this->callMethod($queue, 'getFailedExchange')); + $this->assertSame('failed-exchange', $this->callMethod($queue, 'getFailedExchange', [null])); + $this->assertSame('test', $this->callMethod($queue, 'getFailedExchange', ['test'])); + $this->assertSame('', $this->callMethod($queue, 'getFailedExchange', [''])); + + $queue = $this->connection('rabbitmq-with-options-empty'); + $this->assertSame('', $this->callMethod($queue, 'getFailedExchange')); + $this->assertSame('', $this->callMethod($queue, 'getFailedExchange', [null])); + $this->assertSame('test', $this->callMethod($queue, 'getFailedExchange', ['test'])); + $this->assertSame('', $this->callMethod($queue, 'getFailedExchange', [''])); + + $queue = $this->connection('rabbitmq-with-options-null'); + $this->assertSame('', $this->callMethod($queue, 'getFailedExchange')); + $this->assertSame('', $this->callMethod($queue, 'getFailedExchange', [null])); + $this->assertSame('test', $this->callMethod($queue, 'getFailedExchange', ['test'])); + $this->assertSame('', $this->callMethod($queue, 'getFailedExchange', [''])); + } + + public function testRoutingKey(): void + { + $queue = $this->connection(); + $this->assertSame('test', $this->callMethod($queue, 'getRoutingKey', ['test'])); + $this->assertSame('test', $this->callMethod($queue, 'getRoutingKey', ['.test'])); + $this->assertSame('', $this->callMethod($queue, 'getRoutingKey', [''])); + + $queue = $this->connection('rabbitmq-with-options'); + $this->assertSame('process.test', $this->callMethod($queue, 'getRoutingKey', ['test'])); + + $queue = $this->connection('rabbitmq-with-options-empty'); + $this->assertSame('test', $this->callMethod($queue, 'getRoutingKey', ['test'])); + + $queue = $this->connection('rabbitmq-with-options-null'); + $this->assertSame('test', $this->callMethod($queue, 'getRoutingKey', ['test'])); + $this->callProperty($queue, 'rabbitMQConfig')->setExchangeRoutingKey('.an.alternate.routing-key'); + $this->assertSame('an.alternate.routing-key', $this->callMethod($queue, 'getRoutingKey', ['test'])); + } + + public function testFailedRoutingKey(): void + { + $queue = $this->connection(); + $this->assertSame('test.failed', $this->callMethod($queue, 'getFailedRoutingKey', ['test'])); + $this->assertSame('test.failed', $this->callMethod($queue, 'getFailedRoutingKey', ['.test'])); + $this->assertSame('failed', $this->callMethod($queue, 'getFailedRoutingKey', [''])); + + $queue = $this->connection('rabbitmq-with-options'); + $this->assertSame('application-x.test.failed', $this->callMethod($queue, 'getFailedRoutingKey', ['test'])); + + $queue = $this->connection('rabbitmq-with-options-empty'); + $this->assertSame('test.failed', $this->callMethod($queue, 'getFailedRoutingKey', ['test'])); + + $queue = $this->connection('rabbitmq-with-options-null'); + $this->assertSame('test.failed', $this->callMethod($queue, 'getFailedRoutingKey', ['test'])); + $this->callProperty($queue, 'rabbitMQConfig')->setFailedRoutingKey('.an.alternate.routing-key'); + $this->assertSame('an.alternate.routing-key', $this->callMethod($queue, 'getFailedRoutingKey', ['test'])); + } + + public function testConfigQuorum(): void + { + $queue = $this->connection(); + $this->assertFalse($this->callProperty($queue, 'rabbitMQConfig')->isQuorum()); + + $queue = $this->connection('rabbitmq-with-options'); + $this->assertFalse($this->callProperty($queue, 'rabbitMQConfig')->isQuorum()); + + $queue = $this->connection('rabbitmq-with-options-empty'); + $this->assertFalse($this->callProperty($queue, 'rabbitMQConfig')->isQuorum()); + + $queue = $this->connection('rabbitmq-with-options-null'); + $this->assertFalse($this->callProperty($queue, 'rabbitMQConfig')->isQuorum()); + + $queue = $this->connection('rabbitmq-with-quorum-options'); + $this->assertTrue($this->callProperty($queue, 'rabbitMQConfig')->isQuorum()); + } + + public function testDeclareDeleteExchange(): void + { + $queue = $this->connection(); + + $name = Str::random(); + + $this->assertFalse($queue->isExchangeExists($name)); + + $queue->declareExchange($name); + $this->assertTrue($queue->isExchangeExists($name)); + + $queue->deleteExchange($name); + $this->assertFalse($queue->isExchangeExists($name)); + } + + public function testDeclareDeleteQueue(): void + { + $queue = $this->connection(); + + $name = Str::random(); + + $this->assertFalse($queue->isQueueExists($name)); + + $queue->declareQueue($name); + $this->assertTrue($queue->isQueueExists($name)); + + $queue->deleteQueue($name); + $this->assertFalse($queue->isQueueExists($name)); + } + + public function testQueueArguments(): void + { + $name = Str::random(); + + $queue = $this->connection(); + $actual = $this->callMethod($queue, 'getQueueArguments', [$name]); + $expected = []; + $this->assertEquals(array_keys($expected), array_keys($actual)); + $this->assertEquals(array_values($expected), array_values($actual)); + + $queue = $this->connection('rabbitmq-with-options'); + $actual = $this->callMethod($queue, 'getQueueArguments', [$name]); + $expected = [ + 'x-max-priority' => 20, + 'x-dead-letter-exchange' => 'failed-exchange', + 'x-dead-letter-routing-key' => sprintf('application-x.%s.failed', $name), + ]; + + $this->assertEquals(array_keys($expected), array_keys($actual)); + $this->assertEquals(array_values($expected), array_values($actual)); + + $queue = $this->connection('rabbitmq-with-quorum-options'); + $actual = $this->callMethod($queue, 'getQueueArguments', [$name]); + $expected = [ + 'x-dead-letter-exchange' => 'failed-exchange', + 'x-dead-letter-routing-key' => sprintf('application-x.%s.failed', $name), + 'x-queue-type' => 'quorum', + ]; + + $this->assertEquals(array_keys($expected), array_keys($actual)); + $this->assertEquals(array_values($expected), array_values($actual)); + + $queue = $this->connection('rabbitmq-with-options-empty'); + $actual = $this->callMethod($queue, 'getQueueArguments', [$name]); + $expected = []; + + $this->assertEquals(array_keys($expected), array_keys($actual)); + $this->assertEquals(array_values($expected), array_values($actual)); + } + + public function testDelayQueueArguments(): void + { + $name = Str::random(); + $ttl = 12000; + + $queue = $this->connection(); + $actual = $this->callMethod($queue, 'getDelayQueueArguments', [$name, $ttl]); + $expected = [ + 'x-dead-letter-exchange' => '', + 'x-dead-letter-routing-key' => $name, + 'x-message-ttl' => $ttl, + 'x-expires' => $ttl * 2, + ]; + $this->assertEquals(array_keys($expected), array_keys($actual)); + $this->assertEquals(array_values($expected), array_values($actual)); + + $queue = $this->connection('rabbitmq-with-options'); + $actual = $this->callMethod($queue, 'getDelayQueueArguments', [$name, $ttl]); + $expected = [ + 'x-dead-letter-exchange' => 'application-x', + 'x-dead-letter-routing-key' => sprintf('process.%s', $name), + 'x-message-ttl' => $ttl, + 'x-expires' => $ttl * 2, + ]; + $this->assertEquals(array_keys($expected), array_keys($actual)); + $this->assertEquals(array_values($expected), array_values($actual)); + + $queue = $this->connection('rabbitmq-with-options-empty'); + $actual = $this->callMethod($queue, 'getDelayQueueArguments', [$name, $ttl]); + $expected = [ + 'x-dead-letter-exchange' => '', + 'x-dead-letter-routing-key' => $name, + 'x-message-ttl' => $ttl, + 'x-expires' => $ttl * 2, + ]; + $this->assertEquals(array_keys($expected), array_keys($actual)); + $this->assertEquals(array_values($expected), array_values($actual)); + } +} diff --git a/tests/Functional/TestCase.php b/tests/Functional/TestCase.php new file mode 100644 index 00000000..8b843561 --- /dev/null +++ b/tests/Functional/TestCase.php @@ -0,0 +1,270 @@ +set('queue.default', 'rabbitmq'); + $app['config']->set('queue.connections.rabbitmq', [ + 'driver' => 'rabbitmq', + 'queue' => 'order', + 'connection' => 'default', + + 'hosts' => [ + [ + 'host' => getenv('HOST'), + 'port' => getenv('PORT'), + 'vhost' => '/', + 'user' => 'guest', + 'password' => 'guest', + ], + ], + + 'options' => [ + 'ssl_options' => [ + 'cafile' => null, + 'local_cert' => null, + 'local_key' => null, + 'verify_peer' => true, + 'passphrase' => null, + ], + ], + + 'worker' => 'default', + + ]); + $app['config']->set('queue.connections.rabbitmq-with-options', [ + 'driver' => 'rabbitmq', + 'queue' => 'order', + 'connection' => 'default', + + 'hosts' => [ + [ + 'host' => getenv('HOST'), + 'port' => getenv('PORT'), + 'vhost' => '/', + 'user' => 'guest', + 'password' => 'guest', + ], + ], + + 'options' => [ + 'ssl_options' => [ + 'cafile' => null, + 'local_cert' => null, + 'local_key' => null, + 'verify_peer' => true, + 'passphrase' => null, + ], + + 'queue' => [ + 'prioritize_delayed' => true, + 'queue_max_priority' => 20, + 'exchange' => 'application-x', + 'exchange_type' => 'topic', + 'exchange_routing_key' => 'process.%s', + 'reroute_failed' => true, + 'failed_exchange' => 'failed-exchange', + 'failed_routing_key' => 'application-x.%s.failed', + ], + ], + + 'worker' => 'default', + + ]); + $app['config']->set('queue.connections.rabbitmq-with-options-empty', [ + 'driver' => 'rabbitmq', + 'queue' => 'order', + 'connection' => 'default', + + 'hosts' => [ + [ + 'host' => getenv('HOST'), + 'port' => getenv('PORT'), + 'vhost' => '/', + 'user' => 'guest', + 'password' => 'guest', + ], + ], + + 'options' => [ + 'ssl_options' => [ + 'cafile' => null, + 'local_cert' => null, + 'local_key' => null, + 'verify_peer' => true, + 'passphrase' => null, + ], + + 'queue' => [ + 'prioritize_delayed' => '', + 'queue_max_priority' => '', + 'exchange' => '', + 'exchange_type' => '', + 'exchange_routing_key' => '', + 'reroute_failed' => '', + 'failed_exchange' => '', + 'failed_routing_key' => '', + 'quorum' => '', + ], + ], + + 'worker' => 'default', + + ]); + $app['config']->set('queue.connections.rabbitmq-with-options-null', [ + 'driver' => 'rabbitmq', + 'queue' => 'order', + 'connection' => 'default', + + 'hosts' => [ + [ + 'host' => null, + 'port' => null, + 'vhost' => null, + 'user' => null, + 'password' => null, + ], + ], + + 'options' => [ + 'ssl_options' => [ + 'cafile' => null, + 'local_cert' => null, + 'local_key' => null, + 'verify_peer' => null, + 'passphrase' => null, + ], + + 'queue' => [ + 'prioritize_delayed' => null, + 'queue_max_priority' => null, + 'exchange' => null, + 'exchange_type' => null, + 'exchange_routing_key' => null, + 'reroute_failed' => null, + 'failed_exchange' => null, + 'failed_routing_key' => null, + 'quorum' => null, + ], + ], + + 'worker' => 'default', + + ]); + $app['config']->set('queue.connections.rabbitmq-with-quorum-options', [ + 'driver' => 'rabbitmq', + 'queue' => 'order', + 'connection' => 'default', + + 'hosts' => [ + [ + 'host' => getenv('HOST'), + 'port' => getenv('PORT'), + 'vhost' => '/', + 'user' => 'guest', + 'password' => 'guest', + ], + ], + + 'options' => [ + 'ssl_options' => [ + 'cafile' => null, + 'local_cert' => null, + 'local_key' => null, + 'verify_peer' => true, + 'passphrase' => null, + ], + + 'queue' => [ + 'exchange' => 'application-x', + 'exchange_type' => 'topic', + 'exchange_routing_key' => 'process.%s', + 'reroute_failed' => true, + 'failed_exchange' => 'failed-exchange', + 'failed_routing_key' => 'application-x.%s.failed', + 'quorum' => true, + ], + ], + + 'worker' => 'default', + + ]); + } + + /** + * @throws Exception + */ + protected function callMethod($object, string $method, array $parameters = []): mixed + { + try { + $className = get_class($object); + $reflection = new ReflectionClass($className); + } catch (ReflectionException $e) { + throw new Exception($e->getMessage()); + } + + $method = $reflection->getMethod($method); + $method->setAccessible(true); + + return $method->invokeArgs($object, $parameters); + } + + /** + * @throws Exception + */ + protected function callProperty($object, string $property): mixed + { + try { + $className = get_class($object); + $reflection = new ReflectionClass($className); + } catch (ReflectionException $e) { + throw new Exception($e->getMessage()); + } + + $property = $reflection->getProperty($property); + $property->setAccessible(true); + + return $property->getValue($object); + } + + public function testConnectChannel(): void + { + $queue = $this->connection(); + $this->assertFalse($queue->getConnection()->isConnected()); + + /** @var AMQPChannel $channel */ + $channel = $this->callMethod($queue, 'getChannel'); + $this->assertTrue($queue->getConnection()->isConnected()); + $this->assertSame($channel, $this->callProperty($queue, 'channel')); + $this->assertTrue($channel->is_open()); + } + + public function testReconnect(): void + { + $queue = $this->connection(); + $this->assertFalse($queue->getConnection()->isConnected()); + + // connect + $channel = $this->callMethod($queue, 'getChannel'); + $this->assertTrue($queue->getConnection()->isConnected()); + $this->assertSame($channel, $this->callProperty($queue, 'channel')); + + // close + $queue->getConnection()->close(); + $this->assertFalse($queue->getConnection()->isConnected()); + + // reconnect + $this->callMethod($queue, 'reconnect'); + $this->assertTrue($queue->getConnection()->isConnected()); + $this->assertTrue($queue->getChannel()->is_open()); + } +} diff --git a/tests/Mocks/TestEncryptedJob.php b/tests/Mocks/TestEncryptedJob.php new file mode 100644 index 00000000..1c0e1762 --- /dev/null +++ b/tests/Mocks/TestEncryptedJob.php @@ -0,0 +1,25 @@ +i = $i; + } + + public function handle(): void + { + // + } +} diff --git a/tests/Mocks/TestJob.php b/tests/Mocks/TestJob.php index 0f290fd4..f97c7e12 100644 --- a/tests/Mocks/TestJob.php +++ b/tests/Mocks/TestJob.php @@ -2,10 +2,14 @@ namespace VladimirYuldashev\LaravelQueueRabbitMQ\Tests\Mocks; +use Illuminate\Bus\Queueable; use Illuminate\Contracts\Queue\ShouldQueue; +use Illuminate\Foundation\Bus\Dispatchable; class TestJob implements ShouldQueue { + use Dispatchable, Queueable; + public $i; public function __construct($i = 0) diff --git a/tests/Mocks/TestSSLConnection.php b/tests/Mocks/TestSSLConnection.php new file mode 100644 index 00000000..c1586475 --- /dev/null +++ b/tests/Mocks/TestSSLConnection.php @@ -0,0 +1,14 @@ +config; + } +} diff --git a/tests/TestCase.php b/tests/TestCase.php index 5eea441e..ec49a1ac 100644 --- a/tests/TestCase.php +++ b/tests/TestCase.php @@ -40,7 +40,7 @@ protected function getEnvironmentSetUp($app): void 'cafile' => null, 'local_cert' => null, 'local_key' => null, - 'verify_peer' => true, + 'verify_peer' => false, 'passphrase' => null, ], ], @@ -50,8 +50,8 @@ protected function getEnvironmentSetUp($app): void ]); } - protected function connection(): RabbitMQQueue + protected function connection(?string $name = null): RabbitMQQueue { - return Queue::connection(); + return Queue::connection($name); } }