-
Notifications
You must be signed in to change notification settings - Fork 7.6k
/
Copy pathSpscLinkedArrayQueue.java
286 lines (249 loc) · 10.6 KB
/
SpscLinkedArrayQueue.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/
/*
* The code was inspired by the similarly named JCTools class:
* https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/atomic
*/
package io.reactivex.internal.queue;
import java.util.concurrent.atomic.*;
import io.reactivex.internal.fuseable.SimpleQueue;
import io.reactivex.internal.util.Pow2;
/**
* A single-producer single-consumer array-backed queue which can allocate new arrays in case the consumer is slower
* than the producer.
* @param <T> the contained value type
*/
public final class SpscLinkedArrayQueue<T> implements SimpleQueue<T> {
static final int MAX_LOOK_AHEAD_STEP = Integer.getInteger("jctools.spsc.max.lookahead.step", 4096);
final AtomicLong producerIndex = new AtomicLong();
int producerLookAheadStep;
long producerLookAhead;
final int producerMask;
AtomicReferenceArray<Object> producerBuffer;
final int consumerMask;
AtomicReferenceArray<Object> consumerBuffer;
final AtomicLong consumerIndex = new AtomicLong();
private static final Object HAS_NEXT = new Object();
public SpscLinkedArrayQueue(final int bufferSize) {
int p2capacity = Pow2.roundToPowerOfTwo(Math.max(8, bufferSize));
int mask = p2capacity - 1;
AtomicReferenceArray<Object> buffer = new AtomicReferenceArray<Object>(p2capacity + 1);
producerBuffer = buffer;
producerMask = mask;
adjustLookAheadStep(p2capacity);
consumerBuffer = buffer;
consumerMask = mask;
producerLookAhead = mask - 1; // we know it's all empty to start with
soProducerIndex(0L);
}
/**
* {@inheritDoc}
* <p>
* This implementation is correct for single producer thread use only.
*/
@Override
public boolean offer(final T e) {
if (null == e) {
throw new NullPointerException("Null is not a valid element");
}
// local load of field to avoid repeated loads after volatile reads
final AtomicReferenceArray<Object> buffer = producerBuffer;
final long index = lpProducerIndex();
final int mask = producerMask;
final int offset = calcWrappedOffset(index, mask);
if (index < producerLookAhead) {
return writeToQueue(buffer, e, index, offset);
} else {
final int lookAheadStep = producerLookAheadStep;
// go around the buffer or resize if full (unless we hit max capacity)
int lookAheadElementOffset = calcWrappedOffset(index + lookAheadStep, mask);
if (null == lvElement(buffer, lookAheadElementOffset)) { // LoadLoad
producerLookAhead = index + lookAheadStep - 1; // joy, there's plenty of room
return writeToQueue(buffer, e, index, offset);
} else if (null == lvElement(buffer, calcWrappedOffset(index + 1, mask))) { // buffer is not full
return writeToQueue(buffer, e, index, offset);
} else {
resize(buffer, index, offset, e, mask); // add a buffer and link old to new
return true;
}
}
}
private boolean writeToQueue(final AtomicReferenceArray<Object> buffer, final T e, final long index, final int offset) {
soElement(buffer, offset, e);// StoreStore
soProducerIndex(index + 1);// this ensures atomic write of long on 32bit platforms
return true;
}
private void resize(final AtomicReferenceArray<Object> oldBuffer, final long currIndex, final int offset, final T e,
final long mask) {
final int capacity = oldBuffer.length();
final AtomicReferenceArray<Object> newBuffer = new AtomicReferenceArray<Object>(capacity);
producerBuffer = newBuffer;
producerLookAhead = currIndex + mask - 1;
soElement(newBuffer, offset, e);// StoreStore
soNext(oldBuffer, newBuffer);
soElement(oldBuffer, offset, HAS_NEXT); // new buffer is visible after element is
// inserted
soProducerIndex(currIndex + 1);// this ensures correctness on 32bit platforms
}
private void soNext(AtomicReferenceArray<Object> curr, AtomicReferenceArray<Object> next) {
soElement(curr, calcDirectOffset(curr.length() - 1), next);
}
@SuppressWarnings("unchecked")
private AtomicReferenceArray<Object> lvNext(AtomicReferenceArray<Object> curr) {
return (AtomicReferenceArray<Object>)lvElement(curr, calcDirectOffset(curr.length() - 1));
}
/**
* {@inheritDoc}
* <p>
* This implementation is correct for single consumer thread use only.
*/
@SuppressWarnings("unchecked")
@Override
public T poll() {
// local load of field to avoid repeated loads after volatile reads
final AtomicReferenceArray<Object> buffer = consumerBuffer;
final long index = lpConsumerIndex();
final int mask = consumerMask;
final int offset = calcWrappedOffset(index, mask);
final Object e = lvElement(buffer, offset);// LoadLoad
boolean isNextBuffer = e == HAS_NEXT;
if (null != e && !isNextBuffer) {
soElement(buffer, offset, null);// StoreStore
soConsumerIndex(index + 1);// this ensures correctness on 32bit platforms
return (T) e;
} else if (isNextBuffer) {
return newBufferPoll(lvNext(buffer), index, mask);
}
return null;
}
@SuppressWarnings("unchecked")
private T newBufferPoll(AtomicReferenceArray<Object> nextBuffer, final long index, final int mask) {
consumerBuffer = nextBuffer;
final int offsetInNew = calcWrappedOffset(index, mask);
final T n = (T) lvElement(nextBuffer, offsetInNew);// LoadLoad
if (null == n) {
return null;
} else {
soElement(nextBuffer, offsetInNew, null);// StoreStore
soConsumerIndex(index + 1);// this ensures correctness on 32bit platforms
return n;
}
}
@SuppressWarnings("unchecked")
public T peek() {
final AtomicReferenceArray<Object> buffer = consumerBuffer;
final long index = lpConsumerIndex();
final int mask = consumerMask;
final int offset = calcWrappedOffset(index, mask);
final Object e = lvElement(buffer, offset);// LoadLoad
if (e == HAS_NEXT) {
return newBufferPeek(lvNext(buffer), index, mask);
}
return (T) e;
}
@SuppressWarnings("unchecked")
private T newBufferPeek(AtomicReferenceArray<Object> nextBuffer, final long index, final int mask) {
consumerBuffer = nextBuffer;
final int offsetInNew = calcWrappedOffset(index, mask);
return (T) lvElement(nextBuffer, offsetInNew);// LoadLoad
}
@Override
public void clear() {
while (poll() != null || !isEmpty()) { } // NOPMD
}
public int size() {
/*
* It is possible for a thread to be interrupted or reschedule between the read of the producer and
* consumer indices, therefore protection is required to ensure size is within valid range. In the
* event of concurrent polls/offers to this method the size is OVER estimated as we read consumer
* index BEFORE the producer index.
*/
long after = lvConsumerIndex();
while (true) {
final long before = after;
final long currentProducerIndex = lvProducerIndex();
after = lvConsumerIndex();
if (before == after) {
return (int) (currentProducerIndex - after);
}
}
}
@Override
public boolean isEmpty() {
return lvProducerIndex() == lvConsumerIndex();
}
private void adjustLookAheadStep(int capacity) {
producerLookAheadStep = Math.min(capacity / 4, MAX_LOOK_AHEAD_STEP);
}
private long lvProducerIndex() {
return producerIndex.get();
}
private long lvConsumerIndex() {
return consumerIndex.get();
}
private long lpProducerIndex() {
return producerIndex.get();
}
private long lpConsumerIndex() {
return consumerIndex.get();
}
private void soProducerIndex(long v) {
producerIndex.lazySet(v);
}
private void soConsumerIndex(long v) {
consumerIndex.lazySet(v);
}
private static int calcWrappedOffset(long index, int mask) {
return calcDirectOffset((int)index & mask);
}
private static int calcDirectOffset(int index) {
return index;
}
private static void soElement(AtomicReferenceArray<Object> buffer, int offset, Object e) {
buffer.lazySet(offset, e);
}
private static <E> Object lvElement(AtomicReferenceArray<Object> buffer, int offset) {
return buffer.get(offset);
}
/**
* Offer two elements at the same time.
* <p>Don't use the regular offer() with this at all!
* @param first the first value, not null
* @param second the second value, not null
* @return true if the queue accepted the two new values
*/
@Override
public boolean offer(T first, T second) {
final AtomicReferenceArray<Object> buffer = producerBuffer;
final long p = lvProducerIndex();
final int m = producerMask;
int pi = calcWrappedOffset(p + 2, m);
if (null == lvElement(buffer, pi)) {
pi = calcWrappedOffset(p, m);
soElement(buffer, pi + 1, second);
soElement(buffer, pi, first);
soProducerIndex(p + 2);
} else {
final int capacity = buffer.length();
final AtomicReferenceArray<Object> newBuffer = new AtomicReferenceArray<Object>(capacity);
producerBuffer = newBuffer;
pi = calcWrappedOffset(p, m);
soElement(newBuffer, pi + 1, second);// StoreStore
soElement(newBuffer, pi, first);
soNext(buffer, newBuffer);
soElement(buffer, pi, HAS_NEXT); // new buffer is visible after element is
soProducerIndex(p + 2);// this ensures correctness on 32bit platforms
}
return true;
}
}