Skip to content

Commit c133cca

Browse files
author
yechun
committed
[ISSUE apache#999] use structs as result sets for admin method
1 parent 62d4513 commit c133cca

File tree

4 files changed

+119
-19
lines changed

4 files changed

+119
-19
lines changed

admin/admin.go

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,9 @@ import (
3333
type Admin interface {
3434
CreateTopic(ctx context.Context, opts ...OptionCreate) error
3535
DeleteTopic(ctx context.Context, opts ...OptionDelete) error
36-
//TODO
37-
GroupList(ctx context.Context, brokerAddr string) (*remote.RemotingCommand, error)
38-
TopicList(ctx context.Context) (*remote.RemotingCommand, error)
36+
37+
GetAllSubscriptionGroup(ctx context.Context, brokerAddr string) (*SubscriptionGroupWrapper, error)
38+
FetchAllTopicList(ctx context.Context) (*TopicList, error)
3939
//GetBrokerClusterInfo(ctx context.Context) (*remote.RemotingCommand, error)
4040
FetchPublishMessageQueues(ctx context.Context, topic string) ([]*primitive.MessageQueue, error)
4141
Close() error
@@ -109,7 +109,7 @@ func NewAdmin(opts ...AdminOption) (*admin, error) {
109109
}, nil
110110
}
111111

112-
func (a *admin) GroupList(ctx context.Context, brokerAddr string) (*remote.RemotingCommand, error) {
112+
func (a *admin) GetAllSubscriptionGroup(ctx context.Context, brokerAddr string) (*SubscriptionGroupWrapper, error) {
113113
cmd := remote.NewRemotingCommand(internal.ReqGetAllSubscriptionGroupConfig, nil, nil)
114114
a.cli.RegisterACL()
115115
response, err := a.cli.InvokeSync(ctx, brokerAddr, cmd, 3*time.Second)
@@ -121,25 +121,37 @@ func (a *admin) GroupList(ctx context.Context, brokerAddr string) (*remote.Remot
121121
} else {
122122
rlog.Info("Get all group list success", map[string]interface{}{})
123123
}
124-
rpsCmd := remote.NewRemotingCommand(internal.ReqGetAllSubscriptionGroupConfig, nil, response.Body)
125-
126-
return rpsCmd, nil
124+
var subscriptionGroupWrapper SubscriptionGroupWrapper
125+
_, err = subscriptionGroupWrapper.Decode(response.Body, &subscriptionGroupWrapper)
126+
if err != nil {
127+
rlog.Error("Get all group list decode error", map[string]interface{}{
128+
rlog.LogKeyUnderlayError: err,
129+
})
130+
return nil, err
131+
}
132+
return &subscriptionGroupWrapper, nil
127133
}
128134

129-
func (a *admin) TopicList(ctx context.Context) (*remote.RemotingCommand, error) {
135+
func (a *admin) FetchAllTopicList(ctx context.Context) (*TopicList, error) {
130136
cmd := remote.NewRemotingCommand(internal.ReqGetAllTopicListFromNameServer, nil, nil)
131137
response, err := a.cli.InvokeSync(ctx, a.cli.GetNameSrv().AddrList()[0], cmd, 3*time.Second)
132138
if err != nil {
133-
rlog.Error("Get all topic list error", map[string]interface{}{
139+
rlog.Error("Fetch all topic list error", map[string]interface{}{
134140
rlog.LogKeyUnderlayError: err,
135141
})
136142
return nil, err
137143
} else {
138-
rlog.Info("Get all topic list success", map[string]interface{}{})
144+
rlog.Info("Fetch all topic list success", map[string]interface{}{})
139145
}
140-
rpsCmd := remote.NewRemotingCommand(internal.ReqGetAllTopicListFromNameServer, nil, response.Body)
141-
142-
return rpsCmd, nil
146+
var topicList TopicList
147+
_, err = topicList.Decode(response.Body, &topicList)
148+
if err != nil {
149+
rlog.Error("Fetch all topic list decode error", map[string]interface{}{
150+
rlog.LogKeyUnderlayError: err,
151+
})
152+
return nil, err
153+
}
154+
return &topicList, nil
143155
}
144156

145157
// CreateTopic create topic.

admin/response.go

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
Licensed to the Apache Software Foundation (ASF) under one or more
3+
contributor license agreements. See the NOTICE file distributed with
4+
this work for additional information regarding copyright ownership.
5+
The ASF licenses this file to You under the Apache License, Version 2.0
6+
(the "License"); you may not use this file except in compliance with
7+
the License. You may obtain a copy of the License at
8+
9+
http://www.apache.org/licenses/LICENSE-2.0
10+
11+
Unless required by applicable law or agreed to in writing, software
12+
distributed under the License is distributed on an "AS IS" BASIS,
13+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
See the License for the specific language governing permissions and
15+
limitations under the License.
16+
*/
17+
18+
package admin
19+
20+
import "encoding/json"
21+
22+
type RemotingSerializable struct {
23+
}
24+
25+
func (r *RemotingSerializable) Encode(obj interface{}) ([]byte, error) {
26+
jsonStr := r.ToJson(obj, false)
27+
if jsonStr != "" {
28+
return []byte(jsonStr), nil
29+
}
30+
return nil, nil
31+
}
32+
33+
func (r *RemotingSerializable) ToJson(obj interface{}, prettyFormat bool) string {
34+
if prettyFormat {
35+
jsonBytes, err := json.MarshalIndent(obj, "", " ")
36+
if err != nil {
37+
return ""
38+
}
39+
return string(jsonBytes)
40+
} else {
41+
jsonBytes, err := json.Marshal(obj)
42+
if err != nil {
43+
return ""
44+
}
45+
return string(jsonBytes)
46+
}
47+
}
48+
func (r *RemotingSerializable) Decode(data []byte, classOfT interface{}) (interface{}, error) {
49+
jsonStr := string(data)
50+
return r.FromJson(jsonStr, classOfT)
51+
}
52+
53+
func (r *RemotingSerializable) FromJson(jsonStr string, classOfT interface{}) (interface{}, error) {
54+
err := json.Unmarshal([]byte(jsonStr), classOfT)
55+
if err != nil {
56+
return nil, err
57+
}
58+
return classOfT, nil
59+
}
60+
61+
type TopicList struct {
62+
TopicList []string
63+
BrokerAddr string
64+
RemotingSerializable
65+
}
66+
67+
type SubscriptionGroupWrapper struct {
68+
SubscriptionGroupTable map[string]SubscriptionGroupConfig
69+
DataVersion DataVersion
70+
RemotingSerializable
71+
}
72+
73+
type DataVersion struct {
74+
Timestamp int64
75+
Counter int32
76+
}
77+
78+
type SubscriptionGroupConfig struct {
79+
GroupName string
80+
ConsumeEnable bool
81+
ConsumeFromMinEnable bool
82+
ConsumeBroadcastEnable bool
83+
RetryMaxTimes int
84+
RetryQueueNums int
85+
BrokerId int
86+
WhichBrokerWhenConsumeSlowly int
87+
NotifyConsumerIdsChangedEnable bool
88+
}

examples/admin/group/main.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,11 @@ func main() {
3939
)
4040

4141
// group list
42-
groupList, err := testAdmin.GroupList(context.Background(), brokerAddr)
42+
result, err := testAdmin.GetAllSubscriptionGroup(context.Background(), brokerAddr)
4343
if err != nil {
44-
fmt.Println("GroupList error:", err.Error())
44+
fmt.Println("GetAllSubscriptionGroup error:", err.Error())
4545
}
46-
fmt.Println(string(groupList.Body))
46+
fmt.Println(result.SubscriptionGroupTable)
4747

4848
err = testAdmin.Close()
4949
if err != nil {

examples/admin/topic/main.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,11 @@ func main() {
4040
)
4141

4242
// topic list
43-
topicList, err := testAdmin.TopicList(context.Background())
43+
result, err := testAdmin.FetchAllTopicList(context.Background())
4444
if err != nil {
45-
fmt.Println("TopicList error:", err.Error())
45+
fmt.Println("FetchAllTopicList error:", err.Error())
4646
}
47-
fmt.Println(string(topicList.Body))
47+
fmt.Println(result.TopicList)
4848

4949
//create topic
5050
err = testAdmin.CreateTopic(

0 commit comments

Comments
 (0)