77use PhpAmqpLib \Channel \AMQPChannel ;
88use PhpAmqpLib \Connection \AMQPConnection ;
99use PhpAmqpLib \Message \AMQPMessage ;
10+ use PhpAmqpLib \Wire \AMQPTable ;
1011
1112class RabbitMQQueue extends Queue implements QueueInterface
1213{
@@ -94,15 +95,15 @@ public function later($delay, $job, $data = '', $queue = null)
9495 {
9596 $ payload = $ this ->createPayload ($ job , $ data );
9697 $ this ->declareQueue ($ queue );
97- $ this ->declareDelayedQueue ($ queue , $ delay );
98+ $ queue = $ this ->declareDelayedQueue ($ queue , $ delay );
9899
99100 // push job to a queue
100101 $ message = new AMQPMessage ($ payload , [
101102 'Content-Type ' => 'application/json ' ,
102103 'delivery_mode ' => 2 ,
103104 ]);
104105
105- $ this ->channel ->basic_publish ($ message , $ this -> configExchange [ ' name ' ] );
106+ $ this ->channel ->basic_publish ($ message , $ queue );
106107
107108 return true ;
108109 }
@@ -152,9 +153,11 @@ public function getChannel()
152153 */
153154 public function declareQueue ($ name )
154155 {
156+ $ name = $ this ->getQueueName ($ name );
157+
155158 // declare queue
156159 $ this ->channel ->queue_declare (
157- $ this -> getQueueName ( $ name) ,
160+ $ name ,
158161 $ this ->configQueue ['passive ' ],
159162 $ this ->configQueue ['durable ' ],
160163 $ this ->configQueue ['exclusive ' ],
@@ -171,12 +174,14 @@ public function declareQueue($name)
171174 );
172175
173176 // bind queue to the exchange
174- $ this ->channel ->queue_bind ($ this -> getQueueName ( $ name) , $ this ->configExchange ['name ' ]);
177+ $ this ->channel ->queue_bind ($ name , $ this ->configExchange ['name ' ], $ name );
175178 }
176179
177180 /**
178181 * @param string $destination
179182 * @param DateTime|int $delay
183+ *
184+ * @return string
180185 */
181186 public function declareDelayedQueue ($ destination , $ delay )
182187 {
@@ -192,15 +197,15 @@ public function declareDelayedQueue($destination, $delay)
192197 $ this ->configQueue ['exclusive ' ],
193198 $ this ->configQueue ['auto_delete ' ],
194199 false ,
195- [
196- 'x-dead-letter-exchange ' => $ this ->configExchange ['name ' ],
197- 'x-dead-letter-routing-key ' => $ destination ,
200+ new AMQPTable ([
201+ 'x-dead-letter-exchange ' => $ destination ,
198202 'x-message-ttl ' => $ delay * 1000 ,
199- ]
203+ ])
200204 );
201205
202- // bind queue to the exchange
203- $ this ->channel ->queue_bind ($ name , $ this ->configExchange ['name ' ]);
206+ $ this ->channel ->queue_bind ($ name , $ this ->configExchange ['name ' ], $ name );
207+
208+ return $ name ;
204209 }
205210
206211}
0 commit comments