diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index f611178907..163db82ce3 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -172,6 +172,7 @@ * @author Christian Fredriksson * @author Timofey Barabanov * @author Janek Lasocki-Biczysko + * @author Su Ko */ public class KafkaMessageListenerContainer // NOSONAR line count extends AbstractMessageListenerContainer implements ConsumerPauseResumeEventPublisher { @@ -843,6 +844,9 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume private @Nullable ConsumerRecords remainingRecords; + // use to fix tx marker + private @Nullable ConsumerRecords lastRecords; + private boolean pauseForPending; private boolean firstPoll; @@ -1530,9 +1534,10 @@ private void doProcessCommits() { } private void invokeIfHaveRecords(@Nullable ConsumerRecords records) { + saveLastRecordsIfNeeded(records); + if (records != null && records.count() > 0) { this.receivedSome = true; - savePositionsIfNeeded(records); notIdle(); notIdlePartitions(records.partitions()); invokeListener(records); @@ -1604,50 +1609,69 @@ private void notIdle() { } } - private void savePositionsIfNeeded(ConsumerRecords records) { - if (this.fixTxOffsets) { - this.savedPositions.clear(); - records.partitions().forEach(tp -> this.savedPositions.put(tp, this.consumer.position(tp))); + private void saveLastRecordsIfNeeded(@Nullable ConsumerRecords records) { + if (this.fixTxOffsets && records != null && !records.nextOffsets().isEmpty()) { + this.lastRecords = records; } } + /** + * Fix transactional offsets using ConsumerRecords#nextOffsets() API. + * This method addresses the issue where Kafka transaction markers can cause + * incorrect offset tracking. By using nextOffsets() instead of position(), + * we ensure: + * - Transaction markers are automatically filtered out + * - Partition-leader epoch information is correctly captured + * - Empty poll() cases are properly handled + */ private void fixTxOffsetsIfNeeded() { - if (this.fixTxOffsets) { - try { - Map toFix = new HashMap<>(); - this.lastCommits.forEach((tp, oamd) -> { - long position = this.consumer.position(tp); - Long saved = this.savedPositions.get(tp); - if (saved != null && saved != position) { - this.logger.debug(() -> "Skipping TX offset correction - seek(s) have been performed; " - + "saved: " + this.savedPositions + ", " - + "committed: " + oamd + ", " - + "current: " + tp + "@" + position); - return; - } - if (position > oamd.offset()) { - toFix.put(tp, createOffsetAndMetadata(position)); - } - }); - if (!toFix.isEmpty()) { - this.logger.debug(() -> "Fixing TX offsets: " + toFix); - if (this.kafkaTxManager == null) { - commitOffsets(toFix); - } - else { - Objects.requireNonNull(this.transactionTemplate).executeWithoutResult( - status -> doSendOffsets(getTxProducer(), toFix)); - } - } - } - catch (Exception e) { - this.logger.error(e, () -> "Failed to correct transactional offset(s): " - + ListenerConsumer.this.lastCommits); + if (!this.fixTxOffsets) { + return; + } + + try { + if (this.lastRecords == null) { + this.logger.trace(() -> "No previous records available for TX offset fix."); + return; } - finally { + + Map nextOffsets = this.lastRecords.nextOffsets(); + Map toFix = new HashMap<>(); + + // Fix offset only if records were actually processed + nextOffsets.forEach((tp, nextOffset) -> { + OffsetAndMetadata committed = this.lastCommits.get(tp); + + if (committed == null) { + this.logger.debug(() -> "No committed offset for " + tp + "; skipping TX offset fix."); + return; + } + + // Only fix if we have processed records and the next offset is greater than committed + if (this.lastRecords.nextOffsets().get(tp) != null && nextOffset.offset() > committed.offset()) { + toFix.put(tp, nextOffset); + } + }); + + if (!toFix.isEmpty()) { + this.logger.debug(() -> + String.format("Fixing TX offsets for %d partitions: %s", toFix.size(), toFix)); + if (this.kafkaTxManager == null) { + commitOffsets(toFix); + } + else { + Objects.requireNonNull(this.transactionTemplate) + .executeWithoutResult(status -> doSendOffsets(getTxProducer(), toFix)); + } + ListenerConsumer.this.lastCommits.clear(); + this.lastRecords = null; } } + catch (Exception e) { + this.logger.error(e, () -> "Failed to correct transactional offset(s): " + + ListenerConsumer.this.lastCommits); + } } private @Nullable ConsumerRecords doPoll() { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java index 92f175b39b..93ec673afe 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java @@ -116,7 +116,9 @@ TransactionalContainerTests.topic3, TransactionalContainerTests.topic3DLT, TransactionalContainerTests.topic4, TransactionalContainerTests.topic5, TransactionalContainerTests.topic6, TransactionalContainerTests.topic7, TransactionalContainerTests.topic8, TransactionalContainerTests.topic8DLT, TransactionalContainerTests.topic9, - TransactionalContainerTests.topic10}, + TransactionalContainerTests.topic10, TransactionalContainerTests.topic11, TransactionalContainerTests.topic12, + TransactionalContainerTests.topic13, TransactionalContainerTests.topic14, TransactionalContainerTests.topic15 +}, brokerProperties = { "transaction.state.log.replication.factor=1", "transaction.state.log.min.isr=1" }) public class TransactionalContainerTests { @@ -146,6 +148,14 @@ public class TransactionalContainerTests { public static final String topic11 = "txTopic11"; + public static final String topic12 = "txTopic12"; + + public static final String topic13 = "txTopic13"; + + public static final String topic14 = "txTopic14"; + + public static final String topic15 = "txTopic15"; + private static EmbeddedKafkaBroker embeddedKafka; @BeforeAll @@ -1232,4 +1242,216 @@ public void afterRecord( container.stop(); pf.destroy(); } + + @SuppressWarnings("unchecked") + @Test + void testFixTxOffsetsWithReadUncommitted() throws Exception { + Map props = KafkaTestUtils.consumerProps(embeddedKafka, "txReadUncommittedTest", false); + props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_uncommitted"); + DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); + ContainerProperties containerProps = new ContainerProperties(topic12); + containerProps.setGroupId("txReadUncommittedTest"); + containerProps.setPollTimeout(500L); + containerProps.setIdleEventInterval(500L); + containerProps.setFixTxOffsets(true); + + Map senderProps = KafkaTestUtils.producerProps(embeddedKafka); + DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>(senderProps); + pf.setTransactionIdPrefix("readUncommitted."); + + final KafkaTemplate template = new KafkaTemplate<>(pf); + final AtomicInteger messageCount = new AtomicInteger(); + final CountDownLatch latch = new CountDownLatch(1); + containerProps.setMessageListener((MessageListener) message -> { + messageCount.incrementAndGet(); + latch.countDown(); + }); + + @SuppressWarnings({ "rawtypes" }) + KafkaTransactionManager tm = new KafkaTransactionManager(pf); + containerProps.setKafkaAwareTransactionManager(tm); + KafkaMessageListenerContainer container = + new KafkaMessageListenerContainer<>(cf, containerProps); + container.setBeanName("testFixTxOffsetsWithReadUncommitted"); + + AtomicReference> committed = new AtomicReference<>(); + CountDownLatch idleLatch = new CountDownLatch(1); + container.setApplicationEventPublisher(event -> { + if (event instanceof ListenerContainerIdleEvent) { + Consumer consumer = ((ListenerContainerIdleEvent) event).getConsumer(); + committed.set(consumer.committed( + Collections.singleton(new TopicPartition(topic12, 0)))); + idleLatch.countDown(); + } + }); + + container.start(); + + template.setDefaultTopic(topic12); + template.executeInTransaction(t -> { + template.sendDefault(0, 0, "msg1"); + template.sendDefault(0, 0, "msg2"); + template.sendDefault(0, 0, "msg3"); + return null; + }); + + assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue(); + assertThat(idleLatch.await(60, TimeUnit.SECONDS)).isTrue(); + + assertThat(messageCount.get()).isGreaterThanOrEqualTo(3); + TopicPartition partition0 = new TopicPartition(topic12, 0); + assertThat(committed.get().get(partition0)).isNotNull(); + + // 0 1 2 3(tx marker) => next offset 4 + assertThat(committed.get().get(partition0).offset()).isGreaterThanOrEqualTo(4L); + + container.stop(); + pf.destroy(); + } + + @Test + void testFixTxOffsetsWithEmptyPollAdvance() throws Exception { + Map props = KafkaTestUtils.consumerProps(embeddedKafka, "txEmptyPoll", false); + props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); + props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1); + DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); + + ContainerProperties containerProps = new ContainerProperties(topic13); + containerProps.setGroupId("txEmptyPoll"); + containerProps.setPollTimeout(500L); + containerProps.setFixTxOffsets(true); + containerProps.setIdleEventInterval(1000L); + + containerProps.setMessageListener((MessageListener) rec -> { + }); + + DefaultKafkaProducerFactory pf = + new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(embeddedKafka)); + pf.setTransactionIdPrefix("tx.emptyPoll."); + KafkaTemplate template = new KafkaTemplate<>(pf); + + KafkaMessageListenerContainer container = + new KafkaMessageListenerContainer<>(cf, containerProps); + container.setBeanName("testFixEmptyPoll"); + + AtomicReference> committed = new AtomicReference<>(); + CountDownLatch latch = new CountDownLatch(1); + + container.setApplicationEventPublisher(event -> { + if (event instanceof ListenerContainerIdleEvent e) { + TopicPartition tp = new TopicPartition(topic13, 0); + committed.set(e.getConsumer().committed(Set.of(tp))); + latch.countDown(); + } + }); + + container.start(); + + template.setDefaultTopic(topic13); + template.executeInTransaction(t -> { + template.sendDefault(0, 0, "msg1"); + template.sendDefault(0, 0, "msg2"); + template.sendDefault(0, 0, "msg3"); + return null; + }); + + assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue(); + assertThat(committed.get().get(new TopicPartition(topic13, 0)).offset()) + .isEqualTo(4L); + container.stop(); + } + + @Test + void testFixTxOffsetsRetainsLeaderEpoch() throws Exception { + Map props = KafkaTestUtils.consumerProps(embeddedKafka, "txLeaderEpoch", false); + props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); + DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); + + ContainerProperties containerProps = new ContainerProperties(topic14); + containerProps.setFixTxOffsets(true); + containerProps.setIdleEventInterval(1000L); + + containerProps.setMessageListener((MessageListener) rec -> { + }); + + DefaultKafkaProducerFactory pf = + new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(embeddedKafka)); + pf.setTransactionIdPrefix("tx.leaderEpoch."); + KafkaTemplate template = new KafkaTemplate<>(pf); + + KafkaMessageListenerContainer container = + new KafkaMessageListenerContainer<>(cf, containerProps); + + AtomicReference committed = new AtomicReference<>(); + CountDownLatch latch = new CountDownLatch(1); + + container.setApplicationEventPublisher(event -> { + if (event instanceof ListenerContainerIdleEvent e) { + TopicPartition tp = new TopicPartition(topic14, 0); + committed.set(e.getConsumer().committed(Set.of(tp)).get(tp)); + latch.countDown(); + } + }); + + container.start(); + + template.setDefaultTopic(topic14); + template.executeInTransaction(t -> { + template.sendDefault(0, 0, "data"); + return null; + }); + + assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue(); + assertThat(committed.get().leaderEpoch().isPresent()).isTrue(); + container.stop(); + } + + @Test + void testFixLagWhenMaxPollEqualsTxBatchSize() throws Exception { + Map props = KafkaTestUtils.consumerProps(embeddedKafka, "txTestPollLimit", false); + props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); + props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 3); + DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); + + ContainerProperties containerProps = new ContainerProperties(topic15); + containerProps.setGroupId("txTestPollLimit"); + containerProps.setPollTimeout(500L); + containerProps.setFixTxOffsets(true); + containerProps.setIdleEventInterval(1000L); + containerProps.setMessageListener((MessageListener) rec -> { + }); + + DefaultKafkaProducerFactory pf = + new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(embeddedKafka)); + pf.setTransactionIdPrefix("tx.polllimit."); + + KafkaTemplate template = new KafkaTemplate<>(pf); + KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, containerProps); + container.setBeanName("testFixLagPollLimit"); + + AtomicReference> committed = new AtomicReference<>(); + CountDownLatch latch = new CountDownLatch(1); + + container.setApplicationEventPublisher(event -> { + if (event instanceof ListenerContainerIdleEvent e) { + committed.set(e.getConsumer().committed(Set.of(new TopicPartition(topic15, 0)))); + latch.countDown(); + } + }); + + container.start(); + + template.setDefaultTopic(topic15); + template.executeInTransaction(t -> { + template.sendDefault(0, 0, "msg1"); + template.sendDefault(0, 0, "msg2"); + template.sendDefault(0, 0, "msg3"); + return null; + }); + + assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue(); + assertThat(committed.get().get(new TopicPartition(topic15, 0)).offset()).isEqualTo(4L); + container.stop(); + } + }