Skip to content

Commit 9bca88d

Browse files
author
dengzhiwen1
committed
fix: unlock all queues when consumer shutdown in orderly model
1 parent ae93d2a commit 9bca88d

File tree

2 files changed

+9
-7
lines changed

2 files changed

+9
-7
lines changed

consumer/consumer.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -631,12 +631,12 @@ func (dc *defaultConsumer) doUnlock(addr string, body *lockBatchRequestBody, one
631631
}
632632
} else {
633633
response, err := dc.client.InvokeSync(context.Background(), addr, request, 1*time.Second)
634-
rlog.Error("lock MessageQueue to broker invoke error", map[string]interface{}{
635-
rlog.LogKeyBroker: addr,
636-
rlog.LogKeyUnderlayError: err,
637-
})
638-
if response.Code != internal.ResSuccess {
639-
// TODO error
634+
if err != nil || response == nil || response.Code != internal.ResSuccess {
635+
rlog.Error("lock MessageQueue to broker invoke error", map[string]interface{}{
636+
rlog.LogKeyBroker: addr,
637+
rlog.LogKeyUnderlayError: err,
638+
"response": response,
639+
})
640640
}
641641
}
642642
}

consumer/push_consumer.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,9 @@ func (pc *pushConsumer) Shutdown() error {
277277
pc.option.TraceDispatcher.Close()
278278
}
279279
close(pc.done)
280-
280+
if pc.consumeOrderly && pc.model == Clustering {
281+
pc.unlockAll(false)
282+
}
281283
pc.client.UnregisterConsumer(pc.consumerGroup)
282284
err = pc.defaultConsumer.shutdown()
283285
})

0 commit comments

Comments
 (0)