Skip to content

Commit a249f4f

Browse files
authored
3.x: [Java 8] Add blockingStream & flatMapStream to Flowable (#6779)
1 parent 3d25617 commit a249f4f

12 files changed

+1461
-0
lines changed

src/main/java/io/reactivex/rxjava3/core/Flowable.java

+285
Large diffs are not rendered by default.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,332 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava3.internal.jdk8;
15+
16+
import java.util.*;
17+
import java.util.concurrent.atomic.*;
18+
import java.util.stream.Stream;
19+
20+
import org.reactivestreams.*;
21+
22+
import io.reactivex.rxjava3.annotations.NonNull;
23+
import io.reactivex.rxjava3.core.*;
24+
import io.reactivex.rxjava3.exceptions.MissingBackpressureException;
25+
import io.reactivex.rxjava3.functions.*;
26+
import io.reactivex.rxjava3.internal.fuseable.*;
27+
import io.reactivex.rxjava3.internal.queue.SpscArrayQueue;
28+
import io.reactivex.rxjava3.internal.subscriptions.*;
29+
import io.reactivex.rxjava3.internal.util.*;
30+
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
31+
32+
/**
33+
* Maps the upstream values onto {@link Stream}s and emits their items in order to the downstream.
34+
*
35+
* @param <T> the upstream element type
36+
* @param <R> the inner {@code Stream} and result element type
37+
* @since 3.0.0
38+
*/
39+
public final class FlowableFlatMapStream<T, R> extends Flowable<R> {
40+
41+
final Flowable<T> source;
42+
43+
final Function<? super T, ? extends Stream<? extends R>> mapper;
44+
45+
final int prefetch;
46+
47+
public FlowableFlatMapStream(Flowable<T> source, Function<? super T, ? extends Stream<? extends R>> mapper, int prefetch) {
48+
this.source = source;
49+
this.mapper = mapper;
50+
this.prefetch = prefetch;
51+
}
52+
53+
@Override
54+
protected void subscribeActual(Subscriber<? super R> s) {
55+
if (source instanceof Supplier) {
56+
Stream<? extends R> stream = null;
57+
try {
58+
@SuppressWarnings("unchecked")
59+
T t = ((Supplier<T>)source).get();
60+
if (t != null) {
61+
stream = Objects.requireNonNull(mapper.apply(t), "The mapper returned a null Stream");
62+
}
63+
} catch (Throwable ex) {
64+
EmptySubscription.error(ex, s);
65+
return;
66+
}
67+
68+
if (stream != null) {
69+
FlowableFromStream.subscribeStream(s, stream);
70+
} else {
71+
EmptySubscription.complete(s);
72+
}
73+
} else {
74+
source.subscribe(new FlatMapStreamSubscriber<>(s, mapper, prefetch));
75+
}
76+
}
77+
78+
static final class FlatMapStreamSubscriber<T, R> extends AtomicInteger
79+
implements FlowableSubscriber<T>, Subscription {
80+
81+
private static final long serialVersionUID = -5127032662980523968L;
82+
83+
final Subscriber<? super R> downstream;
84+
85+
final Function<? super T, ? extends Stream<? extends R>> mapper;
86+
87+
final int prefetch;
88+
89+
final AtomicLong requested;
90+
91+
SimpleQueue<T> queue;
92+
93+
Subscription upstream;
94+
95+
Iterator<? extends R> currentIterator;
96+
97+
AutoCloseable currentCloseable;
98+
99+
volatile boolean cancelled;
100+
101+
volatile boolean upstreamDone;
102+
final AtomicThrowable error;
103+
104+
long emitted;
105+
106+
int consumed;
107+
108+
int sourceMode;
109+
110+
FlatMapStreamSubscriber(Subscriber<? super R> downstream, Function<? super T, ? extends Stream<? extends R>> mapper, int prefetch) {
111+
this.downstream = downstream;
112+
this.mapper = mapper;
113+
this.prefetch = prefetch;
114+
this.requested = new AtomicLong();
115+
this.error = new AtomicThrowable();
116+
}
117+
118+
@Override
119+
public void onSubscribe(@NonNull Subscription s) {
120+
if (SubscriptionHelper.validate(this.upstream, s)) {
121+
this.upstream = s;
122+
123+
if (s instanceof QueueSubscription) {
124+
125+
@SuppressWarnings("unchecked")
126+
QueueSubscription<T> qs = (QueueSubscription<T>)s;
127+
128+
int m = qs.requestFusion(QueueFuseable.ANY | QueueFuseable.BOUNDARY);
129+
if (m == QueueFuseable.SYNC) {
130+
sourceMode = m;
131+
queue = qs;
132+
upstreamDone = true;
133+
134+
downstream.onSubscribe(this);
135+
return;
136+
}
137+
else if (m == QueueFuseable.ASYNC) {
138+
sourceMode = m;
139+
queue = qs;
140+
141+
downstream.onSubscribe(this);
142+
143+
s.request(prefetch);
144+
return;
145+
}
146+
}
147+
148+
queue = new SpscArrayQueue<>(prefetch);
149+
150+
downstream.onSubscribe(this);
151+
152+
s.request(prefetch);
153+
}
154+
}
155+
156+
@Override
157+
public void onNext(T t) {
158+
if (sourceMode != QueueFuseable.ASYNC) {
159+
if (!queue.offer(t)) {
160+
upstream.cancel();
161+
onError(new MissingBackpressureException("Queue full?!"));
162+
return;
163+
}
164+
}
165+
drain();
166+
}
167+
168+
@Override
169+
public void onError(Throwable t) {
170+
if (error.compareAndSet(null, t)) {
171+
upstreamDone = true;
172+
drain();
173+
} else {
174+
RxJavaPlugins.onError(t);
175+
}
176+
}
177+
178+
@Override
179+
public void onComplete() {
180+
upstreamDone = true;
181+
drain();
182+
}
183+
184+
@Override
185+
public void request(long n) {
186+
if (SubscriptionHelper.validate(n)) {
187+
BackpressureHelper.add(requested, n);
188+
drain();
189+
}
190+
}
191+
192+
@Override
193+
public void cancel() {
194+
cancelled = true;
195+
upstream.cancel();
196+
drain();
197+
}
198+
199+
void drain() {
200+
if (getAndIncrement() != 0) {
201+
return;
202+
}
203+
204+
int missed = 1;
205+
206+
final Subscriber<? super R> downstream = this.downstream;
207+
final SimpleQueue<T> queue = this.queue;
208+
final AtomicThrowable error = this.error;
209+
Iterator<? extends R> iterator = this.currentIterator;
210+
long requested = this.requested.get();
211+
long emitted = this.emitted;
212+
final int limit = prefetch - (prefetch >> 2);
213+
boolean canRequest = sourceMode != QueueFuseable.SYNC;
214+
215+
for (;;) {
216+
if (cancelled) {
217+
queue.clear();
218+
clearCurrentSuppressCloseError();
219+
} else {
220+
boolean isDone = upstreamDone;
221+
if (error.get() != null) {
222+
downstream.onError(error.get());
223+
cancelled = true;
224+
continue;
225+
}
226+
227+
if (iterator == null) {
228+
T t;
229+
230+
try {
231+
t = queue.poll();
232+
} catch (Throwable ex) {
233+
trySignalError(downstream, ex);
234+
continue;
235+
}
236+
237+
boolean isEmpty = t == null;
238+
239+
if (isDone && isEmpty) {
240+
downstream.onComplete();
241+
cancelled = true;
242+
}
243+
else if (!isEmpty) {
244+
if (canRequest && ++consumed == limit) {
245+
consumed = 0;
246+
upstream.request(limit);
247+
}
248+
249+
Stream<? extends R> stream;
250+
try {
251+
stream = Objects.requireNonNull(mapper.apply(t), "The mapper returned a null Stream");
252+
iterator = stream.iterator();
253+
254+
if (iterator.hasNext()) {
255+
currentIterator = iterator;
256+
currentCloseable = stream;
257+
} else {
258+
iterator = null;
259+
}
260+
} catch (Throwable ex) {
261+
trySignalError(downstream, ex);
262+
}
263+
continue;
264+
}
265+
}
266+
if (iterator != null && emitted != requested) {
267+
R item;
268+
269+
try {
270+
item = Objects.requireNonNull(iterator.next(), "The Stream.Iterator returned a null value");
271+
} catch (Throwable ex) {
272+
trySignalError(downstream, ex);
273+
continue;
274+
}
275+
276+
if (!cancelled) {
277+
downstream.onNext(item);
278+
emitted++;
279+
280+
if (!cancelled) {
281+
try {
282+
if (!iterator.hasNext()) {
283+
iterator = null;
284+
clearCurrentRethrowCloseError();
285+
}
286+
} catch (Throwable ex) {
287+
trySignalError(downstream, ex);
288+
}
289+
}
290+
}
291+
292+
continue;
293+
}
294+
}
295+
296+
this.emitted = emitted;
297+
missed = addAndGet(-missed);
298+
if (missed == 0) {
299+
break;
300+
}
301+
requested = this.requested.get();
302+
}
303+
}
304+
305+
void clearCurrentRethrowCloseError() throws Throwable {
306+
currentIterator = null;
307+
AutoCloseable ac = currentCloseable;
308+
currentCloseable = null;
309+
if (ac != null) {
310+
ac.close();
311+
}
312+
}
313+
314+
void clearCurrentSuppressCloseError() {
315+
try {
316+
clearCurrentRethrowCloseError();
317+
} catch (Throwable ex) {
318+
RxJavaPlugins.onError(ex);
319+
}
320+
}
321+
322+
void trySignalError(Subscriber<?> downstream, Throwable ex) {
323+
if (error.compareAndSet(null, ex)) {
324+
upstream.cancel();
325+
cancelled = true;
326+
downstream.onError(ex);
327+
} else {
328+
RxJavaPlugins.onError(ex);
329+
}
330+
}
331+
}
332+
}

src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableFromStream.java

+10
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,16 @@ public FlowableFromStream(Stream<T> stream) {
4242

4343
@Override
4444
protected void subscribeActual(Subscriber<? super T> s) {
45+
subscribeStream(s, stream);
46+
}
47+
48+
/**
49+
* Subscribes to the Stream by picking the normal or conditional stream Subscription implementation.
50+
* @param <T> the element type of the flow
51+
* @param s the subscriber to drive
52+
* @param stream the sequence to consume
53+
*/
54+
public static <T> void subscribeStream(Subscriber<? super T> s, Stream<T> stream) {
4555
Iterator<T> iterator;
4656
try {
4757
iterator = stream.iterator();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava3.internal.jdk8;
15+
16+
import java.util.stream.*;
17+
18+
import org.reactivestreams.Publisher;
19+
import org.testng.annotations.Test;
20+
21+
import io.reactivex.rxjava3.core.Flowable;
22+
import io.reactivex.rxjava3.tck.BaseTck;
23+
24+
@Test
25+
public class FlatMapStream0HTckTest extends BaseTck<Integer> {
26+
27+
@Override
28+
public Publisher<Integer> createPublisher(final long elements) {
29+
return
30+
Flowable.just(1).hide().flatMapStream(v -> IntStream.range(0, (int)elements).boxed())
31+
;
32+
}
33+
34+
@Override
35+
public Publisher<Integer> createFailedPublisher() {
36+
Stream<Integer> stream = Stream.of(1);
37+
stream.forEach(v -> { });
38+
return Flowable.just(1).hide().flatMapStream(v -> stream);
39+
}
40+
}

0 commit comments

Comments
 (0)