Skip to content

Commit 841f74a

Browse files
authored
Merge pull request #856 from WJL3333/fix_trace_context_cancel_problem
[ISSUE #774] Avoid cancel context too early cause trace data fail.
2 parents f56028f + 35b951d commit 841f74a

File tree

3 files changed

+12
-4
lines changed

3 files changed

+12
-4
lines changed

consumer/push_consumer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1160,7 +1160,7 @@ func (pc *pushConsumer) consumeMessageOrderly(pq *processQueue, mq *primitive.Me
11601160
})
11611161
}
11621162

1163-
// jsut put consumeResult in consumerMessageCtx
1163+
// just put consumeResult in consumerMessageCtx
11641164
//interval = time.Now().Sub(beginTime)
11651165
//consumeReult := SuccessReturn
11661166
//if interval > pc.option.ConsumeTimeout {

internal/trace.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -463,8 +463,8 @@ func (td *traceDispatcher) sendTraceDataByMQ(keySet Keyset, regionID string, dat
463463

464464
var req = td.buildSendRequest(mq, msg)
465465
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
466-
defer cancel()
467466
err := td.cli.InvokeAsync(ctx, addr, req, func(command *remote.RemotingCommand, e error) {
467+
cancel()
468468
resp := primitive.NewSendResult()
469469
if e != nil {
470470
rlog.Info("send trace data error.", map[string]interface{}{
@@ -479,6 +479,7 @@ func (td *traceDispatcher) sendTraceDataByMQ(keySet Keyset, regionID string, dat
479479
}
480480
})
481481
if err != nil {
482+
cancel()
482483
rlog.Info("send trace data error when invoke", map[string]interface{}{
483484
rlog.LogKeyUnderlayError: err,
484485
})

producer/producer.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -359,8 +359,9 @@ func (p *defaultProducer) sendAsync(ctx context.Context, msg *primitive.Message,
359359
return errors.Errorf("topic=%s route info not found", mq.Topic)
360360
}
361361

362-
ctx, _ = context.WithTimeout(ctx, 3*time.Second)
363-
return p.client.InvokeAsync(ctx, addr, p.buildSendRequest(mq, msg), func(command *remote.RemotingCommand, err error) {
362+
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
363+
err := p.client.InvokeAsync(ctx, addr, p.buildSendRequest(mq, msg), func(command *remote.RemotingCommand, err error) {
364+
cancel()
364365
resp := primitive.NewSendResult()
365366
if err != nil {
366367
h(ctx, nil, err)
@@ -369,6 +370,12 @@ func (p *defaultProducer) sendAsync(ctx context.Context, msg *primitive.Message,
369370
h(ctx, resp, nil)
370371
}
371372
})
373+
374+
if err != nil {
375+
cancel()
376+
}
377+
378+
return err
372379
}
373380

374381
func (p *defaultProducer) SendOneWay(ctx context.Context, msgs ...*primitive.Message) error {

0 commit comments

Comments
 (0)