Skip to content

Commit 8a6bf14

Browse files
authored
2.x: Add fusion (perf++) to ObservableSwitchMap inner source (#5919)
1 parent e25ab24 commit 8a6bf14

File tree

7 files changed

+342
-76
lines changed

7 files changed

+342
-76
lines changed

src/jmh/java/io/reactivex/xmapz/ObservableSwitchMapCompletablePerf.java

+8-8
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,9 @@ public class ObservableSwitchMapCompletablePerf {
3232
@Param({ "1", "10", "100", "1000", "10000", "100000", "1000000" })
3333
public int count;
3434

35-
Observable<Integer> switchMapToObservableEmpty;
35+
Observable<Integer> observableConvert;
3636

37-
Completable switchMapCompletableEmpty;
37+
Completable observableDedicated;
3838

3939
Observable<Integer> observablePlain;
4040

@@ -53,15 +53,15 @@ public Observable<? extends Integer> apply(Integer v)
5353
}
5454
});
5555

56-
switchMapToObservableEmpty = source.switchMap(new Function<Integer, Observable<? extends Integer>>() {
56+
observableConvert = source.switchMap(new Function<Integer, Observable<? extends Integer>>() {
5757
@Override
5858
public Observable<? extends Integer> apply(Integer v)
5959
throws Exception {
6060
return Completable.complete().toObservable();
6161
}
6262
});
6363

64-
switchMapCompletableEmpty = source.switchMapCompletable(new Function<Integer, Completable>() {
64+
observableDedicated = source.switchMapCompletable(new Function<Integer, Completable>() {
6565
@Override
6666
public Completable apply(Integer v)
6767
throws Exception {
@@ -76,12 +76,12 @@ public Object observablePlain(Blackhole bh) {
7676
}
7777

7878
@Benchmark
79-
public Object switchMapToObservableEmpty(Blackhole bh) {
80-
return switchMapToObservableEmpty.subscribeWith(new PerfConsumer(bh));
79+
public Object observableConvert(Blackhole bh) {
80+
return observableConvert.subscribeWith(new PerfConsumer(bh));
8181
}
8282

8383
@Benchmark
84-
public Object switchMapCompletableEmpty(Blackhole bh) {
85-
return switchMapCompletableEmpty.subscribeWith(new PerfConsumer(bh));
84+
public Object observableDedicated(Blackhole bh) {
85+
return observableDedicated.subscribeWith(new PerfConsumer(bh));
8686
}
8787
}

src/main/java/io/reactivex/internal/observers/DeferredScalarDisposable.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -72,14 +72,15 @@ public final void complete(T value) {
7272
if ((state & (FUSED_READY | FUSED_CONSUMED | TERMINATED | DISPOSED)) != 0) {
7373
return;
7474
}
75+
Observer<? super T> a = actual;
7576
if (state == FUSED_EMPTY) {
7677
this.value = value;
7778
lazySet(FUSED_READY);
79+
a.onNext(null);
7880
} else {
7981
lazySet(TERMINATED);
82+
a.onNext(value);
8083
}
81-
Observer<? super T> a = actual;
82-
a.onNext(value);
8384
if (get() != DISPOSED) {
8485
a.onComplete();
8586
}

src/main/java/io/reactivex/internal/operators/completable/CompletableToObservable.java

+42-3
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515

1616
import io.reactivex.*;
1717
import io.reactivex.disposables.Disposable;
18+
import io.reactivex.internal.disposables.DisposableHelper;
19+
import io.reactivex.internal.observers.BasicQueueDisposable;
1820

1921
/**
2022
* Wraps a Completable and exposes it as an Observable.
@@ -34,8 +36,12 @@ protected void subscribeActual(Observer<? super T> observer) {
3436
source.subscribe(new ObserverCompletableObserver(observer));
3537
}
3638

37-
static final class ObserverCompletableObserver implements CompletableObserver {
38-
private final Observer<?> observer;
39+
static final class ObserverCompletableObserver extends BasicQueueDisposable<Void>
40+
implements CompletableObserver {
41+
42+
final Observer<?> observer;
43+
44+
Disposable upstream;
3945

4046
ObserverCompletableObserver(Observer<?> observer) {
4147
this.observer = observer;
@@ -53,7 +59,40 @@ public void onError(Throwable e) {
5359

5460
@Override
5561
public void onSubscribe(Disposable d) {
56-
observer.onSubscribe(d);
62+
if (DisposableHelper.validate(upstream, d)) {
63+
this.upstream = d;
64+
observer.onSubscribe(this);
65+
}
66+
}
67+
68+
@Override
69+
public int requestFusion(int mode) {
70+
return mode & ASYNC;
71+
}
72+
73+
@Override
74+
public Void poll() throws Exception {
75+
return null; // always empty
76+
}
77+
78+
@Override
79+
public boolean isEmpty() {
80+
return true;
81+
}
82+
83+
@Override
84+
public void clear() {
85+
// always empty
86+
}
87+
88+
@Override
89+
public void dispose() {
90+
upstream.dispose();
91+
}
92+
93+
@Override
94+
public boolean isDisposed() {
95+
return upstream.isDisposed();
5796
}
5897
}
5998
}

src/main/java/io/reactivex/internal/operators/observable/ObservableSwitchMap.java

+98-52
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@
2121
import io.reactivex.functions.Function;
2222
import io.reactivex.internal.disposables.DisposableHelper;
2323
import io.reactivex.internal.functions.ObjectHelper;
24-
import io.reactivex.internal.queue.*;
24+
import io.reactivex.internal.fuseable.*;
25+
import io.reactivex.internal.queue.SpscLinkedArrayQueue;
2526
import io.reactivex.internal.util.AtomicThrowable;
2627
import io.reactivex.plugins.RxJavaPlugins;
2728

@@ -181,6 +182,8 @@ void drain() {
181182
}
182183

183184
final Observer<? super R> a = actual;
185+
final AtomicReference<SwitchMapInnerObserver<T, R>> active = this.active;
186+
final boolean delayErrors = this.delayErrors;
184187

185188
int missing = 1;
186189

@@ -218,66 +221,85 @@ void drain() {
218221
SwitchMapInnerObserver<T, R> inner = active.get();
219222

220223
if (inner != null) {
221-
SpscLinkedArrayQueue<R> q = inner.queue;
222-
223-
if (inner.done) {
224-
boolean empty = q.isEmpty();
225-
if (delayErrors) {
226-
if (empty) {
227-
active.compareAndSet(inner, null);
228-
continue;
224+
SimpleQueue<R> q = inner.queue;
225+
226+
if (q != null) {
227+
if (inner.done) {
228+
boolean empty = q.isEmpty();
229+
if (delayErrors) {
230+
if (empty) {
231+
active.compareAndSet(inner, null);
232+
continue;
233+
}
234+
} else {
235+
Throwable ex = errors.get();
236+
if (ex != null) {
237+
a.onError(errors.terminate());
238+
return;
239+
}
240+
if (empty) {
241+
active.compareAndSet(inner, null);
242+
continue;
243+
}
229244
}
230-
} else {
231-
Throwable ex = errors.get();
232-
if (ex != null) {
233-
a.onError(errors.terminate());
245+
}
246+
247+
boolean retry = false;
248+
249+
for (;;) {
250+
if (cancelled) {
234251
return;
235252
}
236-
if (empty) {
237-
active.compareAndSet(inner, null);
238-
continue;
253+
if (inner != active.get()) {
254+
retry = true;
255+
break;
239256
}
240-
}
241-
}
242257

243-
boolean retry = false;
258+
if (!delayErrors) {
259+
Throwable ex = errors.get();
260+
if (ex != null) {
261+
a.onError(errors.terminate());
262+
return;
263+
}
264+
}
244265

245-
for (;;) {
246-
if (cancelled) {
247-
return;
248-
}
249-
if (inner != active.get()) {
250-
retry = true;
251-
break;
252-
}
266+
boolean d = inner.done;
267+
R v;
253268

254-
if (!delayErrors) {
255-
Throwable ex = errors.get();
256-
if (ex != null) {
257-
a.onError(errors.terminate());
258-
return;
269+
try {
270+
v = q.poll();
271+
} catch (Throwable ex) {
272+
Exceptions.throwIfFatal(ex);
273+
errors.addThrowable(ex);
274+
active.compareAndSet(inner, null);
275+
if (!delayErrors) {
276+
disposeInner();
277+
s.dispose();
278+
done = true;
279+
} else {
280+
inner.cancel();
281+
}
282+
v = null;
283+
retry = true;
259284
}
260-
}
285+
boolean empty = v == null;
261286

262-
boolean d = inner.done;
263-
R v = q.poll();
264-
boolean empty = v == null;
287+
if (d && empty) {
288+
active.compareAndSet(inner, null);
289+
retry = true;
290+
break;
291+
}
265292

266-
if (d && empty) {
267-
active.compareAndSet(inner, null);
268-
retry = true;
269-
break;
270-
}
293+
if (empty) {
294+
break;
295+
}
271296

272-
if (empty) {
273-
break;
297+
a.onNext(v);
274298
}
275299

276-
a.onNext(v);
277-
}
278-
279-
if (retry) {
280-
continue;
300+
if (retry) {
301+
continue;
302+
}
281303
}
282304
}
283305

@@ -306,25 +328,49 @@ static final class SwitchMapInnerObserver<T, R> extends AtomicReference<Disposab
306328
private static final long serialVersionUID = 3837284832786408377L;
307329
final SwitchMapObserver<T, R> parent;
308330
final long index;
309-
final SpscLinkedArrayQueue<R> queue;
331+
332+
final int bufferSize;
333+
334+
volatile SimpleQueue<R> queue;
310335

311336
volatile boolean done;
312337

313338
SwitchMapInnerObserver(SwitchMapObserver<T, R> parent, long index, int bufferSize) {
314339
this.parent = parent;
315340
this.index = index;
316-
this.queue = new SpscLinkedArrayQueue<R>(bufferSize);
341+
this.bufferSize = bufferSize;
317342
}
318343

319344
@Override
320345
public void onSubscribe(Disposable s) {
321-
DisposableHelper.setOnce(this, s);
346+
if (DisposableHelper.setOnce(this, s)) {
347+
if (s instanceof QueueDisposable) {
348+
@SuppressWarnings("unchecked")
349+
QueueDisposable<R> qd = (QueueDisposable<R>) s;
350+
351+
int m = qd.requestFusion(QueueDisposable.ANY);
352+
if (m == QueueDisposable.SYNC) {
353+
queue = qd;
354+
done = true;
355+
parent.drain();
356+
return;
357+
}
358+
if (m == QueueDisposable.ASYNC) {
359+
queue = qd;
360+
return;
361+
}
362+
}
363+
364+
queue = new SpscLinkedArrayQueue<R>(bufferSize);
365+
}
322366
}
323367

324368
@Override
325369
public void onNext(R t) {
326370
if (index == parent.unique) {
327-
queue.offer(t);
371+
if (t != null) {
372+
queue.offer(t);
373+
}
328374
parent.drain();
329375
}
330376
}

src/main/java/io/reactivex/internal/operators/single/SingleToObservable.java

+8-11
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import io.reactivex.annotations.Experimental;
1717
import io.reactivex.disposables.Disposable;
1818
import io.reactivex.internal.disposables.DisposableHelper;
19+
import io.reactivex.internal.observers.DeferredScalarDisposable;
1920

2021
/**
2122
* Wraps a Single and exposes it as an Observable.
@@ -48,14 +49,14 @@ public static <T> SingleObserver<T> create(Observer<? super T> downstream) {
4849
}
4950

5051
static final class SingleToObservableObserver<T>
51-
implements SingleObserver<T>, Disposable {
52-
53-
final Observer<? super T> actual;
52+
extends DeferredScalarDisposable<T>
53+
implements SingleObserver<T> {
5454

55+
private static final long serialVersionUID = 3786543492451018833L;
5556
Disposable d;
5657

5758
SingleToObservableObserver(Observer<? super T> actual) {
58-
this.actual = actual;
59+
super(actual);
5960
}
6061

6162
@Override
@@ -69,23 +70,19 @@ public void onSubscribe(Disposable d) {
6970

7071
@Override
7172
public void onSuccess(T value) {
72-
actual.onNext(value);
73-
actual.onComplete();
73+
complete(value);
7474
}
7575

7676
@Override
7777
public void onError(Throwable e) {
78-
actual.onError(e);
78+
error(e);
7979
}
8080

8181
@Override
8282
public void dispose() {
83+
super.dispose();
8384
d.dispose();
8485
}
8586

86-
@Override
87-
public boolean isDisposed() {
88-
return d.isDisposed();
89-
}
9087
}
9188
}

0 commit comments

Comments
 (0)