Skip to content

Commit 68e35f8

Browse files
SWR-15367
1 parent 2e0ea6e commit 68e35f8

File tree

7 files changed

+394
-188
lines changed

7 files changed

+394
-188
lines changed

composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
},
3535
"extra": {
3636
"branch-alias": {
37-
"dev-master": "13.57-dev"
37+
"dev-master": "13.58-dev"
3838
},
3939
"laravel": {
4040
"providers": [

src/Console/ScanVhostsCommand.php

Lines changed: 93 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,6 @@ public function __construct(
4242
*/
4343
public function handle()
4444
{
45-
$this->internalStorageManager->getVhostQueues('/');
46-
47-
4845
$connectionName = (string) ($this->argument('connection') ?: $this->laravel['config']['queue.default']);
4946

5047
$connectionConfig = $this->laravel['config']['queue']['connections'][$connectionName] ?? [];
@@ -57,14 +54,26 @@ public function handle()
5754

5855
$this->loadVhosts();
5956

57+
$oldVhosts = $this->internalStorageManager->getVhosts();
58+
6059
if ($this->vhosts->isNotEmpty()) {
6160
foreach ($this->vhosts as $vhost) {
62-
$this->processVhost($vhost);
61+
$vhostDto = $this->processVhost($vhost);
62+
if (null === $vhostDto) {
63+
continue;
64+
}
65+
66+
$oldVhostIndex = array_search($vhostDto->getName(), $oldVhosts, true);
67+
if (false !== $oldVhostIndex) {
68+
unset($oldVhosts[$oldVhostIndex]);
69+
}
6370
}
6471
} else {
6572
$this->warn(sprintf('Vhosts for connection "%s" not found.', $connectionName));
6673
}
6774

75+
$this->removeOldsVhosts($oldVhosts);
76+
6877
return Command::SUCCESS;
6978
}
7079

@@ -104,45 +113,84 @@ private function loadVhosts(int $page = 1, int $pageSize = 100): void
104113

105114
/**
106115
* @param array $vhostApiData
107-
* @return void
116+
* @return VhostApiDto|null
108117
*/
109-
private function processVhost(array $vhostApiData): void
118+
private function processVhost(array $vhostApiData): ?VhostApiDto
110119
{
111120
$vhostDto = new VhostApiDto($vhostApiData);
112121
if ('' === $vhostDto->getName()) {
113-
return;
122+
return null;
114123
}
115124

116125
$indexedSuccessfully = $this->internalStorageManager->indexVhost($vhostDto);
117126
if (!$indexedSuccessfully) {
118127
$this->warn(sprintf(
119-
'Skip processing vhost: "%s". Messages ready: %d.',
128+
'Skip indexation vhost: "%s". Messages ready: %d.',
120129
$vhostDto->getName(),
121130
$vhostDto->getMessagesReady()
122131
));
123132

124-
return;
133+
return null;
125134
}
126135

127136
$this->info(sprintf(
128-
'Start processing vhost: "%s". Messages ready: %d.',
137+
'Successfully indexed vhost: "%s". Messages ready: %d.',
129138
$vhostDto->getName(),
130139
$vhostDto->getMessagesReady()
131140
));
132141

133142
$this->vhostQueues = new Collection();
143+
134144
$this->loadVhostQueues($vhostDto);
135145

146+
$oldVhostQueues = $this->internalStorageManager->getVhostQueues($vhostDto->getName());
147+
136148
if ($this->vhostQueues->isNotEmpty()) {
137149
foreach ($this->vhostQueues as $queueApiData) {
138-
$this->processVhostQueue($queueApiData);
150+
$processQueueDto = $this->processVhostQueue($queueApiData);
151+
if (null === $processQueueDto) {
152+
continue;
153+
}
154+
155+
$oldVhostQueueIndex = array_search($processQueueDto->getName(), $oldVhostQueues, true);
156+
if (false !== $oldVhostQueueIndex) {
157+
unset($oldVhostQueues[$oldVhostQueueIndex]);
158+
}
139159
}
140160
} else {
141161
$this->warn(sprintf(
142162
'Queues for vhost "%s" not found.',
143163
$vhostDto->getName()
144164
));
145165
}
166+
167+
$this->removeOldVhostQueues($vhostDto, $oldVhostQueues);
168+
169+
return $vhostDto;
170+
}
171+
172+
/**
173+
* @param array $oldVhosts
174+
* @return void
175+
*/
176+
private function removeOldsVhosts(array $oldVhosts): void
177+
{
178+
if (empty($oldVhosts)) {
179+
return;
180+
}
181+
182+
foreach ($oldVhosts as $oldVhostName) {
183+
$vhostDto = new VhostApiDto([
184+
'name' => $oldVhostName,
185+
]);
186+
187+
$this->internalStorageManager->removeVhost($vhostDto);
188+
189+
$this->warn(sprintf(
190+
'Removed from index vhost: "%s".',
191+
$vhostDto->getName()
192+
));
193+
}
146194
}
147195

148196
/**
@@ -190,31 +238,60 @@ private function loadVhostQueues(VhostApiDto $vhostDto, int $page = 1, int $page
190238
* @param array $queueApiData
191239
* @return void
192240
*/
193-
private function processVhostQueue(array $queueApiData): void
241+
private function processVhostQueue(array $queueApiData): ?QueueApiDto
194242
{
195243
$queueApiDto = new QueueApiDto($queueApiData);
196244
if ('' === $queueApiDto->getName()) {
197-
return;
245+
return null;
198246
}
199247

200248
$indexedSuccessfully = $this->internalStorageManager->indexQueue($queueApiDto);
201249
if (!$indexedSuccessfully) {
202250
$this->warn(sprintf(
203-
'Skip processing queue: "%s". Vhost: %s. Messages ready: %d.',
251+
'Skip indexation queue: "%s". Vhost: %s. Messages ready: %d.',
204252
$queueApiDto->getName(),
205253
$queueApiDto->getVhostName(),
206254
$queueApiDto->getMessagesReady()
207255
));
208256

209-
return;
257+
return null;
210258
}
211259

212260
$this->info(sprintf(
213-
'Start processing queue: "%s". Vhost: %s. Messages ready: %d.',
261+
'Successfully indexed queue: "%s". Vhost: %s. Messages ready: %d.',
214262
$queueApiDto->getName(),
215263
$queueApiDto->getVhostName(),
216264
$queueApiDto->getMessagesReady()
217265
));
266+
267+
return $queueApiDto;
268+
}
269+
270+
/**
271+
* @param VhostApiDto $vhostDto
272+
* @param array $oldVhostQueues
273+
* @return void
274+
*/
275+
private function removeOldVhostQueues(VhostApiDto $vhostDto, array $oldVhostQueues): void
276+
{
277+
if (empty($oldVhostQueues)) {
278+
return;
279+
}
280+
281+
foreach ($oldVhostQueues as $oldQueueName) {
282+
$queueApiDto = new QueueApiDto([
283+
'name' => $oldQueueName,
284+
'vhost' => $vhostDto->getName(),
285+
]);
286+
287+
$this->internalStorageManager->removeQueue($queueApiDto);
288+
289+
$this->warn(sprintf(
290+
'Removed from index queue: "%s". Vhost: %s.',
291+
$queueApiDto->getName(),
292+
$queueApiDto->getVhostName()
293+
));
294+
}
218295
}
219296
}
220297

src/Dto/QueueApiDto.php

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ class QueueApiDto
1414

1515
private int $messagesUnacknowledged = 0;
1616

17+
private int $lastProcessedAt = 0;
18+
1719
/**
1820
* @param array $data
1921
*/
@@ -67,18 +69,41 @@ public function getMessagesUnacknowledged(): int
6769
return $this->messagesUnacknowledged;
6870
}
6971

72+
/**
73+
* @return int
74+
*/
75+
public function getLastProcessedAt(): int
76+
{
77+
return $this->lastProcessedAt;
78+
}
79+
80+
/**
81+
* @param int $lastProcessedAt
82+
* @return $this
83+
*/
84+
public function setLastProcessedAt(int $lastProcessedAt): self
85+
{
86+
$this->lastProcessedAt = $lastProcessedAt;
87+
return $this;
88+
}
89+
7090
/**
7191
* @return array
7292
*/
73-
public function toInternalData(): array
93+
public function toInternalData(bool $withLastProcessedAt = false): array
7494
{
75-
return [
95+
$data = [
7696
'name' => $this->getName(),
7797
'vhost' => $this->getVhostName(),
7898
'messages' => $this->getMessages(),
7999
'messages_ready' => $this->getMessagesReady(),
80100
'messages_unacknowledged' => $this->getMessagesUnacknowledged(),
81101
];
102+
if ($withLastProcessedAt) {
103+
$data['last_processed_at'] = $this->getLastProcessedAt();
104+
}
105+
106+
return $data;
82107
}
83108
}
84109

src/Dto/VhostApiDto.php

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ class VhostApiDto
1212

1313
private int $messagesUnacknowledged = 0;
1414

15+
private int $lastProcessedAt = 0;
16+
1517
/**
1618
* @param array $data
1719
*/
@@ -64,17 +66,40 @@ public function getMessagesUnacknowledged(): int
6466
return $this->messagesUnacknowledged;
6567
}
6668

69+
/**
70+
* @return int
71+
*/
72+
public function getLastProcessedAt(): int
73+
{
74+
return $this->lastProcessedAt;
75+
}
76+
77+
/**
78+
* @param int $lastProcessedAt
79+
* @return $this
80+
*/
81+
public function setLastProcessedAt(int $lastProcessedAt): self
82+
{
83+
$this->lastProcessedAt = $lastProcessedAt;
84+
return $this;
85+
}
86+
6787
/**
6888
* @return array
6989
*/
70-
public function toInternalData(): array
90+
public function toInternalData(bool $withLastProcessedAt = false): array
7191
{
72-
return [
92+
$data = [
7393
'name' => $this->getName(),
7494
'messages' => $this->getMessages(),
7595
'messages_ready' => $this->getMessagesReady(),
7696
'messages_unacknowledged' => $this->getMessagesUnacknowledged(),
7797
];
98+
if ($withLastProcessedAt) {
99+
$data['last_processed_at'] = $this->getLastProcessedAt();
100+
}
101+
102+
return $data;
78103
}
79104
}
80105

src/LaravelQueueRabbitMQServiceProvider.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ public function register(): void
8484
};
8585

8686
return new VhostsConsumer(
87+
$this->app[InternalStorageManager::class],
8788
$this->app['rabbitmq_queue'],
8889
$this->app['events'],
8990
$this->app[ExceptionHandler::class],

0 commit comments

Comments
 (0)