Skip to content

Commit 35b951d

Browse files
author
wangjinlong.1048576
committed
1. avoid cancel context too early cause trace data fail.
1 parent 0189e44 commit 35b951d

File tree

3 files changed

+13
-5
lines changed

3 files changed

+13
-5
lines changed

consumer/push_consumer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1134,7 +1134,7 @@ func (pc *pushConsumer) consumeMessageOrderly(pq *processQueue, mq *primitive.Me
11341134
})
11351135
}
11361136

1137-
// jsut put consumeResult in consumerMessageCtx
1137+
// just put consumeResult in consumerMessageCtx
11381138
//interval = time.Now().Sub(beginTime)
11391139
//consumeReult := SuccessReturn
11401140
//if interval > pc.option.ConsumeTimeout {

internal/trace.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -463,8 +463,8 @@ func (td *traceDispatcher) sendTraceDataByMQ(keySet Keyset, regionID string, dat
463463

464464
var req = td.buildSendRequest(mq, msg)
465465
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
466-
defer cancel()
467466
err := td.cli.InvokeAsync(ctx, addr, req, func(command *remote.RemotingCommand, e error) {
467+
cancel()
468468
resp := primitive.NewSendResult()
469469
if e != nil {
470470
rlog.Info("send trace data error.", map[string]interface{}{
@@ -479,6 +479,7 @@ func (td *traceDispatcher) sendTraceDataByMQ(keySet Keyset, regionID string, dat
479479
}
480480
})
481481
if err != nil {
482+
cancel()
482483
rlog.Info("send trace data error when invoke", map[string]interface{}{
483484
rlog.LogKeyUnderlayError: err,
484485
})

producer/producer.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -253,8 +253,9 @@ func (p *defaultProducer) sendAsync(ctx context.Context, msg *primitive.Message,
253253
return errors.Errorf("topic=%s route info not found", mq.Topic)
254254
}
255255

256-
ctx, _ = context.WithTimeout(ctx, 3*time.Second)
257-
return p.client.InvokeAsync(ctx, addr, p.buildSendRequest(mq, msg), func(command *remote.RemotingCommand, err error) {
256+
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
257+
err := p.client.InvokeAsync(ctx, addr, p.buildSendRequest(mq, msg), func(command *remote.RemotingCommand, err error) {
258+
cancel()
258259
resp := primitive.NewSendResult()
259260
if err != nil {
260261
h(ctx, nil, err)
@@ -263,6 +264,12 @@ func (p *defaultProducer) sendAsync(ctx context.Context, msg *primitive.Message,
263264
h(ctx, resp, nil)
264265
}
265266
})
267+
268+
if err != nil {
269+
cancel()
270+
}
271+
272+
return err
266273
}
267274

268275
func (p *defaultProducer) SendOneWay(ctx context.Context, msgs ...*primitive.Message) error {
@@ -347,7 +354,7 @@ func (p *defaultProducer) buildSendRequest(mq *primitive.MessageQueue,
347354
}
348355

349356
var (
350-
sysFlag = 0
357+
sysFlag = 0
351358
transferBody = msg.Body
352359
)
353360

0 commit comments

Comments
 (0)