Skip to content

Commit 9bb554b

Browse files
committed
Merge pull request vyuldashev#27 from ivan1986/fix_data_not_array
Fix data not array
2 parents b67565f + 58259b2 commit 9bb554b

File tree

2 files changed

+28
-52
lines changed

2 files changed

+28
-52
lines changed

src/FintechFab/LaravelQueueRabbitMQ/Queue/Jobs/RabbitMQJob.php

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
<?php namespace FintechFab\LaravelQueueRabbitMQ\Queue\Jobs;
22

3+
use FintechFab\LaravelQueueRabbitMQ\Queue\RabbitMQQueue;
34
use Illuminate\Queue\Jobs\Job;
45
use PhpAmqpLib\Channel\AMQPChannel;
56
use PhpAmqpLib\Message\AMQPMessage;
@@ -10,11 +11,13 @@ class RabbitMQJob extends Job
1011

1112
protected $channel;
1213
protected $queue;
14+
protected $connection;
1315
protected $message;
1416

15-
public function __construct($container, AMQPChannel $channel, $queue, AMQPMessage $message)
17+
public function __construct($container, RabbitMQQueue $connection, AMQPChannel $channel, $queue, AMQPMessage $message)
1618
{
1719
$this->container = $container;
20+
$this->connection = $connection;
1821
$this->channel = $channel;
1922
$this->queue = $queue;
2023
$this->message = $message;
@@ -79,17 +82,10 @@ public function release($delay = 0)
7982
$attempts = $this->attempts();
8083

8184
// write attempts to body
82-
$body['data']['attempts'] = $attempts + 1;
85+
$body['attempts'] = $attempts + 1;
8386

84-
$job = $body['job'];
85-
$data = $body['data'];
86-
87-
// push back to a queue
88-
if ($delay > 0) {
89-
Queue::later($delay, $job, $data, $this->getQueue());
90-
} else {
91-
Queue::push($job, $data, $this->getQueue());
92-
}
87+
$this->connection->pushRaw(json_encode($body), $this->getQueue(),
88+
$delay > 0 ? [ 'delay' => $delay ] : []);
9389
}
9490

9591
/**
@@ -101,7 +97,7 @@ public function attempts()
10197
{
10298
$body = json_decode($this->message->body, true);
10399

104-
return isset($body['data']['attempts']) ? $body['data']['attempts'] : 0;
100+
return isset($body['attempts']) ? $body['attempts'] : 0;
105101
}
106102

107103
/**

src/FintechFab/LaravelQueueRabbitMQ/Queue/RabbitMQQueue.php

Lines changed: 20 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -44,69 +44,49 @@ public function __construct(AMQPConnection $amqpConnection, $config)
4444
*/
4545
public function push($job, $data = '', $queue = null)
4646
{
47-
$queue = $this->getQueueName($queue);
48-
$payload = $this->createPayload($job, $data);
49-
$this->declareQueue($queue);
50-
51-
// push job to a queue
52-
$message = new AMQPMessage($payload, [
53-
'Content-Type' => 'application/json',
54-
'delivery_mode' => 2,
55-
]);
56-
57-
$this->channel->basic_publish($message, $queue, $queue);
58-
59-
return true;
47+
return $this->pushRaw($this->createPayload($job, $data), $queue, []);
6048
}
6149

6250
/**
63-
* Push a raw payload onto the queue.
51+
* Push a new job onto the queue after a delay.
6452
*
65-
* @param string $payload
66-
* @param string $queue
67-
* @param array $options
53+
* @param \DateTime|int $delay
54+
* @param string $job
55+
* @param mixed $data
56+
* @param string $queue
6857
*
6958
* @return mixed
7059
*/
71-
public function pushRaw($payload, $queue = null, array $options = [])
60+
public function later($delay, $job, $data = '', $queue = null)
7261
{
73-
$queue = $this->getQueueName($queue);
74-
$this->declareQueue($queue);
75-
76-
// push job to a queue
77-
$message = new AMQPMessage($payload, [
78-
'Content-Type' => 'application/json',
79-
'delivery_mode' => 2,
80-
]);
81-
82-
// push task to a queue
83-
$this->channel->basic_publish($message, $queue, $queue);
84-
85-
return true;
62+
return $this->pushRaw($this->createPayload($job, $data), $queue, ['delay' => $delay]);
8663
}
8764

8865
/**
89-
* Push a new job onto the queue after a delay.
66+
* Push a raw payload onto the queue.
9067
*
91-
* @param \DateTime|int $delay
92-
* @param string $job
93-
* @param mixed $data
94-
* @param string $queue
68+
* @param string $payload
69+
* @param string $queue
70+
* @param array $options
9571
*
9672
* @return mixed
9773
*/
98-
public function later($delay, $job, $data = '', $queue = null)
74+
public function pushRaw($payload, $queue = null, array $options = [])
9975
{
100-
$payload = $this->createPayload($job, $data);
76+
$queue = $this->getQueueName($queue);
10177
$this->declareQueue($queue);
102-
$queue = $this->declareDelayedQueue($queue, $delay);
78+
if (isset($options['delay']))
79+
{
80+
$queue = $this->declareDelayedQueue($queue, $options['delay']);
81+
}
10382

10483
// push job to a queue
10584
$message = new AMQPMessage($payload, [
10685
'Content-Type' => 'application/json',
10786
'delivery_mode' => 2,
10887
]);
10988

89+
// push task to a queue
11090
$this->channel->basic_publish($message, $queue, $queue);
11191

11292
return true;
@@ -130,7 +110,7 @@ public function pop($queue = null)
130110
$message = $this->channel->basic_get($queue);
131111

132112
if ($message instanceof AMQPMessage) {
133-
return new RabbitMQJob($this->container, $this->channel, $queue, $message);
113+
return new RabbitMQJob($this->container, $this, $this->channel, $queue, $message);
134114
}
135115

136116
return null;

0 commit comments

Comments
 (0)