Skip to content

Commit 1d86ea6

Browse files
NeonToo筱瑜
andauthored
support RequestCode GET_CONSUMER_STATUS_FROM_CLIENT (apache#985)
Co-authored-by: 筱瑜 <maoyu.dmy@alibaba-inc.com>
1 parent 8ea107c commit 1d86ea6

File tree

7 files changed

+173
-0
lines changed

7 files changed

+173
-0
lines changed

consumer/mock_offset_store.go

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

consumer/offset_store.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ type OffsetStore interface {
5959
read(mq *primitive.MessageQueue, t readType) int64
6060
readWithException(mq *primitive.MessageQueue, t readType) (int64, error)
6161
update(mq *primitive.MessageQueue, offset int64, increaseOnly bool)
62+
getMQOffsetMap(topic string) map[primitive.MessageQueue]int64
6263
}
6364

6465
type OffsetSerializeWrapper struct {
@@ -228,6 +229,18 @@ func (local *localFileOffsetStore) remove(mq *primitive.MessageQueue) {
228229
// nothing to do
229230
}
230231

232+
func (local *localFileOffsetStore) getMQOffsetMap(topic string) map[primitive.MessageQueue]int64 {
233+
copyOffsetTable := make(map[primitive.MessageQueue]int64)
234+
local.OffsetTable.Range(func(key, value interface{}) bool {
235+
if key.(MessageQueueKey).Topic != topic {
236+
return true
237+
}
238+
copyOffsetTable[primitive.MessageQueue(key.(MessageQueueKey))] = value.(int64)
239+
return true
240+
})
241+
return copyOffsetTable
242+
}
243+
231244
type remoteBrokerOffsetStore struct {
232245
group string
233246
OffsetTable map[primitive.MessageQueue]int64 `json:"OffsetTable"`
@@ -353,6 +366,17 @@ func (r *remoteBrokerOffsetStore) update(mq *primitive.MessageQueue, offset int6
353366
}
354367
}
355368

369+
func (r *remoteBrokerOffsetStore) getMQOffsetMap(topic string) map[primitive.MessageQueue]int64 {
370+
copyOffsetTable := make(map[primitive.MessageQueue]int64)
371+
for mq, offset := range r.OffsetTable {
372+
if mq.Topic != topic {
373+
continue
374+
}
375+
copyOffsetTable[mq] = offset
376+
}
377+
return copyOffsetTable
378+
}
379+
356380
func (r *remoteBrokerOffsetStore) fetchConsumeOffsetFromBroker(group string, mq *primitive.MessageQueue) (int64, error) {
357381
broker := r.namesrv.FindBrokerAddrByName(mq.BrokerName)
358382
if broker == "" {

consumer/pull_consumer.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -853,6 +853,15 @@ func (pc *defaultPullConsumer) consumeMessageConcurrently(pq *processQueue, mq *
853853
}
854854
}
855855

856+
func (pc *defaultPullConsumer) GetConsumerStatus(topic string) *internal.ConsumerStatus {
857+
consumerStatus := internal.NewConsumerStatus()
858+
mqOffsetMap := pc.storage.getMQOffsetMap(topic)
859+
if mqOffsetMap != nil {
860+
consumerStatus.MQOffsetMap = mqOffsetMap
861+
}
862+
return consumerStatus
863+
}
864+
856865
func (pc *defaultPullConsumer) validate() error {
857866
if err := internal.ValidateGroup(pc.consumerGroup); err != nil {
858867
return err

consumer/push_consumer.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -430,6 +430,15 @@ func (pc *pushConsumer) ConsumeMessageDirectly(msg *primitive.MessageExt, broker
430430
return res
431431
}
432432

433+
func (pc *pushConsumer) GetConsumerStatus(topic string) *internal.ConsumerStatus {
434+
consumerStatus := internal.NewConsumerStatus()
435+
mqOffsetMap := pc.storage.getMQOffsetMap(topic)
436+
if mqOffsetMap != nil {
437+
consumerStatus.MQOffsetMap = mqOffsetMap
438+
}
439+
return consumerStatus
440+
}
441+
433442
func (pc *pushConsumer) GetConsumerRunningInfo(stack bool) *internal.ConsumerRunningInfo {
434443
info := internal.NewConsumerRunningInfo()
435444

internal/client.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ type InnerConsumer interface {
9292
GetModel() string
9393
GetWhere() string
9494
ResetOffset(topic string, table map[primitive.MessageQueue]int64)
95+
GetConsumerStatus(topic string) *ConsumerStatus
9596
}
9697

9798
func DefaultClientOptions() ClientOptions {
@@ -377,6 +378,32 @@ func GetOrNewRocketMQClient(option ClientOptions, callbackCh chan interface{}) R
377378
res.Code = ResSuccess
378379
return res
379380
})
381+
382+
client.remoteClient.RegisterRequestFunc(ReqGetConsumerStatsFromClient, func(req *remote.RemotingCommand, addr net.Addr) *remote.RemotingCommand {
383+
rlog.Info("receive get consumer status from client request...", map[string]interface{}{
384+
rlog.LogKeyBroker: addr.String(),
385+
rlog.LogKeyTopic: req.ExtFields["topic"],
386+
rlog.LogKeyConsumerGroup: req.ExtFields["group"],
387+
})
388+
389+
header := new(GetConsumerStatusRequestHeader)
390+
header.Decode(req.ExtFields)
391+
res := remote.NewRemotingCommand(ResError, nil, nil)
392+
393+
consumerStatus := client.getConsumerStatus(header.topic, header.group)
394+
if consumerStatus != nil {
395+
res.Code = ResSuccess
396+
data, err := consumerStatus.Encode()
397+
if err != nil {
398+
res.Remark = fmt.Sprintf("Failed to encode consumer status: %s", err.Error())
399+
} else {
400+
res.Body = data
401+
}
402+
} else {
403+
res.Remark = "there is unexpected error when get consumer status, please check log"
404+
}
405+
return res
406+
})
380407
}
381408
return client
382409
}
@@ -905,6 +932,15 @@ func (c *rmqClient) resetOffset(topic string, group string, offsetTable map[prim
905932
consumer.(InnerConsumer).ResetOffset(topic, offsetTable)
906933
}
907934

935+
func (c *rmqClient) getConsumerStatus(topic string, group string) *ConsumerStatus {
936+
consumer, exist := c.consumerMap.Load(group)
937+
if !exist {
938+
rlog.Warning("group "+group+" do not exists", nil)
939+
return nil
940+
}
941+
return consumer.(InnerConsumer).GetConsumerStatus(topic)
942+
}
943+
908944
func (c *rmqClient) getConsumerRunningInfo(group string, stack bool) *ConsumerRunningInfo {
909945
consumer, exist := c.consumerMap.Load(group)
910946
if !exist {

internal/model.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,54 @@ func NewConsumerRunningInfo() *ConsumerRunningInfo {
295295
}
296296
}
297297

298+
type ConsumerStatus struct {
299+
MQOffsetMap map[primitive.MessageQueue]int64
300+
}
301+
302+
func (status ConsumerStatus) Encode() ([]byte, error) {
303+
mapJson := ""
304+
keys := make([]primitive.MessageQueue, 0)
305+
306+
for k := range status.MQOffsetMap {
307+
keys = append(keys, k)
308+
}
309+
310+
sort.Slice(keys, func(i, j int) bool {
311+
q1 := keys[i]
312+
q2 := keys[j]
313+
com := strings.Compare(q1.Topic, q2.Topic)
314+
if com != 0 {
315+
return com < 0
316+
}
317+
318+
com = strings.Compare(q1.BrokerName, q2.BrokerName)
319+
if com != 0 {
320+
return com < 0
321+
}
322+
323+
return q1.QueueId < q2.QueueId
324+
})
325+
326+
for idx := range keys {
327+
dataK, err := json.Marshal(keys[idx])
328+
if err != nil {
329+
return nil, err
330+
}
331+
dataV, err := json.Marshal(status.MQOffsetMap[keys[idx]])
332+
mapJson = fmt.Sprintf("%s,%s:%s", mapJson, string(dataK), string(dataV))
333+
}
334+
mapJson = strings.TrimLeft(mapJson, ",")
335+
jsonData := fmt.Sprintf("{\"%s\":%s}",
336+
"messageQueueTable", fmt.Sprintf("{%s}", mapJson))
337+
return []byte(jsonData), nil
338+
}
339+
340+
func NewConsumerStatus() *ConsumerStatus {
341+
return &ConsumerStatus{
342+
MQOffsetMap: make(map[primitive.MessageQueue]int64),
343+
}
344+
}
345+
298346
type ConsumeMessageDirectlyResult struct {
299347
Order bool `json:"order"`
300348
AutoCommit bool `json:"autoCommit"`

internal/request.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ const (
5050
ReqDeleteTopicInBroker = int16(215)
5151
ReqDeleteTopicInNameSrv = int16(216)
5252
ReqResetConsumerOffset = int16(220)
53+
ReqGetConsumerStatsFromClient = int16(221)
5354
ReqGetConsumerRunningInfo = int16(307)
5455
ReqConsumeMessageDirectly = int16(309)
5556
ReqSendReplyMessage = int16(324)
@@ -490,6 +491,38 @@ func (request *ConsumeMessageDirectlyHeader) Decode(properties map[string]string
490491
}
491492
}
492493

494+
type GetConsumerStatusRequestHeader struct {
495+
topic string
496+
group string
497+
clientAddr string
498+
}
499+
500+
func (request *GetConsumerStatusRequestHeader) Encode() map[string]string {
501+
return map[string]string{
502+
"topic": request.topic,
503+
"group": request.group,
504+
"clientAddr": request.clientAddr,
505+
}
506+
}
507+
508+
func (request *GetConsumerStatusRequestHeader) Decode(properties map[string]string) {
509+
if len(properties) == 0 {
510+
return
511+
}
512+
513+
if v, existed := properties["topic"]; existed {
514+
request.topic = v
515+
}
516+
517+
if v, existed := properties["group"]; existed {
518+
request.group = v
519+
}
520+
521+
if v, existed := properties["clientAddr"]; existed {
522+
request.clientAddr = v
523+
}
524+
}
525+
493526
type ReplyMessageRequestHeader struct {
494527
producerGroup string
495528
topic string

0 commit comments

Comments
 (0)