Skip to content

Commit a343f1b

Browse files
cserwendengzhiwen1
andauthored
fix consumer stopped consuming when panic in consumeListener (apache#910)
Co-authored-by: dengzhiwen1 <dengzhiwen1@xiaomi.com>
1 parent 3f06d9a commit a343f1b

File tree

1 file changed

+11
-6
lines changed

1 file changed

+11
-6
lines changed

consumer/push_consumer.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1038,15 +1038,23 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *primitive.
10381038
}
10391039

10401040
go primitive.WithRecover(func() {
1041+
defer func() {
1042+
if err := recover(); err != nil {
1043+
rlog.Error("consumeMessageCurrently panic", map[string]interface{}{
1044+
rlog.LogKeyUnderlayError: err,
1045+
rlog.LogKeyConsumerGroup: pc.consumerGroup,
1046+
})
1047+
}
1048+
if !limiterOn {
1049+
<-pc.crCh
1050+
}
1051+
}()
10411052
RETRY:
10421053
if pq.IsDroppd() {
10431054
rlog.Info("the message queue not be able to consume, because it was dropped", map[string]interface{}{
10441055
rlog.LogKeyMessageQueue: mq.String(),
10451056
rlog.LogKeyConsumerGroup: pc.consumerGroup,
10461057
})
1047-
if !limiterOn {
1048-
<-pc.crCh
1049-
}
10501058
return
10511059
}
10521060

@@ -1126,9 +1134,6 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *primitive.
11261134
"message": subMsgs,
11271135
})
11281136
}
1129-
if !limiterOn {
1130-
<-pc.crCh
1131-
}
11321137
})
11331138
}
11341139
}

0 commit comments

Comments
 (0)