Skip to content

Commit 6c88036

Browse files
authored
2.x: add Flowable.parallel() and parallel operators (#4974)
* 2.x: add ParallelFlowable * Fix groupBy benchmark
1 parent 3bc2823 commit 6c88036

23 files changed

+5078
-2
lines changed

src/main/java/io/reactivex/Flowable.java

+95
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import io.reactivex.internal.schedulers.ImmediateThinScheduler;
3030
import io.reactivex.internal.subscribers.*;
3131
import io.reactivex.internal.util.*;
32+
import io.reactivex.parallel.ParallelFlowable;
3233
import io.reactivex.plugins.RxJavaPlugins;
3334
import io.reactivex.schedulers.*;
3435
import io.reactivex.subscribers.*;
@@ -10363,6 +10364,100 @@ public final Flowable<T> onTerminateDetach() {
1036310364
return RxJavaPlugins.onAssembly(new FlowableDetach<T>(this));
1036410365
}
1036510366

10367+
/**
10368+
* Parallelizes the flow by creating multiple 'rails' (equal to the number of CPUs)
10369+
* and dispatches the upstream items to them in a round-robin fashion.
10370+
* <p>
10371+
* Note that the rails don't execute in parallel on their own and one needs to
10372+
* apply {@link ParallelFlowable#runOn(Scheduler)} to specify the Scheduler where
10373+
* each rail will execute.
10374+
* <p>
10375+
* To merge the parallel 'rails' back into a single sequence, use {@link ParallelFlowable#sequential()}.
10376+
* <p>
10377+
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/flowable.parallel.png" alt="">
10378+
* <dl>
10379+
* <dt><b>Backpressure:</b></dt>
10380+
* <dd>The operator requires the upstream to honor backpressure and each 'rail' honors backpressure
10381+
* as well.</dd>
10382+
* <dt><b>Scheduler:</b></dt>
10383+
* <dd>{@code parallel} does not operate by default on a particular {@link Scheduler}.</dd>
10384+
* </dl>
10385+
* @return the new ParallelFlowable instance
10386+
* @since 2.0.5 - experimental
10387+
*/
10388+
@BackpressureSupport(BackpressureKind.FULL)
10389+
@SchedulerSupport(SchedulerSupport.NONE)
10390+
@CheckReturnValue
10391+
@Experimental
10392+
public final ParallelFlowable<T> parallel() {
10393+
return ParallelFlowable.from(this);
10394+
}
10395+
10396+
/**
10397+
* Parallelizes the flow by creating the specified number of 'rails'
10398+
* and dispatches the upstream items to them in a round-robin fashion.
10399+
* <p>
10400+
* Note that the rails don't execute in parallel on their own and one needs to
10401+
* apply {@link ParallelFlowable#runOn(Scheduler)} to specify the Scheduler where
10402+
* each rail will execute.
10403+
* <p>
10404+
* To merge the parallel 'rails' back into a single sequence, use {@link ParallelFlowable#sequential()}.
10405+
* <p>
10406+
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/flowable.parallel.png" alt="">
10407+
* <dl>
10408+
* <dt><b>Backpressure:</b></dt>
10409+
* <dd>The operator requires the upstream to honor backpressure and each 'rail' honors backpressure
10410+
* as well.</dd>
10411+
* <dt><b>Scheduler:</b></dt>
10412+
* <dd>{@code parallel} does not operate by default on a particular {@link Scheduler}.</dd>
10413+
* </dl>
10414+
* @param parallelism the number of 'rails' to use
10415+
* @return the new ParallelFlowable instance
10416+
* @since 2.0.5 - experimental
10417+
*/
10418+
@BackpressureSupport(BackpressureKind.FULL)
10419+
@SchedulerSupport(SchedulerSupport.NONE)
10420+
@CheckReturnValue
10421+
@Experimental
10422+
public final ParallelFlowable<T> parallel(int parallelism) {
10423+
ObjectHelper.verifyPositive(parallelism, "parallelism");
10424+
return ParallelFlowable.from(this, parallelism);
10425+
}
10426+
10427+
/**
10428+
* Parallelizes the flow by creating the specified number of 'rails'
10429+
* and dispatches the upstream items to them in a round-robin fashion and
10430+
* uses the defined per-'rail' prefetch amount.
10431+
* <p>
10432+
* Note that the rails don't execute in parallel on their own and one needs to
10433+
* apply {@link ParallelFlowable#runOn(Scheduler)} to specify the Scheduler where
10434+
* each rail will execute.
10435+
* <p>
10436+
* To merge the parallel 'rails' back into a single sequence, use {@link ParallelFlowable#sequential()}.
10437+
* <p>
10438+
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/flowable.parallel.png" alt="">
10439+
* <dl>
10440+
* <dt><b>Backpressure:</b></dt>
10441+
* <dd>The operator requires the upstream to honor backpressure and each 'rail' honors backpressure
10442+
* as well.</dd>
10443+
* <dt><b>Scheduler:</b></dt>
10444+
* <dd>{@code parallel} does not operate by default on a particular {@link Scheduler}.</dd>
10445+
* </dl>
10446+
* @param parallelism the number of 'rails' to use
10447+
* @param prefetch the number of items each 'rail' should prefetch
10448+
* @return the new ParallelFlowable instance
10449+
* @since 2.0.5 - experimental
10450+
*/
10451+
@BackpressureSupport(BackpressureKind.FULL)
10452+
@SchedulerSupport(SchedulerSupport.NONE)
10453+
@CheckReturnValue
10454+
@Experimental
10455+
public final ParallelFlowable<T> parallel(int parallelism, int prefetch) {
10456+
ObjectHelper.verifyPositive(parallelism, "parallelism");
10457+
ObjectHelper.verifyPositive(prefetch, "prefetch");
10458+
return ParallelFlowable.from(this, parallelism, prefetch);
10459+
}
10460+
1036610461
/**
1036710462
* Returns a {@link ConnectableFlowable}, which is a variety of Publisher that waits until its
1036810463
* {@link ConnectableFlowable#connect connect} method is called before it begins emitting items to those
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.parallel;
15+
16+
import java.util.concurrent.Callable;
17+
18+
import org.reactivestreams.*;
19+
20+
import io.reactivex.exceptions.Exceptions;
21+
import io.reactivex.functions.BiConsumer;
22+
import io.reactivex.internal.subscribers.DeferredScalarSubscriber;
23+
import io.reactivex.internal.subscriptions.*;
24+
import io.reactivex.parallel.ParallelFlowable;
25+
import io.reactivex.plugins.RxJavaPlugins;
26+
27+
/**
28+
* Reduce the sequence of values in each 'rail' to a single value.
29+
*
30+
* @param <T> the input value type
31+
* @param <C> the collection type
32+
*/
33+
public final class ParallelCollect<T, C> extends ParallelFlowable<C> {
34+
35+
final ParallelFlowable<? extends T> source;
36+
37+
final Callable<C> initialCollection;
38+
39+
final BiConsumer<C, T> collector;
40+
41+
public ParallelCollect(ParallelFlowable<? extends T> source,
42+
Callable<C> initialCollection, BiConsumer<C, T> collector) {
43+
this.source = source;
44+
this.initialCollection = initialCollection;
45+
this.collector = collector;
46+
}
47+
48+
@Override
49+
public void subscribe(Subscriber<? super C>[] subscribers) {
50+
if (!validate(subscribers)) {
51+
return;
52+
}
53+
54+
int n = subscribers.length;
55+
@SuppressWarnings("unchecked")
56+
Subscriber<T>[] parents = new Subscriber[n];
57+
58+
for (int i = 0; i < n; i++) {
59+
60+
C initialValue;
61+
62+
try {
63+
initialValue = initialCollection.call();
64+
} catch (Throwable ex) {
65+
Exceptions.throwIfFatal(ex);
66+
reportError(subscribers, ex);
67+
return;
68+
}
69+
70+
if (initialValue == null) {
71+
reportError(subscribers, new NullPointerException("The initialSupplier returned a null value"));
72+
return;
73+
}
74+
75+
parents[i] = new ParallelCollectSubscriber<T, C>(subscribers[i], initialValue, collector);
76+
}
77+
78+
source.subscribe(parents);
79+
}
80+
81+
void reportError(Subscriber<?>[] subscribers, Throwable ex) {
82+
for (Subscriber<?> s : subscribers) {
83+
EmptySubscription.error(ex, s);
84+
}
85+
}
86+
87+
@Override
88+
public int parallelism() {
89+
return source.parallelism();
90+
}
91+
92+
static final class ParallelCollectSubscriber<T, C> extends DeferredScalarSubscriber<T, C> {
93+
94+
95+
private static final long serialVersionUID = -4767392946044436228L;
96+
97+
final BiConsumer<C, T> collector;
98+
99+
C collection;
100+
101+
boolean done;
102+
103+
ParallelCollectSubscriber(Subscriber<? super C> subscriber,
104+
C initialValue, BiConsumer<C, T> collector) {
105+
super(subscriber);
106+
this.collection = initialValue;
107+
this.collector = collector;
108+
}
109+
110+
@Override
111+
public void onSubscribe(Subscription s) {
112+
if (SubscriptionHelper.validate(this.s, s)) {
113+
this.s = s;
114+
115+
actual.onSubscribe(this);
116+
117+
s.request(Long.MAX_VALUE);
118+
}
119+
}
120+
121+
@Override
122+
public void onNext(T t) {
123+
if (done) {
124+
return;
125+
}
126+
127+
try {
128+
collector.accept(collection, t);
129+
} catch (Throwable ex) {
130+
Exceptions.throwIfFatal(ex);
131+
cancel();
132+
onError(ex);
133+
return;
134+
}
135+
}
136+
137+
@Override
138+
public void onError(Throwable t) {
139+
if (done) {
140+
RxJavaPlugins.onError(t);
141+
return;
142+
}
143+
done = true;
144+
collection = null;
145+
actual.onError(t);
146+
}
147+
148+
@Override
149+
public void onComplete() {
150+
if (done) {
151+
return;
152+
}
153+
done = true;
154+
C c = collection;
155+
collection = null;
156+
complete(c);
157+
}
158+
159+
@Override
160+
public void cancel() {
161+
super.cancel();
162+
s.cancel();
163+
}
164+
}
165+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.parallel;
15+
16+
import org.reactivestreams.*;
17+
18+
import io.reactivex.functions.Function;
19+
import io.reactivex.internal.functions.ObjectHelper;
20+
import io.reactivex.internal.operators.flowable.FlowableConcatMap;
21+
import io.reactivex.internal.util.ErrorMode;
22+
import io.reactivex.parallel.ParallelFlowable;
23+
24+
/**
25+
* Concatenates the generated Publishers on each rail.
26+
*
27+
* @param <T> the input value type
28+
* @param <R> the output value type
29+
*/
30+
public final class ParallelConcatMap<T, R> extends ParallelFlowable<R> {
31+
32+
final ParallelFlowable<T> source;
33+
34+
final Function<? super T, ? extends Publisher<? extends R>> mapper;
35+
36+
final int prefetch;
37+
38+
final ErrorMode errorMode;
39+
40+
public ParallelConcatMap(
41+
ParallelFlowable<T> source,
42+
Function<? super T, ? extends Publisher<? extends R>> mapper,
43+
int prefetch, ErrorMode errorMode) {
44+
this.source = source;
45+
this.mapper = ObjectHelper.requireNonNull(mapper, "mapper");
46+
this.prefetch = prefetch;
47+
this.errorMode = ObjectHelper.requireNonNull(errorMode, "errorMode");
48+
}
49+
50+
@Override
51+
public int parallelism() {
52+
return source.parallelism();
53+
}
54+
55+
@Override
56+
public void subscribe(Subscriber<? super R>[] subscribers) {
57+
if (!validate(subscribers)) {
58+
return;
59+
}
60+
61+
int n = subscribers.length;
62+
63+
@SuppressWarnings("unchecked")
64+
final Subscriber<T>[] parents = new Subscriber[n];
65+
66+
// FIXME cheat until we have support from RxJava2 internals
67+
Publisher<T> p = new Publisher<T>() {
68+
int i;
69+
70+
@SuppressWarnings("unchecked")
71+
@Override
72+
public void subscribe(Subscriber<? super T> s) {
73+
parents[i++] = (Subscriber<T>)s;
74+
}
75+
};
76+
77+
FlowableConcatMap<T, R> op = new FlowableConcatMap<T, R>(p, mapper, prefetch, errorMode);
78+
79+
for (int i = 0; i < n; i++) {
80+
81+
op.subscribe(subscribers[i]);
82+
// FIXME needs a FlatMap subscriber
83+
// parents[i] = FlowableConcatMap.createSubscriber(s, mapper, prefetch, errorMode);
84+
}
85+
86+
source.subscribe(parents);
87+
}
88+
}

0 commit comments

Comments
 (0)