Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion src/Clients/Consumer/KafkaConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@

public function __construct(ConsumerConfig $config, LoggerInterface|null $logger = null)
{
$this->logger = $logger ?? new NullLogger();

Check warning on line 36 in src/Clients/Consumer/KafkaConsumer.php

View workflow job for this annotation

GitHub Actions / Infection

Escaped Mutant for Mutator "Coalesce": @@ @@ private bool $shouldRun = true; public function __construct(ConsumerConfig $config, LoggerInterface|null $logger = null) { - $this->logger = $logger ?? new NullLogger(); + $this->logger = new NullLogger() ?? $logger; $this->setupInternalTerminationSignal($config); $config->getConf()->setErrorCb(function (RdKafkaConsumer $kafka, int $err, string $reason): void { $this->logger->error(sprintf('Kafka error: "%s": "%s"', rd_kafka_err2str($err), $reason), ['err' => $err]);

$this->setupInternalTerminationSignal($config);

Check warning on line 38 in src/Clients/Consumer/KafkaConsumer.php

View workflow job for this annotation

GitHub Actions / Infection

Escaped Mutant for Mutator "MethodCallRemoval": @@ @@ public function __construct(ConsumerConfig $config, LoggerInterface|null $logger = null) { $this->logger = $logger ?? new NullLogger(); - $this->setupInternalTerminationSignal($config); + $config->getConf()->setErrorCb(function (RdKafkaConsumer $kafka, int $err, string $reason): void { $this->logger->error(sprintf('Kafka error: "%s": "%s"', rd_kafka_err2str($err), $reason), ['err' => $err]); });

$config->getConf()->setErrorCb(

Check warning on line 40 in src/Clients/Consumer/KafkaConsumer.php

View workflow job for this annotation

GitHub Actions / Infection

Escaped Mutant for Mutator "MethodCallRemoval": @@ @@ { $this->logger = $logger ?? new NullLogger(); $this->setupInternalTerminationSignal($config); - $config->getConf()->setErrorCb(function (RdKafkaConsumer $kafka, int $err, string $reason): void { - $this->logger->error(sprintf('Kafka error: "%s": "%s"', rd_kafka_err2str($err), $reason), ['err' => $err]); - }); + $rebalanceCallback = function (RdKafkaConsumer $kafka, int $err, array|null $partitions = null): void { /** @phpstan-var array<string, TopicPartition>|null $partitions */ switch ($err) {
function (RdKafkaConsumer $kafka, int $err, string $reason): void {
$this->logger->error(
sprintf('Kafka error: "%s": "%s"', rd_kafka_err2str($err), $reason),
Expand All @@ -51,9 +51,9 @@
/** @phpstan-var array<string, TopicPartition>|null $partitions */
switch ($err) {
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
$this->logger->debug(

Check warning on line 54 in src/Clients/Consumer/KafkaConsumer.php

View workflow job for this annotation

GitHub Actions / Infection

Escaped Mutant for Mutator "MethodCallRemoval": @@ @@ /** @phpstan-var array<string, TopicPartition>|null $partitions */ switch ($err) { case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: - $this->logger->debug('Assigning partitions', $partitions === null ? [] : array_map(static fn(TopicPartition $partition): string => (string) $partition->getPartition(), $partitions)); + $kafka->assign($partitions); break; case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
'Assigning partitions',
$partitions === null ? [] : array_map(

Check warning on line 56 in src/Clients/Consumer/KafkaConsumer.php

View workflow job for this annotation

GitHub Actions / Infection

Escaped Mutant for Mutator "Ternary": @@ @@ /** @phpstan-var array<string, TopicPartition>|null $partitions */ switch ($err) { case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: - $this->logger->debug('Assigning partitions', $partitions === null ? [] : array_map(static fn(TopicPartition $partition): string => (string) $partition->getPartition(), $partitions)); + $this->logger->debug('Assigning partitions', $partitions === null ? array_map(static fn(TopicPartition $partition): string => (string) $partition->getPartition(), $partitions) : []); $kafka->assign($partitions); break; case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:

Check warning on line 56 in src/Clients/Consumer/KafkaConsumer.php

View workflow job for this annotation

GitHub Actions / Infection

Escaped Mutant for Mutator "UnwrapArrayMap": @@ @@ /** @phpstan-var array<string, TopicPartition>|null $partitions */ switch ($err) { case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: - $this->logger->debug('Assigning partitions', $partitions === null ? [] : array_map(static fn(TopicPartition $partition): string => (string) $partition->getPartition(), $partitions)); + $this->logger->debug('Assigning partitions', $partitions === null ? [] : $partitions); $kafka->assign($partitions); break; case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
static fn (TopicPartition $partition): string => (string) $partition->getPartition(),
$partitions,
),
Expand All @@ -77,7 +77,7 @@
$kafka->assign();
}
};
$config->getConf()->setRebalanceCb($rebalanceCallback);

Check warning on line 80 in src/Clients/Consumer/KafkaConsumer.php

View workflow job for this annotation

GitHub Actions / Infection

Escaped Mutant for Mutator "MethodCallRemoval": @@ @@ $kafka->assign(); } }; - $config->getConf()->setRebalanceCb($rebalanceCallback); + parent::__construct($config->getConf()); } /**

parent::__construct($config->getConf());
}
Expand Down Expand Up @@ -120,8 +120,8 @@
$consumerRecords,
): void {
$consumerRecords->add($message);
if ($processRecord !== null) {

Check warning on line 123 in src/Clients/Consumer/KafkaConsumer.php

View workflow job for this annotation

GitHub Actions / Infection

Escaped Mutant for Mutator "NotIdentical": @@ @@ $consumerRecords = new ConsumerRecords(); $this->doStart($timeoutMs, function (Message $message) use ($maxBatchSize, $timeoutMs, $batchTime, $processRecord, $onBatchProcessed, $consumerRecords): void { $consumerRecords->add($message); - if ($processRecord !== null) { + if ($processRecord === null) { $processRecord($message); } if ($consumerRecords->count() === $maxBatchSize) {
$processRecord($message);

Check warning on line 124 in src/Clients/Consumer/KafkaConsumer.php

View workflow job for this annotation

GitHub Actions / Infection

Escaped Mutant for Mutator "FunctionCallRemoval": @@ @@ $this->doStart($timeoutMs, function (Message $message) use ($maxBatchSize, $timeoutMs, $batchTime, $processRecord, $onBatchProcessed, $consumerRecords): void { $consumerRecords->add($message); if ($processRecord !== null) { - $processRecord($message); + } if ($consumerRecords->count() === $maxBatchSize) { if ($onBatchProcessed !== null && !$consumerRecords->isEmpty()) {
}

if ($consumerRecords->count() === $maxBatchSize) {
Expand All @@ -146,6 +146,11 @@
{
$this->logger->debug('Shutting down');

$this->stop();
}

public function stop(): void
{
$this->shouldRun = false;
}

Expand All @@ -160,7 +165,9 @@
callable|null $onPartitionEof = null,
callable|null $onTimedOut = null,
): void {
$this->registerSignals($this->shouldRun);
$this->shouldRun = true;
$terminationCallback = fn () => $this->shouldRun = false;
$this->registerSignals($terminationCallback);

while ($this->shouldRun) {
$message = $this->consume($timeoutMs);
Expand Down
13 changes: 8 additions & 5 deletions src/Clients/Consumer/WithSignalControl.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

namespace SimPod\Kafka\Clients\Consumer;

use Safe\Exceptions\PcntlException;

use function Safe\pcntl_signal;
use function Safe\pcntl_sigprocmask;

Expand All @@ -22,12 +24,13 @@ private function setupInternalTerminationSignal(ConsumerConfig $config): void
$config->set('internal.termination.signal', SIGIO);
}

private function registerSignals(bool &$shouldRun): void
/**
* @param callable():mixed $terminationCallback
*
* @throws PcntlException
*/
private function registerSignals(callable $terminationCallback): void
{
$terminationCallback = static function () use (&$shouldRun): void {
$shouldRun = false;
};

pcntl_signal(SIGTERM, $terminationCallback);
pcntl_signal(SIGINT, $terminationCallback);
pcntl_signal(SIGHUP, $terminationCallback);
Expand Down
Loading