Skip to content

Commit d3f459a

Browse files
committed
add admin ext and test
1 parent 7ae83c4 commit d3f459a

File tree

13 files changed

+663
-5
lines changed

13 files changed

+663
-5
lines changed

admin/admin_ext.go

Lines changed: 237 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,237 @@
1+
package admin
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"github.com/apache/rocketmq-client-go/v2/errors"
8+
"github.com/apache/rocketmq-client-go/v2/internal"
9+
"github.com/apache/rocketmq-client-go/v2/internal/remote"
10+
"github.com/apache/rocketmq-client-go/v2/primitive"
11+
producer "github.com/apache/rocketmq-client-go/v2/producer"
12+
"github.com/magiconair/properties"
13+
"sync"
14+
"time"
15+
)
16+
17+
type AdminExt struct {
18+
cli internal.RMQClient
19+
opts *adminOptions
20+
closeOnce sync.Once
21+
}
22+
23+
func NewAdminExt(opts ...AdminOption) (*AdminExt, error) {
24+
defaultOpts := defaultAdminOptions()
25+
for _, opt := range opts {
26+
opt(defaultOpts)
27+
}
28+
namesrv, err := internal.NewNamesrv(defaultOpts.Resolver, defaultOpts.RemotingClientConfig)
29+
defaultOpts.Namesrv = namesrv
30+
if err != nil {
31+
return nil, err
32+
}
33+
34+
cli := internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil)
35+
if cli == nil {
36+
return nil, fmt.Errorf("GetOrNewRocketMQClient faild")
37+
}
38+
defaultOpts.Namesrv = cli.GetNameSrv()
39+
return &AdminExt{
40+
cli: cli,
41+
opts: defaultOpts,
42+
}, nil
43+
}
44+
45+
func (a *AdminExt) Start() {
46+
a.cli.Start()
47+
}
48+
49+
func (a *AdminExt) Close() error {
50+
a.closeOnce.Do(func() {
51+
a.cli.Shutdown()
52+
})
53+
return nil
54+
}
55+
56+
func (a *AdminExt) FetchNameSrvList() []string {
57+
a.cli.GetNameSrv().UpdateNameServerAddress()
58+
return a.cli.GetNameSrv().AddrList()
59+
}
60+
61+
func (a *AdminExt) UpdateKvConfig(nameSrvAddr *string, namespace string, key string, value string, timeoutMills time.Duration) error {
62+
request := &internal.PutKVConfigRequestHeader{}
63+
request.Key = key
64+
request.Namespace = namespace
65+
request.Value = value
66+
cmd := remote.NewRemotingCommand(internal.ReqPutKVConfig, request, nil)
67+
ctx, _ := context.WithTimeout(context.Background(), timeoutMills)
68+
res, err := a.cli.InvokeSync(ctx, *nameSrvAddr, cmd, timeoutMills)
69+
if err != nil {
70+
return err
71+
}
72+
if res.Code != internal.ResSuccess {
73+
return fmt.Errorf("update kv config name srv %s, response code: %d, remarks: %s", *nameSrvAddr, res.Code, res.Remark)
74+
}
75+
return nil
76+
}
77+
78+
func (a *AdminExt) DeleteKvConfig(nameSrvAddr *string, namespace string, key string, timeoutMills time.Duration) error {
79+
request := &internal.DeleteKVConfigRequestHeader{}
80+
request.Key = key
81+
request.Namespace = namespace
82+
cmd := remote.NewRemotingCommand(internal.ReqDeleteKVConfig, request, nil)
83+
ctx, _ := context.WithTimeout(context.Background(), timeoutMills)
84+
res, err := a.cli.InvokeSync(ctx, *nameSrvAddr, cmd, timeoutMills)
85+
if err != nil {
86+
return err
87+
}
88+
if res.Code != internal.ResSuccess {
89+
return fmt.Errorf("delete kv config name srv %s, response code: %d, remarks: %s", *nameSrvAddr, res.Code, res.Remark)
90+
}
91+
return nil
92+
}
93+
94+
func (a *AdminExt) GetKvConfig(nameSrvAddr *string, namespace string, key string, timeoutMills time.Duration) (string, error) {
95+
request := &internal.GetKVConfigRequestHeader{}
96+
request.Key = key
97+
request.Namespace = namespace
98+
cmd := remote.NewRemotingCommand(internal.ReqGetKVConfig, request, nil)
99+
ctx, _ := context.WithTimeout(context.Background(), timeoutMills)
100+
res, err := a.cli.InvokeSync(ctx, *nameSrvAddr, cmd, timeoutMills)
101+
if err != nil {
102+
return "", err
103+
}
104+
if res.Code == internal.ResQueryNotFound {
105+
return "", errors.ErrNotExisted
106+
}
107+
if res.Code != internal.ResSuccess {
108+
return "", fmt.Errorf("delete kv config name srv %s, response code: %d, remarks: %s", *nameSrvAddr, res.Code, res.Remark)
109+
}
110+
responseHeader := &internal.GetKVConfigResponseHeader{}
111+
responseHeader.Decode(res.ExtFields)
112+
return responseHeader.Value, nil
113+
}
114+
115+
func (a *AdminExt) GetBrokerConfig(brokerAddr *string, timeoutMills time.Duration) (map[string]string, error) {
116+
request := &internal.NoParameterRequestHeader{}
117+
cmd := remote.NewRemotingCommand(internal.ReqGetBrokerConfig, request, nil)
118+
res, err := a.cli.InvokeSync(context.Background(), *brokerAddr, cmd, timeoutMills)
119+
if err != nil {
120+
return nil, err
121+
}
122+
if res.Code != internal.ResSuccess {
123+
return nil, fmt.Errorf("update config fail broker response code: %d, remarks: %s", res.Code, res.Remark)
124+
}
125+
p, err := properties.LoadString(string(res.Body))
126+
if err != nil {
127+
return nil, err
128+
}
129+
return p.Map(), nil
130+
}
131+
132+
func (a *AdminExt) UpdateBrokerConfig(brokerAddr *string, config map[string]string, timeoutMills time.Duration) error {
133+
p := properties.LoadMap(config)
134+
value := p.String()
135+
body := []byte(value)
136+
request := &internal.NoParameterRequestHeader{}
137+
cmd := remote.NewRemotingCommand(internal.ReqUpdateBrokerConfig, request, body)
138+
ctx, _ := context.WithTimeout(context.Background(), timeoutMills)
139+
res, err := a.cli.InvokeSync(ctx, *brokerAddr, cmd, timeoutMills)
140+
if err != nil {
141+
return err
142+
}
143+
if res.Code != internal.ResSuccess {
144+
return fmt.Errorf("update config fail broker response code: %d, remarks: %s", res.Code, res.Remark)
145+
}
146+
return nil
147+
}
148+
149+
func (a *AdminExt) UpdateBrokerRole(brokerAddr *string, role string, timeoutMills time.Duration) error {
150+
request := &internal.UpdateBrokerRoleRequestHeader{}
151+
request.BrokerRole = role
152+
cmd := remote.NewRemotingCommand(internal.ReqUpdateBrokerRole, request, nil)
153+
res, err := a.cli.InvokeSync(context.Background(), *brokerAddr, cmd, timeoutMills)
154+
if err != nil {
155+
return err
156+
}
157+
if res.Code != internal.ResSuccess {
158+
return fmt.Errorf("update broker role fail broker %s, response code: %d, remarks: %s", *brokerAddr, res.Code, res.Remark)
159+
}
160+
return nil
161+
}
162+
163+
func (a *AdminExt) GetBrokerRuntimeStats(brokerAddr *string, needDiskCheck bool, needEarliestTime bool, timeoutMills time.Duration) (map[string]string, error) {
164+
request := &internal.GetBrokerRuntimeInfoRequestHeader{}
165+
request.NeedEarliestTime = needEarliestTime
166+
request.NeedDiskCheck = needDiskCheck
167+
cmd := remote.NewRemotingCommand(internal.ReqGetBrokerRuntimeInfo, request, nil)
168+
res, err := a.cli.InvokeSync(context.Background(), *brokerAddr, cmd, timeoutMills)
169+
if err != nil {
170+
return nil, err
171+
}
172+
if res.Code != internal.ResSuccess {
173+
return nil, fmt.Errorf("update config fail broker response code: %d, remarks: %s", res.Code, res.Remark)
174+
}
175+
kvTable := &internal.KVTable{}
176+
err1 := json.Unmarshal(res.Body, kvTable)
177+
if err1 != nil {
178+
return nil, err1
179+
}
180+
return kvTable.Table, nil
181+
}
182+
183+
func (a *AdminExt) GetBrokerClusterInfo(nameSrvAddr *string, cluster string, timeoutMills time.Duration) (*internal.ClusterInfo, error) {
184+
request := &internal.GetClusterListRequestHeader{}
185+
request.Cluster = cluster
186+
cmd := remote.NewRemotingCommand(internal.ReqGetBrokerClusterInfo, request, nil)
187+
res, err := a.cli.InvokeSync(context.Background(), *nameSrvAddr, cmd, timeoutMills)
188+
if err != nil {
189+
return nil, err
190+
}
191+
if res.Code != internal.ResSuccess {
192+
return nil, fmt.Errorf("update config fail broker response code: %d, remarks: %s", res.Code, res.Remark)
193+
}
194+
clusterInfo := &internal.ClusterInfo{}
195+
err1 := clusterInfo.Decode(string(res.Body))
196+
if err1 != nil {
197+
return nil, err1
198+
}
199+
return clusterInfo, nil
200+
}
201+
202+
func (a *AdminExt) GetTopicRoute(nameSrvAddr *string, topic string, timeoutMills time.Duration) (*internal.TopicRouteData, error) {
203+
request := &internal.GetRouteInfoRequestHeader{
204+
Topic: topic,
205+
}
206+
cmd := remote.NewRemotingCommand(internal.ReqGetRouteInfoByTopic, request, nil)
207+
res, err := a.cli.InvokeSync(context.Background(), *nameSrvAddr, cmd, timeoutMills)
208+
if err != nil {
209+
return nil, err
210+
}
211+
if res.Code == internal.ResTopicNotExist {
212+
return nil, errors.ErrTopicNotExist
213+
}
214+
if res.Code != internal.ResSuccess {
215+
return nil, fmt.Errorf("update config fail broker response code: %d, remarks: %s", res.Code, res.Remark)
216+
}
217+
routeData := &internal.TopicRouteData{}
218+
err = routeData.Decode(string(res.Body))
219+
if err != nil {
220+
return nil, err
221+
}
222+
return routeData, nil
223+
}
224+
225+
func (a *AdminExt) SyncSendMessage(ctx context.Context, brokerAddr *string, mq *primitive.MessageQueue, msg *primitive.Message, timeoutMills time.Duration) (*primitive.SendResult, error) {
226+
p, err := producer.NewBlankProducer(producer.WithGroupName("PID_admin_ext"))
227+
if err != nil {
228+
return nil, err
229+
}
230+
req := p.BuildSendRequest(mq, msg)
231+
res, err1 := a.cli.InvokeSync(ctx, *brokerAddr, req, timeoutMills)
232+
if err1 != nil {
233+
return nil, err1
234+
}
235+
resp := primitive.NewSendResult()
236+
return resp, a.cli.ProcessSendResponse(mq.BrokerName, res, resp, msg)
237+
}

0 commit comments

Comments
 (0)