Skip to content

Commit 9230262

Browse files
committed
added: possibility to add custom jobClass to the config.
1 parent 9cc7c22 commit 9230262

File tree

2 files changed

+22
-3
lines changed

2 files changed

+22
-3
lines changed

src/Consumer.php

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,17 +74,19 @@ public function daemon($connectionName, $queue, WorkerOptions $options): void
7474
null
7575
);
7676

77+
$jobClass = $connection->getJobClass();
78+
7779
$this->channel->basic_consume(
7880
$queue,
7981
$this->consumerTag,
8082
false,
8183
false,
8284
false,
8385
false,
84-
function (AMQPMessage $message) use ($connection, $options, $connectionName, $queue): void {
86+
function (AMQPMessage $message) use ($connection, $options, $connectionName, $queue, $jobClass): void {
8587
$this->gotJob = true;
8688

87-
$job = new RabbitMQJob(
89+
$job = new $jobClass(
8890
$this->container,
8991
$connection,
9092
$message,

src/Queue/RabbitMQQueue.php

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,9 +228,11 @@ public function pop($queue = null)
228228
try {
229229
$queue = $this->getQueue($queue);
230230

231+
$job = $this->getJobClass();
232+
231233
/** @var AMQPMessage|null $message */
232234
if ($message = $this->channel->basic_get($queue)) {
233-
return $this->currentJob = new RabbitMQJob(
235+
return $this->currentJob = new $job(
234236
$this->container,
235237
$this,
236238
$message,
@@ -272,6 +274,21 @@ public function getChannel(): AMQPChannel
272274
return $this->channel;
273275
}
274276

277+
/**
278+
* Gets the Job class from config or returns the default job class
279+
* when the job class does not extend the default job class an exception is thrown
280+
*
281+
* @return array|\ArrayAccess|mixed
282+
* @throws \Throwable
283+
*/
284+
public function getJobClass()
285+
{
286+
$job = Arr::get($this->options, 'job', RabbitMQJob::class);
287+
throw_if(! is_a($job, RabbitMQJob::class, true), Exception::class, sprintf('Class %s must extend: %s', $job, RabbitMQJob::class));
288+
289+
return $job;
290+
}
291+
275292
/**
276293
* Gets a queue/destination, by default the queue option set on the connection.
277294
*

0 commit comments

Comments
 (0)