Skip to content

Commit f6f6d82

Browse files
authoredMar 3, 2018
2.x: Add Observable switchMapX and concatMapX operators (#5875)
* 2.x: Add Observable switchMapX and concatMapX operators * Rename local variables to match their datatype better.
1 parent ebab903 commit f6f6d82

19 files changed

+4949
-288
lines changed
 

‎src/main/java/io/reactivex/Observable.java

+547-3
Large diffs are not rendered by default.

‎src/main/java/io/reactivex/internal/operators/mixed/FlowableConcatMapSingle.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -255,10 +255,10 @@ void drain() {
255255
consumed = c;
256256
}
257257

258-
SingleSource<? extends R> ms;
258+
SingleSource<? extends R> ss;
259259

260260
try {
261-
ms = ObjectHelper.requireNonNull(mapper.apply(v), "The mapper returned a null SingleSource");
261+
ss = ObjectHelper.requireNonNull(mapper.apply(v), "The mapper returned a null SingleSource");
262262
} catch (Throwable ex) {
263263
Exceptions.throwIfFatal(ex);
264264
upstream.cancel();
@@ -270,7 +270,7 @@ void drain() {
270270
}
271271

272272
state = STATE_ACTIVE;
273-
ms.subscribe(inner);
273+
ss.subscribe(inner);
274274
break;
275275
} else if (s == STATE_RESULT_VALUE) {
276276
long e = emitted;

‎src/main/java/io/reactivex/internal/operators/mixed/FlowableSwitchMapSingle.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -115,10 +115,10 @@ public void onNext(T t) {
115115
current.dispose();
116116
}
117117

118-
SingleSource<? extends R> ms;
118+
SingleSource<? extends R> ss;
119119

120120
try {
121-
ms = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null SingleSource");
121+
ss = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null SingleSource");
122122
} catch (Throwable ex) {
123123
Exceptions.throwIfFatal(ex);
124124
upstream.cancel();
@@ -135,7 +135,7 @@ public void onNext(T t) {
135135
break;
136136
}
137137
if (inner.compareAndSet(current, observer)) {
138-
ms.subscribe(observer);
138+
ss.subscribe(observer);
139139
break;
140140
}
141141
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,277 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.mixed;
15+
16+
import java.util.concurrent.atomic.*;
17+
18+
import io.reactivex.*;
19+
import io.reactivex.annotations.Experimental;
20+
import io.reactivex.disposables.Disposable;
21+
import io.reactivex.exceptions.*;
22+
import io.reactivex.functions.Function;
23+
import io.reactivex.internal.disposables.DisposableHelper;
24+
import io.reactivex.internal.functions.ObjectHelper;
25+
import io.reactivex.internal.fuseable.SimplePlainQueue;
26+
import io.reactivex.internal.queue.SpscLinkedArrayQueue;
27+
import io.reactivex.internal.util.*;
28+
import io.reactivex.plugins.RxJavaPlugins;
29+
30+
/**
31+
* Maps the upstream intems into {@link CompletableSource}s and subscribes to them one after the
32+
* other completes or terminates (in error-delaying mode).
33+
* @param <T> the upstream value type
34+
* @since 2.1.11 - experimental
35+
*/
36+
@Experimental
37+
public final class ObservableConcatMapCompletable<T> extends Completable {
38+
39+
final Observable<T> source;
40+
41+
final Function<? super T, ? extends CompletableSource> mapper;
42+
43+
final ErrorMode errorMode;
44+
45+
final int prefetch;
46+
47+
public ObservableConcatMapCompletable(Observable<T> source,
48+
Function<? super T, ? extends CompletableSource> mapper,
49+
ErrorMode errorMode,
50+
int prefetch) {
51+
this.source = source;
52+
this.mapper = mapper;
53+
this.errorMode = errorMode;
54+
this.prefetch = prefetch;
55+
}
56+
57+
@Override
58+
protected void subscribeActual(CompletableObserver s) {
59+
source.subscribe(new ConcatMapCompletableObserver<T>(s, mapper, errorMode, prefetch));
60+
}
61+
62+
static final class ConcatMapCompletableObserver<T>
63+
extends AtomicInteger
64+
implements Observer<T>, Disposable {
65+
66+
private static final long serialVersionUID = 3610901111000061034L;
67+
68+
final CompletableObserver downstream;
69+
70+
final Function<? super T, ? extends CompletableSource> mapper;
71+
72+
final ErrorMode errorMode;
73+
74+
final AtomicThrowable errors;
75+
76+
final ConcatMapInnerObserver inner;
77+
78+
final int prefetch;
79+
80+
final SimplePlainQueue<T> queue;
81+
82+
Disposable upstream;
83+
84+
volatile boolean active;
85+
86+
volatile boolean done;
87+
88+
volatile boolean disposed;
89+
90+
ConcatMapCompletableObserver(CompletableObserver downstream,
91+
Function<? super T, ? extends CompletableSource> mapper,
92+
ErrorMode errorMode, int prefetch) {
93+
this.downstream = downstream;
94+
this.mapper = mapper;
95+
this.errorMode = errorMode;
96+
this.prefetch = prefetch;
97+
this.errors = new AtomicThrowable();
98+
this.inner = new ConcatMapInnerObserver(this);
99+
this.queue = new SpscLinkedArrayQueue<T>(prefetch);
100+
}
101+
102+
@Override
103+
public void onSubscribe(Disposable s) {
104+
if (DisposableHelper.validate(upstream, s)) {
105+
this.upstream = s;
106+
downstream.onSubscribe(this);
107+
}
108+
}
109+
110+
@Override
111+
public void onNext(T t) {
112+
queue.offer(t);
113+
drain();
114+
}
115+
116+
@Override
117+
public void onError(Throwable t) {
118+
if (errors.addThrowable(t)) {
119+
if (errorMode == ErrorMode.IMMEDIATE) {
120+
disposed = true;
121+
inner.dispose();
122+
t = errors.terminate();
123+
if (t != ExceptionHelper.TERMINATED) {
124+
downstream.onError(t);
125+
}
126+
if (getAndIncrement() == 0) {
127+
queue.clear();
128+
}
129+
} else {
130+
done = true;
131+
drain();
132+
}
133+
} else {
134+
RxJavaPlugins.onError(t);
135+
}
136+
}
137+
138+
@Override
139+
public void onComplete() {
140+
done = true;
141+
drain();
142+
}
143+
144+
@Override
145+
public void dispose() {
146+
disposed = true;
147+
upstream.dispose();
148+
inner.dispose();
149+
if (getAndIncrement() == 0) {
150+
queue.clear();
151+
}
152+
}
153+
154+
@Override
155+
public boolean isDisposed() {
156+
return disposed;
157+
}
158+
159+
void innerError(Throwable ex) {
160+
if (errors.addThrowable(ex)) {
161+
if (errorMode == ErrorMode.IMMEDIATE) {
162+
disposed = true;
163+
upstream.dispose();
164+
ex = errors.terminate();
165+
if (ex != ExceptionHelper.TERMINATED) {
166+
downstream.onError(ex);
167+
}
168+
if (getAndIncrement() == 0) {
169+
queue.clear();
170+
}
171+
} else {
172+
active = false;
173+
drain();
174+
}
175+
} else {
176+
RxJavaPlugins.onError(ex);
177+
}
178+
}
179+
180+
void innerComplete() {
181+
active = false;
182+
drain();
183+
}
184+
185+
void drain() {
186+
if (getAndIncrement() != 0) {
187+
return;
188+
}
189+
190+
do {
191+
if (disposed) {
192+
queue.clear();
193+
return;
194+
}
195+
196+
if (!active) {
197+
198+
if (errorMode == ErrorMode.BOUNDARY) {
199+
if (errors.get() != null) {
200+
disposed = true;
201+
queue.clear();
202+
Throwable ex = errors.terminate();
203+
downstream.onError(ex);
204+
return;
205+
}
206+
}
207+
208+
boolean d = done;
209+
T v = queue.poll();
210+
boolean empty = v == null;
211+
212+
if (d && empty) {
213+
disposed = true;
214+
Throwable ex = errors.terminate();
215+
if (ex != null) {
216+
downstream.onError(ex);
217+
} else {
218+
downstream.onComplete();
219+
}
220+
return;
221+
}
222+
223+
if (!empty) {
224+
225+
CompletableSource cs;
226+
227+
try {
228+
cs = ObjectHelper.requireNonNull(mapper.apply(v), "The mapper returned a null CompletableSource");
229+
} catch (Throwable ex) {
230+
Exceptions.throwIfFatal(ex);
231+
disposed = true;
232+
queue.clear();
233+
upstream.dispose();
234+
errors.addThrowable(ex);
235+
ex = errors.terminate();
236+
downstream.onError(ex);
237+
return;
238+
}
239+
active = true;
240+
cs.subscribe(inner);
241+
}
242+
}
243+
} while (decrementAndGet() != 0);
244+
}
245+
246+
static final class ConcatMapInnerObserver extends AtomicReference<Disposable>
247+
implements CompletableObserver {
248+
249+
private static final long serialVersionUID = 5638352172918776687L;
250+
251+
final ConcatMapCompletableObserver<?> parent;
252+
253+
ConcatMapInnerObserver(ConcatMapCompletableObserver<?> parent) {
254+
this.parent = parent;
255+
}
256+
257+
@Override
258+
public void onSubscribe(Disposable d) {
259+
DisposableHelper.replace(this, d);
260+
}
261+
262+
@Override
263+
public void onError(Throwable e) {
264+
parent.innerError(e);
265+
}
266+
267+
@Override
268+
public void onComplete() {
269+
parent.innerComplete();
270+
}
271+
272+
void dispose() {
273+
DisposableHelper.dispose(this);
274+
}
275+
}
276+
}
277+
}

0 commit comments

Comments
 (0)