@@ -188,11 +188,18 @@ public function laterRaw($delay, $payload, $queue = null, $attempts = 0)
188188 {
189189 $ ttl = $ this ->secondsUntil ($ delay ) * 1000 ;
190190
191+ // default options
192+ $ options = ['delay ' => $ delay , 'attempts ' => $ attempts ];
193+
191194 // When no ttl just publish a new message to the exchange or queue
192195 if ($ ttl <= 0 ) {
193- return $ this ->pushRaw ($ payload , $ queue , [ ' delay ' => $ delay , ' attempts ' => $ attempts ] );
196+ return $ this ->pushRaw ($ payload , $ queue , $ options );
194197 }
195198
199+ // Create a main queue to handle delayed messages
200+ [$ mainDestination , $ exchange , $ exchangeType , $ attempts ] = $ this ->publishProperties ($ queue , $ options );
201+ $ this ->declareDestination ($ mainDestination , $ exchange , $ exchangeType );
202+
196203 $ destination = $ this ->getQueue ($ queue ).'.delay. ' .$ ttl ;
197204
198205 $ this ->declareQueue ($ destination , true , false , $ this ->getDelayQueueArguments ($ this ->getQueue ($ queue ), $ ttl ));
@@ -754,9 +761,9 @@ protected function getRoutingKey(string $destination): string
754761 protected function getExchangeType (?string $ type = null ): string
755762 {
756763 return @constant (AMQPExchangeType::class.':: ' .Str::upper ($ type ?: Arr::get (
757- $ this ->options ,
758- 'exchange_type '
759- ) ?: 'direct ' )) ?: AMQPExchangeType::DIRECT ;
764+ $ this ->options ,
765+ 'exchange_type '
766+ ) ?: 'direct ' )) ?: AMQPExchangeType::DIRECT ;
760767 }
761768
762769 /**
0 commit comments