44
55use PhpAmqpLib \Connection \AbstractConnection ;
66use Salesmessage \LibRabbitMQ \Dto \ConnectionNameDto ;
7+ use Salesmessage \LibRabbitMQ \Dto \QueueApiDto ;
8+ use Salesmessage \LibRabbitMQ \Dto \VhostApiDto ;
79use Salesmessage \LibRabbitMQ \Queue \RabbitMQQueue as BaseRabbitMQQueue ;
810use PhpAmqpLib \Exception \AMQPChannelClosedException ;
911use PhpAmqpLib \Exception \AMQPConnectionClosedException ;
1012use PhpAmqpLib \Channel \AMQPChannel ;
13+ use Salesmessage \LibRabbitMQ \Services \GroupsService ;
14+ use Salesmessage \LibRabbitMQ \Services \InternalStorageManager ;
1115use Salesmessage \LibRabbitMQ \Services \VhostsService ;
1216
1317class RabbitMQQueueBatchable extends BaseRabbitMQQueue
1418{
15- protected function publishBasic ($ msg , $ exchange = '' , $ destination = '' , $ mandatory = false , $ immediate = false , $ ticket = null ): void
19+ private InternalStorageManager $ internalStorageManager ;
20+
21+ private GroupsService $ groupsService ;
22+
23+ private VhostsService $ vhostsService ;
24+
25+ /**
26+ * @param QueueConfig $config
27+ */
28+ public function __construct (QueueConfig $ config )
29+ {
30+ $ this ->internalStorageManager = app (InternalStorageManager::class);
31+ $ this ->groupsService = app (GroupsService::class);
32+ $ this ->vhostsService = app (VhostsService::class);
33+
34+ parent ::__construct ($ config );
35+ }
36+
37+ protected function publishBasic (
38+ $ msg ,
39+ $ exchange = '' ,
40+ $ destination = '' ,
41+ $ mandatory = false ,
42+ $ immediate = false ,
43+ $ ticket = null
44+ ): void
1645 {
1746 try {
1847 parent ::publishBasic ($ msg , $ exchange , $ destination , $ mandatory , $ immediate , $ ticket );
@@ -61,6 +90,10 @@ public function push($job, $data = '', $queue = null)
6190 $ result = parent ::push ($ job , $ data , $ queue );
6291 }
6392
93+ if (config ('queue.connections.rabbitmq_vhosts.immediate_indexation ' )) {
94+ $ this ->addQueueToIndex ((string ) $ queue );
95+ }
96+
6497 return $ result ;
6598 }
6699
@@ -81,10 +114,39 @@ private function createNotExistsVhost(): bool
81114 return false ;
82115 }
83116
84- /** @var VhostsService $vhostsService */
85- $ vhostsService = app (VhostsService::class);
86-
87- return $ vhostsService ->createVhost ($ dto ->getVhostName (), 'Automatically created vhost ' );
117+ return $ this ->vhostsService ->createVhost ($ dto ->getVhostName (), 'Automatically created vhost ' );
88118 }
89119
120+ /**
121+ * @param string $queue
122+ * @return bool
123+ */
124+ private function addQueueToIndex (string $ queue ): bool
125+ {
126+ if ('' === $ queue ) {
127+ return false ;
128+ }
129+
130+ $ dto = new ConnectionNameDto ($ this ->getConnectionName ());
131+ $ vhostName = $ dto ->getVhostName ();
132+ if (null === $ vhostName ) {
133+ return false ;
134+ }
135+
136+ $ groups = $ this ->groupsService ->getAllGroupsNames ();
137+
138+ $ queueApiDto = new QueueApiDto ([
139+ 'name ' => $ queue ,
140+ 'vhost ' => $ vhostName ,
141+ ]);
142+ $ isQueueActivated = $ this ->internalStorageManager ->activateQueue ($ queueApiDto , $ groups );
143+
144+ $ vhostDto = new VhostApiDto ([
145+ 'name ' => $ vhostName ,
146+ ]);
147+ $ isVhostActivated = $ this ->internalStorageManager ->activateVhost ($ vhostDto , $ groups );
148+
149+ return $ isQueueActivated && $ isVhostActivated ;
150+ }
90151}
152+
0 commit comments