Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Fix msg lost if consumer crash when send msg back failed.
  • Loading branch information
ShannonDing committed Jul 21, 2022
commit b212ff29e562c3d5ef1c10f728535eb4f1fe7570
8 changes: 6 additions & 2 deletions consumer/push_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1037,8 +1037,10 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *primitive.

if !pq.IsDroppd() {
msgBackFailed := make([]*primitive.MessageExt, 0)
msgBackSucceed := make([]*primitive.MessageExt, 0)
if result == ConsumeSuccess {
pc.stat.increaseConsumeOKTPS(pc.consumerGroup, mq.Topic, len(subMsgs))
msgBackSucceed = subMsgs
} else {
pc.stat.increaseConsumeFailedTPS(pc.consumerGroup, mq.Topic, len(subMsgs))
if pc.model == BroadCasting {
Expand All @@ -1050,15 +1052,17 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *primitive.
} else {
for i := 0; i < len(subMsgs); i++ {
msg := subMsgs[i]
if !pc.sendMessageBack(mq.BrokerName, msg, concurrentCtx.DelayLevelWhenNextConsume) {
if pc.sendMessageBack(mq.BrokerName, msg, concurrentCtx.DelayLevelWhenNextConsume) {
msgBackSucceed = append(msgBackSucceed, msg)
} else {
msg.ReconsumeTimes += 1
msgBackFailed = append(msgBackFailed, msg)
}
}
}
}

offset := pq.removeMessage(subMsgs...)
offset := pq.removeMessage(msgBackSucceed...)

if offset >= 0 && !pq.IsDroppd() {
pc.storage.update(mq, int64(offset), true)
Expand Down