Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@
* @author Christian Fredriksson
* @author Timofey Barabanov
* @author Janek Lasocki-Biczysko
* @author Su Ko
*/
public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
extends AbstractMessageListenerContainer<K, V> implements ConsumerPauseResumeEventPublisher {
Expand Down Expand Up @@ -843,6 +844,9 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume

private @Nullable ConsumerRecords<K, V> remainingRecords;

// use to fix tx marker
private @Nullable ConsumerRecords<K, V> lastRecords;

private boolean pauseForPending;

private boolean firstPoll;
Expand Down Expand Up @@ -1530,9 +1534,10 @@ private void doProcessCommits() {
}

private void invokeIfHaveRecords(@Nullable ConsumerRecords<K, V> records) {
saveLastRecordsIfNeeded(records);

if (records != null && records.count() > 0) {
this.receivedSome = true;
savePositionsIfNeeded(records);
notIdle();
notIdlePartitions(records.partitions());
invokeListener(records);
Expand Down Expand Up @@ -1604,50 +1609,69 @@ private void notIdle() {
}
}

private void savePositionsIfNeeded(ConsumerRecords<K, V> records) {
if (this.fixTxOffsets) {
this.savedPositions.clear();
records.partitions().forEach(tp -> this.savedPositions.put(tp, this.consumer.position(tp)));
private void saveLastRecordsIfNeeded(@Nullable ConsumerRecords<K, V> records) {
if (this.fixTxOffsets && records != null && !records.nextOffsets().isEmpty()) {
this.lastRecords = records;
}
}

/**
* Fix transactional offsets using ConsumerRecords#nextOffsets() API.
* This method addresses the issue where Kafka transaction markers can cause
* incorrect offset tracking. By using nextOffsets() instead of position(),
* we ensure:
* - Transaction markers are automatically filtered out
* - Partition-leader epoch information is correctly captured
* - Empty poll() cases are properly handled
*/
private void fixTxOffsetsIfNeeded() {
if (this.fixTxOffsets) {
try {
Map<TopicPartition, OffsetAndMetadata> toFix = new HashMap<>();
this.lastCommits.forEach((tp, oamd) -> {
long position = this.consumer.position(tp);
Long saved = this.savedPositions.get(tp);
if (saved != null && saved != position) {
this.logger.debug(() -> "Skipping TX offset correction - seek(s) have been performed; "
+ "saved: " + this.savedPositions + ", "
+ "committed: " + oamd + ", "
+ "current: " + tp + "@" + position);
return;
}
if (position > oamd.offset()) {
toFix.put(tp, createOffsetAndMetadata(position));
}
});
if (!toFix.isEmpty()) {
this.logger.debug(() -> "Fixing TX offsets: " + toFix);
if (this.kafkaTxManager == null) {
commitOffsets(toFix);
}
else {
Objects.requireNonNull(this.transactionTemplate).executeWithoutResult(
status -> doSendOffsets(getTxProducer(), toFix));
}
}
}
catch (Exception e) {
this.logger.error(e, () -> "Failed to correct transactional offset(s): "
+ ListenerConsumer.this.lastCommits);
if (!this.fixTxOffsets) {
return;
}

try {
if (this.lastRecords == null) {
this.logger.trace(() -> "No previous records available for TX offset fix.");
return;
}
finally {

Map<TopicPartition, OffsetAndMetadata> nextOffsets = this.lastRecords.nextOffsets();
Map<TopicPartition, OffsetAndMetadata> toFix = new HashMap<>();

// Fix offset only if records were actually processed
nextOffsets.forEach((tp, nextOffset) -> {
OffsetAndMetadata committed = this.lastCommits.get(tp);

if (committed == null) {
this.logger.debug(() -> "No committed offset for " + tp + "; skipping TX offset fix.");
return;
}

// Only fix if we have processed records and the next offset is greater than committed
if (this.lastRecords.nextOffsets().get(tp) != null && nextOffset.offset() > committed.offset()) {
toFix.put(tp, nextOffset);
}
});

if (!toFix.isEmpty()) {
this.logger.debug(() ->
String.format("Fixing TX offsets for %d partitions: %s", toFix.size(), toFix));
if (this.kafkaTxManager == null) {
commitOffsets(toFix);
}
else {
Objects.requireNonNull(this.transactionTemplate)
.executeWithoutResult(status -> doSendOffsets(getTxProducer(), toFix));
}

ListenerConsumer.this.lastCommits.clear();
this.lastRecords = null;
}
}
catch (Exception e) {
this.logger.error(e, () -> "Failed to correct transactional offset(s): "
+ ListenerConsumer.this.lastCommits);
}
}

private @Nullable ConsumerRecords<K, V> doPoll() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,9 @@
TransactionalContainerTests.topic3, TransactionalContainerTests.topic3DLT, TransactionalContainerTests.topic4,
TransactionalContainerTests.topic5, TransactionalContainerTests.topic6, TransactionalContainerTests.topic7,
TransactionalContainerTests.topic8, TransactionalContainerTests.topic8DLT, TransactionalContainerTests.topic9,
TransactionalContainerTests.topic10},
TransactionalContainerTests.topic10, TransactionalContainerTests.topic11, TransactionalContainerTests.topic12,
TransactionalContainerTests.topic13, TransactionalContainerTests.topic14, TransactionalContainerTests.topic15
},
brokerProperties = { "transaction.state.log.replication.factor=1", "transaction.state.log.min.isr=1" })
public class TransactionalContainerTests {

Expand Down Expand Up @@ -146,6 +148,14 @@ public class TransactionalContainerTests {

public static final String topic11 = "txTopic11";

public static final String topic12 = "txTopic12";

public static final String topic13 = "txTopic13";

public static final String topic14 = "txTopic14";

public static final String topic15 = "txTopic15";

private static EmbeddedKafkaBroker embeddedKafka;

@BeforeAll
Expand Down Expand Up @@ -1232,4 +1242,216 @@ public void afterRecord(
container.stop();
pf.destroy();
}

@SuppressWarnings("unchecked")
@Test
void testFixTxOffsetsWithReadUncommitted() throws Exception {
Map<String, Object> props = KafkaTestUtils.consumerProps(embeddedKafka, "txReadUncommittedTest", false);
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_uncommitted");
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
ContainerProperties containerProps = new ContainerProperties(topic12);
containerProps.setGroupId("txReadUncommittedTest");
containerProps.setPollTimeout(500L);
containerProps.setIdleEventInterval(500L);
containerProps.setFixTxOffsets(true);

Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
pf.setTransactionIdPrefix("readUncommitted.");

final KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
final AtomicInteger messageCount = new AtomicInteger();
final CountDownLatch latch = new CountDownLatch(1);
containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
messageCount.incrementAndGet();
latch.countDown();
});

@SuppressWarnings({ "rawtypes" })
KafkaTransactionManager tm = new KafkaTransactionManager(pf);
containerProps.setKafkaAwareTransactionManager(tm);
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
container.setBeanName("testFixTxOffsetsWithReadUncommitted");

AtomicReference<Map<TopicPartition, OffsetAndMetadata>> committed = new AtomicReference<>();
CountDownLatch idleLatch = new CountDownLatch(1);
container.setApplicationEventPublisher(event -> {
if (event instanceof ListenerContainerIdleEvent) {
Consumer<?, ?> consumer = ((ListenerContainerIdleEvent) event).getConsumer();
committed.set(consumer.committed(
Collections.singleton(new TopicPartition(topic12, 0))));
idleLatch.countDown();
}
});

container.start();

template.setDefaultTopic(topic12);
template.executeInTransaction(t -> {
template.sendDefault(0, 0, "msg1");
template.sendDefault(0, 0, "msg2");
template.sendDefault(0, 0, "msg3");
return null;
});

assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
assertThat(idleLatch.await(60, TimeUnit.SECONDS)).isTrue();

assertThat(messageCount.get()).isGreaterThanOrEqualTo(3);
TopicPartition partition0 = new TopicPartition(topic12, 0);
assertThat(committed.get().get(partition0)).isNotNull();

// 0 1 2 3(tx marker) => next offset 4
assertThat(committed.get().get(partition0).offset()).isGreaterThanOrEqualTo(4L);

container.stop();
pf.destroy();
}

@Test
void testFixTxOffsetsWithEmptyPollAdvance() throws Exception {
Map<String, Object> props = KafkaTestUtils.consumerProps(embeddedKafka, "txEmptyPoll", false);
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);

ContainerProperties containerProps = new ContainerProperties(topic13);
containerProps.setGroupId("txEmptyPoll");
containerProps.setPollTimeout(500L);
containerProps.setFixTxOffsets(true);
containerProps.setIdleEventInterval(1000L);

containerProps.setMessageListener((MessageListener<Integer, String>) rec -> {
});

DefaultKafkaProducerFactory<Integer, String> pf =
new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(embeddedKafka));
pf.setTransactionIdPrefix("tx.emptyPoll.");
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);

KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
container.setBeanName("testFixEmptyPoll");

AtomicReference<Map<TopicPartition, OffsetAndMetadata>> committed = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);

container.setApplicationEventPublisher(event -> {
if (event instanceof ListenerContainerIdleEvent e) {
TopicPartition tp = new TopicPartition(topic13, 0);
committed.set(e.getConsumer().committed(Set.of(tp)));
latch.countDown();
}
});

container.start();

template.setDefaultTopic(topic13);
template.executeInTransaction(t -> {
template.sendDefault(0, 0, "msg1");
template.sendDefault(0, 0, "msg2");
template.sendDefault(0, 0, "msg3");
return null;
});

assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
assertThat(committed.get().get(new TopicPartition(topic13, 0)).offset())
.isEqualTo(4L);
container.stop();
}

@Test
void testFixTxOffsetsRetainsLeaderEpoch() throws Exception {
Map<String, Object> props = KafkaTestUtils.consumerProps(embeddedKafka, "txLeaderEpoch", false);
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);

ContainerProperties containerProps = new ContainerProperties(topic14);
containerProps.setFixTxOffsets(true);
containerProps.setIdleEventInterval(1000L);

containerProps.setMessageListener((MessageListener<Integer, String>) rec -> {
});

DefaultKafkaProducerFactory<Integer, String> pf =
new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(embeddedKafka));
pf.setTransactionIdPrefix("tx.leaderEpoch.");
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);

KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);

AtomicReference<OffsetAndMetadata> committed = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);

container.setApplicationEventPublisher(event -> {
if (event instanceof ListenerContainerIdleEvent e) {
TopicPartition tp = new TopicPartition(topic14, 0);
committed.set(e.getConsumer().committed(Set.of(tp)).get(tp));
latch.countDown();
}
});

container.start();

template.setDefaultTopic(topic14);
template.executeInTransaction(t -> {
template.sendDefault(0, 0, "data");
return null;
});

assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();
assertThat(committed.get().leaderEpoch().isPresent()).isTrue();
container.stop();
}

@Test
void testFixLagWhenMaxPollEqualsTxBatchSize() throws Exception {
Map<String, Object> props = KafkaTestUtils.consumerProps(embeddedKafka, "txTestPollLimit", false);
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 3);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);

ContainerProperties containerProps = new ContainerProperties(topic15);
containerProps.setGroupId("txTestPollLimit");
containerProps.setPollTimeout(500L);
containerProps.setFixTxOffsets(true);
containerProps.setIdleEventInterval(1000L);
containerProps.setMessageListener((MessageListener<Integer, String>) rec -> {
});

DefaultKafkaProducerFactory<Integer, String> pf =
new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(embeddedKafka));
pf.setTransactionIdPrefix("tx.polllimit.");

KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf, containerProps);
container.setBeanName("testFixLagPollLimit");

AtomicReference<Map<TopicPartition, OffsetAndMetadata>> committed = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);

container.setApplicationEventPublisher(event -> {
if (event instanceof ListenerContainerIdleEvent e) {
committed.set(e.getConsumer().committed(Set.of(new TopicPartition(topic15, 0))));
latch.countDown();
}
});

container.start();

template.setDefaultTopic(topic15);
template.executeInTransaction(t -> {
template.sendDefault(0, 0, "msg1");
template.sendDefault(0, 0, "msg2");
template.sendDefault(0, 0, "msg3");
return null;
});

assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
assertThat(committed.get().get(new TopicPartition(topic15, 0)).offset()).isEqualTo(4L);
container.stop();
}

}