diff --git a/internal/remote/codec.go b/internal/remote/codec.go index f756c11f..1c6e0a59 100644 --- a/internal/remote/codec.go +++ b/internal/remote/codec.go @@ -20,6 +20,7 @@ import ( "bytes" "encoding/binary" "fmt" + "io" "sync/atomic" jsoniter "github.com/json-iterator/go" @@ -132,6 +133,43 @@ var ( // +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ // + len | 4bytes | 4bytes | (21 + r_len + e_len) bytes | remain bytes + // +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ +func (command *RemotingCommand) WriteTo(w io.Writer) error { + var ( + header []byte + err error + ) + + switch codecType { + case JsonCodecs: + header, err = jsonSerializer.encodeHeader(command) + case RocketMQCodecs: + header, err = rocketMqSerializer.encodeHeader(command) + } + + if err != nil { + return err + } + + frameSize := 4 + len(header) + len(command.Body) + err = binary.Write(w, binary.BigEndian, int32(frameSize)) + if err != nil { + return err + } + + err = binary.Write(w, binary.BigEndian, markProtocolType(int32(len(header)))) + if err != nil { + return err + } + + _, err = w.Write(header) + if err != nil { + return err + } + + _, err = w.Write(command.Body) + return err +} + func encode(command *RemotingCommand) ([]byte, error) { var ( header []byte diff --git a/internal/remote/remote_client.go b/internal/remote/remote_client.go index dae364e1..5e2aa835 100644 --- a/internal/remote/remote_client.go +++ b/internal/remote/remote_client.go @@ -272,11 +272,9 @@ func (c *remotingClient) sendRequest(conn *tcpConnWrapper, request *RemotingComm } func (c *remotingClient) doRequest(conn *tcpConnWrapper, request *RemotingCommand) error { - content, err := encode(request) - if err != nil { - return err - } - _, err = conn.Write(content) + conn.Lock() + defer conn.Unlock() + err := request.WriteTo(conn) if err != nil { c.closeConnection(conn) return err diff --git a/internal/remote/tcp_conn.go b/internal/remote/tcp_conn.go index dda7dcf3..ae340c6b 100644 --- a/internal/remote/tcp_conn.go +++ b/internal/remote/tcp_conn.go @@ -19,6 +19,7 @@ package remote import ( "context" "net" + "sync" "go.uber.org/atomic" ) @@ -26,6 +27,7 @@ import ( // TODO: Adding TCP Connections Pool, https://github.com/apache/rocketmq-client-go/v2/issues/298 type tcpConnWrapper struct { net.Conn + sync.Mutex closed atomic.Bool }