Skip to content

Commit 8893d88

Browse files
committed
add functional tests. fix unit tests.
1 parent e6f0f6b commit 8893d88

File tree

10 files changed

+383
-27
lines changed

10 files changed

+383
-27
lines changed

composer.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
},
1818
"require-dev": {
1919
"phpunit/phpunit": "~6.0",
20+
"illuminate/events": "5.5.*",
2021
"mockery/mockery": "^0.9.5"
2122
},
2223
"autoload": {

src/Queue/Connectors/RabbitMQConnector.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public function connect(array $config): Queue
6565

6666
$context = $factory->createContext();
6767

68-
$this->dispatcher->listen(WorkerStopping::class, function () use ($context) {
68+
$this->dispatcher->listen(WorkerStopping::class, function () use($context) {
6969
$context->close();
7070
});
7171

src/Queue/Jobs/RabbitMQJob.php

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,16 +30,12 @@ public function __construct(
3030
Container $container,
3131
RabbitMQQueue $connection,
3232
AmqpConsumer $consumer,
33-
string $queue,
34-
AmqpMessage $message,
35-
string $connectionName = null
33+
AmqpMessage $message
3634
) {
3735
$this->container = $container;
3836
$this->connection = $connection;
3937
$this->consumer = $consumer;
40-
$this->queue = $queue;
4138
$this->message = $message;
42-
$this->connectionName = $connectionName;
4339
}
4440

4541
/**

src/Queue/RabbitMQQueue.php

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -72,11 +72,14 @@ public function push($job, $data = '', $queue = null)
7272
public function pushRaw($payload, $queueName = null, array $options = [])
7373
{
7474
try {
75-
/** @var AmqpTopic $topic */
76-
list(, $topic) = $this->declareEverything($queueName);
75+
/**
76+
* @var AmqpTopic $topic
77+
* @var AmqpQueue $queue
78+
*/
79+
list($queue, $topic) = $this->declareEverything($queueName);
7780

7881
$message = $this->context->createMessage($payload);
79-
$message->setRoutingKey($queueName);
82+
$message->setRoutingKey($queue->getQueueName());
8083
$message->setCorrelationId($this->getCorrelationId());
8184
$message->setContentType('application/json');
8285
$message->setDeliveryMode(AmqpMessage::DELIVERY_MODE_PERSISTENT);
@@ -116,14 +119,7 @@ public function pop($queueName = null)
116119
$consumer = $this->context->createConsumer($queue);
117120

118121
if ($message = $consumer->receiveNoWait()) {
119-
return new RabbitMQJob(
120-
$this->container,
121-
$this,
122-
$consumer,
123-
$queueName,
124-
$message,
125-
$this->connectionName
126-
);
122+
return new RabbitMQJob($this->container, $this, $consumer, $message);
127123
}
128124
} catch (\Exception $exception) {
129125
$this->reportConnectionError('pop', $exception);
@@ -179,7 +175,7 @@ public function getContext(): AmqpContext
179175
*
180176
* @return array [Interop\Amqp\AmqpQueue, Interop\Amqp\AmqpTopic]
181177
*/
182-
private function declareEverything(string $queueName): array
178+
private function declareEverything(string $queueName = null): array
183179
{
184180
$queueName = $queueName ?: $this->defaultQueue;
185181
$exchangeName = $this->configExchange['name'] ?: $queueName;
@@ -224,7 +220,7 @@ private function declareEverything(string $queueName): array
224220
}
225221

226222
if ($this->declareBindQueue) {
227-
$this->context->bind(new AmqpBind($queue, $topic, $queueName));
223+
$this->context->bind(new AmqpBind($queue, $topic, $queue->getQueueName()));
228224
}
229225

230226
return [$queue, $topic];
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
<?php
2+
3+
namespace VladimirYuldashev\LaravelQueueRabbitMQ\Tests\Functional;
4+
5+
use Enqueue\AmqpLib\AmqpConnectionFactory;
6+
use Illuminate\Container\Container;
7+
use Illuminate\Events\Dispatcher;
8+
use Interop\Amqp\AmqpTopic;
9+
use PHPUnit\Framework\TestCase;
10+
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Connectors\RabbitMQConnector;
11+
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob;
12+
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue;
13+
14+
class SendAndReceiveDelayedMessageTest extends TestCase
15+
{
16+
public function test()
17+
{
18+
$config = [
19+
'factory_class' => AmqpConnectionFactory::class,
20+
'dsn' => null,
21+
'host' => getenv('HOST'),
22+
'port' => getenv('PORT'),
23+
'login' => 'guest',
24+
'password' => 'guest',
25+
'vhost' => '/',
26+
'queue' => 'queue_name',
27+
'exchange_declare' => true,
28+
'queue_declare' => true,
29+
'queue_declare_bind' => true,
30+
'queue_params' => [
31+
'passive' => false,
32+
'durable' => true,
33+
'exclusive' => false,
34+
'auto_delete' => false,
35+
'arguments' => null,
36+
],
37+
'exchange_params' => [
38+
'name' => null,
39+
'type' => AmqpTopic::TYPE_DIRECT,
40+
'passive' => false,
41+
'durable' => true,
42+
'auto_delete' => false,
43+
],
44+
'ssl_params' => [
45+
'ssl_on' => false,
46+
'cafile' => null,
47+
'local_cert' => null,
48+
'verify_peer' => true,
49+
'passphrase' => null,
50+
]
51+
];
52+
53+
$connector = new RabbitMQConnector(new Dispatcher());
54+
/** @var RabbitMQQueue $queue */
55+
$queue = $connector->connect($config);
56+
$queue->setContainer(new Container());
57+
58+
// we need it to declare exchange\queue on RabbitMQ side.
59+
$queue->pushRaw('something');
60+
61+
$queue->getContext()->purgeQueue($queue->getContext()->createQueue('queue_name'));
62+
63+
$expectedPayload = __METHOD__.microtime(true);
64+
65+
$queue->pushRaw($expectedPayload, null, ['delay' => 3]);
66+
67+
sleep(1);
68+
69+
$this->assertNull($queue->pop());
70+
71+
sleep(4);
72+
73+
$job = $queue->pop();
74+
75+
$this->assertInstanceOf(RabbitMQJob::class, $job);
76+
$this->assertSame($expectedPayload, $job->getRawBody());
77+
78+
$job->delete();
79+
}
80+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
<?php
2+
3+
namespace VladimirYuldashev\LaravelQueueRabbitMQ\Tests\Functional;
4+
5+
use Enqueue\AmqpLib\AmqpConnectionFactory;
6+
use Illuminate\Container\Container;
7+
use Illuminate\Events\Dispatcher;
8+
use Interop\Amqp\AmqpTopic;
9+
use PHPUnit\Framework\TestCase;
10+
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Connectors\RabbitMQConnector;
11+
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob;
12+
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue;
13+
14+
class SendAndReceiveMessageTest extends TestCase
15+
{
16+
public function test()
17+
{
18+
$config = [
19+
'factory_class' => AmqpConnectionFactory::class,
20+
'dsn' => null,
21+
'host' => getenv('HOST'),
22+
'port' => getenv('PORT'),
23+
'login' => 'guest',
24+
'password' => 'guest',
25+
'vhost' => '/',
26+
'queue' => 'queue_name',
27+
'exchange_declare' => true,
28+
'queue_declare' => true,
29+
'queue_declare_bind' => true,
30+
'queue_params' => [
31+
'passive' => false,
32+
'durable' => true,
33+
'exclusive' => false,
34+
'auto_delete' => false,
35+
'arguments' => null,
36+
],
37+
'exchange_params' => [
38+
'name' => null,
39+
'type' => AmqpTopic::TYPE_DIRECT,
40+
'passive' => false,
41+
'durable' => true,
42+
'auto_delete' => false,
43+
],
44+
'ssl_params' => [
45+
'ssl_on' => false,
46+
'cafile' => null,
47+
'local_cert' => null,
48+
'verify_peer' => true,
49+
'passphrase' => null,
50+
]
51+
];
52+
53+
$connector = new RabbitMQConnector(new Dispatcher());
54+
/** @var RabbitMQQueue $queue */
55+
$queue = $connector->connect($config);
56+
$queue->setContainer(new Container());
57+
58+
// we need it to declare exchange\queue on RabbitMQ side.
59+
$queue->pushRaw('something');
60+
61+
$queue->getContext()->purgeQueue($queue->getContext()->createQueue('queue_name'));
62+
63+
$expectedPayload = __METHOD__.microtime(true);
64+
65+
$queue->pushRaw($expectedPayload);
66+
67+
sleep(1);
68+
69+
$job = $queue->pop();
70+
71+
$this->assertInstanceOf(RabbitMQJob::class, $job);
72+
$this->assertSame($expectedPayload, $job->getRawBody());
73+
74+
$job->delete();
75+
}
76+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
<?php
2+
3+
namespace VladimirYuldashev\LaravelQueueRabbitMQ\Tests\Functional;
4+
5+
use Enqueue\AmqpLib\AmqpConnectionFactory;
6+
use Enqueue\AmqpLib\AmqpContext;
7+
use Illuminate\Events\Dispatcher;
8+
use Interop\Amqp\AmqpTopic;
9+
use PhpAmqpLib\Connection\AMQPSSLConnection;
10+
use PHPUnit\Framework\TestCase;
11+
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Connectors\RabbitMQConnector;
12+
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue;
13+
14+
class SslConnectionTest extends TestCase
15+
{
16+
public function testConnectorEstablishSecureConnectionWithRabbitMQBroker()
17+
{
18+
// $config = [
19+
// 'factory_class' => AmqpConnectionFactory::class,
20+
// 'dsn' => null,
21+
// 'host' => getenv('HOST'),
22+
// 'port' => getenv('PORT_SSL'),
23+
// 'login' => 'guest',
24+
// 'password' => 'guest',
25+
// 'vhost' => '/',
26+
//
27+
// 'queue' => 'queue_name',
28+
// 'exchange_declare' => true,
29+
// 'queue_declare' => true,
30+
// 'queue_declare_bind' => true,
31+
//
32+
// 'queue_params' => [
33+
// 'passive' => false,
34+
// 'durable' => true,
35+
// 'exclusive' => false,
36+
// 'auto_delete' => false,
37+
// 'arguments' => null,
38+
// ],
39+
// 'exchange_params' => [
40+
// 'name' => null,
41+
// 'type' => AmqpTopic::TYPE_DIRECT,
42+
// 'passive' => false,
43+
// 'durable' => true,
44+
// 'auto_delete' => false,
45+
// ],
46+
// 'ssl_params' => [
47+
// 'cafile' => getenv('RABBITMQ_SSL_CAFILE')
48+
// ]
49+
// ];
50+
//
51+
// $connector = new RabbitMQConnector(new Dispatcher());
52+
// /** @var RabbitMQQueue $queue */
53+
// $queue = $connector->connect($config);
54+
//
55+
// $this->assertInstanceOf(RabbitMQQueue::class, $queue);
56+
//
57+
// /** @var AmqpContext $context */
58+
// $context = $queue->getContext();
59+
// $this->assertInstanceOf(AmqpContext::class, $context);
60+
//
61+
// $this->assertInstanceOf(AMQPSSLConnection::class, $context->getLibChannel()->getConnection());
62+
// $this->assertTrue($context->getLibChannel()->getConnection()->isConnected());
63+
}
64+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
<?php
2+
3+
namespace VladimirYuldashev\LaravelQueueRabbitMQ\Tests\Functional;
4+
5+
use Enqueue\AmqpLib\AmqpConnectionFactory;
6+
use Enqueue\AmqpLib\AmqpContext;
7+
use Illuminate\Events\Dispatcher;
8+
use Interop\Amqp\AmqpTopic;
9+
use PhpAmqpLib\Connection\AMQPStreamConnection;
10+
use PHPUnit\Framework\TestCase;
11+
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Connectors\RabbitMQConnector;
12+
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue;
13+
14+
class StreamConnectionTest extends TestCase
15+
{
16+
public function testConnectorEstablishSecureConnectionWithRabbitMQBroker()
17+
{
18+
$config = [
19+
'factory_class' => AmqpConnectionFactory::class,
20+
'dsn' => null,
21+
'host' => getenv('HOST'),
22+
'port' => getenv('PORT'),
23+
'login' => 'guest',
24+
'password' => 'guest',
25+
'vhost' => '/',
26+
'queue' => 'queue_name',
27+
'exchange_declare' => true,
28+
'queue_declare' => true,
29+
'queue_declare_bind' => true,
30+
'queue_params' => [
31+
'passive' => false,
32+
'durable' => true,
33+
'exclusive' => false,
34+
'auto_delete' => false,
35+
'arguments' => null,
36+
],
37+
'exchange_params' => [
38+
'name' => null,
39+
'type' => AmqpTopic::TYPE_DIRECT,
40+
'passive' => false,
41+
'durable' => true,
42+
'auto_delete' => false,
43+
],
44+
'ssl_params' => [
45+
'ssl_on' => false,
46+
'cafile' => null,
47+
'local_cert' => null,
48+
'verify_peer' => true,
49+
'passphrase' => null,
50+
]
51+
];
52+
53+
$connector = new RabbitMQConnector(new Dispatcher());
54+
/** @var RabbitMQQueue $queue */
55+
$queue = $connector->connect($config);
56+
57+
$this->assertInstanceOf(RabbitMQQueue::class, $queue);
58+
59+
/** @var AmqpContext $context */
60+
$context = $queue->getContext();
61+
$this->assertInstanceOf(AmqpContext::class, $context);
62+
63+
$this->assertInstanceOf(AMQPStreamConnection::class, $context->getLibChannel()->getConnection());
64+
$this->assertTrue($context->getLibChannel()->getConnection()->isConnected());
65+
}
66+
}

0 commit comments

Comments
 (0)