diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 37c80375..4aad6d20 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -13,12 +13,14 @@ jobs: strategy: fail-fast: true matrix: - php: ['8.0', '8.1', '8.2'] - stability: ['prefer-lowest', prefer-stable] - laravel: ['^9.0', '^10.0'] + php: ['8.1', '8.2', '8.3', '8.4'] + stability: ['prefer-lowest', 'prefer-stable'] + laravel: ['^10.0', '^11.0', '^12.0'] exclude: - - php: '8.0' - laravel: '^10.0' + - php: '8.1' + laravel: '^11.0' + - php: '8.1' + laravel: '^12.0' name: 'PHP ${{ matrix.php }} - Laravel: ${{matrix.laravel}} - ${{ matrix.stability }}' @@ -33,15 +35,8 @@ jobs: extensions: dom, curl, libxml, mbstring, zip coverage: none - - name: Set up Docker - run: | - sudo rm /usr/local/bin/docker-compose - curl -L https://github.com/docker/compose/releases/download/1.24.1/docker-compose-`uname -s`-`uname -m` > docker-compose - chmod +x docker-compose - sudo mv docker-compose /usr/local/bin - - name: Start Docker container - run: docker-compose up -d rabbitmq + run: docker compose up -d rabbitmq - name: Install dependencies run: composer update --with='laravel/framework:${{matrix.laravel}}' --${{ matrix.stability }} --prefer-dist --no-interaction --no-progress @@ -50,4 +45,4 @@ jobs: run: ./vendor/bin/pint --test - name: Execute tests - run: sleep 10 && vendor/bin/phpunit --verbose + run: sleep 10 && vendor/bin/phpunit diff --git a/.gitignore b/.gitignore index 00dea824..a2dcf885 100644 --- a/.gitignore +++ b/.gitignore @@ -4,5 +4,5 @@ composer.lock .phpstorm.meta.php phpunit.xml -.phpunit.result.cache +.phpunit.* .php_cs.cache diff --git a/CHANGELOG-14x.md b/CHANGELOG-14x.md new file mode 100644 index 00000000..c94b3dd2 --- /dev/null +++ b/CHANGELOG-14x.md @@ -0,0 +1,8 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +## [unreleased](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v13.3.0...master) + +## [14.0.0] +- First release compatible with Laravel 11 diff --git a/README.md b/README.md index e02f99c0..30a10df9 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ RabbitMQ Queue driver for Laravel ====================== [![Latest Stable Version](https://poser.pugx.org/vladimir-yuldashev/laravel-queue-rabbitmq/v/stable?format=flat-square)](https://packagist.org/packages/vladimir-yuldashev/laravel-queue-rabbitmq) -[![Build Status](https://github.com/vyuldashev/laravel-queue-rabbitmq/workflows/Tests/badge.svg)](https://github.com/vyuldashev/laravel-queue-rabbitmq/actions) +[![Build Status](https://github.com/vyuldashev/laravel-queue-rabbitmq/actions/workflows/tests.yml/badge.svg?branch=master)](https://github.com/vyuldashev/laravel-queue-rabbitmq/actions/workflows/tests.yml) [![Total Downloads](https://poser.pugx.org/vladimir-yuldashev/laravel-queue-rabbitmq/downloads?format=flat-square)](https://packagist.org/packages/vladimir-yuldashev/laravel-queue-rabbitmq) [![License](https://poser.pugx.org/vladimir-yuldashev/laravel-queue-rabbitmq/license?format=flat-square)](https://packagist.org/packages/vladimir-yuldashev/laravel-queue-rabbitmq) @@ -509,6 +509,53 @@ If for some reason you don't want the connection lazy you can turn it off by set ], ``` +### Network Protocol + +By default, the network protocol used for connection is tcp. +If for some reason you want to use another network protocol, you can add the extra value in your config options. +Available protocols : `tcp`, `ssl`, `tls` + +```php +'connections' => [ + // ... + + 'rabbitmq' => [ + // ... + + 'network_protocol' => 'tcp', + ], + + // ... +], +``` + +### Network Timeouts + +For network timeouts configuration you can use option parameters. +All float values are in seconds and zero value can mean infinite timeout. +Example contains default values. + +```php +'connections' => [ + // ... + + 'rabbitmq' => [ + // ... + + 'options' => [ + // ... + + 'connection_timeout' => 3.0, + 'read_timeout' => 3.0, + 'write_timeout' => 3.0, + 'channel_rpc_timeout' => 0.0, + ], + ], + + // ... +], +``` + ### Octane support Starting with 13.3.0, this package supports [Laravel Octane](https://laravel.com/docs/octane) out of the box. diff --git a/composer.json b/composer.json index 20cf5310..caf8a0fc 100644 --- a/composer.json +++ b/composer.json @@ -11,16 +11,16 @@ "require": { "php": "^8.0", "ext-json": "*", - "illuminate/queue": "^9.0|^10.0", - "php-amqplib/php-amqplib": "^v3.2" + "illuminate/queue": "^10.0|^11.0|^12.0", + "php-amqplib/php-amqplib": "^v3.6" }, "require-dev": { - "phpunit/phpunit": "^9.3", + "phpunit/phpunit": "^10.0|^11.0", "mockery/mockery": "^1.0", "laravel/horizon": "^5.0", - "orchestra/testbench": "^7.0|^8.0", + "orchestra/testbench": "^7.0|^8.0|^9.0|^10.0", "laravel/pint": "^1.2", - "laravel/framework": "^9.0|^10.0" + "laravel/framework": "^9.0|^10.0|^11.0|^12.0" }, "autoload": { "psr-4": { diff --git a/phpunit.xml.dist b/phpunit.xml.dist index 7e166ab8..d213fd63 100644 --- a/phpunit.xml.dist +++ b/phpunit.xml.dist @@ -1,27 +1,16 @@ - - - - src/ - - - - - - - - ./tests/ - - - - - - - - - + + + + ./tests/ + + + + + + + + + + diff --git a/pint.json b/pint.json new file mode 100644 index 00000000..05f4b41e --- /dev/null +++ b/pint.json @@ -0,0 +1,8 @@ +{ + "preset": "laravel", + "rules": { + "php_unit_method_casing": { + "case": "camel_case" + } + } +} diff --git a/src/Console/ConsumeCommand.php b/src/Console/ConsumeCommand.php index 50aab1d4..4072132a 100644 --- a/src/Console/ConsumeCommand.php +++ b/src/Console/ConsumeCommand.php @@ -21,9 +21,10 @@ class ConsumeCommand extends WorkCommand {--force : Force the worker to run even in maintenance mode} {--memory=128 : The memory limit in megabytes} {--sleep=3 : Number of seconds to sleep when no job is available} + {--rest=0 : Number of seconds to rest between jobs} {--timeout=60 : The number of seconds a child process can run} {--tries=1 : Number of times to attempt a job before logging it failed} - {--rest=0 : Number of seconds to rest between jobs} + {--json : Output the queue worker information as JSON} {--max-priority=} {--consumer-tag} diff --git a/src/Horizon/RabbitMQQueue.php b/src/Horizon/RabbitMQQueue.php index e4266a76..e2ca400d 100644 --- a/src/Horizon/RabbitMQQueue.php +++ b/src/Horizon/RabbitMQQueue.php @@ -26,7 +26,7 @@ class RabbitMQQueue extends BaseRabbitMQQueue * * @throws AMQPProtocolChannelException */ - public function readyNow(string $queue = null): int + public function readyNow(?string $queue = null): int { return $this->size($queue); } @@ -48,7 +48,7 @@ public function push($job, $data = '', $queue = null) */ public function pushRaw($payload, $queue = null, array $options = []): int|string|null { - $payload = (new JobPayload($payload))->prepare($this->lastPushed)->value; + $payload = (new JobPayload($payload))->prepare($this->lastPushed ?? null)->value; return tap(parent::pushRaw($payload, $queue, $options), function () use ($queue, $payload): void { $this->event($this->getQueue($queue), new JobPushed($payload)); diff --git a/src/Queue/Connection/ConfigFactory.php b/src/Queue/Connection/ConfigFactory.php index c6e24699..06c8080d 100644 --- a/src/Queue/Connection/ConfigFactory.php +++ b/src/Queue/Connection/ConfigFactory.php @@ -16,7 +16,7 @@ class ConfigFactory */ public static function make(array $config = []): AMQPConnectionConfig { - return tap(new AMQPConnectionConfig(), function (AMQPConnectionConfig $connectionConfig) use ($config) { + return tap(new AMQPConnectionConfig, function (AMQPConnectionConfig $connectionConfig) use ($config) { // Set the connection to a Lazy by default $connectionConfig->setIsLazy(! in_array( Arr::get($config, 'lazy') ?? true, @@ -37,6 +37,8 @@ public static function make(array $config = []): AMQPConnectionConfig self::getHostFromConfig($connectionConfig, $config); self::getHeartbeatFromConfig($connectionConfig, $config); + self::getNetworkProtocolFromConfig($connectionConfig, $config); + self::getTimeoutsFromConfig($connectionConfig, $config); }); } @@ -74,7 +76,8 @@ protected static function getSLLOptionsFromConfig(AMQPConnectionConfig $connecti if ($key = Arr::get($sslConfig, 'local_key')) { $connectionConfig->setSslKey($key); } - if ($verifyPeer = Arr::get($sslConfig, 'verify_peer')) { + if (Arr::has($sslConfig, 'verify_peer')) { + $verifyPeer = Arr::get($sslConfig, 'verify_peer'); $connectionConfig->setSslVerify($verifyPeer); } if ($passphrase = Arr::get($sslConfig, 'passphrase')) { @@ -90,4 +93,34 @@ protected static function getHeartbeatFromConfig(AMQPConnectionConfig $connectio $connectionConfig->setHeartbeat((int) $heartbeat); } } + + protected static function getNetworkProtocolFromConfig(AMQPConnectionConfig $connectionConfig, array $config): void + { + if ($networkProtocol = Arr::get($config, 'network_protocol')) { + $connectionConfig->setNetworkProtocol($networkProtocol); + } + } + + protected static function getTimeoutsFromConfig(AMQPConnectionConfig $connectionConfig, array $config): void + { + $connectionTimeout = Arr::get($config, self::CONFIG_OPTIONS.'.connection_timeout'); + if (is_numeric($connectionTimeout) && floatval($connectionTimeout) >= 0) { + $connectionConfig->setConnectionTimeout((float) $connectionTimeout); + } + + $readTimeout = Arr::get($config, self::CONFIG_OPTIONS.'.read_timeout'); + if (is_numeric($readTimeout) && floatval($readTimeout) >= 0) { + $connectionConfig->setReadTimeout((float) $readTimeout); + } + + $writeTimeout = Arr::get($config, self::CONFIG_OPTIONS.'.write_timeout'); + if (is_numeric($writeTimeout) && floatval($writeTimeout) >= 0) { + $connectionConfig->setWriteTimeout((float) $writeTimeout); + } + + $chanelRpcTimeout = Arr::get($config, self::CONFIG_OPTIONS.'.channel_rpc_timeout'); + if (is_numeric($chanelRpcTimeout) && floatval($chanelRpcTimeout) >= 0) { + $connectionConfig->setChannelRPCTimeout((float) $chanelRpcTimeout); + } + } } diff --git a/src/Queue/Connection/ConnectionFactory.php b/src/Queue/Connection/ConnectionFactory.php index df19f223..f531e761 100644 --- a/src/Queue/Connection/ConnectionFactory.php +++ b/src/Queue/Connection/ConnectionFactory.php @@ -126,7 +126,6 @@ protected static function createStreamConnection($connection, AMQPConnectionConf 'keepalive' => $config->isKeepalive(), 'heartbeat' => $config->getHeartbeat(), ], - $config->getNetworkProtocol(), $config ); } @@ -170,7 +169,7 @@ protected static function getSslOptions(AMQPConnectionConfig $config): array 'ciphers' => $config->getSslCiphers(), 'security_level' => $config->getSslSecurityLevel(), ], static function ($value) { - return null !== $value; + return $value !== null; }); } @@ -200,10 +199,10 @@ protected static function assertSSLConnection($connection): void self::assertExtendedOf($connection, self::CONNECTION_SUB_TYPE_SSL); } - protected static function assertExtendedOf($connection, string $abstract): void + protected static function assertExtendedOf($connection, string $parent): void { - if (! is_subclass_of($connection, $abstract)) { - throw new AMQPLogicException(sprintf('The connection must extend: %s', class_basename($abstract))); + if (! is_subclass_of($connection, $parent) && $connection !== $parent) { + throw new AMQPLogicException(sprintf('The connection must extend: %s', class_basename($parent))); } } diff --git a/src/Queue/QueueConfigFactory.php b/src/Queue/QueueConfigFactory.php index 87fc2fac..6f2befc5 100644 --- a/src/Queue/QueueConfigFactory.php +++ b/src/Queue/QueueConfigFactory.php @@ -13,7 +13,7 @@ class QueueConfigFactory */ public static function make(array $config = []): QueueConfig { - return tap(new QueueConfig(), function (QueueConfig $queueConfig) use ($config) { + return tap(new QueueConfig, function (QueueConfig $queueConfig) use ($config) { if (! empty($queue = Arr::get($config, 'queue'))) { $queueConfig->setQueue($queue); } diff --git a/src/Queue/RabbitMQQueue.php b/src/Queue/RabbitMQQueue.php index cf52eb80..04377a0d 100644 --- a/src/Queue/RabbitMQQueue.php +++ b/src/Queue/RabbitMQQueue.php @@ -7,12 +7,15 @@ use ErrorException; use Exception; use Illuminate\Contracts\Queue\Queue as QueueContract; +use Illuminate\Contracts\Queue\ShouldBeEncrypted; use Illuminate\Queue\Queue; use Illuminate\Support\Arr; +use Illuminate\Support\Facades\Crypt; use Illuminate\Support\Str; use PhpAmqpLib\Channel\AMQPChannel; use PhpAmqpLib\Connection\AbstractConnection; use PhpAmqpLib\Exception\AMQPChannelClosedException; +use PhpAmqpLib\Exception\AMQPConnectionBlockedException; use PhpAmqpLib\Exception\AMQPConnectionClosedException; use PhpAmqpLib\Exception\AMQPProtocolChannelException; use PhpAmqpLib\Exception\AMQPRuntimeException; @@ -54,19 +57,19 @@ class RabbitMQQueue extends Queue implements QueueContract, RabbitMQQueueContrac /** * Current job being processed. */ - protected RabbitMQJob $currentJob; + protected ?RabbitMQJob $currentJob = null; /** * Holds the Configuration */ - protected QueueConfig $config; + protected QueueConfig $rabbitMQConfig; /** * RabbitMQQueue constructor. */ public function __construct(QueueConfig $config) { - $this->config = $config; + $this->rabbitMQConfig = $config; $this->dispatchAfterCommit = $config->isDispatchAfterCommit(); } @@ -201,7 +204,7 @@ protected function publishBatch($jobs, $data = '', $queue = null): void /** * @throws AMQPProtocolChannelException */ - public function bulkRaw(string $payload, string $queue = null, array $options = []): int|string|null + public function bulkRaw(string $payload, ?string $queue = null, array $options = []): int|string|null { [$destination, $exchange, $exchangeType, $attempts] = $this->publishProperties($queue, $options); @@ -290,7 +293,7 @@ public function setConnection(AbstractConnection $connection): RabbitMQQueue */ public function getJobClass(): string { - $job = $this->getConfig()->getAbstractJob(); + $job = $this->getRabbitMQConfig()->getAbstractJob(); throw_if( ! is_a($job, RabbitMQJob::class, true), @@ -303,12 +306,10 @@ public function getJobClass(): string /** * Gets a queue/destination, by default the queue option set on the connection. - * - * @param null $queue */ public function getQueue($queue = null): string { - return $queue ?: $this->getConfig()->getQueue(); + return $queue ?: $this->getRabbitMQConfig()->getQueue(); } /** @@ -396,7 +397,7 @@ public function deleteExchange(string $name, bool $unused = false): void * * @throws AMQPProtocolChannelException */ - public function isQueueExists(string $name = null): bool + public function isQueueExists(?string $name = null): bool { $queueName = $this->getQueue($name); @@ -483,7 +484,7 @@ public function bindQueue(string $queue, string $exchange, string $routingKey = /** * Purge the queue of messages. */ - public function purge(string $queue = null): void + public function purge(?string $queue = null): void { // create a temporary channel, so the main channel will not be closed on exception $channel = $this->createChannel(); @@ -522,11 +523,16 @@ protected function createMessage($payload, int $attempts = 0): array $properties['correlation_id'] = $correlationId; } - if ($this->getConfig()->isPrioritizeDelayed()) { + if ($this->getRabbitMQConfig()->isPrioritizeDelayed()) { $properties['priority'] = $attempts; } if (isset($currentPayload['data']['command'])) { + // If the command data is encrypted, decrypt it first before attempting to unserialize + if (is_subclass_of($currentPayload['data']['commandName'], ShouldBeEncrypted::class)) { + $currentPayload['data']['command'] = Crypt::decrypt($currentPayload['data']['command']); + } + $commandData = unserialize($currentPayload['data']['command']); if (property_exists($commandData, 'priority')) { $properties['priority'] = $commandData->priority; @@ -599,16 +605,16 @@ protected function getQueueArguments(string $destination): array // Messages with a priority which is higher than the queue's maximum, are treated as if they were // published with the maximum priority. // Quorum queues does not support priority. - if ($this->getConfig()->isPrioritizeDelayed() && ! $this->getConfig()->isQuorum()) { - $arguments['x-max-priority'] = $this->getConfig()->getQueueMaxPriority(); + if ($this->getRabbitMQConfig()->isPrioritizeDelayed() && ! $this->getRabbitMQConfig()->isQuorum()) { + $arguments['x-max-priority'] = $this->getRabbitMQConfig()->getQueueMaxPriority(); } - if ($this->getConfig()->isRerouteFailed()) { + if ($this->getRabbitMQConfig()->isRerouteFailed()) { $arguments['x-dead-letter-exchange'] = $this->getFailedExchange(); $arguments['x-dead-letter-routing-key'] = $this->getFailedRoutingKey($destination); } - if ($this->getConfig()->isQuorum()) { + if ($this->getRabbitMQConfig()->isQuorum()) { $arguments['x-queue-type'] = 'quorum'; } @@ -633,7 +639,7 @@ protected function getDelayQueueArguments(string $destination, int $ttl): array */ protected function getExchange(?string $exchange = null): string { - return $exchange ?? $this->getConfig()->getExchange(); + return $exchange ?? $this->getRabbitMQConfig()->getExchange(); } /** @@ -642,7 +648,7 @@ protected function getExchange(?string $exchange = null): string */ protected function getRoutingKey(string $destination): string { - return ltrim(sprintf($this->getConfig()->getExchangeRoutingKey(), $destination), '.'); + return ltrim(sprintf($this->getRabbitMQConfig()->getExchangeRoutingKey(), $destination), '.'); } /** @@ -650,7 +656,7 @@ protected function getRoutingKey(string $destination): string */ protected function getExchangeType(?string $type = null): string { - $constant = AMQPExchangeType::class.'::'.Str::upper($type ?: $this->getConfig()->getExchangeType()); + $constant = AMQPExchangeType::class.'::'.Str::upper($type ?: $this->getRabbitMQConfig()->getExchangeType()); return defined($constant) ? constant($constant) : AMQPExchangeType::DIRECT; } @@ -660,7 +666,7 @@ protected function getExchangeType(?string $type = null): string */ protected function getFailedExchange(?string $exchange = null): string { - return $exchange ?? $this->getConfig()->getFailedExchange(); + return $exchange ?? $this->getRabbitMQConfig()->getFailedExchange(); } /** @@ -669,7 +675,7 @@ protected function getFailedExchange(?string $exchange = null): string */ protected function getFailedRoutingKey(string $destination): string { - return ltrim(sprintf($this->getConfig()->getFailedRoutingKey(), $destination), '.'); + return ltrim(sprintf($this->getRabbitMQConfig()->getFailedRoutingKey(), $destination), '.'); } /** @@ -729,11 +735,16 @@ protected function publishProperties($queue, array $options = []): array return [$destination, $exchange, $exchangeType, $attempts]; } - protected function getConfig(): QueueConfig + protected function getRabbitMQConfig(): QueueConfig { - return $this->config; + return $this->rabbitMQConfig; } + /** + * @throws AMQPChannelClosedException + * @throws AMQPConnectionClosedException + * @throws AMQPConnectionBlockedException + */ protected function publishBasic($msg, $exchange = '', $destination = '', $mandatory = false, $immediate = false, $ticket = null): void { $this->getChannel()->basic_publish($msg, $exchange, $destination, $mandatory, $immediate, $ticket); diff --git a/tests/Feature/ConnectorTest.php b/tests/Feature/ConnectorTest.php index 3ecede98..91660ad2 100644 --- a/tests/Feature/ConnectorTest.php +++ b/tests/Feature/ConnectorTest.php @@ -3,10 +3,12 @@ namespace VladimirYuldashev\LaravelQueueRabbitMQ\Tests\Feature; use Illuminate\Queue\QueueManager; +use PhpAmqpLib\Connection\AMQPConnectionConfig; use PhpAmqpLib\Connection\AMQPLazyConnection; use PhpAmqpLib\Connection\AMQPSSLConnection; use PhpAmqpLib\Connection\AMQPStreamConnection; use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue; +use VladimirYuldashev\LaravelQueueRabbitMQ\Tests\Mocks\TestSSLConnection; class ConnectorTest extends \VladimirYuldashev\LaravelQueueRabbitMQ\Tests\TestCase { @@ -138,4 +140,48 @@ public function testSslConnection(): void $this->assertTrue($connection->getConnection()->isConnected()); $this->assertTrue($connection->getChannel()->is_open()); } + + // Test to validate ssl connection params + public function testNoVerificationSslConnection(): void + { + $this->app['config']->set('queue.connections.rabbitmq', [ + 'driver' => 'rabbitmq', + 'queue' => env('RABBITMQ_QUEUE', 'default'), + 'connection' => TestSSLConnection::class, + 'secure' => true, + + 'hosts' => [ + [ + 'host' => getenv('HOST'), + 'port' => getenv('PORT_SSL'), + 'user' => 'guest', + 'password' => 'guest', + 'vhost' => '/', + ], + ], + + 'options' => [ + 'ssl_options' => [ + 'cafile' => getenv('RABBITMQ_SSL_CAFILE'), + 'local_cert' => null, + 'local_key' => null, + 'verify_peer' => false, + 'passphrase' => null, + ], + ], + + 'worker' => env('RABBITMQ_WORKER', 'default'), + ]); + + /** @var QueueManager $queue */ + $queue = $this->app['queue']; + + /** @var RabbitMQQueue $connection */ + $connection = $queue->connection('rabbitmq'); + $this->assertInstanceOf(RabbitMQQueue::class, $connection); + $this->assertInstanceOf(AMQPSSLConnection::class, $connection->getConnection()); + /** @var AMQPConnectionConfig */ + $config = $connection->getConnection()->getConfig(); + $this->assertFalse($config->getSslVerify()); + } } diff --git a/tests/Feature/QueueTest.php b/tests/Feature/QueueTest.php index 5ecfb978..ee324c9a 100644 --- a/tests/Feature/QueueTest.php +++ b/tests/Feature/QueueTest.php @@ -10,7 +10,7 @@ class QueueTest extends TestCase { - public function setUp(): void + protected function setUp(): void { parent::setUp(); @@ -29,7 +29,7 @@ public function testWithoutReconnect(): void { $queue = $this->connection('rabbitmq'); - $queue->push(new TestJob()); + $queue->push(new TestJob); sleep(1); $this->assertSame(1, $queue->size()); @@ -38,6 +38,6 @@ public function testWithoutReconnect(): void $this->assertFalse($queue->getConnection()->isConnected()); $this->expectException(AMQPChannelClosedException::class); - $queue->push(new TestJob()); + $queue->push(new TestJob); } } diff --git a/tests/Feature/SslQueueTest.php b/tests/Feature/SslQueueTest.php index 7233ee09..11c93f5a 100644 --- a/tests/Feature/SslQueueTest.php +++ b/tests/Feature/SslQueueTest.php @@ -4,13 +4,14 @@ use PhpAmqpLib\Connection\AMQPSSLConnection; -/** - * @group functional - */ class SslQueueTest extends TestCase { - public function setUp(): void + protected bool $interactsWithConnection = false; + + protected function setUp(): void { + parent::setUp(); + $this->markTestSkipped(); } diff --git a/tests/Feature/TestCase.php b/tests/Feature/TestCase.php index 299e7024..f8a9cb05 100644 --- a/tests/Feature/TestCase.php +++ b/tests/Feature/TestCase.php @@ -8,20 +8,28 @@ use PhpAmqpLib\Exception\AMQPProtocolChannelException; use RuntimeException; use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob; +use VladimirYuldashev\LaravelQueueRabbitMQ\Tests\Mocks\TestEncryptedJob; use VladimirYuldashev\LaravelQueueRabbitMQ\Tests\Mocks\TestJob; use VladimirYuldashev\LaravelQueueRabbitMQ\Tests\TestCase as BaseTestCase; abstract class TestCase extends BaseTestCase { + /** + * Set to false for skipped tests. + */ + protected bool $interactsWithConnection = true; + /** * @throws AMQPProtocolChannelException */ - public function setUp(): void + protected function setUp(): void { parent::setUp(); - if ($this->connection()->isQueueExists()) { - $this->connection()->purge(); + if ($this->interactsWithConnection) { + if ($this->connection()->isQueueExists()) { + $this->connection()->purge(); + } } } @@ -30,11 +38,13 @@ public function setUp(): void */ protected function tearDown(): void { - if ($this->connection()->isQueueExists()) { - $this->connection()->purge(); - } + if ($this->interactsWithConnection) { + if ($this->connection()->isQueueExists()) { + $this->connection()->purge(); + } - self::assertSame(0, Queue::size()); + self::assertSame(0, Queue::size()); + } parent::tearDown(); } @@ -69,7 +79,7 @@ public function testPushRaw(): void public function testPush(): void { - Queue::push(new TestJob()); + Queue::push(new TestJob); sleep(1); @@ -110,7 +120,7 @@ public function testPushAfterCommit(): void $this->assertSame(0, Queue::size()); $this->assertNull(Queue::pop()); - $transaction->commit('FakeDBConnection'); + $transaction->commit('FakeDBConnection', 1, 0); sleep(1); @@ -153,7 +163,7 @@ public function testLaterRaw(): void public function testLater(): void { - Queue::later(3, new TestJob()); + Queue::later(3, new TestJob); sleep(1); @@ -194,6 +204,103 @@ public function testBulk(): void $this->assertSame($count, Queue::size()); } + public function testPushEncrypted(): void + { + Queue::push(new TestEncryptedJob); + + sleep(1); + + $this->assertSame(1, Queue::size()); + $this->assertNotNull($job = Queue::pop()); + $this->assertSame(1, $job->attempts()); + $this->assertInstanceOf(RabbitMQJob::class, $job); + $this->assertSame(TestEncryptedJob::class, $job->resolveName()); + $this->assertNotNull($job->getJobId()); + + $payload = $job->payload(); + + $this->assertSame(TestEncryptedJob::class, $payload['displayName']); + $this->assertSame('Illuminate\Queue\CallQueuedHandler@call', $payload['job']); + $this->assertNull($payload['maxTries']); + $this->assertNull($payload['backoff']); + $this->assertNull($payload['timeout']); + $this->assertNull($payload['retryUntil']); + $this->assertSame($job->getJobId(), $payload['id']); + + $job->delete(); + $this->assertSame(0, Queue::size()); + } + + public function testPushEncryptedAfterCommit(): void + { + $transaction = new DatabaseTransactionsManager; + + $this->app->singleton('db.transactions', function ($app) use ($transaction) { + $transaction->begin('FakeDBConnection', 1); + + return $transaction; + }); + + TestEncryptedJob::dispatch()->afterCommit(); + + sleep(1); + $this->assertSame(0, Queue::size()); + $this->assertNull(Queue::pop()); + + $transaction->commit('FakeDBConnection', 1, 0); + + sleep(1); + + $this->assertSame(1, Queue::size()); + $this->assertNotNull($job = Queue::pop()); + + $job->delete(); + $this->assertSame(0, Queue::size()); + } + + public function testEncryptedLater(): void + { + Queue::later(3, new TestEncryptedJob); + + sleep(1); + + $this->assertSame(0, Queue::size()); + $this->assertNull(Queue::pop()); + + sleep(3); + + $this->assertSame(1, Queue::size()); + $this->assertNotNull($job = Queue::pop()); + + $this->assertInstanceOf(RabbitMQJob::class, $job); + + $body = json_decode($job->getRawBody(), true); + + $this->assertSame(TestEncryptedJob::class, $body['displayName']); + $this->assertSame('Illuminate\Queue\CallQueuedHandler@call', $body['job']); + $this->assertSame(TestEncryptedJob::class, $body['data']['commandName']); + $this->assertNotNull($job->getJobId()); + + $job->delete(); + $this->assertSame(0, Queue::size()); + } + + public function testEncryptedBulk(): void + { + $count = 100; + $jobs = []; + + for ($i = 0; $i < $count; $i++) { + $jobs[$i] = new TestEncryptedJob($i); + } + + Queue::bulk($jobs); + + sleep(1); + + $this->assertSame($count, Queue::size()); + } + public function testReleaseRaw(): void { Queue::pushRaw($payload = Str::random()); @@ -222,7 +329,7 @@ public function testReleaseRaw(): void public function testRelease(): void { - Queue::push(new TestJob()); + Queue::push(new TestJob); sleep(1); @@ -279,7 +386,7 @@ public function testReleaseWithDelayRaw(): void public function testReleaseInThePast(): void { - Queue::push(new TestJob()); + Queue::push(new TestJob); $job = Queue::pop(); $job->release(-3); @@ -294,7 +401,7 @@ public function testReleaseInThePast(): void public function testReleaseAndReleaseWithDelayAttempts(): void { - Queue::push(new TestJob()); + Queue::push(new TestJob); sleep(1); @@ -321,7 +428,7 @@ public function testReleaseAndReleaseWithDelayAttempts(): void public function testDelete(): void { - Queue::push(new TestJob()); + Queue::push(new TestJob); $job = Queue::pop(); @@ -335,7 +442,7 @@ public function testDelete(): void public function testFailed(): void { - Queue::push(new TestJob()); + Queue::push(new TestJob); $job = Queue::pop(); diff --git a/tests/Functional/RabbitMQQueueTest.php b/tests/Functional/RabbitMQQueueTest.php index 7a106a08..f3ca4005 100644 --- a/tests/Functional/RabbitMQQueueTest.php +++ b/tests/Functional/RabbitMQQueueTest.php @@ -24,50 +24,50 @@ public function testConnection(): void public function testConfigRerouteFailed(): void { $queue = $this->connection(); - $this->assertFalse($this->callProperty($queue, 'config')->isRerouteFailed()); + $this->assertFalse($this->callProperty($queue, 'rabbitMQConfig')->isRerouteFailed()); $queue = $this->connection('rabbitmq-with-options'); - $this->assertTrue($this->callProperty($queue, 'config')->isRerouteFailed()); + $this->assertTrue($this->callProperty($queue, 'rabbitMQConfig')->isRerouteFailed()); $queue = $this->connection('rabbitmq-with-options-empty'); - $this->assertFalse($this->callProperty($queue, 'config')->isRerouteFailed()); + $this->assertFalse($this->callProperty($queue, 'rabbitMQConfig')->isRerouteFailed()); $queue = $this->connection('rabbitmq-with-options-null'); - $this->assertFalse($this->callProperty($queue, 'config')->isRerouteFailed()); + $this->assertFalse($this->callProperty($queue, 'rabbitMQConfig')->isRerouteFailed()); } public function testConfigPrioritizeDelayed(): void { $queue = $this->connection(); - $this->assertFalse($this->callProperty($queue, 'config')->isPrioritizeDelayed()); + $this->assertFalse($this->callProperty($queue, 'rabbitMQConfig')->isPrioritizeDelayed()); $queue = $this->connection('rabbitmq-with-options'); - $this->assertTrue($this->callProperty($queue, 'config')->isPrioritizeDelayed()); + $this->assertTrue($this->callProperty($queue, 'rabbitMQConfig')->isPrioritizeDelayed()); $queue = $this->connection('rabbitmq-with-options-empty'); - $this->assertFalse($this->callProperty($queue, 'config')->isPrioritizeDelayed()); + $this->assertFalse($this->callProperty($queue, 'rabbitMQConfig')->isPrioritizeDelayed()); $queue = $this->connection('rabbitmq-with-options-null'); - $this->assertFalse($this->callProperty($queue, 'config')->isPrioritizeDelayed()); + $this->assertFalse($this->callProperty($queue, 'rabbitMQConfig')->isPrioritizeDelayed()); } public function testQueueMaxPriority(): void { $queue = $this->connection(); - $this->assertIsInt($this->callProperty($queue, 'config')->getQueueMaxPriority()); - $this->assertSame(2, $this->callProperty($queue, 'config')->getQueueMaxPriority()); + $this->assertIsInt($this->callProperty($queue, 'rabbitMQConfig')->getQueueMaxPriority()); + $this->assertSame(2, $this->callProperty($queue, 'rabbitMQConfig')->getQueueMaxPriority()); $queue = $this->connection('rabbitmq-with-options'); - $this->assertIsInt($this->callProperty($queue, 'config')->getQueueMaxPriority()); - $this->assertSame(20, $this->callProperty($queue, 'config')->getQueueMaxPriority()); + $this->assertIsInt($this->callProperty($queue, 'rabbitMQConfig')->getQueueMaxPriority()); + $this->assertSame(20, $this->callProperty($queue, 'rabbitMQConfig')->getQueueMaxPriority()); $queue = $this->connection('rabbitmq-with-options-empty'); - $this->assertIsInt($this->callProperty($queue, 'config')->getQueueMaxPriority()); - $this->assertSame(2, $this->callProperty($queue, 'config')->getQueueMaxPriority()); + $this->assertIsInt($this->callProperty($queue, 'rabbitMQConfig')->getQueueMaxPriority()); + $this->assertSame(2, $this->callProperty($queue, 'rabbitMQConfig')->getQueueMaxPriority()); $queue = $this->connection('rabbitmq-with-options-null'); - $this->assertIsInt($this->callProperty($queue, 'config')->getQueueMaxPriority()); - $this->assertSame(2, $this->callProperty($queue, 'config')->getQueueMaxPriority()); + $this->assertIsInt($this->callProperty($queue, 'rabbitMQConfig')->getQueueMaxPriority()); + $this->assertSame(2, $this->callProperty($queue, 'rabbitMQConfig')->getQueueMaxPriority()); } public function testConfigExchangeType(): void @@ -87,8 +87,8 @@ public function testConfigExchangeType(): void $queue = $this->connection('rabbitmq-with-options-null'); $this->assertSame(AMQPExchangeType::DIRECT, $this->callMethod($queue, 'getExchangeType')); - //testing an unkown type with a default - $this->callProperty($queue, 'config')->setExchangeType('unknown'); + // testing an unkown type with a default + $this->callProperty($queue, 'rabbitMQConfig')->setExchangeType('unknown'); $this->assertSame(AMQPExchangeType::DIRECT, $this->callMethod($queue, 'getExchangeType')); } @@ -161,7 +161,7 @@ public function testRoutingKey(): void $queue = $this->connection('rabbitmq-with-options-null'); $this->assertSame('test', $this->callMethod($queue, 'getRoutingKey', ['test'])); - $this->callProperty($queue, 'config')->setExchangeRoutingKey('.an.alternate.routing-key'); + $this->callProperty($queue, 'rabbitMQConfig')->setExchangeRoutingKey('.an.alternate.routing-key'); $this->assertSame('an.alternate.routing-key', $this->callMethod($queue, 'getRoutingKey', ['test'])); } @@ -180,26 +180,26 @@ public function testFailedRoutingKey(): void $queue = $this->connection('rabbitmq-with-options-null'); $this->assertSame('test.failed', $this->callMethod($queue, 'getFailedRoutingKey', ['test'])); - $this->callProperty($queue, 'config')->setFailedRoutingKey('.an.alternate.routing-key'); + $this->callProperty($queue, 'rabbitMQConfig')->setFailedRoutingKey('.an.alternate.routing-key'); $this->assertSame('an.alternate.routing-key', $this->callMethod($queue, 'getFailedRoutingKey', ['test'])); } public function testConfigQuorum(): void { $queue = $this->connection(); - $this->assertFalse($this->callProperty($queue, 'config')->isQuorum()); + $this->assertFalse($this->callProperty($queue, 'rabbitMQConfig')->isQuorum()); $queue = $this->connection('rabbitmq-with-options'); - $this->assertFalse($this->callProperty($queue, 'config')->isQuorum()); + $this->assertFalse($this->callProperty($queue, 'rabbitMQConfig')->isQuorum()); $queue = $this->connection('rabbitmq-with-options-empty'); - $this->assertFalse($this->callProperty($queue, 'config')->isQuorum()); + $this->assertFalse($this->callProperty($queue, 'rabbitMQConfig')->isQuorum()); $queue = $this->connection('rabbitmq-with-options-null'); - $this->assertFalse($this->callProperty($queue, 'config')->isQuorum()); + $this->assertFalse($this->callProperty($queue, 'rabbitMQConfig')->isQuorum()); $queue = $this->connection('rabbitmq-with-quorum-options'); - $this->assertTrue($this->callProperty($queue, 'config')->isQuorum()); + $this->assertTrue($this->callProperty($queue, 'rabbitMQConfig')->isQuorum()); } public function testDeclareDeleteExchange(): void diff --git a/tests/Mocks/TestEncryptedJob.php b/tests/Mocks/TestEncryptedJob.php new file mode 100644 index 00000000..1c0e1762 --- /dev/null +++ b/tests/Mocks/TestEncryptedJob.php @@ -0,0 +1,25 @@ +i = $i; + } + + public function handle(): void + { + // + } +} diff --git a/tests/Mocks/TestSSLConnection.php b/tests/Mocks/TestSSLConnection.php new file mode 100644 index 00000000..c1586475 --- /dev/null +++ b/tests/Mocks/TestSSLConnection.php @@ -0,0 +1,14 @@ +config; + } +} diff --git a/tests/TestCase.php b/tests/TestCase.php index 7d50fa67..ec49a1ac 100644 --- a/tests/TestCase.php +++ b/tests/TestCase.php @@ -40,7 +40,7 @@ protected function getEnvironmentSetUp($app): void 'cafile' => null, 'local_cert' => null, 'local_key' => null, - 'verify_peer' => true, + 'verify_peer' => false, 'passphrase' => null, ], ], @@ -50,7 +50,7 @@ protected function getEnvironmentSetUp($app): void ]); } - protected function connection(string $name = null): RabbitMQQueue + protected function connection(?string $name = null): RabbitMQQueue { return Queue::connection($name); }