Skip to content

Commit bb4ec9f

Browse files
author
nick
committed
when queue dropped,should'not consume the process queue.(#905)
1 parent 2630383 commit bb4ec9f

File tree

1 file changed

+23
-0
lines changed

1 file changed

+23
-0
lines changed

consumer/pull_consumer.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,14 @@ func (cr *ConsumeRequest) GetMsgList() []*primitive.MessageExt {
5656
return cr.msgList
5757
}
5858

59+
func (cr *ConsumeRequest) GetMQ() *primitive.MessageQueue {
60+
return cr.messageQueue
61+
}
62+
63+
func (cr *ConsumeRequest) GetPQ() *processQueue {
64+
return cr.processQueue
65+
}
66+
5967
type defaultPullConsumer struct {
6068
*defaultConsumer
6169

@@ -193,6 +201,14 @@ func (pc *defaultPullConsumer) Poll(ctx context.Context, timeout time.Duration)
193201
case <-ctx.Done():
194202
return nil, ErrNoNewMsg
195203
case cr := <-pc.consumeRequestCache:
204+
if cr.GetPQ().IsDroppd() {
205+
rlog.Info("defaultPullConsumer poll the message queue not be able to consume, because it was dropped", map[string]interface{}{
206+
rlog.LogKeyMessageQueue: cr.GetMQ().String(),
207+
rlog.LogKeyConsumerGroup: pc.consumerGroup,
208+
})
209+
return nil, ErrNoNewMsg
210+
}
211+
196212
if len(cr.GetMsgList()) == 0 {
197213
return nil, ErrNoNewMsg
198214
}
@@ -780,6 +796,13 @@ func (pc *defaultPullConsumer) consumeMessageCurrently(pq *processQueue, mq *pri
780796
if msgList == nil {
781797
return
782798
}
799+
if pq.IsDroppd() {
800+
rlog.Info("defaultPullConsumer consumeMessageCurrently the message queue not be able to consume, because it was dropped", map[string]interface{}{
801+
rlog.LogKeyMessageQueue: mq.String(),
802+
rlog.LogKeyConsumerGroup: pc.consumerGroup,
803+
})
804+
return
805+
}
783806
cr := &ConsumeRequest{
784807
messageQueue: mq,
785808
processQueue: pq,

0 commit comments

Comments
 (0)