@@ -143,6 +143,8 @@ func (p *defaultProducer) SendSync(ctx context.Context, msgs ...*primitive.Messa
143143 return nil , err
144144 }
145145
146+ p .messagesWithNamespace (msgs ... )
147+
146148 msg := p .encodeBatch (msgs ... )
147149
148150 resp := primitive .NewSendResult ()
@@ -179,10 +181,6 @@ func (p *defaultProducer) sendSync(ctx context.Context, msg *primitive.Message,
179181 err error
180182 )
181183
182- if p .options .Namespace != "" {
183- msg .Topic = p .options .Namespace + "%" + msg .Topic
184- }
185-
186184 var producerCtx * primitive.ProducerCtx
187185 for retryCount := 0 ; retryCount < retryTime ; retryCount ++ {
188186 mq := p .selectMessageQueue (msg )
@@ -217,6 +215,8 @@ func (p *defaultProducer) SendAsync(ctx context.Context, f func(context.Context,
217215 return err
218216 }
219217
218+ p .messagesWithNamespace (msgs ... )
219+
220220 msg := p .encodeBatch (msgs ... )
221221
222222 if p .interceptor != nil {
@@ -230,9 +230,7 @@ func (p *defaultProducer) SendAsync(ctx context.Context, f func(context.Context,
230230}
231231
232232func (p * defaultProducer ) sendAsync (ctx context.Context , msg * primitive.Message , h func (context.Context , * primitive.SendResult , error )) error {
233- if p .options .Namespace != "" {
234- msg .Topic = p .options .Namespace + "%" + msg .Topic
235- }
233+
236234 mq := p .selectMessageQueue (msg )
237235 if mq == nil {
238236 return errors .Errorf ("the topic=%s route info not found" , msg .Topic )
@@ -260,6 +258,8 @@ func (p *defaultProducer) SendOneWay(ctx context.Context, msgs ...*primitive.Mes
260258 return err
261259 }
262260
261+ p .messagesWithNamespace (msgs ... )
262+
263263 msg := p .encodeBatch (msgs ... )
264264
265265 if p .interceptor != nil {
@@ -275,10 +275,6 @@ func (p *defaultProducer) SendOneWay(ctx context.Context, msgs ...*primitive.Mes
275275func (p * defaultProducer ) sendOneWay (ctx context.Context , msg * primitive.Message ) error {
276276 retryTime := 1 + p .options .RetryTimes
277277
278- if p .options .Namespace != "" {
279- msg .Topic = p .options .Namespace + "%" + msg .Topic
280- }
281-
282278 var err error
283279 for retryCount := 0 ; retryCount < retryTime ; retryCount ++ {
284280 mq := p .selectMessageQueue (msg )
@@ -302,6 +298,26 @@ func (p *defaultProducer) sendOneWay(ctx context.Context, msg *primitive.Message
302298 return err
303299}
304300
301+ func (p * defaultProducer ) withNamespace (resource string ) string {
302+
303+ if p .options .Namespace != "" {
304+ return p .options .Namespace + "%" + resource
305+ }
306+
307+ return resource
308+ }
309+
310+ func (p * defaultProducer ) messagesWithNamespace (msgs ... * primitive.Message ) {
311+
312+ if p .options .Namespace == "" {
313+ return
314+ }
315+
316+ for _ , msg := range msgs {
317+ msg .Topic = p .withNamespace (msg .Topic )
318+ }
319+ }
320+
305321func (p * defaultProducer ) tryCompressMsg (msg * primitive.Message ) bool {
306322 if msg .Compress {
307323 return true
0 commit comments