Skip to content

Commit 96f00c4

Browse files
maqingxiang鲁扬
andauthored
[ISSUE #953] fix limiter with goroutine cover (#952)
* fix limiter with goroutine cover * fix limiter with goroutine cover Co-authored-by: 鲁扬 <qingxiang.mqx@alibaba-inc.com>
1 parent 8afd69f commit 96f00c4

File tree

1 file changed

+4
-9
lines changed

1 file changed

+4
-9
lines changed

consumer/push_consumer.go

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1046,10 +1046,8 @@ func (pc *pushConsumer) consumeMessageConcurrently(pq *processQueue, mq *primiti
10461046

10471047
limiter := pc.option.Limiter
10481048
limiterOn := limiter != nil
1049-
if !limiterOn {
1050-
if _, ok := pc.crCh[mq.Topic]; !ok {
1051-
pc.crCh[mq.Topic] = make(chan struct{}, pc.defaultConsumer.option.ConsumeGoroutineNums)
1052-
}
1049+
if _, ok := pc.crCh[mq.Topic]; !ok {
1050+
pc.crCh[mq.Topic] = make(chan struct{}, pc.defaultConsumer.option.ConsumeGoroutineNums)
10531051
}
10541052

10551053
for count := 0; count < len(msgs); count++ {
@@ -1065,9 +1063,8 @@ func (pc *pushConsumer) consumeMessageConcurrently(pq *processQueue, mq *primiti
10651063

10661064
if limiterOn {
10671065
limiter(utils.WithoutNamespace(mq.Topic))
1068-
} else {
1069-
pc.crCh[mq.Topic] <- struct{}{}
10701066
}
1067+
pc.crCh[mq.Topic] <- struct{}{}
10711068

10721069
go primitive.WithRecover(func() {
10731070
defer func() {
@@ -1077,9 +1074,7 @@ func (pc *pushConsumer) consumeMessageConcurrently(pq *processQueue, mq *primiti
10771074
rlog.LogKeyConsumerGroup: pc.consumerGroup,
10781075
})
10791076
}
1080-
if !limiterOn {
1081-
<-pc.crCh[mq.Topic]
1082-
}
1077+
<-pc.crCh[mq.Topic]
10831078
}()
10841079
RETRY:
10851080
if pq.IsDroppd() {

0 commit comments

Comments
 (0)