diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go index d3f8545f..17e04c67 100644 --- a/consumer/push_consumer.go +++ b/consumer/push_consumer.go @@ -1037,8 +1037,10 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *primitive. if !pq.IsDroppd() { msgBackFailed := make([]*primitive.MessageExt, 0) + msgBackSucceed := make([]*primitive.MessageExt, 0) if result == ConsumeSuccess { pc.stat.increaseConsumeOKTPS(pc.consumerGroup, mq.Topic, len(subMsgs)) + msgBackSucceed = subMsgs } else { pc.stat.increaseConsumeFailedTPS(pc.consumerGroup, mq.Topic, len(subMsgs)) if pc.model == BroadCasting { @@ -1050,7 +1052,9 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *primitive. } else { for i := 0; i < len(subMsgs); i++ { msg := subMsgs[i] - if !pc.sendMessageBack(mq.BrokerName, msg, concurrentCtx.DelayLevelWhenNextConsume) { + if pc.sendMessageBack(mq.BrokerName, msg, concurrentCtx.DelayLevelWhenNextConsume) { + msgBackSucceed = append(msgBackSucceed, msg) + } else { msg.ReconsumeTimes += 1 msgBackFailed = append(msgBackFailed, msg) } @@ -1058,7 +1062,7 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *primitive. } } - offset := pq.removeMessage(subMsgs...) + offset := pq.removeMessage(msgBackSucceed...) if offset >= 0 && !pq.IsDroppd() { pc.storage.update(mq, int64(offset), true)