Skip to content

Commit 05929da

Browse files
authored
[ISSUE #617] retrieve transactionid from property first (#620)
1 parent f698f32 commit 05929da

File tree

2 files changed

+9
-1
lines changed

2 files changed

+9
-1
lines changed

primitive/message.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ const (
5959
PropertyTranscationCheckTimes = "TRANSACTION_CHECK_TIMES"
6060
PropertyCheckImmunityTimeInSeconds = "CHECK_IMMUNITY_TIME_IN_SECONDS"
6161
PropertyShardingKey = "SHARDING_KEY"
62+
PropertyTransactionID = "__transactionId__"
6263
)
6364

6465
type Message struct {

producer/producer.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -462,13 +462,20 @@ func (tp *transactionProducer) checkTransactionState() {
462462
if uniqueKey == "" {
463463
uniqueKey = callback.Msg.MsgId
464464
}
465+
transactionId := callback.Msg.GetProperty(primitive.PropertyTransactionID)
466+
if transactionId == "" {
467+
transactionId = callback.Header.TransactionId
468+
}
469+
if transactionId == "" {
470+
transactionId = callback.Msg.TransactionId
471+
}
465472
header := &internal.EndTransactionRequestHeader{
466473
CommitLogOffset: callback.Header.CommitLogOffset,
467474
ProducerGroup: tp.producer.group,
468475
TranStateTableOffset: callback.Header.TranStateTableOffset,
469476
FromTransactionCheck: true,
470477
MsgID: uniqueKey,
471-
TransactionId: callback.Header.TransactionId,
478+
TransactionId: transactionId,
472479
CommitOrRollback: tp.transactionState(localTransactionState),
473480
}
474481

0 commit comments

Comments
 (0)