Skip to content

Commit 9736ba8

Browse files
author
shenhui.backend
committed
feat(produce) : reduce memory copy for on-wire protocol
1 parent 7e36c75 commit 9736ba8

File tree

3 files changed

+43
-5
lines changed

3 files changed

+43
-5
lines changed

internal/remote/codec.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"bytes"
2121
"encoding/binary"
2222
"fmt"
23+
"io"
2324
"sync/atomic"
2425

2526
jsoniter "github.com/json-iterator/go"
@@ -132,6 +133,43 @@ var (
132133
// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
133134
// + len | 4bytes | 4bytes | (21 + r_len + e_len) bytes | remain bytes +
134135
// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
136+
func (command *RemotingCommand) WriteTo(w io.Writer) error {
137+
var (
138+
header []byte
139+
err error
140+
)
141+
142+
switch codecType {
143+
case JsonCodecs:
144+
header, err = jsonSerializer.encodeHeader(command)
145+
case RocketMQCodecs:
146+
header, err = rocketMqSerializer.encodeHeader(command)
147+
}
148+
149+
if err != nil {
150+
return err
151+
}
152+
153+
frameSize := 4 + len(header) + len(command.Body)
154+
err = binary.Write(w, binary.BigEndian, int32(frameSize))
155+
if err != nil {
156+
return err
157+
}
158+
159+
err = binary.Write(w, binary.BigEndian, markProtocolType(int32(len(header))))
160+
if err != nil {
161+
return err
162+
}
163+
164+
_, err = w.Write(header)
165+
if err != nil {
166+
return err
167+
}
168+
169+
_, err = w.Write(command.Body)
170+
return err
171+
}
172+
135173
func encode(command *RemotingCommand) ([]byte, error) {
136174
var (
137175
header []byte

internal/remote/remote_client.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -272,11 +272,9 @@ func (c *remotingClient) sendRequest(conn *tcpConnWrapper, request *RemotingComm
272272
}
273273

274274
func (c *remotingClient) doRequest(conn *tcpConnWrapper, request *RemotingCommand) error {
275-
content, err := encode(request)
276-
if err != nil {
277-
return err
278-
}
279-
_, err = conn.Write(content)
275+
conn.Lock()
276+
defer conn.Unlock()
277+
err := request.WriteTo(conn)
280278
if err != nil {
281279
c.closeConnection(conn)
282280
return err

internal/remote/tcp_conn.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@ package remote
1919
import (
2020
"context"
2121
"net"
22+
"sync"
2223

2324
"go.uber.org/atomic"
2425
)
2526

2627
// TODO: Adding TCP Connections Pool, https://github.com/apache/rocketmq-client-go/v2/issues/298
2728
type tcpConnWrapper struct {
2829
net.Conn
30+
sync.Mutex
2931
closed atomic.Bool
3032
}
3133

0 commit comments

Comments
 (0)