Skip to content

Commit 7ae83c4

Browse files
authored
fix: fix producer send msg timeout option does not take effect (apache#1109)
1 parent c9e197c commit 7ae83c4

File tree

6 files changed

+69
-16
lines changed

6 files changed

+69
-16
lines changed

consumer/option.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,15 @@ func WithLimiter(limiter Limiter) Option {
382382
}
383383
}
384384

385+
// WithRemotingTimeout set remote client timeout options
386+
func WithRemotingTimeout(connectionTimeout, readTimeout, writeTimeout time.Duration) Option {
387+
return func(opts *consumerOptions) {
388+
opts.ClientOptions.RemotingClientConfig.ConnectionTimeout = connectionTimeout
389+
opts.ClientOptions.RemotingClientConfig.ReadTimeout = readTimeout
390+
opts.ClientOptions.RemotingClientConfig.WriteTimeout = writeTimeout
391+
}
392+
}
393+
385394
func WithTls(useTls bool) Option {
386395
return func(opts *consumerOptions) {
387396
opts.ClientOptions.RemotingClientConfig.UseTls = useTls

consumer/option_test.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package consumer
33
import (
44
"reflect"
55
"testing"
6+
"time"
67
)
78

89
func getFieldString(obj interface{}, field string) string {
@@ -12,6 +13,20 @@ func getFieldString(obj interface{}, field string) string {
1213
}).String()
1314
}
1415

16+
func TestWithRemotingTimeout(t *testing.T) {
17+
opt := defaultPushConsumerOptions()
18+
WithRemotingTimeout(3*time.Second, 4*time.Second, 5*time.Second)(&opt)
19+
if timeout := opt.RemotingClientConfig.ConnectionTimeout; timeout != 3*time.Second {
20+
t.Errorf("consumer option WithRemotingTimeout connectionTimeout. want:%s, got=%s", 3*time.Second, timeout)
21+
}
22+
if timeout := opt.RemotingClientConfig.ReadTimeout; timeout != 4*time.Second {
23+
t.Errorf("consumer option WithRemotingTimeout readTimeout. want:%s, got=%s", 4*time.Second, timeout)
24+
}
25+
if timeout := opt.RemotingClientConfig.WriteTimeout; timeout != 5*time.Second {
26+
t.Errorf("consumer option WithRemotingTimeout writeTimeout. want:%s, got=%s", 5*time.Second, timeout)
27+
}
28+
}
29+
1530
func TestWithUnitName(t *testing.T) {
1631
opt := defaultPushConsumerOptions()
1732
unitName := "unsh"

internal/remote/remote_client.go

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ func (c *remotingClient) InvokeSync(ctx context.Context, addr string, request *R
102102
c.responseTable.Store(resp.Opaque, resp)
103103
defer c.responseTable.Delete(request.Opaque)
104104

105-
err = c.sendRequest(conn, request)
105+
err = c.sendRequest(ctx, conn, request)
106106
if err != nil {
107107
return nil, err
108108
}
@@ -120,7 +120,7 @@ func (c *remotingClient) InvokeAsync(ctx context.Context, addr string, request *
120120
resp := NewResponseFuture(ctx, request.Opaque, callback)
121121
c.responseTable.Store(resp.Opaque, resp)
122122

123-
err = c.sendRequest(conn, request)
123+
err = c.sendRequest(ctx, conn, request)
124124
if err != nil {
125125
c.responseTable.Delete(request.Opaque)
126126
return err
@@ -146,11 +146,11 @@ func (c *remotingClient) InvokeOneWay(ctx context.Context, addr string, request
146146
if err != nil {
147147
return err
148148
}
149-
return c.sendRequest(conn, request)
149+
return c.sendRequest(ctx, conn, request)
150150
}
151151

152152
func (c *remotingClient) connect(ctx context.Context, addr string) (*tcpConnWrapper, error) {
153-
//it needs additional locker.
153+
// it needs additional locker.
154154
c.connectionLocker.Lock()
155155
defer c.connectionLocker.Unlock()
156156
conn, ok := c.connectionTable.Load(addr)
@@ -246,7 +246,7 @@ func (c *remotingClient) processCMD(cmd *RemotingCommand, r *tcpConnWrapper) {
246246
if res != nil {
247247
res.Opaque = cmd.Opaque
248248
res.Flag |= 1 << 0
249-
err := c.sendRequest(r, res)
249+
err := c.sendRequest(context.Background(), r, res)
250250
if err != nil {
251251
rlog.Warning("send response to broker error", map[string]interface{}{
252252
rlog.LogKeyUnderlayError: err,
@@ -297,23 +297,27 @@ func (c *remotingClient) createScanner(r io.Reader) *bufio.Scanner {
297297
return scanner
298298
}
299299

300-
func (c *remotingClient) sendRequest(conn *tcpConnWrapper, request *RemotingCommand) error {
300+
func (c *remotingClient) sendRequest(ctx context.Context, conn *tcpConnWrapper, request *RemotingCommand) error {
301301
var err error
302302
if c.interceptor != nil {
303-
err = c.interceptor(context.Background(), request, nil, func(ctx context.Context, req, reply interface{}) error {
304-
return c.doRequest(conn, request)
303+
err = c.interceptor(ctx, request, nil, func(ctx context.Context, req, reply interface{}) error {
304+
return c.doRequest(ctx, conn, request)
305305
})
306306
} else {
307-
err = c.doRequest(conn, request)
307+
err = c.doRequest(ctx, conn, request)
308308
}
309309
return err
310310
}
311311

312-
func (c *remotingClient) doRequest(conn *tcpConnWrapper, request *RemotingCommand) error {
312+
func (c *remotingClient) doRequest(ctx context.Context, conn *tcpConnWrapper, request *RemotingCommand) error {
313313
conn.Lock()
314314
defer conn.Unlock()
315315

316-
err := conn.Conn.SetWriteDeadline(time.Now().Add(c.config.WriteTimeout))
316+
deadline, ok := ctx.Deadline()
317+
if !ok {
318+
deadline = time.Now().Add(c.config.WriteTimeout)
319+
}
320+
err := conn.Conn.SetWriteDeadline(deadline)
317321
if err != nil {
318322
rlog.Error("conn error, close connection", map[string]interface{}{
319323
rlog.LogKeyUnderlayError: err,

producer/option.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,15 @@ func WithCompressLevel(level int) Option {
179179
}
180180
}
181181

182+
// WithRemotingTimeout set remote client timeout options
183+
func WithRemotingTimeout(connectionTimeout, readTimeout, writeTimeout time.Duration) Option {
184+
return func(opts *producerOptions) {
185+
opts.ClientOptions.RemotingClientConfig.ConnectionTimeout = connectionTimeout
186+
opts.ClientOptions.RemotingClientConfig.ReadTimeout = readTimeout
187+
opts.ClientOptions.RemotingClientConfig.WriteTimeout = writeTimeout
188+
}
189+
}
190+
182191
func WithTls(useTls bool) Option {
183192
return func(opts *producerOptions) {
184193
opts.ClientOptions.RemotingClientConfig.UseTls = useTls

producer/option_test.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package producer
33
import (
44
"reflect"
55
"testing"
6+
"time"
67
)
78

89
func getFieldString(obj interface{}, field string) string {
@@ -12,6 +13,20 @@ func getFieldString(obj interface{}, field string) string {
1213
}).String()
1314
}
1415

16+
func TestWithRemotingTimeout(t *testing.T) {
17+
opt := defaultProducerOptions()
18+
WithRemotingTimeout(3*time.Second, 4*time.Second, 5*time.Second)(&opt)
19+
if timeout := opt.RemotingClientConfig.ConnectionTimeout; timeout != 3*time.Second {
20+
t.Errorf("consumer option WithRemotingTimeout connectionTimeout. want:%s, got=%s", 3*time.Second, timeout)
21+
}
22+
if timeout := opt.RemotingClientConfig.ReadTimeout; timeout != 4*time.Second {
23+
t.Errorf("consumer option WithRemotingTimeout readTimeout. want:%s, got=%s", 4*time.Second, timeout)
24+
}
25+
if timeout := opt.RemotingClientConfig.WriteTimeout; timeout != 5*time.Second {
26+
t.Errorf("consumer option WithRemotingTimeout writeTimeout. want:%s, got=%s", 5*time.Second, timeout)
27+
}
28+
}
29+
1530
func TestWithUnitName(t *testing.T) {
1631
opt := defaultProducerOptions()
1732
unitName := "unsh"

producer/producer.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,15 @@ import (
2626
"sync/atomic"
2727
"time"
2828

29+
"github.com/google/uuid"
30+
"github.com/pkg/errors"
31+
2932
errors2 "github.com/apache/rocketmq-client-go/v2/errors"
3033
"github.com/apache/rocketmq-client-go/v2/internal"
3134
"github.com/apache/rocketmq-client-go/v2/internal/remote"
3235
"github.com/apache/rocketmq-client-go/v2/internal/utils"
3336
"github.com/apache/rocketmq-client-go/v2/primitive"
3437
"github.com/apache/rocketmq-client-go/v2/rlog"
35-
"github.com/google/uuid"
36-
"github.com/pkg/errors"
3738
)
3839

3940
type defaultProducer struct {
@@ -355,7 +356,7 @@ func (p *defaultProducer) sendSync(ctx context.Context, msg *primitive.Message,
355356
producerCtx.MQ = *mq
356357
}
357358

358-
res, _err := p.client.InvokeSync(ctx, addr, p.buildSendRequest(mq, msg), 3*time.Second)
359+
res, _err := p.client.InvokeSync(ctx, addr, p.buildSendRequest(mq, msg), p.options.SendMsgTimeout)
359360
if _err != nil {
360361
err = _err
361362
continue
@@ -400,7 +401,7 @@ func (p *defaultProducer) sendAsync(ctx context.Context, msg *primitive.Message,
400401
return errors.Errorf("topic=%s route info not found", mq.Topic)
401402
}
402403

403-
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
404+
ctx, cancel := context.WithTimeout(ctx, p.options.SendMsgTimeout)
404405
err := p.client.InvokeAsync(ctx, addr, p.buildSendRequest(mq, msg), func(command *remote.RemotingCommand, err error) {
405406
cancel()
406407
if err != nil {
@@ -465,7 +466,7 @@ func (p *defaultProducer) sendOneWay(ctx context.Context, msg *primitive.Message
465466
return fmt.Errorf("topic=%s route info not found", mq.Topic)
466467
}
467468

468-
_err := p.client.InvokeOneWay(ctx, addr, p.buildSendRequest(mq, msg), 3*time.Second)
469+
_err := p.client.InvokeOneWay(ctx, addr, p.buildSendRequest(mq, msg), p.options.SendMsgTimeout)
469470
if _err != nil {
470471
err = _err
471472
continue

0 commit comments

Comments
 (0)