Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Only the latest version will get new features. Bug fixes will be provided using
You can install this package via composer using this command:

```
composer require salesmessage/php-lib-rabbitmq:^1.29 --ignore-platform-reqs
composer require salesmessage/php-lib-rabbitmq:^1.31 --ignore-platform-reqs
```

The package will automatically register itself.
Expand Down Expand Up @@ -663,7 +663,19 @@ if not all the issues with the following command:
composer fix:style
```

## Contribution

You can contribute to this package by discovering bugs and opening issues. Please, add to which version of package you
create pull request or issue. (e.g. [5.2] Fatal error on delayed job)
## Local Setup
- Configure all config items in `config/queue.php` section `connections.rabbitmq_vhosts` (see as example [rabbitmq.php](./config/rabbitmq.php))
- Create `yml` file in the project root with name `rabbit-groups.yml` and content, for example like this (you can replace `vhosts` and `queues` with `vhosts_mask` and `queues_mask`):
```yaml
groups:
test-notes:
vhosts:
- organization_200005
queues:
- local-myname.notes.200005
batch_size: 3
prefetch_count: 3
```
- Make sure that vhosts exist in RabbitMQ (if not - create them)
- Run command `php artisan lib-rabbitmq:scan-vhosts` within your project where this library is installed (this command fetches data from RabbitMQ to Redis)
- Run command for consumer `php artisan lib-rabbitmq:consume-vhosts test-notes rabbitmq_vhosts --name=mq-vhost-test-local-notes --memory=300 --timeout=0 --max-jobs=1000 --max-time=600 --async-mode=1`
6 changes: 4 additions & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
},
"extra": {
"branch-alias": {
"dev-master": "1.30-dev"
"dev-master": "1.31-dev"
},
"laravel": {
"providers": [
Expand All @@ -43,7 +43,9 @@
}
},
"suggest": {
"ext-pcntl": "Required to use all features of the queue consumer."
"ext-pcntl": "Required to use all features of the queue consumer.",
"ext-swoole": "Required to use async mode for healthcheck (alternative is ext-openswoole).",
"ext-openswoole": "Required to use async mode for healthcheck (alternative is ext-swoole)."
},
"scripts": {
"test": [
Expand Down
27 changes: 27 additions & 0 deletions config/rabbitmq.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,33 @@
'options' => [
],

/**
* Provided on 2 levels: transport and application.
*/
'deduplication' => [
'transport' => [
'enabled' => env('RABBITMQ_DEDUP_TRANSPORT_ENABLED', false),
'ttl' => env('RABBITMQ_DEDUP_TRANSPORT_TTL', 7200),
'lock_ttl' => env('RABBITMQ_DEDUP_TRANSPORT_LOCK_TTL', 60),
/**
* Possible: ack, reject
*/
'action_on_duplication' => env('RABBITMQ_DEDUP_TRANSPORT_ACTION', 'ack'),
/**
* Possible: ack, reject, requeue
*/
'action_on_lock' => env('RABBITMQ_DEDUP_TRANSPORT_LOCK_ACTION', 'requeue'),
'connection' => [
'driver' => env('RABBITMQ_DEDUP_TRANSPORT_DRIVER', 'redis'),
'name' => env('RABBITMQ_DEDUP_TRANSPORT_CONNECTION_NAME', 'persistent'),
'key_prefix' => env('RABBITMQ_DEDUP_TRANSPORT_KEY_PREFIX', 'mq_dedup'),
],
],
'application' => [
'enabled' => env('RABBITMQ_DEDUP_APP_ENABLED', true),
],
],

/*
* Set to "horizon" if you wish to use Laravel Horizon.
*/
Expand Down
14 changes: 12 additions & 2 deletions src/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Exception\AMQPRuntimeException;
use PhpAmqpLib\Message\AMQPMessage;
use Throwable;
use Salesmessage\LibRabbitMQ\Queue\RabbitMQQueue;
use Salesmessage\LibRabbitMQ\Services\Deduplication\TransportLevel\DeduplicationService;
use Throwable;

class Consumer extends Worker
{
Expand Down Expand Up @@ -122,7 +123,16 @@ function (AMQPMessage $message) use ($connection, $options, $connectionName, $qu

$jobsProcessed++;

$this->runJob($job, $connectionName, $options);
/** @var DeduplicationService $transportDedupService */
$transportDedupService = $this->container->make(DeduplicationService::class);
$transportDedupService->decorateWithDeduplication(
function () use ($job, $message, $connectionName, $queue, $options, $transportDedupService) {
$this->runJob($job, $connectionName, $options);
$transportDedupService->markAsProcessed($message, $queue);
},
$message,
$queue
);

if ($this->supportsAsyncSignals()) {
$this->resetTimeoutHandler();
Expand Down
15 changes: 15 additions & 0 deletions src/Contracts/RabbitMQConsumable.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?php

namespace Salesmessage\LibRabbitMQ\Contracts;

/**
* Each Laravel job should implement this interface.
*/
interface RabbitMQConsumable
{
/**
* Check duplications on the application side.
* It's mostly represented as an idempotency checker.
*/
public function isDuplicated(): bool;
}
11 changes: 9 additions & 2 deletions src/Interfaces/RabbitMQBatchable.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,18 @@

interface RabbitMQBatchable
{
/**
* Filter out jobs that have already been processed according to the application logic.
*
* @param list<static> $batch
* @return list<static>
*/
public static function getNotDuplicatedBatchedJobs(array $batch): array;

/**
* Processing jobs array of static class
*
* @param array<static> $batch
* @return mixed
* @param list<static> $batch
*/
public static function collection(array $batch): void;
}
51 changes: 42 additions & 9 deletions src/LaravelLibRabbitMQServiceProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,17 @@
namespace Salesmessage\LibRabbitMQ;

use Illuminate\Contracts\Debug\ExceptionHandler;
use Illuminate\Queue\Connectors\BeanstalkdConnector;
use Illuminate\Queue\Connectors\DatabaseConnector;
use Illuminate\Queue\Connectors\NullConnector;
use Illuminate\Queue\Connectors\RedisConnector;
use Illuminate\Queue\Connectors\SqsConnector;
use Illuminate\Queue\Connectors\SyncConnector;
use Illuminate\Queue\QueueManager;
use Illuminate\Support\ServiceProvider;
use Psr\Log\LoggerInterface;
use Salesmessage\LibRabbitMQ\Console\ConsumeCommand;
use Salesmessage\LibRabbitMQ\Console\ConsumeVhostsCommand;
use Salesmessage\LibRabbitMQ\Console\ScanVhostsCommand;
use Salesmessage\LibRabbitMQ\Queue\Connectors\RabbitMQVhostsConnector;
use Salesmessage\LibRabbitMQ\Services\Deduplication\TransportLevel\DeduplicationService;
use Salesmessage\LibRabbitMQ\Services\Deduplication\TransportLevel\DeduplicationStore;
use Salesmessage\LibRabbitMQ\Services\Deduplication\TransportLevel\NullDeduplicationStore;
use Salesmessage\LibRabbitMQ\Services\Deduplication\TransportLevel\RedisDeduplicationStore;
use Salesmessage\LibRabbitMQ\Services\GroupsService;
use Salesmessage\LibRabbitMQ\Services\InternalStorageManager;
use Salesmessage\LibRabbitMQ\Services\QueueService;
Expand All @@ -36,6 +34,8 @@ public function register(): void
);

if ($this->app->runningInConsole()) {
$this->bindDeduplicationService();

$this->app->singleton('rabbitmq.consumer', function () {
$isDownForMaintenance = function () {
return $this->app->isDownForMaintenance();
Expand Down Expand Up @@ -68,7 +68,8 @@ public function register(): void
$this->app['events'],
$this->app[ExceptionHandler::class],
$isDownForMaintenance,
null
$this->app->get(DeduplicationService::class),
null,
);
});

Expand All @@ -84,15 +85,16 @@ public function register(): void
$this->app['events'],
$this->app[ExceptionHandler::class],
$isDownForMaintenance,
null
$this->app->get(DeduplicationService::class),
null,
);
});

$this->app->singleton(ConsumeVhostsCommand::class, static function ($app) {
$consumerClass = ('direct' === config('queue.connections.rabbitmq_vhosts.consumer_type'))
? VhostsDirectConsumer::class
: VhostsQueueConsumer::class;

return new ConsumeVhostsCommand(
$app[GroupsService::class],
$app[$consumerClass],
Expand Down Expand Up @@ -139,4 +141,35 @@ public function boot(): void
return new RabbitMQVhostsConnector($this->app['events']);
});
}

/**
* Config params:
* @phpstan-import-type DeduplicationConfig from DeduplicationService
*
* @return void
*/
private function bindDeduplicationService(): void
{
$this->app->bind(DeduplicationStore::class, static function () {
/** @var DeduplicationConfig $config */
$config = (array) config('queue.connections.rabbitmq_vhosts.deduplication.transport', []);
$enabled = (bool) ($config['enabled'] ?? false);
if (!$enabled) {
return new NullDeduplicationStore();
}

$connectionDriver = $config['connection']['driver'] ?? null;
if ($connectionDriver !== 'redis') {
throw new \InvalidArgumentException('For now only Redis connection is supported for deduplication');
}
$connectionName = $config['connection']['name'] ?? null;

$prefix = trim($config['connection']['key_prefix'] ?? '');
if (empty($prefix)) {
throw new \InvalidArgumentException('Key prefix is required');
}

return new RedisDeduplicationStore($connectionName, $prefix);
});
}
}
2 changes: 2 additions & 0 deletions src/Queue/RabbitMQQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
use Ramsey\Uuid\Uuid;
use RuntimeException;
use Throwable;
use Salesmessage\LibRabbitMQ\Contracts\RabbitMQQueueContract;
Expand Down Expand Up @@ -521,6 +522,7 @@ protected function createMessage($payload, int $attempts = 0): array
$currentPayload = json_decode($payload, true);
if ($correlationId = $currentPayload['id'] ?? null) {
$properties['correlation_id'] = $correlationId;
$properties['message_id'] = Uuid::uuid7()->toString();
}

if ($this->getConfig()->isPrioritizeDelayed()) {
Expand Down
12 changes: 11 additions & 1 deletion src/Queue/RabbitMQQueueBatchable.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Salesmessage\LibRabbitMQ\Queue;

use PhpAmqpLib\Connection\AbstractConnection;
use Salesmessage\LibRabbitMQ\Contracts\RabbitMQConsumable;
use Salesmessage\LibRabbitMQ\Dto\ConnectionNameDto;
use Salesmessage\LibRabbitMQ\Dto\QueueApiDto;
use Salesmessage\LibRabbitMQ\Dto\VhostApiDto;
Expand Down Expand Up @@ -77,7 +78,16 @@ protected function createChannel(): AMQPChannel

public function push($job, $data = '', $queue = null)
{
$queue = $queue ?: $job->onQueue();
if (!($job instanceof RabbitMQConsumable)) {
throw new \InvalidArgumentException('Job must implement RabbitMQConsumable');
}

if (!$queue) {
if (!method_exists($job, 'onQueue')) {
throw new \InvalidArgumentException('Job must implement onQueue method');
}
$queue = $job->onQueue();
}

try {
$result = parent::push($job, $data, $queue);
Expand Down
4 changes: 2 additions & 2 deletions src/Services/Api/RabbitApiClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public function request(
$contents = $response->getBody()->getContents();

return (array) ($contents ? json_decode($contents, true) : []);
} catch (Throwable $exception) {
} catch (\Throwable $exception) {
$rethrowException = $exception;
if ($exception instanceof ClientException) {
$rethrowException = new RabbitApiClientException($exception->getMessage());
Expand Down Expand Up @@ -109,4 +109,4 @@ private function getPassword(): string
{
return (string) ($this->connectionConfig['hosts'][0]['password'] ?? '');
}
}
}
18 changes: 18 additions & 0 deletions src/Services/Deduplication/AppDeduplicationService.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?php

namespace Salesmessage\LibRabbitMQ\Services\Deduplication;

class AppDeduplicationService
{
public static function isEnabled(): bool
{
return (bool) static::getConfig('enabled', true);
}

protected static function getConfig(string $key, mixed $default = null): mixed
{
$value = config("queue.connections.rabbitmq_vhosts.deduplication.application.$key");

return $value !== null ? $value : $default;
}
}
Loading