Skip to content

Commit 190c450

Browse files
authored
Merge pull request #1 from vyuldashev/master
Merge with fork source
2 parents c75c934 + d06427b commit 190c450

File tree

15 files changed

+128
-81
lines changed

15 files changed

+128
-81
lines changed

.github/workflows/tests.yml

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,12 @@ jobs:
1515
matrix:
1616
php: ['8.1', '8.2', '8.3']
1717
stability: ['prefer-lowest', 'prefer-stable']
18-
laravel: ['^10.0', '^11.0']
18+
laravel: ['^10.0', '^11.0', '^12.0']
1919
exclude:
2020
- php: '8.1'
2121
laravel: '^11.0'
22+
- php: '8.1'
23+
laravel: '^12.0'
2224

2325
name: 'PHP ${{ matrix.php }} - Laravel: ${{matrix.laravel}} - ${{ matrix.stability }}'
2426

@@ -33,15 +35,8 @@ jobs:
3335
extensions: dom, curl, libxml, mbstring, zip
3436
coverage: none
3537

36-
- name: Set up Docker
37-
run: |
38-
sudo rm /usr/local/bin/docker-compose
39-
curl -L https://github.com/docker/compose/releases/download/1.24.1/docker-compose-`uname -s`-`uname -m` > docker-compose
40-
chmod +x docker-compose
41-
sudo mv docker-compose /usr/local/bin
42-
4338
- name: Start Docker container
44-
run: docker-compose up -d rabbitmq
39+
run: docker compose up -d rabbitmq
4540

4641
- name: Install dependencies
4742
run: composer update --with='laravel/framework:${{matrix.laravel}}' --${{ matrix.stability }} --prefer-dist --no-interaction --no-progress

README.md

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -529,6 +529,33 @@ Available protocols : `tcp`, `ssl`, `tls`
529529
],
530530
```
531531

532+
### Network Timeouts
533+
534+
For network timeouts configuration you can use option parameters.
535+
All float values are in seconds and zero value can mean infinite timeout.
536+
Example contains default values.
537+
538+
```php
539+
'connections' => [
540+
// ...
541+
542+
'rabbitmq' => [
543+
// ...
544+
545+
'options' => [
546+
// ...
547+
548+
'connection_timeout' => 3.0,
549+
'read_timeout' => 3.0,
550+
'write_timeout' => 3.0,
551+
'channel_rpc_timeout' => 0.0,
552+
],
553+
],
554+
555+
// ...
556+
],
557+
```
558+
532559
### Octane support
533560

534561
Starting with 13.3.0, this package supports [Laravel Octane](https://laravel.com/docs/octane) out of the box.

pint.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
{
22
"preset": "laravel",
33
"rules": {
4-
"nullable_type_declaration_for_default_null_value": {
5-
"use_nullable_type_declaration": false
4+
"php_unit_method_casing": {
5+
"case": "camel_case"
66
}
77
}
88
}

src/Console/ConsumeCommand.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,10 @@ class ConsumeCommand extends WorkCommand
2121
{--force : Force the worker to run even in maintenance mode}
2222
{--memory=128 : The memory limit in megabytes}
2323
{--sleep=3 : Number of seconds to sleep when no job is available}
24+
{--rest=0 : Number of seconds to rest between jobs}
2425
{--timeout=60 : The number of seconds a child process can run}
2526
{--tries=1 : Number of times to attempt a job before logging it failed}
26-
{--rest=0 : Number of seconds to rest between jobs}
27+
{--json : Output the queue worker information as JSON}
2728
2829
{--max-priority=}
2930
{--consumer-tag}

src/Horizon/RabbitMQQueue.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ class RabbitMQQueue extends BaseRabbitMQQueue
2626
*
2727
* @throws AMQPProtocolChannelException
2828
*/
29-
public function readyNow(string $queue = null): int
29+
public function readyNow(?string $queue = null): int
3030
{
3131
return $this->size($queue);
3232
}

src/Queue/Connection/ConfigFactory.php

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ class ConfigFactory
1616
*/
1717
public static function make(array $config = []): AMQPConnectionConfig
1818
{
19-
return tap(new AMQPConnectionConfig(), function (AMQPConnectionConfig $connectionConfig) use ($config) {
19+
return tap(new AMQPConnectionConfig, function (AMQPConnectionConfig $connectionConfig) use ($config) {
2020
// Set the connection to a Lazy by default
2121
$connectionConfig->setIsLazy(! in_array(
2222
Arr::get($config, 'lazy') ?? true,
@@ -38,6 +38,7 @@ public static function make(array $config = []): AMQPConnectionConfig
3838
self::getHostFromConfig($connectionConfig, $config);
3939
self::getHeartbeatFromConfig($connectionConfig, $config);
4040
self::getNetworkProtocolFromConfig($connectionConfig, $config);
41+
self::getTimeoutsFromConfig($connectionConfig, $config);
4142
});
4243
}
4344

@@ -99,4 +100,27 @@ protected static function getNetworkProtocolFromConfig(AMQPConnectionConfig $con
99100
$connectionConfig->setNetworkProtocol($networkProtocol);
100101
}
101102
}
103+
104+
protected static function getTimeoutsFromConfig(AMQPConnectionConfig $connectionConfig, array $config): void
105+
{
106+
$connectionTimeout = Arr::get($config, self::CONFIG_OPTIONS.'.connection_timeout');
107+
if (is_numeric($connectionTimeout) && floatval($connectionTimeout) >= 0) {
108+
$connectionConfig->setConnectionTimeout((float) $connectionTimeout);
109+
}
110+
111+
$readTimeout = Arr::get($config, self::CONFIG_OPTIONS.'.read_timeout');
112+
if (is_numeric($readTimeout) && floatval($readTimeout) >= 0) {
113+
$connectionConfig->setReadTimeout((float) $readTimeout);
114+
}
115+
116+
$writeTimeout = Arr::get($config, self::CONFIG_OPTIONS.'.write_timeout');
117+
if (is_numeric($writeTimeout) && floatval($writeTimeout) >= 0) {
118+
$connectionConfig->setWriteTimeout((float) $writeTimeout);
119+
}
120+
121+
$chanelRpcTimeout = Arr::get($config, self::CONFIG_OPTIONS.'.channel_rpc_timeout');
122+
if (is_numeric($chanelRpcTimeout) && floatval($chanelRpcTimeout) >= 0) {
123+
$connectionConfig->setChannelRPCTimeout((float) $chanelRpcTimeout);
124+
}
125+
}
102126
}

src/Queue/QueueConfigFactory.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ class QueueConfigFactory
1313
*/
1414
public static function make(array $config = []): QueueConfig
1515
{
16-
return tap(new QueueConfig(), function (QueueConfig $queueConfig) use ($config) {
16+
return tap(new QueueConfig, function (QueueConfig $queueConfig) use ($config) {
1717
if (! empty($queue = Arr::get($config, 'queue'))) {
1818
$queueConfig->setQueue($queue);
1919
}

src/Queue/RabbitMQQueue.php

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ protected function publishBatch($jobs, $data = '', $queue = null): void
204204
/**
205205
* @throws AMQPProtocolChannelException
206206
*/
207-
public function bulkRaw(string $payload, string $queue = null, array $options = []): int|string|null
207+
public function bulkRaw(string $payload, ?string $queue = null, array $options = []): int|string|null
208208
{
209209
[$destination, $exchange, $exchangeType, $attempts] = $this->publishProperties($queue, $options);
210210

@@ -397,7 +397,7 @@ public function deleteExchange(string $name, bool $unused = false): void
397397
*
398398
* @throws AMQPProtocolChannelException
399399
*/
400-
public function isQueueExists(string $name = null): bool
400+
public function isQueueExists(?string $name = null): bool
401401
{
402402
$queueName = $this->getQueue($name);
403403

@@ -484,7 +484,7 @@ public function bindQueue(string $queue, string $exchange, string $routingKey =
484484
/**
485485
* Purge the queue of messages.
486486
*/
487-
public function purge(string $queue = null): void
487+
public function purge(?string $queue = null): void
488488
{
489489
// create a temporary channel, so the main channel will not be closed on exception
490490
$channel = $this->createChannel();
@@ -637,7 +637,7 @@ protected function getDelayQueueArguments(string $destination, int $ttl): array
637637
/**
638638
* Get the exchange name, or empty string; as default value.
639639
*/
640-
protected function getExchange(string $exchange = null): string
640+
protected function getExchange(?string $exchange = null): string
641641
{
642642
return $exchange ?? $this->getConfig()->getExchange();
643643
}
@@ -654,7 +654,7 @@ protected function getRoutingKey(string $destination): string
654654
/**
655655
* Get the exchangeType, or AMQPExchangeType::DIRECT as default.
656656
*/
657-
protected function getExchangeType(string $type = null): string
657+
protected function getExchangeType(?string $type = null): string
658658
{
659659
$constant = AMQPExchangeType::class.'::'.Str::upper($type ?: $this->getConfig()->getExchangeType());
660660

@@ -664,7 +664,7 @@ protected function getExchangeType(string $type = null): string
664664
/**
665665
* Get the exchange for failed messages.
666666
*/
667-
protected function getFailedExchange(string $exchange = null): string
667+
protected function getFailedExchange(?string $exchange = null): string
668668
{
669669
return $exchange ?? $this->getConfig()->getFailedExchange();
670670
}
@@ -699,7 +699,7 @@ protected function isQueueDeclared(string $name): bool
699699
*
700700
* @throws AMQPProtocolChannelException
701701
*/
702-
protected function declareDestination(string $destination, string $exchange = null, string $exchangeType = AMQPExchangeType::DIRECT): void
702+
protected function declareDestination(string $destination, ?string $exchange = null, string $exchangeType = AMQPExchangeType::DIRECT): void
703703
{
704704
// When an exchange is provided and no exchange is present in RabbitMQ, create an exchange.
705705
if ($exchange && ! $this->isExchangeExists($exchange)) {

tests/Feature/ConnectorTest.php

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
class ConnectorTest extends \VladimirYuldashev\LaravelQueueRabbitMQ\Tests\TestCase
1414
{
15-
public function testLazyConnection(): void
15+
public function test_lazy_connection(): void
1616
{
1717
$this->app['config']->set('queue.connections.rabbitmq', [
1818
'driver' => 'rabbitmq',
@@ -55,7 +55,7 @@ public function testLazyConnection(): void
5555
$this->assertTrue($connection->getConnection()->isConnected());
5656
}
5757

58-
public function testLazyStreamConnection(): void
58+
public function test_lazy_stream_connection(): void
5959
{
6060
$this->app['config']->set('queue.connections.rabbitmq', [
6161
'driver' => 'rabbitmq',
@@ -98,7 +98,7 @@ public function testLazyStreamConnection(): void
9898
$this->assertTrue($connection->getConnection()->isConnected());
9999
}
100100

101-
public function testSslConnection(): void
101+
public function test_ssl_connection(): void
102102
{
103103
$this->markTestSkipped();
104104

@@ -142,7 +142,7 @@ public function testSslConnection(): void
142142
}
143143

144144
// Test to validate ssl connection params
145-
public function testNoVerificationSslConnection(): void
145+
public function test_no_verification_ssl_connection(): void
146146
{
147147
$this->app['config']->set('queue.connections.rabbitmq', [
148148
'driver' => 'rabbitmq',

tests/Feature/QueueTest.php

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
class QueueTest extends TestCase
1212
{
13-
public function setUp(): void
13+
protected function setUp(): void
1414
{
1515
parent::setUp();
1616

@@ -20,16 +20,16 @@ public function setUp(): void
2020
]);
2121
}
2222

23-
public function testConnection(): void
23+
public function test_connection(): void
2424
{
2525
$this->assertInstanceOf(AMQPStreamConnection::class, $this->connection()->getChannel()->getConnection());
2626
}
2727

28-
public function testWithoutReconnect(): void
28+
public function test_without_reconnect(): void
2929
{
3030
$queue = $this->connection('rabbitmq');
3131

32-
$queue->push(new TestJob());
32+
$queue->push(new TestJob);
3333
sleep(1);
3434
$this->assertSame(1, $queue->size());
3535

@@ -38,6 +38,6 @@ public function testWithoutReconnect(): void
3838
$this->assertFalse($queue->getConnection()->isConnected());
3939

4040
$this->expectException(AMQPChannelClosedException::class);
41-
$queue->push(new TestJob());
41+
$queue->push(new TestJob);
4242
}
4343
}

0 commit comments

Comments
 (0)