Skip to content

Commit 6c06cb3

Browse files
committed
resolve conflicts
2 parents 93522f7 + 06d96c3 commit 6c06cb3

File tree

4 files changed

+35
-7
lines changed

4 files changed

+35
-7
lines changed

src/Console/ConsumeCommand.php

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ class ConsumeCommand extends WorkCommand
2424
{--timeout=60 : The number of seconds a child process can run}
2525
{--tries=1 : Number of times to attempt a job before logging it failed}
2626
{--rest=0 : Number of seconds to rest between jobs}
27-
27+
28+
{--max-priority=}
2829
{--consumer-tag}
2930
{--prefetch-size=0}
3031
{--prefetch-count=1000}
@@ -40,6 +41,7 @@ public function handle(): void
4041
$consumer->setContainer($this->laravel);
4142
$consumer->setName($this->option('name'));
4243
$consumer->setConsumerTag($this->consumerTag());
44+
$consumer->setMaxPriority((int) $this->option('max-priority'));
4345
$consumer->setPrefetchSize((int) $this->option('prefetch-size'));
4446
$consumer->setPrefetchCount((int) $this->option('prefetch-count'));
4547

src/Console/QueueDeclareCommand.php

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ class QueueDeclareCommand extends Command
1111
protected $signature = 'rabbitmq:queue-declare
1212
{name : The name of the queue to declare}
1313
{connection=rabbitmq : The name of the queue connection to use}
14+
{--max-priority}
1415
{--durable=1}
1516
{--auto-delete=0}
1617
{--quorum=0}';
@@ -33,9 +34,16 @@ public function handle(RabbitMQConnector $connector): void
3334
return;
3435
}
3536

36-
$arguments = (bool) $this->option('quorum')
37-
? ['x-queue-type' => 'quorum']
38-
: [];
37+
$arguments = [];
38+
39+
$maxPriority = (int) $this->option('max-priority');
40+
if ($maxPriority) {
41+
$arguments['x-max-priority'] = $maxPriority;
42+
}
43+
44+
if($this->option('quorum')) {
45+
$arguments['x-queue-type'] = 'quorum';
46+
}
3947

4048
$queue->declareQueue(
4149
$this->argument('name'),

src/Consumer.php

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ class Consumer extends Worker
2323
/** @var int */
2424
protected $prefetchSize;
2525

26+
/** @var int */
27+
protected $maxPriority;
28+
2629
/** @var int */
2730
protected $prefetchCount;
2831

@@ -42,6 +45,11 @@ public function setConsumerTag(string $value): void
4245
$this->consumerTag = $value;
4346
}
4447

48+
public function setMaxPriority(int $value): void
49+
{
50+
$this->maxPriority = $value;
51+
}
52+
4553
public function setPrefetchSize(int $value): void
4654
{
4755
$this->prefetchSize = $value;
@@ -83,6 +91,10 @@ public function daemon($connectionName, $queue, WorkerOptions $options)
8391
);
8492

8593
$jobClass = $connection->getJobClass();
94+
$arguments = [];
95+
if ($this->maxPriority) {
96+
$arguments['priority'] = ['I', $this->maxPriority];
97+
}
8698

8799
$this->channel->basic_consume(
88100
$queue,
@@ -113,7 +125,9 @@ function (AMQPMessage $message) use ($connection, $options, $connectionName, $qu
113125
if ($this->supportsAsyncSignals()) {
114126
$this->resetTimeoutHandler();
115127
}
116-
}
128+
},
129+
null,
130+
$arguments
117131
);
118132

119133
while ($this->channel->is_consuming()) {

src/Queue/RabbitMQQueue.php

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -532,14 +532,18 @@ protected function createMessage($payload, int $attempts = 0): array
532532
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
533533
];
534534

535-
if ($correlationId = json_decode($payload, true, 512)['id'] ?? null) {
535+
$currentPayload = json_decode($payload, true, 512);
536+
if ($correlationId = $currentPayload['id'] ?? null) {
536537
$properties['correlation_id'] = $correlationId;
537538
}
538539

539540
if ($this->isPrioritizeDelayed()) {
540541
$properties['priority'] = $attempts;
541542
}
542-
543+
$commandData = unserialize($currentPayload['data']['command']);
544+
if (property_exists($commandData, 'priority')) {
545+
$properties['priority'] = $commandData->priority;
546+
}
543547
$message = new AMQPMessage($payload, $properties);
544548

545549
$message->set('application_headers', new AMQPTable([

0 commit comments

Comments
 (0)