Skip to content

Commit a8b10ce

Browse files
authored
[offset] Optimize the update offset logic (#732)
Signed-off-by: zhangyang21 <zhangyang21@xiaomi.com>
1 parent 673760e commit a8b10ce

File tree

1 file changed

+4
-5
lines changed

1 file changed

+4
-5
lines changed

consumer/push_consumer.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -764,10 +764,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
764764
"prevRequestOffset": prevRequestOffset,
765765
})
766766
}
767-
case primitive.PullNoNewMsg:
768-
rlog.Debug(fmt.Sprintf("Topic: %s, QueueId: %d no more msg, current offset: %d, next offset: %d",
769-
request.mq.Topic, request.mq.QueueId, pullRequest.QueueOffset, result.NextBeginOffset), nil)
770-
case primitive.PullNoMsgMatched:
767+
case primitive.PullNoNewMsg, primitive.PullNoMsgMatched:
771768
request.nextOffset = result.NextBeginOffset
772769
pc.correctTagsOffset(request)
773770
case primitive.PullOffsetIllegal:
@@ -790,7 +787,9 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
790787
}
791788

792789
func (pc *pushConsumer) correctTagsOffset(pr *PullRequest) {
793-
// TODO
790+
if pr.pq.cachedMsgCount <= 0 {
791+
pc.storage.update(pr.mq, pr.nextOffset, true)
792+
}
794793
}
795794

796795
func (pc *pushConsumer) sendMessageBack(brokerName string, msg *primitive.MessageExt, delayLevel int) bool {

0 commit comments

Comments
 (0)