-
Notifications
You must be signed in to change notification settings - Fork 7.6k
/
Copy pathObservablePublish.java
385 lines (353 loc) · 15.3 KB
/
ObservablePublish.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.internal.operators.observable;
import java.util.concurrent.atomic.*;
import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.Consumer;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.fuseable.HasUpstreamObservableSource;
import io.reactivex.internal.util.ExceptionHelper;
import io.reactivex.observables.ConnectableObservable;
import io.reactivex.plugins.RxJavaPlugins;
/**
* A connectable observable which shares an underlying source and dispatches source values to observers in a backpressure-aware
* manner.
* @param <T> the value type
*/
public final class ObservablePublish<T> extends ConnectableObservable<T> implements HasUpstreamObservableSource<T> {
/** The source observable. */
final ObservableSource<T> source;
/** Holds the current subscriber that is, will be or just was subscribed to the source observable. */
final AtomicReference<PublishObserver<T>> current;
final ObservableSource<T> onSubscribe;
/**
* Creates a OperatorPublish instance to publish values of the given source observable.
* @param <T> the source value type
* @param source the source observable
* @return the connectable observable
*/
public static <T> ConnectableObservable<T> create(ObservableSource<T> source) {
// the current connection to source needs to be shared between the operator and its onSubscribe call
final AtomicReference<PublishObserver<T>> curr = new AtomicReference<PublishObserver<T>>();
ObservableSource<T> onSubscribe = new PublishSource<T>(curr);
return RxJavaPlugins.onAssembly(new ObservablePublish<T>(onSubscribe, source, curr));
}
private ObservablePublish(ObservableSource<T> onSubscribe, ObservableSource<T> source,
final AtomicReference<PublishObserver<T>> current) {
this.onSubscribe = onSubscribe;
this.source = source;
this.current = current;
}
@Override
public ObservableSource<T> source() {
return source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
onSubscribe.subscribe(observer);
}
@Override
public void connect(Consumer<? super Disposable> connection) {
boolean doConnect;
PublishObserver<T> ps;
// we loop because concurrent connect/disconnect and termination may change the state
for (;;) {
// retrieve the current subscriber-to-source instance
ps = current.get();
// if there is none yet or the current has been disposed
if (ps == null || ps.isDisposed()) {
// create a new subscriber-to-source
PublishObserver<T> u = new PublishObserver<T>(current);
// try setting it as the current subscriber-to-source
if (!current.compareAndSet(ps, u)) {
// did not work, perhaps a new subscriber arrived
// and created a new subscriber-to-source as well, retry
continue;
}
ps = u;
}
// if connect() was called concurrently, only one of them should actually
// connect to the source
doConnect = !ps.shouldConnect.get() && ps.shouldConnect.compareAndSet(false, true);
break; // NOPMD
}
/*
* Notify the callback that we have a (new) connection which it can dispose
* but since ps is unique to a connection, multiple calls to connect() will return the
* same Disposable and even if there was a connect-disconnect-connect pair, the older
* references won't disconnect the newer connection.
* Synchronous source consumers have the opportunity to disconnect via dispose on the
* Disposable as subscribe() may never return in its own.
*
* Note however, that asynchronously disconnecting a running source might leave
* child observers without any terminal event; PublishSubject does not have this
* issue because the dispose() was always triggered by the child observers
* themselves.
*/
try {
connection.accept(ps);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
throw ExceptionHelper.wrapOrThrow(ex);
}
if (doConnect) {
source.subscribe(ps);
}
}
@SuppressWarnings("rawtypes")
static final class PublishObserver<T>
implements Observer<T>, Disposable {
/** Holds onto the current connected PublishObserver. */
final AtomicReference<PublishObserver<T>> current;
/** Indicates an empty array of inner observers. */
static final InnerDisposable[] EMPTY = new InnerDisposable[0];
/** Indicates a terminated PublishObserver. */
static final InnerDisposable[] TERMINATED = new InnerDisposable[0];
/** Tracks the subscribed observers. */
final AtomicReference<InnerDisposable<T>[]> observers;
/**
* Atomically changed from false to true by connect to make sure the
* connection is only performed by one thread.
*/
final AtomicBoolean shouldConnect;
final AtomicReference<Disposable> upstream = new AtomicReference<Disposable>();
@SuppressWarnings("unchecked")
PublishObserver(AtomicReference<PublishObserver<T>> current) {
this.observers = new AtomicReference<InnerDisposable<T>[]>(EMPTY);
this.current = current;
this.shouldConnect = new AtomicBoolean();
}
@SuppressWarnings("unchecked")
@Override
public void dispose() {
InnerDisposable[] ps = observers.getAndSet(TERMINATED);
if (ps != TERMINATED) {
current.compareAndSet(PublishObserver.this, null);
DisposableHelper.dispose(upstream);
}
}
@Override
public boolean isDisposed() {
return observers.get() == TERMINATED;
}
@Override
public void onSubscribe(Disposable d) {
DisposableHelper.setOnce(this.upstream, d);
}
@Override
public void onNext(T t) {
for (InnerDisposable<T> inner : observers.get()) {
inner.child.onNext(t);
}
}
@SuppressWarnings("unchecked")
@Override
public void onError(Throwable e) {
current.compareAndSet(this, null);
InnerDisposable<T>[] a = observers.getAndSet(TERMINATED);
if (a.length != 0) {
for (InnerDisposable<T> inner : a) {
inner.child.onError(e);
}
} else {
RxJavaPlugins.onError(e);
}
}
@SuppressWarnings("unchecked")
@Override
public void onComplete() {
current.compareAndSet(this, null);
for (InnerDisposable<T> inner : observers.getAndSet(TERMINATED)) {
inner.child.onComplete();
}
}
/**
* Atomically try adding a new InnerDisposable to this Observer or return false if this
* Observer was terminated.
* @param producer the producer to add
* @return true if succeeded, false otherwise
*/
boolean add(InnerDisposable<T> producer) {
// the state can change so we do a CAS loop to achieve atomicity
for (;;) {
// get the current producer array
InnerDisposable<T>[] c = observers.get();
// if this subscriber-to-source reached a terminal state by receiving
// an onError or onComplete, just refuse to add the new producer
if (c == TERMINATED) {
return false;
}
// we perform a copy-on-write logic
int len = c.length;
@SuppressWarnings("unchecked")
InnerDisposable<T>[] u = new InnerDisposable[len + 1];
System.arraycopy(c, 0, u, 0, len);
u[len] = producer;
// try setting the observers array
if (observers.compareAndSet(c, u)) {
return true;
}
// if failed, some other operation succeeded (another add, remove or termination)
// so retry
}
}
/**
* Atomically removes the given producer from the observers array.
* @param producer the producer to remove
*/
@SuppressWarnings("unchecked")
void remove(InnerDisposable<T> producer) {
// the state can change so we do a CAS loop to achieve atomicity
for (;;) {
// let's read the current observers array
InnerDisposable<T>[] c = observers.get();
// if it is either empty or terminated, there is nothing to remove so we quit
int len = c.length;
if (len == 0) {
return;
}
// let's find the supplied producer in the array
// although this is O(n), we don't expect too many child observers in general
int j = -1;
for (int i = 0; i < len; i++) {
if (c[i].equals(producer)) {
j = i;
break;
}
}
// we didn't find it so just quit
if (j < 0) {
return;
}
// we do copy-on-write logic here
InnerDisposable<T>[] u;
// we don't create a new empty array if producer was the single inhabitant
// but rather reuse an empty array
if (len == 1) {
u = EMPTY;
} else {
// otherwise, create a new array one less in size
u = new InnerDisposable[len - 1];
// copy elements being before the given producer
System.arraycopy(c, 0, u, 0, j);
// copy elements being after the given producer
System.arraycopy(c, j + 1, u, j, len - j - 1);
}
// try setting this new array as
if (observers.compareAndSet(c, u)) {
return;
}
// if we failed, it means something else happened
// (a concurrent add/remove or termination), we need to retry
}
}
}
/**
* A Disposable that manages the request and disposed state of a
* child Observer in thread-safe manner.
* {@code this} holds the parent PublishObserver or itself if disposed
* @param <T> the value type
*/
static final class InnerDisposable<T>
extends AtomicReference<Object>
implements Disposable {
private static final long serialVersionUID = -1100270633763673112L;
/** The actual child subscriber. */
final Observer<? super T> child;
InnerDisposable(Observer<? super T> child) {
this.child = child;
}
@Override
public boolean isDisposed() {
return get() == this;
}
@SuppressWarnings("unchecked")
@Override
public void dispose() {
Object o = getAndSet(this);
if (o != null && o != this) {
((PublishObserver<T>)o).remove(this);
}
}
void setParent(PublishObserver<T> p) {
if (!compareAndSet(null, p)) {
p.remove(this);
}
}
}
static final class PublishSource<T> implements ObservableSource<T> {
private final AtomicReference<PublishObserver<T>> curr;
PublishSource(AtomicReference<PublishObserver<T>> curr) {
this.curr = curr;
}
@Override
public void subscribe(Observer<? super T> child) {
// create the backpressure-managing producer for this child
InnerDisposable<T> inner = new InnerDisposable<T>(child);
child.onSubscribe(inner);
// concurrent connection/disconnection may change the state,
// we loop to be atomic while the child subscribes
for (;;) {
// get the current subscriber-to-source
PublishObserver<T> r = curr.get();
// if there isn't one or it is disposed
if (r == null || r.isDisposed()) {
// create a new subscriber to source
PublishObserver<T> u = new PublishObserver<T>(curr);
// let's try setting it as the current subscriber-to-source
if (!curr.compareAndSet(r, u)) {
// didn't work, maybe someone else did it or the current subscriber
// to source has just finished
continue;
}
// we won, let's use it going onwards
r = u;
}
/*
* Try adding it to the current subscriber-to-source, add is atomic in respect
* to other adds and the termination of the subscriber-to-source.
*/
if (r.add(inner)) {
inner.setParent(r);
break; // NOPMD
}
/*
* The current PublishObserver has been terminated, try with a newer one.
*/
/*
* Note: although technically correct, concurrent disconnects can cause
* unexpected behavior such as child observers never receiving anything
* (unless connected again). An alternative approach, similar to
* PublishSubject would be to immediately terminate such child
* observers as well:
*
* Object term = r.terminalEvent;
* if (r.nl.isCompleted(term)) {
* child.onComplete();
* } else {
* child.onError(r.nl.getError(term));
* }
* return;
*
* The original concurrent behavior was non-deterministic in this regard as well.
* Allowing this behavior, however, may introduce another unexpected behavior:
* after disconnecting a previous connection, one might not be able to prepare
* a new connection right after a previous termination by subscribing new child
* observers asynchronously before a connect call.
*/
}
}
}
}