Skip to content

Commit 5788999

Browse files
authored
feat: introduce KafkaProducerWrapper (#228)
1 parent 39535d4 commit 5788999

File tree

5 files changed

+103
-11
lines changed

5 files changed

+103
-11
lines changed

src/Clients/Producer/KafkaProducer.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use const RD_KAFKA_PARTITION_UA;
1616
use const RD_KAFKA_RESP_ERR_NO_ERROR;
1717

18+
/** @deprecated Use {@see \SimPod\Kafka\Clients\Producer\KafkaProducerWrapper} instead */
1819
class KafkaProducer extends Producer
1920
{
2021
// phpcs:disable Cdn77.NamingConventions.ValidConstantName.ClassConstantNotUpperCase
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace SimPod\Kafka\Clients\Producer;
6+
7+
use Closure;
8+
use InvalidArgumentException;
9+
use RdKafka\Producer;
10+
use RuntimeException;
11+
12+
use function assert;
13+
use function sprintf;
14+
15+
use const RD_KAFKA_PARTITION_UA;
16+
use const RD_KAFKA_RESP_ERR_NO_ERROR;
17+
18+
final readonly class KafkaProducerWrapper
19+
{
20+
private const int RdKafkaMsgFCopy = 0;
21+
22+
public Producer $producer;
23+
24+
/** @var (Closure(self):void)|null */
25+
private Closure|null $exitCallback;
26+
27+
/** @param (Closure(self):void)|null $exitCallback */
28+
public function __construct(ProducerConfig $config, callable|null $exitCallback = null)
29+
{
30+
$this->exitCallback = $exitCallback;
31+
$this->producer = new Producer($config->getConf());
32+
}
33+
34+
public function __destruct()
35+
{
36+
if ($this->exitCallback === null) {
37+
return;
38+
}
39+
40+
($this->exitCallback)($this);
41+
}
42+
43+
/** @param array<string, string>|null $headers */
44+
public function produce(
45+
string $topicName,
46+
int|null $partition,
47+
string $value,
48+
string|null $key = null,
49+
array|null $headers = null,
50+
int|null $timestampMs = null,
51+
): void {
52+
if ($partition < 0) {
53+
throw new InvalidArgumentException(
54+
sprintf('Invalid partition: %d. Partition number should always be non-negative or null.', $partition),
55+
);
56+
}
57+
58+
$topic = $this->producer->newTopic($topicName);
59+
$topic->producev(
60+
$partition ?? RD_KAFKA_PARTITION_UA,
61+
self::RdKafkaMsgFCopy,
62+
$value,
63+
$key,
64+
$headers,
65+
$timestampMs ?? 0,
66+
);
67+
$this->producer->poll(0);
68+
}
69+
70+
public function flushMessages(int $timeoutMs = 10000): void
71+
{
72+
$result = null;
73+
for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
74+
$result = $this->producer->flush($timeoutMs);
75+
if ($result === RD_KAFKA_RESP_ERR_NO_ERROR) {
76+
break;
77+
}
78+
}
79+
80+
assert($result !== null);
81+
82+
if ($result !== RD_KAFKA_RESP_ERR_NO_ERROR) {
83+
throw new RuntimeException('Was unable to flush, messages might be lost!', $result);
84+
}
85+
}
86+
}

tests/Clients/Consumer/TestProducer.php renamed to tests/Clients/Consumer/Fixture/TestProducer.php

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,22 @@
22

33
declare(strict_types=1);
44

5-
namespace SimPod\Kafka\Tests\Clients\Consumer;
5+
namespace SimPod\Kafka\Tests\Clients\Consumer\Fixture;
66

7-
use SimPod\Kafka\Clients\Producer\KafkaProducer;
7+
use SimPod\Kafka\Clients\Producer\KafkaProducerWrapper;
88
use SimPod\Kafka\Clients\Producer\ProducerConfig;
99

1010
use function Safe\gethostname;
1111

1212
final class TestProducer
1313
{
14-
private KafkaProducer $producer;
14+
private KafkaProducerWrapper $producer;
1515

1616
public function __construct()
1717
{
18-
$this->producer = new KafkaProducer(
18+
$this->producer = new KafkaProducerWrapper(
1919
$this->getConfig(),
20-
static function (KafkaProducer $producer): void {
20+
static function (KafkaProducerWrapper $producer): void {
2121
$producer->flushMessages(5000);
2222
},
2323
);

tests/Clients/Consumer/KafkaBatchConsumerTest.php

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,35 +4,40 @@
44

55
namespace SimPod\Kafka\Tests\Clients\Consumer;
66

7+
use PHPUnit\Framework\Attributes\CoversClass;
78
use PHPUnit\Framework\TestCase;
89
use RdKafka\Message;
910
use SimPod\Kafka\Clients\Consumer\ConsumerConfig;
1011
use SimPod\Kafka\Clients\Consumer\ConsumerRecords;
1112
use SimPod\Kafka\Clients\Consumer\KafkaConsumer;
13+
use SimPod\Kafka\Clients\Producer\KafkaProducerWrapper;
14+
use SimPod\Kafka\Tests\Clients\Consumer\Fixture\TestProducer;
1215

1316
use function mt_rand;
1417
use function Safe\gethostname;
1518

19+
#[CoversClass(KafkaConsumer::class)]
20+
#[CoversClass(KafkaProducerWrapper::class)]
1621
final class KafkaBatchConsumerTest extends TestCase
1722
{
18-
public const string PAYLOAD = 'Tasty, chilled pudding is best flavored with juicy lime.';
19-
public const string TOPIC = 'kafka-batch-consumer';
23+
public const string Payload = 'Tasty, chilled pudding is best flavored with juicy lime.';
24+
public const string Topic = 'kafka-batch-consumer';
2025

2126
public function testMaxBatchSize(): void
2227
{
2328
$testProducer = new TestProducer();
2429
for ($i = 0; $i < 100; $i++) {
25-
$testProducer->run(self::TOPIC, self::PAYLOAD);
30+
$testProducer->run(self::Topic, self::Payload);
2631
}
2732

2833
$consumer = new KafkaConsumer($this->getConfig());
29-
$consumer->subscribe([self::TOPIC]);
34+
$consumer->subscribe([self::Topic]);
3035

3136
$consumer->startBatch(
3237
90,
3338
10000,
3439
static function (Message $message): void {
35-
self::assertSame(self::PAYLOAD, $message->payload);
40+
self::assertSame(self::Payload, $message->payload);
3641
},
3742
static function (ConsumerRecords $consumerRecords) use ($consumer): void {
3843
self::assertCount(90, $consumerRecords);

tests/Clients/Consumer/KafkaTest.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
use PHPUnit\Framework\TestCase;
99
use SimPod\Kafka\Clients\Consumer\ConsumerConfig;
1010
use SimPod\Kafka\Clients\Consumer\KafkaConsumer;
11-
use SimPod\Kafka\Tests\Clients\Consumer\TestProducer;
11+
use SimPod\Kafka\Tests\Clients\Consumer\Fixture\TestProducer;
1212

1313
use function mt_rand;
1414
use function Safe\gethostname;

0 commit comments

Comments
 (0)