Skip to content

Commit 505cdc7

Browse files
committed
buffer: Use a double-buffering scheme to prevent data races
1 parent 6e2f60e commit 505cdc7

File tree

2 files changed

+42
-13
lines changed

2 files changed

+42
-13
lines changed

buffer.go

Lines changed: 35 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,47 +15,69 @@ import (
1515
)
1616

1717
const defaultBufSize = 4096
18+
const maxCachedBufSize = 16 * 1024
1819

1920
// A buffer which is used for both reading and writing.
2021
// This is possible since communication on each connection is synchronous.
2122
// In other words, we can't write and read simultaneously on the same connection.
2223
// The buffer is similar to bufio.Reader / Writer but zero-copy-ish
2324
// Also highly optimized for this particular use case.
25+
// This buffer is backed by two byte slices in a double-buffering scheme
2426
type buffer struct {
2527
buf []byte // buf is a byte buffer who's length and capacity are equal.
2628
nc net.Conn
2729
idx int
2830
length int
2931
timeout time.Duration
32+
dbuf [2][]byte // dbuf is an array with the two byte slices that back this buffer
33+
flipcnt uint // flipccnt is the current buffer counter for double-buffering
3034
}
3135

3236
// newBuffer allocates and returns a new buffer.
3337
func newBuffer(nc net.Conn) buffer {
38+
fg := make([]byte, defaultBufSize)
3439
return buffer{
35-
buf: make([]byte, defaultBufSize),
36-
nc: nc,
40+
buf: fg,
41+
nc: nc,
42+
dbuf: [2][]byte{fg, nil},
3743
}
3844
}
3945

46+
// flip replaces the active buffer with the background buffer
47+
// this is a delayed flip that simply increases the buffer counter;
48+
// the actual flip will be performed the next time we call `buffer.fill`
49+
func (b *buffer) flip() {
50+
b.flipcnt += 1
51+
}
52+
4053
// fill reads into the buffer until at least _need_ bytes are in it
4154
func (b *buffer) fill(need int) error {
4255
n := b.length
56+
// fill data into its double-buffering target: if we've called
57+
// flip on this buffer, we'll be copying to the background buffer,
58+
// and then filling it with network data; otherwise we'll just move
59+
// the contents of the current buffer to the front before filling it
60+
dest := b.dbuf[b.flipcnt&1]
61+
62+
// grow buffer if necessary to fit the whole packet.
63+
if need > len(dest) {
64+
// Round up to the next multiple of the default size
65+
dest = make([]byte, ((need/defaultBufSize)+1)*defaultBufSize)
4366

44-
// move existing data to the beginning
45-
if n > 0 && b.idx > 0 {
46-
copy(b.buf[0:n], b.buf[b.idx:])
67+
// if the allocated buffer is not too large, move it to backing storage
68+
// to prevent extra allocations on applications that perform large reads
69+
if len(dest) <= maxCachedBufSize {
70+
b.dbuf[b.flipcnt&1] = dest
71+
}
4772
}
4873

49-
// grow buffer if necessary
50-
// TODO: let the buffer shrink again at some point
51-
// Maybe keep the org buf slice and swap back?
52-
if need > len(b.buf) {
53-
// Round up to the next multiple of the default size
54-
newBuf := make([]byte, ((need/defaultBufSize)+1)*defaultBufSize)
55-
copy(newBuf, b.buf)
56-
b.buf = newBuf
74+
// if we're filling the fg buffer, move the existing data to the start of it.
75+
// if we're filling the bg buffer, copy over the data
76+
if n > 0 {
77+
copy(dest[0:n], b.buf[b.idx:])
5778
}
5879

80+
b.buf = dest
5981
b.idx = 0
6082

6183
for {

rows.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,13 @@ func (rows *mysqlRows) Close() (err error) {
111111
return err
112112
}
113113

114+
// flip the buffer for this connection if we need to drain it.
115+
// note that for a successful query (i.e. one where rows.next()
116+
// has been called until it returns false), `rows.mc` will be nil
117+
// by the time the user calls `(*Rows).Close`, so we won't reach this
118+
// see: https://github.com/golang/go/commit/651ddbdb5056ded455f47f9c494c67b389622a47
119+
mc.buf.flip()
120+
114121
// Remove unread packets from stream
115122
if !rows.rs.done {
116123
err = mc.readUntilEOF()

0 commit comments

Comments
 (0)