|
| 1 | +// +build !appengine |
| 2 | + |
| 3 | +/* |
| 4 | + * |
| 5 | + * Copyright 2019 gRPC authors. |
| 6 | + * |
| 7 | + * Licensed under the Apache License, Version 2.0 (the "License"); |
| 8 | + * you may not use this file except in compliance with the License. |
| 9 | + * You may obtain a copy of the License at |
| 10 | + * |
| 11 | + * http://www.apache.org/licenses/LICENSE-2.0 |
| 12 | + * |
| 13 | + * Unless required by applicable law or agreed to in writing, software |
| 14 | + * distributed under the License is distributed on an "AS IS" BASIS, |
| 15 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 16 | + * See the License for the specific language governing permissions and |
| 17 | + * limitations under the License. |
| 18 | + * |
| 19 | + */ |
| 20 | + |
| 21 | +package buffer |
| 22 | + |
| 23 | +import ( |
| 24 | + "errors" |
| 25 | + "math/bits" |
| 26 | + "runtime" |
| 27 | + "sync" |
| 28 | + "sync/atomic" |
| 29 | + "unsafe" |
| 30 | +) |
| 31 | + |
| 32 | +type queue struct { |
| 33 | + // An array of pointers as references to the items stored in this queue. |
| 34 | + arr []unsafe.Pointer |
| 35 | + // The maximum number of elements this queue may store before it wraps around |
| 36 | + // and overwrites older values. Must be an exponent of 2. |
| 37 | + size uint32 |
| 38 | + // Always size - 1. A bitwise AND is performed with this mask in place of a |
| 39 | + // modulo operation by the Push operation. |
| 40 | + mask uint32 |
| 41 | + // Each Push operation into this queue increments the acquired counter before |
| 42 | + // proceeding forwarding with the actual write to arr. This counter is also |
| 43 | + // used by the Drain operation's drainWait subroutine to wait for all pushes |
| 44 | + // to complete. |
| 45 | + acquired uint32 |
| 46 | + // After the completion of a Push operation, the written counter is |
| 47 | + // incremented. Also used by drainWait to wait for all pushes to complete. |
| 48 | + written uint32 |
| 49 | +} |
| 50 | + |
| 51 | +// Allocates and returns a new *queue. size needs to be a exponent of two. |
| 52 | +func newQueue(size uint32) *queue { |
| 53 | + return &queue{ |
| 54 | + arr: make([]unsafe.Pointer, size), |
| 55 | + size: size, |
| 56 | + mask: size - 1, |
| 57 | + } |
| 58 | +} |
| 59 | + |
| 60 | +// drainWait blocks the caller until all Pushes on this queue are complete. |
| 61 | +func (q *queue) drainWait() { |
| 62 | + for atomic.LoadUint32(&q.acquired) != atomic.LoadUint32(&q.written) { |
| 63 | + runtime.Gosched() |
| 64 | + } |
| 65 | +} |
| 66 | + |
| 67 | +// A queuePair has two queues. At any given time, Pushes go into the queue |
| 68 | +// referenced by queuePair.q. The active queue gets switched when there's a |
| 69 | +// drain operation on the circular buffer. |
| 70 | +type queuePair struct { |
| 71 | + q0 unsafe.Pointer |
| 72 | + q1 unsafe.Pointer |
| 73 | + q unsafe.Pointer |
| 74 | +} |
| 75 | + |
| 76 | +// Allocates and returns a new *queuePair with its internal queues allocated. |
| 77 | +func newQueuePair(size uint32) *queuePair { |
| 78 | + qp := &queuePair{} |
| 79 | + qp.q0 = unsafe.Pointer(newQueue(size)) |
| 80 | + qp.q1 = unsafe.Pointer(newQueue(size)) |
| 81 | + qp.q = qp.q0 |
| 82 | + return qp |
| 83 | +} |
| 84 | + |
| 85 | +// Switches the current queue for future Pushes to proceed to the other queue |
| 86 | +// so that there's no blocking in Push. Returns a pointer to the old queue that |
| 87 | +// was in place before the switch. |
| 88 | +func (qp *queuePair) switchQueues() *queue { |
| 89 | + // Even though we have mutual exclusion across drainers (thanks to mu.Lock in |
| 90 | + // drain), Push operations may access qp.q whilst we're writing to it. |
| 91 | + if atomic.CompareAndSwapPointer(&qp.q, qp.q0, qp.q1) { |
| 92 | + return (*queue)(qp.q0) |
| 93 | + } |
| 94 | + |
| 95 | + atomic.CompareAndSwapPointer(&qp.q, qp.q1, qp.q0) |
| 96 | + return (*queue)(qp.q1) |
| 97 | +} |
| 98 | + |
| 99 | +// In order to not have expensive modulo operations, we require the maximum |
| 100 | +// number of elements in the circular buffer (N) to be an exponent of two to |
| 101 | +// use a bitwise AND mask. Since a CircularBuffer is a collection of queuePairs |
| 102 | +// (see below), we need to divide N; since exponents of two are only divisible |
| 103 | +// by other exponents of two, we use floorCPUCount number of queuePairs within |
| 104 | +// each CircularBuffer. |
| 105 | +// |
| 106 | +// Floor of the number of CPUs (and not the ceiling) was found to the be the |
| 107 | +// optimal number through experiments. |
| 108 | +func floorCPUCount() uint32 { |
| 109 | + floorExponent := bits.Len32(uint32(runtime.NumCPU())) - 1 |
| 110 | + if floorExponent < 0 { |
| 111 | + floorExponent = 0 |
| 112 | + } |
| 113 | + return 1 << uint32(floorExponent) |
| 114 | +} |
| 115 | + |
| 116 | +var numCircularBufferPairs = floorCPUCount() |
| 117 | + |
| 118 | +// CircularBuffer is a lock-free data structure that supports Push and Drain |
| 119 | +// operations. |
| 120 | +// |
| 121 | +// Note that CircularBuffer is built for performance more than reliability. |
| 122 | +// That is, some Push operations may fail without retries in some situations |
| 123 | +// (such as during a Drain operation). Order of pushes is not maintained |
| 124 | +// either; that is, if A was pushed before B, the Drain operation may return an |
| 125 | +// array with B before A. These restrictions are acceptable within gRPC's |
| 126 | +// profiling, but if your use-case does not permit these relaxed constraints |
| 127 | +// or if performance is not a primary concern, you should probably use a |
| 128 | +// lock-based data structure such as internal/buffer.UnboundedBuffer. |
| 129 | +type CircularBuffer struct { |
| 130 | + drainMutex sync.Mutex |
| 131 | + qp []*queuePair |
| 132 | + // qpn is an monotonically incrementing counter that's used to determine |
| 133 | + // which queuePair a Push operation should write to. This approach's |
| 134 | + // performance was found to be better than writing to a random queue. |
| 135 | + qpn uint32 |
| 136 | + qpMask uint32 |
| 137 | +} |
| 138 | + |
| 139 | +var errInvalidCircularBufferSize = errors.New("buffer size is not an exponent of two") |
| 140 | + |
| 141 | +// NewCircularBuffer allocates a circular buffer of size size and returns a |
| 142 | +// reference to the struct. Only circular buffers of size 2^k are allowed |
| 143 | +// (saves us from having to do expensive modulo operations). |
| 144 | +func NewCircularBuffer(size uint32) (*CircularBuffer, error) { |
| 145 | + if size&(size-1) != 0 { |
| 146 | + return nil, errInvalidCircularBufferSize |
| 147 | + } |
| 148 | + |
| 149 | + n := numCircularBufferPairs |
| 150 | + if size/numCircularBufferPairs < 8 { |
| 151 | + // If each circular buffer is going to hold less than a very small number |
| 152 | + // of items (let's say 8), using multiple circular buffers is very likely |
| 153 | + // wasteful. Instead, fallback to one circular buffer holding everything. |
| 154 | + n = 1 |
| 155 | + } |
| 156 | + |
| 157 | + cb := &CircularBuffer{ |
| 158 | + qp: make([]*queuePair, n), |
| 159 | + qpMask: n - 1, |
| 160 | + } |
| 161 | + |
| 162 | + for i := uint32(0); i < n; i++ { |
| 163 | + cb.qp[i] = newQueuePair(size / n) |
| 164 | + } |
| 165 | + |
| 166 | + return cb, nil |
| 167 | +} |
| 168 | + |
| 169 | +// Push pushes an element in to the circular buffer. Guaranteed to complete in |
| 170 | +// a finite number of steps (also lock-free). Does not guarantee that push |
| 171 | +// order will be retained. Does not guarantee that the operation will succeed |
| 172 | +// if a Drain operation concurrently begins execution. |
| 173 | +func (cb *CircularBuffer) Push(x interface{}) { |
| 174 | + n := atomic.AddUint32(&cb.qpn, 1) & cb.qpMask |
| 175 | + qptr := atomic.LoadPointer(&cb.qp[n].q) |
| 176 | + q := (*queue)(qptr) |
| 177 | + |
| 178 | + acquired := atomic.AddUint32(&q.acquired, 1) - 1 |
| 179 | + |
| 180 | + // If true, it means that we have incremented acquired before any queuePair |
| 181 | + // was switched, and therefore before any drainWait completion. Therefore, it |
| 182 | + // is safe to proceed with the Push operation on this queue. Otherwise, it |
| 183 | + // means that a Drain operation has begun execution, but we don't know how |
| 184 | + // far along the process it is. If it is past the drainWait check, it is not |
| 185 | + // safe to proceed with the Push operation. We choose to drop this sample |
| 186 | + // entirely instead of retrying, as retrying may potentially send the Push |
| 187 | + // operation into a spin loop (we want to guarantee completion of the Push |
| 188 | + // operation within a finite time). Before exiting, we increment written so |
| 189 | + // that any existing drainWaits can proceed. |
| 190 | + if atomic.LoadPointer(&cb.qp[n].q) != qptr { |
| 191 | + atomic.AddUint32(&q.written, 1) |
| 192 | + return |
| 193 | + } |
| 194 | + |
| 195 | + // At this point, we're definitely writing to the right queue. That is, one |
| 196 | + // of the following is true: |
| 197 | + // 1. No drainer is in execution on this queue. |
| 198 | + // 2. A drainer is in execution on this queue and it is waiting at the |
| 199 | + // acquired == written barrier. |
| 200 | + // |
| 201 | + // Let's say two Pushes A and B happen on the same queue. Say A and B are |
| 202 | + // q.size apart; i.e. they get the same index. That is, |
| 203 | + // |
| 204 | + // index_A = index_B |
| 205 | + // acquired_A + q.size = acquired_B |
| 206 | + // |
| 207 | + // We say "B has wrapped around A" when this happens. In this case, since A |
| 208 | + // occurred before B, B's Push should be the final value. However, we |
| 209 | + // accommodate A being the final value because wrap-arounds are extremely |
| 210 | + // rare and accounting for them requires an additional counter and a |
| 211 | + // significant performance penalty. Note that the below approach never leads |
| 212 | + // to any data corruption. |
| 213 | + index := acquired & q.mask |
| 214 | + atomic.StorePointer(&q.arr[index], unsafe.Pointer(&x)) |
| 215 | + |
| 216 | + // Allows any drainWait checks to proceed. |
| 217 | + atomic.AddUint32(&q.written, 1) |
| 218 | +} |
| 219 | + |
| 220 | +// Dereferences non-nil pointers from arr into result. Range of elements from |
| 221 | +// arr that are copied is [from, to). Assumes that the result slice is already |
| 222 | +// allocated and is large enough to hold all the elements that might be copied. |
| 223 | +// Also assumes mutual exclusion on the array of pointers. |
| 224 | +func dereferenceAppend(result []interface{}, arr []unsafe.Pointer, from, to uint32) []interface{} { |
| 225 | + for i := from; i < to; i++ { |
| 226 | + // We have mutual exclusion on arr, there's no need for atomics. |
| 227 | + x := (*interface{})(arr[i]) |
| 228 | + if x != nil { |
| 229 | + result = append(result, *x) |
| 230 | + } |
| 231 | + } |
| 232 | + return result |
| 233 | +} |
| 234 | + |
| 235 | +// Drain allocates and returns an array of things Pushed in to the circular |
| 236 | +// buffer. Push order is not maintained; that is, if B was Pushed after A, |
| 237 | +// drain may return B at a lower index than A in the returned array. |
| 238 | +func (cb *CircularBuffer) Drain() []interface{} { |
| 239 | + cb.drainMutex.Lock() |
| 240 | + |
| 241 | + qs := make([]*queue, len(cb.qp)) |
| 242 | + for i := 0; i < len(cb.qp); i++ { |
| 243 | + qs[i] = cb.qp[i].switchQueues() |
| 244 | + } |
| 245 | + |
| 246 | + var wg sync.WaitGroup |
| 247 | + wg.Add(int(len(qs))) |
| 248 | + for i := 0; i < len(qs); i++ { |
| 249 | + go func(qi int) { |
| 250 | + qs[qi].drainWait() |
| 251 | + wg.Done() |
| 252 | + }(i) |
| 253 | + } |
| 254 | + wg.Wait() |
| 255 | + |
| 256 | + result := make([]interface{}, 0) |
| 257 | + for i := 0; i < len(qs); i++ { |
| 258 | + if qs[i].acquired < qs[i].size { |
| 259 | + result = dereferenceAppend(result, qs[i].arr, 0, qs[i].acquired) |
| 260 | + } else { |
| 261 | + result = dereferenceAppend(result, qs[i].arr, 0, qs[i].size) |
| 262 | + } |
| 263 | + } |
| 264 | + |
| 265 | + for i := 0; i < len(qs); i++ { |
| 266 | + atomic.StoreUint32(&qs[i].acquired, 0) |
| 267 | + atomic.StoreUint32(&qs[i].written, 0) |
| 268 | + } |
| 269 | + |
| 270 | + cb.drainMutex.Unlock() |
| 271 | + return result |
| 272 | +} |
0 commit comments