44use FintechFab \LaravelQueueRabbitMQ \Queue \Jobs \RabbitMQJob ;
55use Illuminate \Contracts \Queue \Queue as QueueContract ;
66use Illuminate \Queue \Queue ;
7- use Illuminate \Queue \QueueInterface ;
87use PhpAmqpLib \Channel \AMQPChannel ;
98use PhpAmqpLib \Connection \AMQPConnection ;
109use PhpAmqpLib \Message \AMQPMessage ;
@@ -45,19 +44,7 @@ public function __construct(AMQPConnection $amqpConnection, $config)
4544 */
4645 public function push ($ job , $ data = '' , $ queue = null )
4746 {
48- $ queue = $ this ->getQueueName ($ queue );
49- $ payload = $ this ->createPayload ($ job , $ data );
50- $ this ->declareQueue ($ queue );
51-
52- // push job to a queue
53- $ message = new AMQPMessage ($ payload , [
54- 'Content-Type ' => 'application/json ' ,
55- 'delivery_mode ' => 2 ,
56- ]);
57-
58- $ this ->channel ->basic_publish ($ message , $ queue , $ queue );
59-
60- return true ;
47+ return $ this ->pushRaw ($ this ->createPayload ($ job , $ data ), $ queue , []);
6148 }
6249
6350 /**
@@ -73,6 +60,9 @@ public function pushRaw($payload, $queue = null, array $options = [])
7360 {
7461 $ queue = $ this ->getQueueName ($ queue );
7562 $ this ->declareQueue ($ queue );
63+ if (isset ($ options ['delay ' ])) {
64+ $ queue = $ this ->declareDelayedQueue ($ queue , $ options ['delay ' ]);
65+ }
7666
7767 // push job to a queue
7868 $ message = new AMQPMessage ($ payload , [
@@ -98,19 +88,7 @@ public function pushRaw($payload, $queue = null, array $options = [])
9888 */
9989 public function later ($ delay , $ job , $ data = '' , $ queue = null )
10090 {
101- $ payload = $ this ->createPayload ($ job , $ data );
102- $ this ->declareQueue ($ queue );
103- $ queue = $ this ->declareDelayedQueue ($ queue , $ delay );
104-
105- // push job to a queue
106- $ message = new AMQPMessage ($ payload , [
107- 'Content-Type ' => 'application/json ' ,
108- 'delivery_mode ' => 2 ,
109- ]);
110-
111- $ this ->channel ->basic_publish ($ message , $ queue , $ queue );
112-
113- return true ;
91+ return $ this ->pushRaw ($ this ->createPayload ($ job , $ data ), $ queue , ['delay ' => $ delay ]);
11492 }
11593
11694 /**
@@ -131,7 +109,7 @@ public function pop($queue = null)
131109 $ message = $ this ->channel ->basic_get ($ queue );
132110
133111 if ($ message instanceof AMQPMessage) {
134- return new RabbitMQJob ($ this ->container , $ this ->channel , $ queue , $ message );
112+ return new RabbitMQJob ($ this ->container , $ this , $ this ->channel , $ queue , $ message );
135113 }
136114
137115 return null ;
0 commit comments