From 847c11d426e3280674fa586fd19424c14620bb20 Mon Sep 17 00:00:00 2001 From: trxo Date: Wed, 16 Jun 2021 22:55:37 +0800 Subject: [PATCH] fix msg.Body compressed and not reset, cause error in transaction message processing --- primitive/message.go | 13 +++++++------ producer/producer.go | 14 ++++++++++---- 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/primitive/message.go b/primitive/message.go index b330dc1d..d17d8d8d 100644 --- a/primitive/message.go +++ b/primitive/message.go @@ -63,12 +63,13 @@ const ( ) type Message struct { - Topic string - Body []byte - Flag int32 - TransactionId string - Batch bool - Compress bool + Topic string + Body []byte + CompressedBody []byte + Flag int32 + TransactionId string + Batch bool + Compress bool // Queue is the queue that messages will be sent to. the value must be set if want to custom the queue of message, // just ignore if not. Queue *MessageQueue diff --git a/producer/producer.go b/producer/producer.go index 8ebb660f..22a2f9e0 100644 --- a/producer/producer.go +++ b/producer/producer.go @@ -323,7 +323,7 @@ func (p *defaultProducer) tryCompressMsg(msg *primitive.Message) bool { if e != nil { return false } - msg.Body = compressedBody + msg.CompressedBody = compressedBody msg.Compress = true return true } @@ -333,8 +333,14 @@ func (p *defaultProducer) buildSendRequest(mq *primitive.MessageQueue, if !msg.Batch && msg.GetProperty(primitive.PropertyUniqueClientMessageIdKeyIndex) == "" { msg.WithProperty(primitive.PropertyUniqueClientMessageIdKeyIndex, primitive.CreateUniqID()) } - sysFlag := 0 + + var ( + sysFlag = 0 + transferBody = msg.Body + ) + if p.tryCompressMsg(msg) { + transferBody = msg.CompressedBody sysFlag = primitive.SetCompressedFlag(sysFlag) } v := msg.GetProperty(primitive.PropertyTransactionPrepared) @@ -361,10 +367,10 @@ func (p *defaultProducer) buildSendRequest(mq *primitive.MessageQueue, if msg.Batch { cmd = internal.ReqSendBatchMessage reqv2 := &internal.SendMessageRequestV2Header{SendMessageRequestHeader: req} - return remote.NewRemotingCommand(cmd, reqv2, msg.Body) + return remote.NewRemotingCommand(cmd, reqv2, transferBody) } - return remote.NewRemotingCommand(cmd, req, msg.Body) + return remote.NewRemotingCommand(cmd, req, transferBody) } func (p *defaultProducer) selectMessageQueue(msg *primitive.Message) *primitive.MessageQueue {