Skip to content

Commit 83f60c1

Browse files
author
WJL3333
authored
[ISSUE #759]
ISSUE #759] Change ResetOffsetBody response parse method to support fastjson schema
1 parent de5f561 commit 83f60c1

File tree

2 files changed

+122
-5
lines changed

2 files changed

+122
-5
lines changed

internal/model.go

Lines changed: 73 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -295,12 +295,39 @@ type ResetOffsetBody struct {
295295
OffsetTable map[primitive.MessageQueue]int64 `json:"offsetTable"`
296296
}
297297

298+
// Decode note: the origin implementation for parse json is in gson format.
299+
// this func should support both gson and fastjson schema.
298300
func (resetOffsetBody *ResetOffsetBody) Decode(body []byte) {
301+
validJSON := gjson.ValidBytes(body)
302+
303+
var offsetTable map[primitive.MessageQueue]int64
304+
305+
if validJSON {
306+
offsetTable = parseGsonFormat(body)
307+
} else {
308+
offsetTable = parseFastJsonFormat(body)
309+
}
310+
311+
resetOffsetBody.OffsetTable = offsetTable
312+
}
313+
314+
func parseGsonFormat(body []byte) map[primitive.MessageQueue]int64 {
299315
result := gjson.ParseBytes(body)
316+
300317
rlog.Debug("offset table string "+result.Get("offsetTable").String(), nil)
301318

302319
offsetTable := make(map[primitive.MessageQueue]int64, 0)
303-
offsetTableArray := strings.Split(result.Get("offsetTable").String(), "],[")
320+
321+
offsetStr := result.Get("offsetTable").String()
322+
if len(offsetStr) <= 2 {
323+
rlog.Warning("parse reset offset table json get nothing in body", map[string]interface{}{
324+
"origin json": offsetStr,
325+
})
326+
return offsetTable
327+
}
328+
329+
offsetTableArray := strings.Split(offsetStr, "],[")
330+
304331
for index, v := range offsetTableArray {
305332
kvArray := strings.Split(v, "},")
306333

@@ -315,7 +342,7 @@ func (resetOffsetBody *ResetOffsetBody) Decode(body []byte) {
315342
rlog.Error("Unmarshal offset error", map[string]interface{}{
316343
rlog.LogKeyUnderlayError: err,
317344
})
318-
return
345+
return nil
319346
}
320347

321348
if index == 0 {
@@ -329,9 +356,51 @@ func (resetOffsetBody *ResetOffsetBody) Decode(body []byte) {
329356
rlog.Error("Unmarshal message queue error", map[string]interface{}{
330357
rlog.LogKeyUnderlayError: err,
331358
})
332-
return
359+
return nil
333360
}
334361
offsetTable[*kObj] = offset
335362
}
336-
resetOffsetBody.OffsetTable = offsetTable
363+
364+
return offsetTable
365+
}
366+
367+
func parseFastJsonFormat(body []byte) map[primitive.MessageQueue]int64 {
368+
offsetTable := make(map[primitive.MessageQueue]int64)
369+
370+
jsonStr := string(body)
371+
offsetStr := gjson.Get(jsonStr, "offsetTable").String()
372+
373+
if len(offsetStr) <= 2 {
374+
rlog.Warning("parse reset offset table json get nothing in body", map[string]interface{}{
375+
"origin json": jsonStr,
376+
})
377+
return offsetTable
378+
}
379+
380+
trimStr := offsetStr[2 : len(offsetStr)-1]
381+
382+
split := strings.Split(trimStr, ",{")
383+
384+
for _, v := range split {
385+
tuple := strings.Split(v, "}:")
386+
387+
queueStr := "{" + tuple[0] + "}"
388+
389+
var err error
390+
// ignore err for now
391+
offset, err := strconv.Atoi(tuple[1])
392+
393+
var queue primitive.MessageQueue
394+
err = json.Unmarshal([]byte(queueStr), &queue)
395+
396+
if err != nil {
397+
rlog.Error("parse reset offset table json get nothing in body", map[string]interface{}{
398+
"origin json": jsonStr,
399+
})
400+
}
401+
402+
offsetTable[queue] = int64(offset)
403+
}
404+
405+
return offsetTable
337406
}

internal/model_test.go

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -419,7 +419,7 @@ func TestConsumeMessageDirectlyResult_MarshalJSON(t *testing.T) {
419419
}
420420

421421
func TestRestOffsetBody_MarshalJSON(t *testing.T) {
422-
Convey("test ResetOffset Body Decode", t, func() {
422+
Convey("test ResetOffset Body Decode gson json schema", t, func() {
423423
body := "{\"offsetTable\":[[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":5},23354233],[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":4},23354245],[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":7},23354203],[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":6},23354312],[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":1},23373517],[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":0},23373350],[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":3},23373424],[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":2},23373382]]}"
424424
resetOffsetBody := new(ResetOffsetBody)
425425
resetOffsetBody.Decode([]byte(body))
@@ -433,4 +433,52 @@ func TestRestOffsetBody_MarshalJSON(t *testing.T) {
433433
}
434434
So(offsetTable[messageQueue], ShouldEqual, 23354233)
435435
})
436+
437+
Convey("test ResetOffset Body Decode fast json schema", t, func() {
438+
body := "{\"offsetTable\":{{\"brokerName\":\"RaftNode00\",\"queueId\":0,\"topic\":\"topicB\"}:11110,{\"brokerName\":\"RaftNode00\",\"queueId\":1,\"topic\":\"topicB\"}:0,{\"brokerName\":\"RaftNode00\",\"queueId\":2,\"topic\":\"topicB\"}:0,{\"brokerName\":\"RaftNode00\",\"queueId\":3,\"topic\":\"topicB\"}:0}}"
439+
resetOffsetBody := new(ResetOffsetBody)
440+
resetOffsetBody.Decode([]byte(body))
441+
offsetTable := resetOffsetBody.OffsetTable
442+
So(offsetTable, ShouldNotBeNil)
443+
So(len(offsetTable), ShouldEqual, 4)
444+
messageQueue := primitive.MessageQueue{
445+
Topic: "topicB",
446+
BrokerName: "RaftNode00",
447+
QueueId: 0,
448+
}
449+
So(offsetTable[messageQueue], ShouldEqual, 11110)
450+
})
451+
452+
Convey("test ResetOffset Body Decode fast json schema with one item", t, func() {
453+
body := "{\"offsetTable\":{{\"brokerName\":\"RaftNode00\",\"queueId\":0,\"topic\":\"topicB\"}:11110}}"
454+
resetOffsetBody := new(ResetOffsetBody)
455+
resetOffsetBody.Decode([]byte(body))
456+
offsetTable := resetOffsetBody.OffsetTable
457+
So(offsetTable, ShouldNotBeNil)
458+
So(len(offsetTable), ShouldEqual, 1)
459+
messageQueue := primitive.MessageQueue{
460+
Topic: "topicB",
461+
BrokerName: "RaftNode00",
462+
QueueId: 0,
463+
}
464+
So(offsetTable[messageQueue], ShouldEqual, 11110)
465+
})
466+
467+
Convey("test ResetOffset Body Decode empty fast json ", t, func() {
468+
body := "{\"offsetTable\":{}}"
469+
resetOffsetBody := new(ResetOffsetBody)
470+
resetOffsetBody.Decode([]byte(body))
471+
offsetTable := resetOffsetBody.OffsetTable
472+
So(offsetTable, ShouldNotBeNil)
473+
So(len(offsetTable), ShouldEqual, 0)
474+
})
475+
476+
Convey("test ResetOffset Body Decode empty gson json ", t, func() {
477+
body := "{\"offsetTable\":[]}"
478+
resetOffsetBody := new(ResetOffsetBody)
479+
resetOffsetBody.Decode([]byte(body))
480+
offsetTable := resetOffsetBody.OffsetTable
481+
So(offsetTable, ShouldNotBeNil)
482+
So(len(offsetTable), ShouldEqual, 0)
483+
})
436484
}

0 commit comments

Comments
 (0)