Skip to content

Commit c7760e5

Browse files
SWR-15367
1 parent 4b02b15 commit c7760e5

File tree

4 files changed

+115
-113
lines changed

4 files changed

+115
-113
lines changed

src/Console/ScanVhostsCommand.php

Lines changed: 10 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,14 @@
88
use Illuminate\Support\Facades\Redis;
99
use Salesmessage\LibRabbitMQ\Dto\QueueApiDto;
1010
use Salesmessage\LibRabbitMQ\Dto\VhostApiDto;
11-
use Salesmessage\LibRabbitMQ\Services\Api\RabbitApiClient;
11+
use Salesmessage\LibRabbitMQ\Services\QueueService;
12+
use Salesmessage\LibRabbitMQ\Services\VhostsService;
1213
use Throwable;
1314
use Salesmessage\LibRabbitMQ\Services\InternalStorageManager;
1415

1516
class ScanVhostsCommand extends Command
1617
{
17-
protected $signature = 'lib-rabbitmq:scan-vhosts
18-
{connection? : The name of the queue connection to work}';
18+
protected $signature = 'lib-rabbitmq:scan-vhosts';
1919

2020
protected $description = 'Scan and index vhosts';
2121

@@ -24,11 +24,13 @@ class ScanVhostsCommand extends Command
2424
private Collection $vhostQueues;
2525

2626
/**
27-
* @param RabbitApiClient $apiClient
27+
* @param VhostsService $vhostsService
28+
* @param QueueService $queueService
2829
* @param InternalStorageManager $internalStorageManager
2930
*/
3031
public function __construct(
31-
private RabbitApiClient $apiClient,
32+
private VhostsService $vhostsService,
33+
private QueueService $queueService,
3234
private InternalStorageManager $internalStorageManager
3335
) {
3436
$this->vhosts = new Collection();
@@ -42,17 +44,7 @@ public function __construct(
4244
*/
4345
public function handle()
4446
{
45-
$connectionName = (string) ($this->argument('connection') ?: $this->laravel['config']['queue.default']);
46-
47-
$connectionConfig = $this->laravel['config']['queue']['connections'][$connectionName] ?? [];
48-
if (empty($connectionConfig)) {
49-
$this->error(sprintf('Config for connection "%s" not found.', $connectionName));
50-
return Command::INVALID;
51-
}
52-
53-
$this->apiClient->setConnectionConfig($connectionConfig);
54-
55-
$this->loadVhosts();
47+
$this->vhosts = $this->vhostsService->getAllVhosts();
5648

5749
$oldVhosts = $this->internalStorageManager->getVhosts();
5850

@@ -69,48 +61,14 @@ public function handle()
6961
}
7062
}
7163
} else {
72-
$this->warn(sprintf('Vhosts for connection "%s" not found.', $connectionName));
64+
$this->warn('Vhosts not found.');
7365
}
7466

7567
$this->removeOldsVhosts($oldVhosts);
7668

7769
return Command::SUCCESS;
7870
}
7971

80-
/**
81-
* @param int $page
82-
* @param int $pageSize
83-
* @return void
84-
*/
85-
private function loadVhosts(int $page = 1, int $pageSize = 100): void
86-
{
87-
try {
88-
$data = $this->apiClient->request(
89-
'GET',
90-
'/api/vhosts', [
91-
'page' => $page,
92-
'page_size' => $pageSize,
93-
'columns' => 'name,messages,messages_ready,messages_unacknowledged',
94-
]);
95-
} catch (Throwable $exception) {
96-
$data = [];
97-
98-
$this->error(sprintf('Load vhosts error: %s.', (string) $exception->getMessage()));
99-
}
100-
101-
$items = (array) ($data['items'] ?? []);
102-
if (!empty($items)) {
103-
$this->vhosts->push(...$items);
104-
}
105-
106-
$nextPage = $page + 1;
107-
$lastPage = (int) ($data['page_count'] ?? 1);
108-
if ($lastPage >= $nextPage) {
109-
$this->loadVhosts($nextPage, $pageSize);
110-
return;
111-
}
112-
}
113-
11472
/**
11573
* @param array $vhostApiData
11674
* @return VhostApiDto|null
@@ -139,9 +97,7 @@ private function processVhost(array $vhostApiData): ?VhostApiDto
13997
$vhostDto->getMessagesReady()
14098
));
14199

142-
$this->vhostQueues = new Collection();
143-
144-
$this->loadVhostQueues($vhostDto);
100+
$this->vhostQueues = $this->queueService->getAllVhostQueues($vhostDto);
145101

146102
$oldVhostQueues = $this->internalStorageManager->getVhostQueues($vhostDto->getName());
147103

@@ -193,47 +149,6 @@ private function removeOldsVhosts(array $oldVhosts): void
193149
}
194150
}
195151

196-
/**
197-
* @param VhostApiDto $vhostDto
198-
* @param int $page
199-
* @param int $pageSize
200-
* @return void
201-
*/
202-
private function loadVhostQueues(VhostApiDto $vhostDto, int $page = 1, int $pageSize = 2): void
203-
{
204-
try {
205-
$data = $this->apiClient->request(
206-
'GET',
207-
'/api/queues/' . $vhostDto->getApiName(), [
208-
'page' => $page,
209-
'page_size' => $pageSize,
210-
'columns' => 'name,vhost,messages,messages_ready,messages_unacknowledged',
211-
'disable_stats' => 'true',
212-
'enable_queue_totals' => 'true',
213-
]);
214-
} catch (Throwable $exception) {
215-
$data = [];
216-
217-
$this->error(sprintf(
218-
'Load vhost "%s" queues error: %s.',
219-
$vhostDto->getName(),
220-
(string) $exception->getMessage()
221-
));
222-
}
223-
224-
$items = (array) ($data['items'] ?? []);
225-
if (!empty($items)) {
226-
$this->vhostQueues->push(...$items);
227-
}
228-
229-
$nextPage = $page + 1;
230-
$lastPage = (int) ($data['page_count'] ?? 1);
231-
if ($lastPage >= $nextPage) {
232-
$this->loadVhostQueues($vhostDto, $nextPage, $pageSize);
233-
return;
234-
}
235-
}
236-
237152
/**
238153
* @param array $queueApiData
239154
* @return void

src/LaravelLibRabbitMQServiceProvider.php

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,9 @@
1616
use Salesmessage\LibRabbitMQ\Console\ScanVhostsCommand;
1717
use Salesmessage\LibRabbitMQ\Queue\Connectors\RabbitMQConnector;
1818
use Salesmessage\LibRabbitMQ\Queue\VhostsQueueManager as RabbitMQVhostsQueueManager;
19-
use Salesmessage\LibRabbitMQ\Services\Api\RabbitApiClient;
2019
use Salesmessage\LibRabbitMQ\Services\InternalStorageManager;
20+
use Salesmessage\LibRabbitMQ\Services\QueueService;
21+
use Salesmessage\LibRabbitMQ\Services\VhostsService;
2122

2223
class LaravelLibRabbitMQServiceProvider extends ServiceProvider
2324
{
@@ -102,7 +103,8 @@ public function register(): void
102103

103104
$this->app->singleton(ScanVhostsCommand::class, static function ($app) {
104105
return new ScanVhostsCommand(
105-
$app[RabbitApiClient::class],
106+
$app[VhostsService::class],
107+
$app[QueueService::class],
106108
$app[InternalStorageManager::class]
107109
);
108110
});

src/Services/QueueService.php

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
<?php
2+
3+
namespace Salesmessage\LibRabbitMQ\Services;
4+
5+
use Illuminate\Support\Collection;
6+
use Psr\Log\LoggerInterface;
7+
use Salesmessage\LibRabbitMQ\Dto\VhostApiDto;
8+
use Salesmessage\LibRabbitMQ\Services\Api\RabbitApiClient;
9+
10+
class QueueService
11+
{
12+
/**
13+
* @param RabbitApiClient $rabbitApiClient
14+
* @param LoggerInterface $logger
15+
*/
16+
public function __construct(
17+
private RabbitApiClient $rabbitApiClient,
18+
private LoggerInterface $logger
19+
)
20+
{
21+
$connectionConfig = (array) config('queue.connections.rabbitmq_vhosts', []);
22+
23+
$this->rabbitApiClient->setConnectionConfig($connectionConfig);
24+
}
25+
26+
/**
27+
* @param VhostApiDto $vhostDto
28+
* @param int $page
29+
* @param int $pageSize
30+
* @param Collection|null $queues
31+
* @return Collection|null
32+
* @throws \GuzzleHttp\Exception\GuzzleException
33+
* @throws \Salesmessage\LibRabbitMQ\Exceptions\RabbitApiClientException
34+
*/
35+
public function getAllVhostQueues(
36+
VhostApiDto $vhostDto,
37+
int $page = 1,
38+
int $pageSize = 500,
39+
?Collection $queues = null,
40+
): ?Collection
41+
{
42+
if (null === $queues) {
43+
$queues = new Collection();
44+
}
45+
46+
try {
47+
$data = $this->rabbitApiClient->request(
48+
'GET',
49+
'/api/queues/' . $vhostDto->getApiName(), [
50+
'page' => $page,
51+
'page_size' => $pageSize,
52+
'columns' => 'name,vhost,messages,messages_ready,messages_unacknowledged',
53+
'disable_stats' => 'true',
54+
'enable_queue_totals' => 'true',
55+
]);
56+
} catch (Throwable $exception) {
57+
$this->logger->warning('Salesmessage.LibRabbitMQ.Services.QueueService.getAllVhostQueues.exception', [
58+
'message' => $exception->getMessage(),
59+
'code' => $exception->getCode(),
60+
'trace' => $exception->getTraceAsString(),
61+
]);
62+
63+
$data = [];
64+
}
65+
66+
$items = (array) ($data['items'] ?? []);
67+
if (!empty($items)) {
68+
$queues->push(...$items);
69+
}
70+
71+
$nextPage = $page + 1;
72+
$lastPage = (int) ($data['page_count'] ?? 1);
73+
if ($lastPage >= $nextPage) {
74+
return $this->getAllVhostQueues($vhostDto, $nextPage, $pageSize, $queues);
75+
}
76+
77+
return $queues;
78+
}
79+
}
80+

src/Services/VhostsService.php

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,14 @@
22

33
namespace Salesmessage\LibRabbitMQ\Services;
44

5+
use Illuminate\Support\Collection;
56
use Psr\Log\LoggerInterface;
67
use Salesmessage\LibRabbitMQ\Services\Api\RabbitApiClient;
78
use Throwable;
89

910
class VhostsService
1011
{
11-
private const VHOST_PREFIX = 'organization_';
12+
public const VHOST_PREFIX = 'organization_';
1213

1314
/**
1415
* @param RabbitApiClient $rabbitApiClient
@@ -25,23 +26,31 @@ public function __construct(
2526
}
2627

2728
/**
28-
* @param array $vhosts
2929
* @param int $page
3030
* @param int $pageSize
31-
* @return array
31+
* @param Collection|null $vhosts
32+
* @return Collection
3233
*/
33-
public function getAllVhosts(array $vhosts = [], int $page = 1, int $pageSize = 500): array
34+
public function getAllVhosts(
35+
int $page = 1,
36+
int $pageSize = 500,
37+
?Collection $vhosts = null,
38+
): Collection
3439
{
40+
if (null === $vhosts) {
41+
$vhosts = new Collection();
42+
}
43+
3544
try {
3645
$data = $this->rabbitApiClient->request(
3746
'GET',
3847
'/api/vhosts', [
3948
'page' => $page,
4049
'page_size' => $pageSize,
41-
'columns' => 'name',
50+
'columns' => 'name,messages,messages_ready,messages_unacknowledged',
4251
]);
4352
} catch (Throwable $exception) {
44-
$this->logger->warning('App.Organizations.Services.Rabbitmq.getAllVhosts.exception', [
53+
$this->logger->warning('Salesmessage.LibRabbitMQ.Services.VhostsService.getAllVhosts.exception', [
4554
'message' => $exception->getMessage(),
4655
'code' => $exception->getCode(),
4756
'trace' => $exception->getTraceAsString(),
@@ -50,19 +59,15 @@ public function getAllVhosts(array $vhosts = [], int $page = 1, int $pageSize =
5059
$data = [];
5160
}
5261

53-
$vhostsChunk = (array) ($data['items'] ?? []);
54-
if (!empty($vhostsChunk)) {
55-
foreach ($vhostsChunk as $vhost) {
56-
if (isset($vhost['name']) && str_starts_with($vhost['name'], self::VHOST_PREFIX)) {
57-
$vhosts[] = $vhost['name'];
58-
}
59-
}
62+
$items = (array) ($data['items'] ?? []);
63+
if (!empty($items)) {
64+
$vhosts->push(...$items);
6065
}
6166

6267
$nextPage = $page + 1;
6368
$lastPage = (int) ($data['page_count'] ?? 1);
6469
if ($lastPage >= $nextPage) {
65-
return $this->getAllVhosts($vhosts, $nextPage, $pageSize);
70+
return $this->getAllVhosts($nextPage, $pageSize, $vhosts);
6671
}
6772

6873
return $vhosts;
@@ -96,7 +101,7 @@ public function getVhost(string $vhostName): array
96101
]
97102
);
98103
} catch (Throwable $exception) {
99-
$this->logger->warning('App.Organizations.Services.Rabbitmq.getVhost.exception', [
104+
$this->logger->warning('Salesmessage.LibRabbitMQ.Services.VhostsService.getVhost.exception', [
100105
'vhost_name' => $vhostName,
101106
'message' => $exception->getMessage(),
102107
'code' => $exception->getCode(),
@@ -142,7 +147,7 @@ public function createVhost(string $vhostName, string $description): bool
142147
);
143148
$isCreated = true;
144149
} catch (Throwable $exception) {
145-
$this->logger->warning('App.Organizations.Services.Rabbitmq.createVhost.exception', [
150+
$this->logger->warning('Salesmessage.LibRabbitMQ.Services.VhostsService.createVhost.exception', [
146151
'vhost_name' => $vhostName,
147152
'message' => $exception->getMessage(),
148153
'code' => $exception->getCode(),

0 commit comments

Comments
 (0)