From f1c6f30cd985be4c15acf7b65ac15f7d0fe28337 Mon Sep 17 00:00:00 2001 From: nick Date: Sun, 11 Apr 2021 09:38:37 +0200 Subject: [PATCH 1/3] small fixes --- src/Consumer/AbstractKafkaConsumer.php | 13 ++--- src/Consumer/KafkaConsumerBuilder.php | 20 ++----- .../KafkaConsumerBuilderInterface.php | 9 ---- .../KafkaConsumerRebalanceCallbackTest.php | 6 +-- .../Consumer/KafkaConsumerBuilderTest.php | 34 ++++-------- tests/Unit/Consumer/KafkaConsumerTest.php | 52 +++++++------------ tests/Unit/Producer/KafkaProducerTest.php | 6 +-- 7 files changed, 39 insertions(+), 101 deletions(-) diff --git a/src/Consumer/AbstractKafkaConsumer.php b/src/Consumer/AbstractKafkaConsumer.php index dd42091..98723f2 100644 --- a/src/Consumer/AbstractKafkaConsumer.php +++ b/src/Consumer/AbstractKafkaConsumer.php @@ -94,12 +94,7 @@ public function consume(int $timeoutMs = 10000, bool $autoDecode = true): KafkaC throw new KafkaConsumerConsumeException(KafkaConsumerConsumeException::NOT_SUBSCRIBED_EXCEPTION_MESSAGE); } - if (null === $rdKafkaMessage = $this->kafkaConsume($timeoutMs)) { - throw new KafkaConsumerEndOfPartitionException( - kafka_err2str(RD_KAFKA_RESP_ERR__PARTITION_EOF), - RD_KAFKA_RESP_ERR__PARTITION_EOF - ); - } + $rdKafkaMessage = $this->kafkaConsume($timeoutMs); if (RD_KAFKA_RESP_ERR__PARTITION_EOF === $rdKafkaMessage->err) { throw new KafkaConsumerEndOfPartitionException($rdKafkaMessage->getErrorString(), $rdKafkaMessage->err); @@ -211,9 +206,11 @@ protected function getAllTopicPartitions(string $topic): array $partitions = []; $topicMetadata = $this->getMetadataForTopic($topic); + $metaPartitions = $topicMetadata->getPartitions(); - foreach ($topicMetadata->getPartitions() as $partition) { - $partitions[] = $partition->getId(); + while ($metaPartitions->valid()) { + $partitions[] = $metaPartitions->current()->getId(); + $metaPartitions->next(); } return $partitions; diff --git a/src/Consumer/KafkaConsumerBuilder.php b/src/Consumer/KafkaConsumerBuilder.php index a5dbf09..2985f39 100644 --- a/src/Consumer/KafkaConsumerBuilder.php +++ b/src/Consumer/KafkaConsumerBuilder.php @@ -24,7 +24,8 @@ final class KafkaConsumerBuilder implements KafkaConsumerBuilderInterface private $config = [ 'enable.auto.offset.store' => false, 'enable.auto.commit' => false, - 'auto.offset.reset' => 'earliest' + 'enable.partition.eof' => true, + 'auto.offset.reset' => 'earliest', ]; /** @@ -199,21 +200,6 @@ public function withRebalanceCallback(callable $rebalanceCallback): KafkaConsume return $that; } - /** - * Only applicable for the high level consumer - * Callback that is going to be called when you call consume - * - * @param callable $consumeCallback - * @return KafkaConsumerBuilderInterface - */ - public function withConsumeCallback(callable $consumeCallback): KafkaConsumerBuilderInterface - { - $that = clone $this; - $that->consumeCallback = $consumeCallback; - - return $that; - } - /** * Callback for log related events * @@ -300,7 +286,7 @@ private function registerCallbacks(KafkaConfiguration $conf): void } if (null !== $this->logCallback) { - //$conf->setLogCb($this->logCallback); + $conf->setLogCb($this->logCallback); } if (null !== $this->offsetCommitCallback) { diff --git a/src/Consumer/KafkaConsumerBuilderInterface.php b/src/Consumer/KafkaConsumerBuilderInterface.php index 0ddc43b..e828696 100644 --- a/src/Consumer/KafkaConsumerBuilderInterface.php +++ b/src/Consumer/KafkaConsumerBuilderInterface.php @@ -83,15 +83,6 @@ public function withErrorCallback(callable $errorCallback): self; */ public function withRebalanceCallback(callable $rebalanceCallback): self; - /** - * Only applicable for the high level consumer - * Callback that is going to be called when you call consume - * - * @param callable $consumeCallback - * @return KafkaConsumerBuilderInterface - */ - public function withConsumeCallback(callable $consumeCallback): self; - /** * Set callback that is being called on offset commits * diff --git a/tests/Unit/Callback/KafkaConsumerRebalanceCallbackTest.php b/tests/Unit/Callback/KafkaConsumerRebalanceCallbackTest.php index e6c5b8a..4e5e436 100644 --- a/tests/Unit/Callback/KafkaConsumerRebalanceCallbackTest.php +++ b/tests/Unit/Callback/KafkaConsumerRebalanceCallbackTest.php @@ -43,8 +43,7 @@ public function testInvokeAssign() $consumer ->expects(self::once()) ->method('assign') - ->with($partitions) - ->willReturn(null); + ->with($partitions); call_user_func( @@ -62,8 +61,7 @@ public function testInvokeRevoke() $consumer ->expects(self::once()) ->method('assign') - ->with(null) - ->willReturn(null); + ->with(null); call_user_func(new KafkaConsumerRebalanceCallback(), $consumer, RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS); } diff --git a/tests/Unit/Consumer/KafkaConsumerBuilderTest.php b/tests/Unit/Consumer/KafkaConsumerBuilderTest.php index 66fe6b0..0519da4 100644 --- a/tests/Unit/Consumer/KafkaConsumerBuilderTest.php +++ b/tests/Unit/Consumer/KafkaConsumerBuilderTest.php @@ -8,7 +8,6 @@ use PhpKafka\Message\Decoder\DecoderInterface; use PhpKafka\Consumer\TopicSubscription; use PhpKafka\Exception\KafkaConsumerBuilderException; -use PhpKafka\Consumer\KafkaConsumerInterface; use PHPUnit\Framework\TestCase; /** @@ -110,6 +109,7 @@ public function testAddConfig(): void 'group.id' => 'test-group', 'enable.auto.offset.store' => true, 'enable.auto.commit' => false, + 'enable.partition.eof' => true, 'auto.offset.reset' => 'earliest' ], $reflectionProperty->getValue($clone) @@ -170,6 +170,7 @@ public function testSetErrorCallback(): void $consumer = $clone ->withAdditionalBroker('localhost') ->withSubscription('test') + ->withLogCallback($callback) ->build(); $conf = $consumer->getConfiguration(); self::assertArrayHasKey('error_cb', $conf); @@ -196,6 +197,7 @@ public function testSetRebalanceCallback(): void $consumer = $clone ->withAdditionalBroker('localhost') ->withSubscription('test') + ->withLogCallback($callback) ->build(); $conf = $consumer->getConfiguration(); self::assertArrayHasKey('rebalance_cb', $conf); @@ -222,6 +224,7 @@ public function testSetOffsetCommitCallback(): void $consumer = $clone ->withAdditionalBroker('localhost') ->withSubscription('test') + ->withLogCallback($callback) ->build(); $conf = $consumer->getConfiguration(); self::assertArrayHasKey('offset_commit_cb', $conf); @@ -273,29 +276,6 @@ public function testBuildSuccess(): void ->withAdditionalSubscription('test-topic') ->withRebalanceCallback($callback) ->withOffsetCommitCallback($callback) - ->withConsumeCallback($callback) - ->withErrorCallback($callback) - ->withLogCallback($callback) - ->build(); - - self::assertInstanceOf(KafkaConsumerInterface::class, $consumer); - self::assertInstanceOf(KafkaConsumer::class, $consumer); - } - - /** - * @return void - */ - public function testBuildHighLevelSuccess(): void - { - $callback = function ($kafka, $errId, $msg) { - // Anonymous test method, no logic required - }; - - /** @var $consumer KafkaConsumer */ - $consumer = $this->kafkaConsumerBuilder - ->withAdditionalBroker('localhost') - ->withAdditionalSubscription('test-topic') - ->withRebalanceCallback($callback) ->withErrorCallback($callback) ->withLogCallback($callback) ->build(); @@ -303,8 +283,12 @@ public function testBuildHighLevelSuccess(): void $conf = $consumer->getConfiguration(); self::assertInstanceOf(KafkaConsumerInterface::class, $consumer); - self::assertInstanceOf(KafkaConsumerInterface::class, $consumer); + self::assertInstanceOf(KafkaConsumer::class, $consumer); self::assertArrayHasKey('enable.auto.commit', $conf); self::assertEquals($conf['enable.auto.commit'], 'false'); + self::assertArrayHasKey('rebalance_cb', $conf); + self::assertArrayHasKey('offset_commit_cb', $conf); + self::assertArrayHasKey('error_cb', $conf); + self::assertArrayHasKey('log_cb', $conf); } } diff --git a/tests/Unit/Consumer/KafkaConsumerTest.php b/tests/Unit/Consumer/KafkaConsumerTest.php index 3acbbd0..03bced4 100644 --- a/tests/Unit/Consumer/KafkaConsumerTest.php +++ b/tests/Unit/Consumer/KafkaConsumerTest.php @@ -87,10 +87,23 @@ public function testSubscribeSuccessWithAssignmentWithPartitions(): void */ public function testSubscribeSuccessWithAssignmentWithOffsetOnly(): void { - $partitions = [ - $this->getMetadataPartitionMock(1), - $this->getMetadataPartitionMock(2) - ]; + $partitionCollection = $this->createMock(SkcMetadataCollection::class); + $partitionCollection->expects(self::exactly(2))->method('next'); + $partitionCollection + ->expects(self::exactly(3)) + ->method('valid') + ->willReturnOnConsecutiveCalls( + true, + true, + false + ); + $partitionCollection + ->expects(self::exactly(2)) + ->method('current') + ->willReturnOnConsecutiveCalls( + $this->getMetadataPartitionMock(1), + $this->getMetadataPartitionMock(2) + ); /** @var SkcConsumerTopic|MockObject $rdKafkaConsumerTopicMock */ $rdKafkaConsumerTopicMock = $this->createMock(SkcConsumerTopic::class); @@ -100,7 +113,7 @@ public function testSubscribeSuccessWithAssignmentWithOffsetOnly(): void $rdKafkaMetadataTopicMock ->expects(self::once()) ->method('getPartitions') - ->willReturn($partitions); + ->willReturn($partitionCollection); /** @var SkcMetadata|MockObject $rdKafkaMetadataMock */ $rdKafkaMetadataMock = $this->createMock(SkcMetadata::class); @@ -745,35 +758,6 @@ function (string $topic, int $partition, int &$lowOffset, int &$highOffset, int $this->assertEquals(5, $lowOffset); } - /** - * @throws KafkaConsumerConsumeException - * @throws KafkaConsumerEndOfPartitionException - * @throws KafkaConsumerSubscriptionException - * @throws KafkaConsumerTimeoutException - * @return void - */ - public function testConsumeThrowsEofExceptionIfQueueConsumeReturnsNull(): void - { - self::expectException(KafkaConsumerEndOfPartitionException::class); - self::expectExceptionCode(RD_KAFKA_RESP_ERR__PARTITION_EOF); - self::expectExceptionMessage(kafka_err2str(RD_KAFKA_RESP_ERR__PARTITION_EOF)); - - $rdKafkaConsumerMock = $this->createMock(SkcConsumer::class); - $kafkaConfigurationMock = $this->createMock(KafkaConfiguration::class); - $decoderMock = $this->getMockForAbstractClass(DecoderInterface::class); - - $kafkaConsumer = new KafkaConsumer($rdKafkaConsumerMock, $kafkaConfigurationMock, $decoderMock); - - $rdKafkaConsumerMock - ->expects(self::once()) - ->method('consume') - ->with(10000) - ->willReturn(null); - - $kafkaConsumer->subscribe(); - $kafkaConsumer->consume(); - } - /** * @throws KafkaConsumerConsumeException * @throws KafkaConsumerEndOfPartitionException diff --git a/tests/Unit/Producer/KafkaProducerTest.php b/tests/Unit/Producer/KafkaProducerTest.php index 1b29279..621124a 100644 --- a/tests/Unit/Producer/KafkaProducerTest.php +++ b/tests/Unit/Producer/KafkaProducerTest.php @@ -336,8 +336,7 @@ public function testBeginTransactionSuccess(): void $this->rdKafkaProducerMock ->expects(self::once()) ->method('initTransactions') - ->with(10000) - ->willReturn(RD_KAFKA_RESP_ERR_NO_ERROR); + ->with(10000); $this->rdKafkaProducerMock ->expects(self::once()) ->method('beginTransaction'); @@ -353,8 +352,7 @@ public function testBeginTransactionConsecutiveSuccess(): void $this->rdKafkaProducerMock ->expects(self::once()) ->method('initTransactions') - ->with(10000) - ->willReturn(RD_KAFKA_RESP_ERR_NO_ERROR); + ->with(10000); $this->rdKafkaProducerMock ->expects(self::exactly(2)) ->method('beginTransaction'); From e12786b519b289243ac570a4120b6f8a7c2604f4 Mon Sep 17 00:00:00 2001 From: nick Date: Sun, 11 Apr 2021 09:46:29 +0200 Subject: [PATCH 2/3] fix return type --- src/Consumer/AbstractKafkaConsumer.php | 4 ++-- src/Consumer/KafkaConsumer.php | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/Consumer/AbstractKafkaConsumer.php b/src/Consumer/AbstractKafkaConsumer.php index 98723f2..973b941 100644 --- a/src/Consumer/AbstractKafkaConsumer.php +++ b/src/Consumer/AbstractKafkaConsumer.php @@ -235,7 +235,7 @@ private function getConsumerMessage(SkcMessage $message): KafkaConsumerMessageIn /** * @param integer $timeoutMs - * @return null|SkcMessage + * @return SkcMessage */ - abstract protected function kafkaConsume(int $timeoutMs): ?SkcMessage; + abstract protected function kafkaConsume(int $timeoutMs): SkcMessage; } diff --git a/src/Consumer/KafkaConsumer.php b/src/Consumer/KafkaConsumer.php index ddf93ba..0fc14da 100644 --- a/src/Consumer/KafkaConsumer.php +++ b/src/Consumer/KafkaConsumer.php @@ -181,10 +181,10 @@ public function close(): void /** * @param integer $timeoutMs - * @return SkcMessage|null + * @return SkcMessage * @throws SkcException */ - protected function kafkaConsume(int $timeoutMs): ?SkcMessage + protected function kafkaConsume(int $timeoutMs): SkcMessage { return $this->consumer->consume($timeoutMs); } From 0c3a1925d3324f06f146e500a5cfd0826dce1170 Mon Sep 17 00:00:00 2001 From: nick Date: Sun, 11 Apr 2021 10:09:41 +0200 Subject: [PATCH 3/3] update docker comments --- docker/dev/php/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker/dev/php/Dockerfile b/docker/dev/php/Dockerfile index 221a4a2..c8b2402 100644 --- a/docker/dev/php/Dockerfile +++ b/docker/dev/php/Dockerfile @@ -26,7 +26,7 @@ RUN echo "$HOST_USER:x:$HOST_USER_ID:82:Linux User,,,:/home/$HOST_USER:" >> /etc echo "ALL ALL=NOPASSWD: ALL" >> /etc/sudoers && \ addgroup $HOST_USER www-data -# COMPOSER: install binary and prestissimo +# COMPOSER: install binary RUN curl -sS https://getcomposer.org/installer | php -- --install-dir=/usr/bin --filename=composer # PHP: Install php extensions