Skip to content

Commit c6a3148

Browse files
authored
[ISSUE #615]putmessage should check msg in consumingMsgOrderlyTreeMap
close #615
1 parent 6cd3181 commit c6a3148

File tree

1 file changed

+4
-0
lines changed

1 file changed

+4
-0
lines changed

consumer/process_queue.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,10 @@ func (pq *processQueue) putMessage(messages ...*primitive.MessageExt) {
9898
if found {
9999
continue
100100
}
101+
_, found = pq.consumingMsgOrderlyTreeMap.Get(msg.QueueOffset)
102+
if found {
103+
continue
104+
}
101105
pq.msgCache.Put(msg.QueueOffset, msg)
102106
validMessageCount++
103107
pq.queueOffsetMax = msg.QueueOffset

0 commit comments

Comments
 (0)