Skip to content

Commit 3f06d9a

Browse files
author
Nick
authored
when queue dropped,should'not consume the process queue.(#905) (#906)
* when queue dropped,should'not consume the process queue.(#905) * poll method use private properties
1 parent de2dc05 commit 3f06d9a

File tree

2 files changed

+25
-0
lines changed

2 files changed

+25
-0
lines changed

consumer/pull_consumer.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,14 @@ func (cr *ConsumeRequest) GetMsgList() []*primitive.MessageExt {
5656
return cr.msgList
5757
}
5858

59+
func (cr *ConsumeRequest) GetMQ() *primitive.MessageQueue {
60+
return cr.messageQueue
61+
}
62+
63+
func (cr *ConsumeRequest) GetPQ() *processQueue {
64+
return cr.processQueue
65+
}
66+
5967
type defaultPullConsumer struct {
6068
*defaultConsumer
6169

@@ -193,6 +201,14 @@ func (pc *defaultPullConsumer) Poll(ctx context.Context, timeout time.Duration)
193201
case <-ctx.Done():
194202
return nil, ErrNoNewMsg
195203
case cr := <-pc.consumeRequestCache:
204+
if cr.processQueue.IsDroppd() {
205+
rlog.Info("defaultPullConsumer poll the message queue not be able to consume, because it was dropped", map[string]interface{}{
206+
rlog.LogKeyMessageQueue: cr.messageQueue.String(),
207+
rlog.LogKeyConsumerGroup: pc.consumerGroup,
208+
})
209+
return nil, ErrNoNewMsg
210+
}
211+
196212
if len(cr.GetMsgList()) == 0 {
197213
return nil, ErrNoNewMsg
198214
}
@@ -780,6 +796,13 @@ func (pc *defaultPullConsumer) consumeMessageCurrently(pq *processQueue, mq *pri
780796
if msgList == nil {
781797
return
782798
}
799+
if pq.IsDroppd() {
800+
rlog.Info("defaultPullConsumer consumeMessageCurrently the message queue not be able to consume, because it was dropped", map[string]interface{}{
801+
rlog.LogKeyMessageQueue: mq.String(),
802+
rlog.LogKeyConsumerGroup: pc.consumerGroup,
803+
})
804+
return
805+
}
783806
cr := &ConsumeRequest{
784807
messageQueue: mq,
785808
processQueue: pq,

examples/consumer/pull/poll/main.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,8 @@ func poll() {
9797
}
9898
// todo LOGIC CODE HERE
9999
log.Println("msgList: ", cr.GetMsgList())
100+
log.Println("messageQueue: ", cr.GetMQ())
101+
log.Println("processQueue: ", cr.GetPQ())
100102
// pullConsumer.ACK(context.TODO(), cr, consumer.ConsumeRetryLater)
101103
pullConsumer.ACK(context.TODO(), cr, consumer.ConsumeSuccess)
102104
}

0 commit comments

Comments
 (0)