Skip to content

Commit 8afd69f

Browse files
0daypwnwuxb02
andauthored
[ISSUE #927] fix processQueue remove offset (#928)
Co-authored-by: wuxb02 <wuxb02@mingyuanyun.com>
1 parent c197b50 commit 8afd69f

File tree

1 file changed

+7
-10
lines changed

1 file changed

+7
-10
lines changed

consumer/process_queue.go

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -99,16 +99,7 @@ func (pq *processQueue) putMessage(messages ...*primitive.MessageExt) {
9999
if pq.IsDroppd() {
100100
return
101101
}
102-
if !pq.order {
103-
select {
104-
case <-pq.closeChan:
105-
return
106-
case pq.msgCh <- messages:
107-
}
108-
}
109-
110102
pq.mutex.Lock()
111-
112103
validMessageCount := 0
113104
for idx := range messages {
114105
msg := messages[idx]
@@ -126,9 +117,15 @@ func (pq *processQueue) putMessage(messages ...*primitive.MessageExt) {
126117

127118
pq.cachedMsgSize.Add(int64(len(msg.Body)))
128119
}
129-
130120
pq.cachedMsgCount.Add(int64(validMessageCount))
131121
pq.mutex.Unlock()
122+
if !pq.order {
123+
select {
124+
case <-pq.closeChan:
125+
return
126+
case pq.msgCh <- messages:
127+
}
128+
}
132129

133130
if pq.cachedMsgCount.Load() > 0 && !pq.consuming {
134131
pq.consuming = true

0 commit comments

Comments
 (0)