Skip to content

Commit e1cb606

Browse files
dweeboakarnokd
authored andcommitted
2.x Add concatMapCompletable() to Observable (#5649)
* Initial implementation of Observable.concatMapCompletable() * Update serialVersionUID * Fix javadoc and verify prefetch is positive * Put back auto-changed whitespace * Put back more whitespace intellij is removing * More javadoc fixes * switch from testng to junit * Add experimental annotation and change prefetch to capacityHint
1 parent bb1e313 commit e1cb606

File tree

3 files changed

+548
-0
lines changed

3 files changed

+548
-0
lines changed

src/main/java/io/reactivex/Observable.java

+46
Original file line numberDiff line numberDiff line change
@@ -6251,6 +6251,52 @@ public final <R> Observable<R> concatMapEagerDelayError(Function<? super T, ? ex
62516251
return RxJavaPlugins.onAssembly(new ObservableConcatMapEager<T, R>(this, mapper, tillTheEnd ? ErrorMode.END : ErrorMode.BOUNDARY, maxConcurrency, prefetch));
62526252
}
62536253

6254+
/**
6255+
* Maps each element of the upstream Observable into CompletableSources, subscribes to them one at a time in
6256+
* order and waits until the upstream and all CompletableSources complete.
6257+
* <dl>
6258+
* <dt><b>Scheduler:</b></dt>
6259+
* <dd>{@code concatMapCompletable} does not operate by default on a particular {@link Scheduler}.</dd>
6260+
* </dl>
6261+
*
6262+
* @param mapper
6263+
* a function that, when applied to an item emitted by the source ObservableSource, returns a CompletableSource
6264+
* @return a Completable that signals {@code onComplete} when the upstream and all CompletableSources complete
6265+
* @since 2.1.6 - experimental
6266+
*/
6267+
@Experimental
6268+
@CheckReturnValue
6269+
@SchedulerSupport(SchedulerSupport.NONE)
6270+
public final Completable concatMapCompletable(Function<? super T, ? extends CompletableSource> mapper) {
6271+
return concatMapCompletable(mapper, 2);
6272+
}
6273+
6274+
/**
6275+
* Maps each element of the upstream Observable into CompletableSources, subscribes to them one at a time in
6276+
* order and waits until the upstream and all CompletableSources complete.
6277+
* <dl>
6278+
* <dt><b>Scheduler:</b></dt>
6279+
* <dd>{@code concatMapCompletable} does not operate by default on a particular {@link Scheduler}.</dd>
6280+
* </dl>
6281+
*
6282+
* @param mapper
6283+
* a function that, when applied to an item emitted by the source ObservableSource, returns a CompletableSource
6284+
*
6285+
* @param capacityHint
6286+
* the number of upstream items expected to be buffered until the current CompletableSource, mapped from
6287+
* the current item, completes.
6288+
* @return a Completable that signals {@code onComplete} when the upstream and all CompletableSources complete
6289+
* @since 2.1.6 - experimental
6290+
*/
6291+
@Experimental
6292+
@CheckReturnValue
6293+
@SchedulerSupport(SchedulerSupport.NONE)
6294+
public final Completable concatMapCompletable(Function<? super T, ? extends CompletableSource> mapper, int capacityHint) {
6295+
ObjectHelper.requireNonNull(mapper, "mapper is null");
6296+
ObjectHelper.verifyPositive(capacityHint, "capacityHint");
6297+
return RxJavaPlugins.onAssembly(new ObservableConcatMapCompletable<T>(this, mapper, capacityHint));
6298+
}
6299+
62546300
/**
62556301
* Returns an Observable that concatenate each item emitted by the source ObservableSource with the values in an
62566302
* Iterable corresponding to that item that is generated by a selector.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,247 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
package io.reactivex.internal.operators.observable;
14+
15+
import io.reactivex.*;
16+
import io.reactivex.disposables.Disposable;
17+
import io.reactivex.exceptions.Exceptions;
18+
import io.reactivex.functions.Function;
19+
import io.reactivex.internal.disposables.DisposableHelper;
20+
import io.reactivex.internal.disposables.SequentialDisposable;
21+
import io.reactivex.internal.functions.ObjectHelper;
22+
import io.reactivex.internal.fuseable.QueueDisposable;
23+
import io.reactivex.internal.fuseable.SimpleQueue;
24+
import io.reactivex.internal.queue.SpscLinkedArrayQueue;
25+
import io.reactivex.plugins.RxJavaPlugins;
26+
27+
import java.util.concurrent.atomic.AtomicInteger;
28+
29+
public final class ObservableConcatMapCompletable<T> extends Completable {
30+
31+
final ObservableSource<T> source;
32+
final Function<? super T, ? extends CompletableSource> mapper;
33+
final int bufferSize;
34+
35+
public ObservableConcatMapCompletable(ObservableSource<T> source,
36+
Function<? super T, ? extends CompletableSource> mapper, int bufferSize) {
37+
this.source = source;
38+
this.mapper = mapper;
39+
this.bufferSize = Math.max(8, bufferSize);
40+
}
41+
@Override
42+
public void subscribeActual(CompletableObserver observer) {
43+
source.subscribe(new SourceObserver<T>(observer, mapper, bufferSize));
44+
}
45+
46+
static final class SourceObserver<T> extends AtomicInteger implements Observer<T>, Disposable {
47+
48+
private static final long serialVersionUID = 6893587405571511048L;
49+
final CompletableObserver actual;
50+
final SequentialDisposable sa;
51+
final Function<? super T, ? extends CompletableSource> mapper;
52+
final CompletableObserver inner;
53+
final int bufferSize;
54+
55+
SimpleQueue<T> queue;
56+
57+
Disposable s;
58+
59+
volatile boolean active;
60+
61+
volatile boolean disposed;
62+
63+
volatile boolean done;
64+
65+
int sourceMode;
66+
67+
SourceObserver(CompletableObserver actual,
68+
Function<? super T, ? extends CompletableSource> mapper, int bufferSize) {
69+
this.actual = actual;
70+
this.mapper = mapper;
71+
this.bufferSize = bufferSize;
72+
this.inner = new InnerObserver(actual, this);
73+
this.sa = new SequentialDisposable();
74+
}
75+
@Override
76+
public void onSubscribe(Disposable s) {
77+
if (DisposableHelper.validate(this.s, s)) {
78+
this.s = s;
79+
if (s instanceof QueueDisposable) {
80+
@SuppressWarnings("unchecked")
81+
QueueDisposable<T> qd = (QueueDisposable<T>) s;
82+
83+
int m = qd.requestFusion(QueueDisposable.ANY);
84+
if (m == QueueDisposable.SYNC) {
85+
sourceMode = m;
86+
queue = qd;
87+
done = true;
88+
89+
actual.onSubscribe(this);
90+
91+
drain();
92+
return;
93+
}
94+
95+
if (m == QueueDisposable.ASYNC) {
96+
sourceMode = m;
97+
queue = qd;
98+
99+
actual.onSubscribe(this);
100+
101+
return;
102+
}
103+
}
104+
105+
queue = new SpscLinkedArrayQueue<T>(bufferSize);
106+
107+
actual.onSubscribe(this);
108+
}
109+
}
110+
@Override
111+
public void onNext(T t) {
112+
if (done) {
113+
return;
114+
}
115+
if (sourceMode == QueueDisposable.NONE) {
116+
queue.offer(t);
117+
}
118+
drain();
119+
}
120+
@Override
121+
public void onError(Throwable t) {
122+
if (done) {
123+
RxJavaPlugins.onError(t);
124+
return;
125+
}
126+
done = true;
127+
dispose();
128+
actual.onError(t);
129+
}
130+
@Override
131+
public void onComplete() {
132+
if (done) {
133+
return;
134+
}
135+
done = true;
136+
drain();
137+
}
138+
139+
void innerComplete() {
140+
active = false;
141+
drain();
142+
}
143+
144+
@Override
145+
public boolean isDisposed() {
146+
return disposed;
147+
}
148+
149+
@Override
150+
public void dispose() {
151+
disposed = true;
152+
sa.dispose();
153+
s.dispose();
154+
155+
if (getAndIncrement() == 0) {
156+
queue.clear();
157+
}
158+
}
159+
160+
void innerSubscribe(Disposable s) {
161+
sa.update(s);
162+
}
163+
164+
void drain() {
165+
if (getAndIncrement() != 0) {
166+
return;
167+
}
168+
169+
for (;;) {
170+
if (disposed) {
171+
queue.clear();
172+
return;
173+
}
174+
if (!active) {
175+
176+
boolean d = done;
177+
178+
T t;
179+
180+
try {
181+
t = queue.poll();
182+
} catch (Throwable ex) {
183+
Exceptions.throwIfFatal(ex);
184+
dispose();
185+
queue.clear();
186+
actual.onError(ex);
187+
return;
188+
}
189+
190+
boolean empty = t == null;
191+
192+
if (d && empty) {
193+
disposed = true;
194+
actual.onComplete();
195+
return;
196+
}
197+
198+
if (!empty) {
199+
CompletableSource c;
200+
201+
try {
202+
c = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null CompletableSource");
203+
} catch (Throwable ex) {
204+
Exceptions.throwIfFatal(ex);
205+
dispose();
206+
queue.clear();
207+
actual.onError(ex);
208+
return;
209+
}
210+
211+
active = true;
212+
c.subscribe(inner);
213+
}
214+
}
215+
216+
if (decrementAndGet() == 0) {
217+
break;
218+
}
219+
}
220+
}
221+
222+
static final class InnerObserver implements CompletableObserver {
223+
final CompletableObserver actual;
224+
final SourceObserver<?> parent;
225+
226+
InnerObserver(CompletableObserver actual, SourceObserver<?> parent) {
227+
this.actual = actual;
228+
this.parent = parent;
229+
}
230+
231+
@Override
232+
public void onSubscribe(Disposable s) {
233+
parent.innerSubscribe(s);
234+
}
235+
236+
@Override
237+
public void onError(Throwable t) {
238+
parent.dispose();
239+
actual.onError(t);
240+
}
241+
@Override
242+
public void onComplete() {
243+
parent.innerComplete();
244+
}
245+
}
246+
}
247+
}

0 commit comments

Comments
 (0)