Skip to content

Commit e413134

Browse files
committed
fix: the message may be cleaned when the message has not been consumed, because of startTime has not set already
1 parent 25003f6 commit e413134

File tree

3 files changed

+43
-9
lines changed

3 files changed

+43
-9
lines changed

consumer/process_queue.go

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -259,22 +259,32 @@ func (pq *processQueue) cleanExpiredMsg(pc *pushConsumer) {
259259
"time": startTime,
260260
rlog.LogKeyUnderlayError: err,
261261
})
262+
pq.mutex.RUnlock()
262263
continue
263264
}
264265
if time.Now().UnixNano()/1e6-st <= int64(pc.option.ConsumeTimeout/time.Millisecond) {
265266
pq.mutex.RUnlock()
266267
return
267268
}
268-
}
269-
pq.mutex.RUnlock()
270-
271-
if !pc.sendMessageBack("", msg, int(3+msg.ReconsumeTimes)) {
272-
rlog.Error("send message back to broker error when clean expired messages", map[string]interface{}{
273-
rlog.LogKeyConsumerGroup: pc.consumerGroup,
269+
rlog.Info("send expire msg back. ", map[string]interface{}{
270+
rlog.LogKeyTopic: msg.Topic,
271+
rlog.LogKeyMessageId: msg.MsgId,
272+
"startTime": startTime,
273+
rlog.LogKeyStoreHost: msg.StoreHost,
274+
rlog.LogKeyQueueId: msg.Queue.QueueId,
275+
rlog.LogKeyQueueOffset: msg.QueueOffset,
274276
})
275-
continue
277+
pq.mutex.RUnlock()
278+
if !pc.sendMessageBack("", msg, int(3+msg.ReconsumeTimes)) {
279+
rlog.Error("send message back to broker error when clean expired messages", map[string]interface{}{
280+
rlog.LogKeyConsumerGroup: pc.consumerGroup,
281+
})
282+
continue
283+
}
284+
pq.removeMessage(msg)
285+
} else {
286+
pq.mutex.RUnlock()
276287
}
277-
pq.removeMessage(msg)
278288
}
279289
}
280290

consumer/push_consumer.go

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1133,8 +1133,19 @@ func (pc *pushConsumer) consumeMessageConcurrently(pq *processQueue, mq *primiti
11331133

11341134
consumeRT := time.Now().Sub(beginTime)
11351135
if err != nil {
1136+
rlog.Warning("consumeMessageCurrently error", map[string]interface{}{
1137+
rlog.LogKeyUnderlayError: err,
1138+
rlog.LogKeyMessages: msgs,
1139+
rlog.LogKeyMessageQueue: mq,
1140+
rlog.LogKeyConsumerGroup: pc.consumerGroup,
1141+
})
11361142
msgCtx.Properties[primitive.PropCtxType] = string(primitive.ExceptionReturn)
11371143
} else if consumeRT >= pc.option.ConsumeTimeout {
1144+
rlog.Warning("consumeMessageCurrently time out", map[string]interface{}{
1145+
rlog.LogKeyMessages: msgs,
1146+
rlog.LogKeyMessageQueue: mq,
1147+
rlog.LogKeyConsumerGroup: pc.consumerGroup,
1148+
})
11381149
msgCtx.Properties[primitive.PropCtxType] = string(primitive.TimeoutReturn)
11391150
} else if result == ConsumeSuccess {
11401151
msgCtx.Properties[primitive.PropCtxType] = string(primitive.SuccessReturn)
@@ -1262,7 +1273,15 @@ func (pc *pushConsumer) consumeMessageOrderly(pq *processQueue, mq *primitive.Me
12621273
ctx = primitive.WithOrderlyCtx(ctx, orderlyCtx)
12631274

12641275
pq.lockConsume.Lock()
1265-
result, _ := pc.consumeInner(ctx, msgs)
1276+
result, err := pc.consumeInner(ctx, msgs)
1277+
if err != nil {
1278+
rlog.Warning("consumeMessage orderly error", map[string]interface{}{
1279+
rlog.LogKeyUnderlayError: err,
1280+
rlog.LogKeyMessages: msgs,
1281+
rlog.LogKeyMessageQueue: mq.String(),
1282+
rlog.LogKeyConsumerGroup: pc.consumerGroup,
1283+
})
1284+
}
12661285
pq.lockConsume.Unlock()
12671286

12681287
if result == Rollback || result == SuspendCurrentQueueAMoment {

rlog/log.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,11 @@ const (
3939
LogKeyValueChangedTo = "changeTo"
4040
LogKeyPullRequest = "PullRequest"
4141
LogKeyTimeStamp = "timestamp"
42+
LogKeyMessageId = "msgId"
43+
LogKeyStoreHost = "storeHost"
44+
LogKeyQueueId = "queueId"
45+
LogKeyQueueOffset = "queueOffset"
46+
LogKeyMessages = "messages"
4247
)
4348

4449
type Logger interface {

0 commit comments

Comments
 (0)