diff --git a/src/Queue/RabbitMQQueue.php b/src/Queue/RabbitMQQueue.php index 2b855fed..47a00f3d 100644 --- a/src/Queue/RabbitMQQueue.php +++ b/src/Queue/RabbitMQQueue.php @@ -6,6 +6,8 @@ use ErrorException; use Exception; +use Illuminate\Broadcasting\BroadcastEvent; +use Illuminate\Contracts\Encryption\Encrypter; use Illuminate\Contracts\Queue\Queue as QueueContract; use Illuminate\Queue\Queue; use Illuminate\Support\Arr; @@ -122,7 +124,11 @@ public function size($queue = null): int */ public function push($job, $data = '', $queue = null) { - return $this->pushRaw($this->createPayload($job, $queue, $data), $queue, []); + $options = $job->options ?? []; + if (empty($options) && $job instanceof BroadcastEvent) { + $options = $job->event->options ?? []; + } + return $this->pushRaw($this->createPayload($job, $queue, $data), $queue, $options); } /** @@ -419,6 +425,88 @@ public function isQueueExists(string $name = null): bool } } + /** + * Create a payload for an object-based queue handler. + * + * @param object $job + * @param string $queue + * @return array + */ + protected function createObjectPayload($job, $queue) + { + $payload = $this->withCreatePayloadHooks($queue, [ + 'uuid' => (string) Str::uuid(), + 'displayName' => $this->getDisplayName($job), + 'job' => 'Illuminate\Queue\CallQueuedHandler@call', + 'schema' => $this->getSchema($job) ?? null, + 'busEvent' => $this->getBusEvent($job) ?? null, + 'payload' => $this->getJobPayload($job) ?? null, + 'maxTries' => $job->tries ?? null, + 'maxExceptions' => $job->maxExceptions ?? null, + 'failOnTimeout' => $job->failOnTimeout ?? false, + 'backoff' => $this->getJobBackoff($job), + 'timeout' => $job->timeout ?? null, + 'retryUntil' => $this->getJobExpiration($job), + 'data' => [ + 'commandName' => $job, + 'command' => $job, + ], + ]); + + $command = $this->jobShouldBeEncrypted($job) && $this->container->bound(Encrypter::class) + ? $this->container[Encrypter::class]->encrypt(serialize(clone $job)) + : serialize(clone $job); + + return array_merge($payload, [ + 'data' => array_merge($payload['data'], [ + 'commandName' => get_class($job), + 'command' => $command, + ]), + ]); + } + + /** + * Get the display name for the given job. + * + * @param object $job + */ + protected function getSchema($job): ?string + { + if (method_exists($job, 'displayName') && isset($job->event)) { + return $job->event->schema ?? null; + } + + return $job->schema ?? null; + } + + /** + * Get the display name for the given job. + * + * @param object $job + */ + protected function getBusEvent($job): ?string + { + if (method_exists($job, 'displayName') && isset($job->event)) { + return $job->event->busEvent ?? null; + } + + return $job->busEvent ?? null; + } + + /** + * Get the display name for the given job. + * + * @param object $job + */ + protected function getJobPayload($job): ?string + { + if (method_exists($job, 'displayName') && isset($job->event) && isset($job->event->payload)) { + return serialize($job->event->payload) ?? null; + } + + return isset($job->payload) ? serialize($job->payload) : null; + } + /** * Declare a queue in rabbitMQ, when not already declared. *