@@ -22,7 +22,9 @@ class RabbitMQQueue extends Queue implements QueueContract
2222 protected $ channel ;
2323
2424 protected $ declareExchange ;
25+ protected $ declaredExchanges = [];
2526 protected $ declareBindQueue ;
27+ protected $ declaredQueues = [];
2628
2729 protected $ defaultQueue ;
2830 protected $ configQueue ;
@@ -40,7 +42,7 @@ class RabbitMQQueue extends Queue implements QueueContract
4042
4143 /**
4244 * @param AMQPStreamConnection $amqpConnection
43- * @param array $config
45+ * @param array $config
4446 */
4547 public function __construct (AMQPStreamConnection $ amqpConnection , $ config )
4648 {
@@ -69,7 +71,7 @@ public function size($queue = null)
6971 * Push a new job onto the queue.
7072 *
7173 * @param string $job
72- * @param mixed $data
74+ * @param mixed $data
7375 * @param string $queue
7476 *
7577 * @return bool
@@ -84,7 +86,7 @@ public function push($job, $data = '', $queue = null)
8486 *
8587 * @param string $payload
8688 * @param string $queue
87- * @param array $options
89+ * @param array $options
8890 *
8991 * @return mixed
9092 */
@@ -99,7 +101,7 @@ public function pushRaw($payload, $queue = null, array $options = [])
99101 }
100102
101103 $ headers = [
102- 'Content-Type ' => 'application/json ' ,
104+ 'Content-Type ' => 'application/json ' ,
103105 'delivery_mode ' => 2 ,
104106 ];
105107
@@ -123,9 +125,9 @@ public function pushRaw($payload, $queue = null, array $options = [])
123125 * Push a new job onto the queue after a delay.
124126 *
125127 * @param \DateTime|int $delay
126- * @param string $job
127- * @param mixed $data
128- * @param string $queue
128+ * @param string $job
129+ * @param mixed $data
130+ * @param string $queue
129131 *
130132 * @return mixed
131133 */
@@ -184,7 +186,7 @@ private function declareQueue($name)
184186 $ name = $ this ->getQueueName ($ name );
185187 $ exchange = $ this ->configExchange ['name ' ] ?: $ name ;
186188
187- if ($ this ->declareExchange ) {
189+ if ($ this ->declareExchange && ! in_array ( $ exchange , $ this -> declaredExchanges ) ) {
188190 // declare exchange
189191 $ this ->channel ->exchange_declare (
190192 $ exchange ,
@@ -193,9 +195,11 @@ private function declareQueue($name)
193195 $ this ->configExchange ['durable ' ],
194196 $ this ->configExchange ['auto_delete ' ]
195197 );
198+
199+ $ this ->declaredExchanges [] = $ exchange ;
196200 }
197201
198- if ($ this ->declareBindQueue ) {
202+ if ($ this ->declareBindQueue && ! in_array ( $ name , $ this -> declaredQueues ) ) {
199203 // declare queue
200204 $ this ->channel ->queue_declare (
201205 $ name ,
@@ -207,13 +211,15 @@ private function declareQueue($name)
207211
208212 // bind queue to the exchange
209213 $ this ->channel ->queue_bind ($ name , $ exchange , $ name );
214+
215+ $ this ->declaredQueues [] = $ name ;
210216 }
211217
212218 return [$ name , $ exchange ];
213219 }
214220
215221 /**
216- * @param string $destination
222+ * @param string $destination
217223 * @param DateTime|int $delay
218224 *
219225 * @return string
@@ -223,32 +229,36 @@ private function declareDelayedQueue($destination, $delay)
223229 $ delay = $ this ->getSeconds ($ delay );
224230 $ destination = $ this ->getQueueName ($ destination );
225231 $ destinationExchange = $ this ->configExchange ['name ' ] ?: $ destination ;
226- $ name = $ this ->getQueueName ($ destination ). '_deferred_ ' . $ delay ;
232+ $ name = $ this ->getQueueName ($ destination ) . '_deferred_ ' . $ delay ;
227233 $ exchange = $ this ->configExchange ['name ' ] ?: $ destination ;
228234
229235 // declare exchange
230- $ this ->channel ->exchange_declare (
231- $ exchange ,
232- $ this ->configExchange ['type ' ],
233- $ this ->configExchange ['passive ' ],
234- $ this ->configExchange ['durable ' ],
235- $ this ->configExchange ['auto_delete ' ]
236- );
236+ if (!in_array ($ exchange , $ this ->declaredExchanges )) {
237+ $ this ->channel ->exchange_declare (
238+ $ exchange ,
239+ $ this ->configExchange ['type ' ],
240+ $ this ->configExchange ['passive ' ],
241+ $ this ->configExchange ['durable ' ],
242+ $ this ->configExchange ['auto_delete ' ]
243+ );
244+ }
237245
238246 // declare queue
239- $ this ->channel ->queue_declare (
240- $ name ,
241- $ this ->configQueue ['passive ' ],
242- $ this ->configQueue ['durable ' ],
243- $ this ->configQueue ['exclusive ' ],
244- $ this ->configQueue ['auto_delete ' ],
245- false ,
246- new AMQPTable ([
247- 'x-dead-letter-exchange ' => $ destinationExchange ,
248- 'x-dead-letter-routing-key ' => $ destination ,
249- 'x-message-ttl ' => $ delay * 1000 ,
250- ])
251- );
247+ if (!in_array ($ name , $ this ->declaredQueues )) {
248+ $ this ->channel ->queue_declare (
249+ $ name ,
250+ $ this ->configQueue ['passive ' ],
251+ $ this ->configQueue ['durable ' ],
252+ $ this ->configQueue ['exclusive ' ],
253+ $ this ->configQueue ['auto_delete ' ],
254+ false ,
255+ new AMQPTable ([
256+ 'x-dead-letter-exchange ' => $ destinationExchange ,
257+ 'x-dead-letter-routing-key ' => $ destination ,
258+ 'x-message-ttl ' => $ delay * 1000 ,
259+ ])
260+ );
261+ }
252262
253263 // bind queue to the exchange
254264 $ this ->channel ->queue_bind ($ name , $ exchange , $ name );
0 commit comments