Skip to content

Commit 4efd770

Browse files
authored
Fix msg lost if consumer crash when send msg back failed. (#860)
Co-authored-by: shannon.dl <shannon.dl@alibaba-inc.com>
1 parent d3be7e5 commit 4efd770

File tree

1 file changed

+6
-2
lines changed

1 file changed

+6
-2
lines changed

consumer/push_consumer.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1046,8 +1046,10 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *primitive.
10461046

10471047
if !pq.IsDroppd() {
10481048
msgBackFailed := make([]*primitive.MessageExt, 0)
1049+
msgBackSucceed := make([]*primitive.MessageExt, 0)
10491050
if result == ConsumeSuccess {
10501051
pc.stat.increaseConsumeOKTPS(pc.consumerGroup, mq.Topic, len(subMsgs))
1052+
msgBackSucceed = subMsgs
10511053
} else {
10521054
pc.stat.increaseConsumeFailedTPS(pc.consumerGroup, mq.Topic, len(subMsgs))
10531055
if pc.model == BroadCasting {
@@ -1059,15 +1061,17 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *primitive.
10591061
} else {
10601062
for i := 0; i < len(subMsgs); i++ {
10611063
msg := subMsgs[i]
1062-
if !pc.sendMessageBack(mq.BrokerName, msg, concurrentCtx.DelayLevelWhenNextConsume) {
1064+
if pc.sendMessageBack(mq.BrokerName, msg, concurrentCtx.DelayLevelWhenNextConsume) {
1065+
msgBackSucceed = append(msgBackSucceed, msg)
1066+
} else {
10631067
msg.ReconsumeTimes += 1
10641068
msgBackFailed = append(msgBackFailed, msg)
10651069
}
10661070
}
10671071
}
10681072
}
10691073

1070-
offset := pq.removeMessage(subMsgs...)
1074+
offset := pq.removeMessage(msgBackSucceed...)
10711075

10721076
if offset >= 0 && !pq.IsDroppd() {
10731077
pc.storage.update(mq, int64(offset), true)

0 commit comments

Comments
 (0)