File tree Expand file tree Collapse file tree 1 file changed +19
-0
lines changed Expand file tree Collapse file tree 1 file changed +19
-0
lines changed Original file line number Diff line number Diff line change @@ -430,6 +430,25 @@ func (dc *defaultConsumer) doBalance() {
430430 }
431431 return true
432432 })
433+ dc .truncateMessageQueueNotMyTopic ()
434+ }
435+
436+ func (dc * defaultConsumer ) truncateMessageQueueNotMyTopic () {
437+ dc .processQueueTable .Range (func (key , value interface {}) bool {
438+ mq := key .(primitive.MessageQueue )
439+ pq := value .(* processQueue )
440+ if _ , ok := dc .subscriptionDataTable .Load (mq .Topic ); ! ok {
441+ pq .WithDropped (true )
442+ if dc .removeUnnecessaryMessageQueue (& mq , pq ) {
443+ dc .processQueueTable .Delete (key )
444+ rlog .Info ("remove unnecessary mq because unsubscribed" , map [string ]interface {}{
445+ rlog .LogKeyConsumerGroup : dc .consumerGroup ,
446+ rlog .LogKeyMessageQueue : mq .String (),
447+ })
448+ }
449+ }
450+ return true
451+ })
433452}
434453
435454func (dc * defaultConsumer ) SubscriptionDataList () []* internal.SubscriptionData {
You can’t perform that action at this time.
0 commit comments