@@ -72,11 +72,13 @@ public function pushRaw($payload, $queue = null, array $options = [])
7272 $ queue = $ this ->declareDelayedQueue ($ queue , $ options ['delay ' ]);
7373 }
7474
75- // push job to a queue
76- $ message = new AMQPMessage ($ payload , [
77- 'Content-Type ' => 'application/json ' ,
78- 'delivery_mode ' => 2 ,
79- ]);
75+ $ defaultProperties = [
76+ 'content_type ' => 'application/json ' ,
77+ 'delivery_mode ' => AMQPMessage::DELIVERY_MODE_PERSISTENT ,
78+ ];
79+
80+ // push job to a queue
81+ $ message = new AMQPMessage ($ payload , $ defaultProperties + $ options );
8082
8183 // push task to a queue
8284 $ this ->channel ->basic_publish ($ message , $ exchange , $ queue );
@@ -218,4 +220,31 @@ private function declareDelayedQueue($destination, $delay)
218220 return $ name ;
219221 }
220222
223+ /**
224+ * Consume
225+ *
226+ * @param string $name
227+ * @param \Closure $callback
228+ */
229+ public function consume ($ name , \Closure $ callback )
230+ {
231+ $ name = $ this ->getQueueName ($ name );
232+ $ this ->channel ->basic_consume ($ name , '' , false , false , false , false , function (AMQPMessage $ message ) use ($ callback ) {
233+ $ deliveryTag = $ message ->delivery_info ['delivery_tag ' ];
234+ /** @var AMQPChannel $channel */
235+ $ channel = $ message ->delivery_info ['channel ' ];
236+
237+ $ result = call_user_func ($ callback , $ message );
238+
239+ if ($ result === true ) {
240+ $ channel ->basic_ack ($ deliveryTag );
241+ } else {
242+ $ channel ->basic_nack ($ deliveryTag );
243+ }
244+ });
245+
246+ while (count ($ this ->channel ->callbacks )) {
247+ $ this ->channel ->wait ();
248+ }
249+ }
221250}
0 commit comments