-
Notifications
You must be signed in to change notification settings - Fork 7.6k
/
Copy pathDeferredScalarSubscription.java
203 lines (180 loc) · 6.71 KB
/
DeferredScalarSubscription.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.internal.subscriptions;
import org.reactivestreams.Subscriber;
/**
* A subscription that signals a single value eventually.
* <p>
* Note that the class leaks all methods of {@link java.util.concurrent.atomic.AtomicLong}.
* Use {@link #complete(Object)} to signal the single value.
* <p>
* The this atomic integer stores a bit field:<br>
* bit 0: indicates that there is a value available<br>
* bit 1: indicates that there was a request made<br>
* bit 2: indicates there was a cancellation, exclusively set<br>
* bit 3: indicates in fusion mode but no value yet, exclusively set<br>
* bit 4: indicates in fusion mode and value is available, exclusively set<br>
* bit 5: indicates in fusion mode and value has been consumed, exclusively set<br>
* Where exclusively set means any other bits are 0 when that bit is set.
* @param <T> the value type
*/
public class DeferredScalarSubscription<T> extends BasicIntQueueSubscription<T> {
private static final long serialVersionUID = -2151279923272604993L;
/** The Subscriber to emit the value to. */
protected final Subscriber<? super T> actual;
/** The value is stored here if there is no request yet or in fusion mode. */
protected T value;
/** Indicates this Subscription has no value and not requested yet. */
static final int NO_REQUEST_NO_VALUE = 0;
/** Indicates this Subscription has a value but not requested yet. */
static final int NO_REQUEST_HAS_VALUE = 1;
/** Indicates this Subscription has been requested but there is no value yet. */
static final int HAS_REQUEST_NO_VALUE = 2;
/** Indicates this Subscription has both request and value. */
static final int HAS_REQUEST_HAS_VALUE = 3;
/** Indicates the Subscription has been cancelled. */
static final int CANCELLED = 4;
/** Indicates this Subscription is in fusion mode and is currently empty. */
static final int FUSED_EMPTY = 8;
/** Indicates this Subscription is in fusion mode and has a value. */
static final int FUSED_READY = 16;
/** Indicates this Subscription is in fusion mode and its value has been consumed. */
static final int FUSED_CONSUMED = 32;
/**
* Creates a DeferredScalarSubscription by wrapping the given Subscriber.
* @param actual the Subscriber to wrap, not null (not verified)
*/
public DeferredScalarSubscription(Subscriber<? super T> actual) {
this.actual = actual;
}
@Override
public final void request(long n) {
if (SubscriptionHelper.validate(n)) {
for (;;) {
int state = get();
// if the any bits 1-31 are set, we are either in fusion mode (FUSED_*)
// or request has been called (HAS_REQUEST_*)
if ((state & ~NO_REQUEST_HAS_VALUE) != 0) {
return;
}
if (state == NO_REQUEST_HAS_VALUE) {
if (compareAndSet(NO_REQUEST_HAS_VALUE, HAS_REQUEST_HAS_VALUE)) {
T v = value;
if (v != null) {
value = null;
Subscriber<? super T> a = actual;
a.onNext(v);
if (get() != CANCELLED) {
a.onComplete();
}
}
}
return;
}
if (compareAndSet(NO_REQUEST_NO_VALUE, HAS_REQUEST_NO_VALUE)) {
return;
}
}
}
}
/**
* Completes this subscription by indicating the given value should
* be emitted when the first request arrives.
* <p>Make sure this is called exactly once.
* @param v the value to signal, not null (not validated)
*/
public final void complete(T v) {
int state = get();
for (;;) {
if (state == FUSED_EMPTY) {
value = v;
lazySet(FUSED_READY);
Subscriber<? super T> a = actual;
a.onNext(v);
if (get() != CANCELLED) {
a.onComplete();
}
return;
}
// if state is >= CANCELLED or bit zero is set (*_HAS_VALUE) case, return
if ((state & ~HAS_REQUEST_NO_VALUE) != 0) {
return;
}
if (state == HAS_REQUEST_NO_VALUE) {
lazySet(HAS_REQUEST_HAS_VALUE);
Subscriber<? super T> a = actual;
a.onNext(v);
if (get() != CANCELLED) {
a.onComplete();
}
return;
}
value = v;
if (compareAndSet(NO_REQUEST_NO_VALUE, NO_REQUEST_HAS_VALUE)) {
return;
}
state = get();
if (state == CANCELLED) {
value = null;
return;
}
}
}
@Override
public final int requestFusion(int mode) {
if ((mode & ASYNC) != 0) {
lazySet(FUSED_EMPTY);
return ASYNC;
}
return NONE;
}
@Override
public final T poll() {
if (get() == FUSED_READY) {
lazySet(FUSED_CONSUMED);
T v = value;
value = null;
return v;
}
return null;
}
@Override
public final boolean isEmpty() {
return get() != FUSED_READY;
}
@Override
public final void clear() {
lazySet(FUSED_CONSUMED);
value = null;
}
@Override
public void cancel() {
set(CANCELLED);
value = null;
}
/**
* Returns true if this Subscription has been cancelled.
* @return true if this Subscription has been cancelled
*/
public final boolean isCancelled() {
return get() == CANCELLED;
}
/**
* Atomically sets a cancelled state and returns true if
* the current thread did it successfully.
* @return true if the current thread cancelled
*/
public final boolean tryCancel() {
return getAndSet(CANCELLED) != CANCELLED;
}
}