Skip to content

Commit 74dabab

Browse files
committed
feat(consumer): add a way to stop consumer
1 parent 9c2f544 commit 74dabab

File tree

2 files changed

+16
-6
lines changed

2 files changed

+16
-6
lines changed

src/Clients/Consumer/KafkaConsumer.php

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,11 @@ public function shutdown(): void
146146
{
147147
$this->logger->debug('Shutting down');
148148

149+
$this->stop();
150+
}
151+
152+
public function stop(): void
153+
{
149154
$this->shouldRun = false;
150155
}
151156

@@ -160,7 +165,9 @@ private function doStart(
160165
callable|null $onPartitionEof = null,
161166
callable|null $onTimedOut = null,
162167
): void {
163-
$this->registerSignals($this->shouldRun);
168+
$this->shouldRun = true;
169+
$terminationCallback = fn () => $this->shouldRun = false;
170+
$this->registerSignals($terminationCallback);
164171

165172
while ($this->shouldRun) {
166173
$message = $this->consume($timeoutMs);

src/Clients/Consumer/WithSignalControl.php

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
namespace SimPod\Kafka\Clients\Consumer;
66

7+
use Safe\Exceptions\PcntlException;
8+
79
use function Safe\pcntl_signal;
810
use function Safe\pcntl_sigprocmask;
911

@@ -22,12 +24,13 @@ private function setupInternalTerminationSignal(ConsumerConfig $config): void
2224
$config->set('internal.termination.signal', SIGIO);
2325
}
2426

25-
private function registerSignals(bool &$shouldRun): void
27+
/**
28+
* @param callable():mixed $terminationCallback
29+
*
30+
* @throws PcntlException
31+
*/
32+
private function registerSignals(callable $terminationCallback): void
2633
{
27-
$terminationCallback = static function () use (&$shouldRun): void {
28-
$shouldRun = false;
29-
};
30-
3134
pcntl_signal(SIGTERM, $terminationCallback);
3235
pcntl_signal(SIGINT, $terminationCallback);
3336
pcntl_signal(SIGHUP, $terminationCallback);

0 commit comments

Comments
 (0)