Skip to content

Commit 3eb9052

Browse files
committed
Fix the usage bug of namespace
Signed-off-by: zhangyang21 <zhangyang21@xiaomi.com>
1 parent dfa26d1 commit 3eb9052

File tree

2 files changed

+83
-12
lines changed

2 files changed

+83
-12
lines changed

producer/producer.go

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,8 @@ func (p *defaultProducer) SendSync(ctx context.Context, msgs ...*primitive.Messa
143143
return nil, err
144144
}
145145

146+
p.messagesWithNamespace(msgs...)
147+
146148
msg := p.encodeBatch(msgs...)
147149

148150
resp := primitive.NewSendResult()
@@ -179,10 +181,6 @@ func (p *defaultProducer) sendSync(ctx context.Context, msg *primitive.Message,
179181
err error
180182
)
181183

182-
if p.options.Namespace != "" {
183-
msg.Topic = p.options.Namespace + "%" + msg.Topic
184-
}
185-
186184
var producerCtx *primitive.ProducerCtx
187185
for retryCount := 0; retryCount < retryTime; retryCount++ {
188186
mq := p.selectMessageQueue(msg)
@@ -217,6 +215,8 @@ func (p *defaultProducer) SendAsync(ctx context.Context, f func(context.Context,
217215
return err
218216
}
219217

218+
p.messagesWithNamespace(msgs...)
219+
220220
msg := p.encodeBatch(msgs...)
221221

222222
if p.interceptor != nil {
@@ -230,9 +230,7 @@ func (p *defaultProducer) SendAsync(ctx context.Context, f func(context.Context,
230230
}
231231

232232
func (p *defaultProducer) sendAsync(ctx context.Context, msg *primitive.Message, h func(context.Context, *primitive.SendResult, error)) error {
233-
if p.options.Namespace != "" {
234-
msg.Topic = p.options.Namespace + "%" + msg.Topic
235-
}
233+
236234
mq := p.selectMessageQueue(msg)
237235
if mq == nil {
238236
return errors.Errorf("the topic=%s route info not found", msg.Topic)
@@ -260,6 +258,8 @@ func (p *defaultProducer) SendOneWay(ctx context.Context, msgs ...*primitive.Mes
260258
return err
261259
}
262260

261+
p.messagesWithNamespace(msgs...)
262+
263263
msg := p.encodeBatch(msgs...)
264264

265265
if p.interceptor != nil {
@@ -275,10 +275,6 @@ func (p *defaultProducer) SendOneWay(ctx context.Context, msgs ...*primitive.Mes
275275
func (p *defaultProducer) sendOneWay(ctx context.Context, msg *primitive.Message) error {
276276
retryTime := 1 + p.options.RetryTimes
277277

278-
if p.options.Namespace != "" {
279-
msg.Topic = p.options.Namespace + "%" + msg.Topic
280-
}
281-
282278
var err error
283279
for retryCount := 0; retryCount < retryTime; retryCount++ {
284280
mq := p.selectMessageQueue(msg)
@@ -302,6 +298,17 @@ func (p *defaultProducer) sendOneWay(ctx context.Context, msg *primitive.Message
302298
return err
303299
}
304300

301+
func (p *defaultProducer) messagesWithNamespace(msgs ...*primitive.Message) {
302+
303+
if p.options.Namespace == "" {
304+
return
305+
}
306+
307+
for _, msg := range msgs {
308+
msg.Topic = p.options.Namespace + "%" + msg.Topic
309+
}
310+
}
311+
305312
func (p *defaultProducer) tryCompressMsg(msg *primitive.Message) bool {
306313
if msg.Compress {
307314
return true

producer/producer_test.go

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ import (
3030
)
3131

3232
const (
33-
topic = "TopicTest"
33+
topic = "TopicTest"
34+
namespaceTopic = "Test%TopicTest"
3435
)
3536

3637
func TestShutdown(t *testing.T) {
@@ -83,6 +84,16 @@ func mockB4Send(p *defaultProducer) {
8384
},
8485
},
8586
})
87+
p.publishInfo.Store(namespaceTopic, &internal.TopicPublishInfo{
88+
HaveTopicRouterInfo: true,
89+
MqList: []*primitive.MessageQueue{
90+
{
91+
Topic: namespaceTopic,
92+
BrokerName: "aa",
93+
QueueId: 0,
94+
},
95+
},
96+
})
8697
p.options.Namesrv.AddBroker(&internal.TopicRouteData{
8798
BrokerDataList: []*internal.BrokerData{
8899
{
@@ -245,3 +256,56 @@ func TestOneway(t *testing.T) {
245256
err = p.SendOneWay(ctx, msg)
246257
assert.Nil(t, err)
247258
}
259+
260+
func TestSyncWithNamespace(t *testing.T) {
261+
p, _ := NewDefaultProducer(
262+
WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
263+
WithRetry(2),
264+
WithQueueSelector(NewManualQueueSelector()),
265+
WithNamespace("Test"),
266+
)
267+
268+
ctrl := gomock.NewController(t)
269+
defer ctrl.Finish()
270+
client := internal.NewMockRMQClient(ctrl)
271+
p.client = client
272+
273+
client.EXPECT().RegisterProducer(gomock.Any(), gomock.Any()).Return()
274+
client.EXPECT().Start().Return()
275+
err := p.Start()
276+
assert.Nil(t, err)
277+
278+
ctx := context.Background()
279+
msg := &primitive.Message{
280+
Topic: topic,
281+
Body: []byte("this is a message body"),
282+
Queue: &primitive.MessageQueue{
283+
Topic: namespaceTopic,
284+
BrokerName: "aa",
285+
QueueId: 0,
286+
},
287+
}
288+
msg.WithProperty("key", "value")
289+
290+
expectedResp := &primitive.SendResult{
291+
Status: primitive.SendOK,
292+
MsgID: "111",
293+
QueueOffset: 0,
294+
OffsetMsgID: "0",
295+
}
296+
297+
mockB4Send(p)
298+
299+
client.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil)
300+
client.EXPECT().ProcessSendResponse(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Do(
301+
func(brokerName string, cmd *remote.RemotingCommand, resp *primitive.SendResult, msgs ...*primitive.Message) {
302+
resp.Status = expectedResp.Status
303+
resp.MsgID = expectedResp.MsgID
304+
resp.QueueOffset = expectedResp.QueueOffset
305+
resp.OffsetMsgID = expectedResp.OffsetMsgID
306+
})
307+
resp, err := p.SendSync(ctx, msg)
308+
assert.Nil(t, err)
309+
assert.Equal(t, expectedResp, resp)
310+
assert.Equal(t, namespaceTopic, msg.Topic)
311+
}

0 commit comments

Comments
 (0)