From 833a9184da0ce097ef4aac063771633283278181 Mon Sep 17 00:00:00 2001 From: roman Date: Mon, 10 Jun 2019 17:51:10 +0300 Subject: [PATCH 1/2] Implemented basic_consume --- config/rabbitmq.php | 3 + src/Queue/BasicConsumeHandler.php | 81 +++++++++++++++++++ src/Queue/RabbitMQQueue.php | 56 +++++++++++-- .../SendAndReceiveDelayedMessageTest.php | 75 ++++++++++++++++- .../Functional/SendAndReceiveMessageTest.php | 73 ++++++++++++++++- tests/Queue/RabbitMQQueueTest.php | 57 +++++++++++++ 6 files changed, 336 insertions(+), 9 deletions(-) create mode 100644 src/Queue/BasicConsumeHandler.php diff --git a/config/rabbitmq.php b/config/rabbitmq.php index 708d8bbc..2598629a 100644 --- a/config/rabbitmq.php +++ b/config/rabbitmq.php @@ -74,6 +74,9 @@ 'exclusive' => env('RABBITMQ_QUEUE_EXCLUSIVE', false), 'auto_delete' => env('RABBITMQ_QUEUE_AUTODELETE', false), 'arguments' => env('RABBITMQ_QUEUE_ARGUMENTS'), + + 'basic_consume' => env('RABBITMQ_QUEUE_BASIC_CONSUME', false), + 'basic_consume_timeout' => env('RABBITMQ_QUEUE_BASIC_CONSUME_TIMEOUT', false), ], ], diff --git a/src/Queue/BasicConsumeHandler.php b/src/Queue/BasicConsumeHandler.php new file mode 100644 index 00000000..3adb039d --- /dev/null +++ b/src/Queue/BasicConsumeHandler.php @@ -0,0 +1,81 @@ +context = $context; + $this->queue = $queue; + $this->options = $options; + } + + /** + * @param Container $container + * @param RabbitMQQueue $rabbitMQQueue + * @return RabbitMQJob|null + * @throws \Interop\Queue\Exception\SubscriptionConsumerNotSupportedException + */ + public function getJob(Container $container, RabbitMQQueue $rabbitMQQueue): ?RabbitMQJob + { + $this->job = null; + + if (! $this->subscriptionConsumer) { + $this->consumer = $this->context->createConsumer($this->queue); + $this->subscriptionConsumer = $this->context->createSubscriptionConsumer(); + + $this->subscriptionConsumer + ->subscribe($this->consumer, function ($message) use ($container, $rabbitMQQueue) { + $this->job = new RabbitMQJob($container, $rabbitMQQueue, $this->consumer, $message); + + return false; + }); + } + $this->subscriptionConsumer->consume($this->options['timeout'] ?? 10000); + + return $this->job; + } +} diff --git a/src/Queue/RabbitMQQueue.php b/src/Queue/RabbitMQQueue.php index c78dfe28..117336f9 100644 --- a/src/Queue/RabbitMQQueue.php +++ b/src/Queue/RabbitMQQueue.php @@ -24,6 +24,10 @@ class RabbitMQQueue extends Queue implements QueueContract protected $declaredExchanges = []; protected $declaredQueues = []; + /** + * @var BasicConsumeHandler + */ + private $jobConsumer; /** * @var AmqpContext @@ -152,18 +156,56 @@ public function release($delay, $job, $data, $queue, $attempts = 0) ]); } + + /** + * @param null $queueName + * @return RabbitMQJob|null + * @throws \Interop\Queue\Exception\SubscriptionConsumerNotSupportedException + */ + private function popBasicConsume($queueName): ?RabbitMQJob + { + [$queue] = $this->declareEverything($queueName); + + $options = []; + if ($timeout = $this->queueOptions['basic_consume_timeout'] ?? false) { + $options['timeout'] = $timeout; + } + $this->jobConsumer = $this->jobConsumer ?: new BasicConsumeHandler( + $this->context, + $queue, + $options + ); + + return $this->jobConsumer->getJob($this->container, $this); + } + + /** + * @param $queueName + * @return RabbitMQJob|null + */ + private function popPolling($queueName): ?RabbitMQJob + { + /** @var AmqpQueue $queue */ + [$queue] = $this->declareEverything($queueName); + + $consumer = $this->context->createConsumer($queue); + + if ($message = $consumer->receiveNoWait()) { + return new RabbitMQJob($this->container, $this, $consumer, $message); + } + + return null; + } + /** {@inheritdoc} */ public function pop($queueName = null) { try { - /** @var AmqpQueue $queue */ - [$queue] = $this->declareEverything($queueName); - - $consumer = $this->context->createConsumer($queue); - - if ($message = $consumer->receiveNoWait()) { - return new RabbitMQJob($this->container, $this, $consumer, $message); + if ($this->queueOptions['basic_consume'] ?? false) { + return $this->popBasicConsume($queueName); } + + return $this->popPolling($queueName); } catch (\Throwable $exception) { $this->reportConnectionError('pop', $exception); diff --git a/tests/Functional/SendAndReceiveDelayedMessageTest.php b/tests/Functional/SendAndReceiveDelayedMessageTest.php index 1a6ae946..195302be 100644 --- a/tests/Functional/SendAndReceiveDelayedMessageTest.php +++ b/tests/Functional/SendAndReceiveDelayedMessageTest.php @@ -16,7 +16,7 @@ */ class SendAndReceiveDelayedMessageTest extends TestCase { - public function test() + public function testPolling() { $config = [ 'factory_class' => AmqpConnectionFactory::class, @@ -85,6 +85,79 @@ public function test() $job->delete(); } + public function testBasicConsume() + { + $config = [ + 'factory_class' => AmqpConnectionFactory::class, + 'dsn' => null, + 'host' => getenv('HOST'), + 'port' => getenv('PORT'), + 'login' => 'guest', + 'password' => 'guest', + 'vhost' => '/', + 'options' => [ + 'exchange' => [ + 'name' => null, + 'declare' => true, + 'type' => \Interop\Amqp\AmqpTopic::TYPE_DIRECT, + 'passive' => false, + 'durable' => true, + 'auto_delete' => false, + ], + + 'queue' => [ + 'name' => 'default', + 'basic_consume' => true, + 'basic_consume_timeout' => 500, + 'declare' => true, + 'bind' => true, + 'passive' => false, + 'durable' => true, + 'exclusive' => false, + 'auto_delete' => false, + 'arguments' => '[]', + ], + ], + 'ssl_params' => [ + 'ssl_on' => false, + 'cafile' => null, + 'local_cert' => null, + 'local_key' => null, + 'verify_peer' => true, + 'passphrase' => null, + ], + ]; + + $connector = new RabbitMQConnector(new Dispatcher()); + /** @var RabbitMQQueue $queue */ + $queue = $connector->connect($config); + $queue->setContainer($this->createDummyContainer()); + + // we need it to declare exchange\queue on RabbitMQ side. + $queue->pushRaw('something'); + + $queue->getContext()->purgeQueue($queue->getContext()->createQueue('default')); + + $expectedPayload = __METHOD__.microtime(true); + + $queue->pushRaw($expectedPayload, null, ['delay' => 3]); + + sleep(1); //value bigger than queue.basic_consume.timeout here + + //triggers basic_consume timeout here, because basic consume blocks thread until receives message + //in normal case these are 2 separate processes, so it's ok + $this->assertNull($queue->pop()); + + sleep(4); + + $job = $queue->pop(); + + $this->assertInstanceOf(RabbitMQJob::class, $job); + $this->assertSame($expectedPayload, $job->getRawBody()); + + $job->delete(); + } + private function createDummyContainer() { $container = new Container(); diff --git a/tests/Functional/SendAndReceiveMessageTest.php b/tests/Functional/SendAndReceiveMessageTest.php index b5aa6a77..706916a2 100644 --- a/tests/Functional/SendAndReceiveMessageTest.php +++ b/tests/Functional/SendAndReceiveMessageTest.php @@ -16,7 +16,7 @@ */ class SendAndReceiveMessageTest extends TestCase { - public function test() + public function testPollingConsume() { $config = [ 'factory_class' => AmqpConnectionFactory::class, @@ -85,6 +85,77 @@ public function test() $this->assertEquals(0, $queue->size()); } + public function testBasicConsume() + { + $config = [ + 'factory_class' => AmqpConnectionFactory::class, + 'dsn' => null, + 'host' => getenv('HOST'), + 'port' => getenv('PORT'), + 'login' => 'guest', + 'password' => 'guest', + 'vhost' => '/', + 'options' => [ + 'exchange' => [ + 'name' => null, + 'declare' => true, + 'type' => \Interop\Amqp\AmqpTopic::TYPE_DIRECT, + 'passive' => false, + 'durable' => true, + 'auto_delete' => false, + ], + + 'queue' => [ + 'name' => 'default', + 'declare' => true, + 'bind' => true, + 'passive' => false, + 'durable' => true, + 'exclusive' => false, + 'auto_delete' => false, + 'basic_consume' => true, + 'basic_consume_timeout' => 10000, + 'arguments' => '[]', + ], + ], + 'ssl_params' => [ + 'ssl_on' => false, + 'cafile' => null, + 'local_cert' => null, + 'local_key' => null, + 'verify_peer' => true, + 'passphrase' => null, + ], + ]; + + $connector = new RabbitMQConnector(new Dispatcher()); + /** @var RabbitMQQueue $queue */ + $queue = $connector->connect($config); + $queue->setContainer($this->createDummyContainer()); + + // we need it to declare exchange\queue on RabbitMQ side. + $queue->pushRaw('something'); + + $queue->getContext()->purgeQueue($queue->getContext()->createQueue('default')); + + $expectedPayload = __METHOD__.microtime(true); + + $queue->pushRaw($expectedPayload); + + sleep(1); + + $this->assertEquals(1, $queue->size()); + + $job = $queue->pop(); + + $this->assertInstanceOf(RabbitMQJob::class, $job); + $this->assertSame($expectedPayload, $job->getRawBody()); + + $job->delete(); + + $this->assertEquals(0, $queue->size()); + } + private function createDummyContainer() { $container = new Container(); diff --git a/tests/Queue/RabbitMQQueueTest.php b/tests/Queue/RabbitMQQueueTest.php index ca9ebd8f..d1f10768 100644 --- a/tests/Queue/RabbitMQQueueTest.php +++ b/tests/Queue/RabbitMQQueueTest.php @@ -3,6 +3,7 @@ namespace VladimirYuldashev\LaravelQueueRabbitMQ\Queue; use Interop\Amqp\AmqpQueue; +use Interop\Amqp\AmqpSubscriptionConsumer; use Interop\Amqp\AmqpTopic; use Psr\Log\LoggerInterface; use Interop\Amqp\AmqpContext; @@ -392,6 +393,62 @@ public function testShouldLogExceptionOnPop() $queue->pop('aQueue'); } + public function testShouldUseBasicConsumeSubscriber() + { + $callback = function ($msg) { + // + }; + + $config = $this->createDummyConfig(); + $config['options']['queue']['basic_consume'] = true; + + $queue = $this->createMock(AmqpQueue::class); + + $consumer = $this->createMock(AmqpConsumer::class); + $subConsumer = $this->createMock(AmqpSubscriptionConsumer::class); + $subConsumer + ->expects($this->once()) + ->method('subscribe') + ->willReturnCallback(function ($context, $cb) use (&$callback) { + //save callback, to call it later + $callback = $cb; + }); + + $subConsumer + ->expects($this->once()) + ->method('consume') + ->willReturnCallback(function () use (&$callback) { + $callback(new \Interop\Amqp\Impl\AmqpMessage('thePayload')); + }); + + $context = $this->createAmqpContext(); + $context + ->expects($this->once()) + ->method('createSubscriptionConsumer') + ->willReturn($subConsumer); + + $context + ->expects($this->once()) + ->method('createConsumer') + ->willReturn($consumer); + + $context + ->expects($this->once()) + ->method('createQueue') + ->willReturn($queue); + + $context + ->expects($this->once()) + ->method('createTopic') + ->willReturn($this->createMock(AmqpTopic::class)); + + $queue = new RabbitMQQueue($context, $config); + $queue->setContainer($this->createDummyContainer()); + $job = $queue->pop('aQueue'); + + $this->assertInstanceOf(RabbitMQJob::class, $job); + } + /** * @return AmqpContext|\PHPUnit_Framework_MockObject_MockObject|AmqpContext */ From 2647cdf81f9808b470d44751ff21404a0a5a61f5 Mon Sep 17 00:00:00 2001 From: roman Date: Tue, 11 Jun 2019 10:45:17 +0300 Subject: [PATCH 2/2] StyleCI --- src/Queue/BasicConsumeHandler.php | 6 +++--- src/Queue/RabbitMQQueue.php | 1 - tests/Queue/RabbitMQQueueTest.php | 2 +- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/Queue/BasicConsumeHandler.php b/src/Queue/BasicConsumeHandler.php index 3adb039d..89635d89 100644 --- a/src/Queue/BasicConsumeHandler.php +++ b/src/Queue/BasicConsumeHandler.php @@ -2,11 +2,11 @@ namespace VladimirYuldashev\LaravelQueueRabbitMQ\Queue; -use Illuminate\Container\Container; -use Interop\Amqp\AmqpConsumer; +use Interop\Queue\Context; use Interop\Amqp\AmqpQueue; +use Interop\Amqp\AmqpConsumer; +use Illuminate\Container\Container; use Interop\Amqp\AmqpSubscriptionConsumer; -use Interop\Queue\Context; use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob; class BasicConsumeHandler diff --git a/src/Queue/RabbitMQQueue.php b/src/Queue/RabbitMQQueue.php index 117336f9..1b7107c9 100644 --- a/src/Queue/RabbitMQQueue.php +++ b/src/Queue/RabbitMQQueue.php @@ -156,7 +156,6 @@ public function release($delay, $job, $data, $queue, $attempts = 0) ]); } - /** * @param null $queueName * @return RabbitMQJob|null diff --git a/tests/Queue/RabbitMQQueueTest.php b/tests/Queue/RabbitMQQueueTest.php index d1f10768..4980b862 100644 --- a/tests/Queue/RabbitMQQueueTest.php +++ b/tests/Queue/RabbitMQQueueTest.php @@ -3,7 +3,6 @@ namespace VladimirYuldashev\LaravelQueueRabbitMQ\Queue; use Interop\Amqp\AmqpQueue; -use Interop\Amqp\AmqpSubscriptionConsumer; use Interop\Amqp\AmqpTopic; use Psr\Log\LoggerInterface; use Interop\Amqp\AmqpContext; @@ -12,6 +11,7 @@ use Interop\Amqp\AmqpProducer; use PHPUnit\Framework\TestCase; use Illuminate\Container\Container; +use Interop\Amqp\AmqpSubscriptionConsumer; use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob; class RabbitMQQueueTest extends TestCase