Skip to content

Commit dadb49a

Browse files
authored
2.x: improve the parallel() mode test coverage, improve its code (#5006)
1 parent 6c88036 commit dadb49a

26 files changed

+2306
-259
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMap.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,13 @@ protected void subscribeActual(Subscriber<? super U> s) {
4949
if (FlowableScalarXMap.tryScalarXMapSubscribe(source, s, mapper)) {
5050
return;
5151
}
52-
source.subscribe(new MergeSubscriber<T, U>(s, mapper, delayErrors, maxConcurrency, bufferSize));
52+
source.subscribe(subscribe(s, mapper, delayErrors, maxConcurrency, bufferSize));
53+
}
54+
55+
public static <T, U> Subscriber<T> subscribe(Subscriber<? super U> s,
56+
Function<? super T, ? extends Publisher<? extends U>> mapper,
57+
boolean delayErrors, int maxConcurrency, int bufferSize) {
58+
return new MergeSubscriber<T, U>(s, mapper, delayErrors, maxConcurrency, bufferSize);
5359
}
5460

5561
static final class MergeSubscriber<T, U> extends AtomicInteger implements Subscription, Subscriber<T> {

src/main/java/io/reactivex/internal/operators/parallel/ParallelCollect.java

+7-11
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import io.reactivex.exceptions.Exceptions;
2121
import io.reactivex.functions.BiConsumer;
22+
import io.reactivex.internal.functions.ObjectHelper;
2223
import io.reactivex.internal.subscribers.DeferredScalarSubscriber;
2324
import io.reactivex.internal.subscriptions.*;
2425
import io.reactivex.parallel.ParallelFlowable;
@@ -34,12 +35,12 @@ public final class ParallelCollect<T, C> extends ParallelFlowable<C> {
3435

3536
final ParallelFlowable<? extends T> source;
3637

37-
final Callable<C> initialCollection;
38+
final Callable<? extends C> initialCollection;
3839

39-
final BiConsumer<C, T> collector;
40+
final BiConsumer<? super C, ? super T> collector;
4041

4142
public ParallelCollect(ParallelFlowable<? extends T> source,
42-
Callable<C> initialCollection, BiConsumer<C, T> collector) {
43+
Callable<? extends C> initialCollection, BiConsumer<? super C, ? super T> collector) {
4344
this.source = source;
4445
this.initialCollection = initialCollection;
4546
this.collector = collector;
@@ -60,18 +61,13 @@ public void subscribe(Subscriber<? super C>[] subscribers) {
6061
C initialValue;
6162

6263
try {
63-
initialValue = initialCollection.call();
64+
initialValue = ObjectHelper.requireNonNull(initialCollection.call(), "The initialSupplier returned a null value");
6465
} catch (Throwable ex) {
6566
Exceptions.throwIfFatal(ex);
6667
reportError(subscribers, ex);
6768
return;
6869
}
6970

70-
if (initialValue == null) {
71-
reportError(subscribers, new NullPointerException("The initialSupplier returned a null value"));
72-
return;
73-
}
74-
7571
parents[i] = new ParallelCollectSubscriber<T, C>(subscribers[i], initialValue, collector);
7672
}
7773

@@ -94,14 +90,14 @@ static final class ParallelCollectSubscriber<T, C> extends DeferredScalarSubscri
9490

9591
private static final long serialVersionUID = -4767392946044436228L;
9692

97-
final BiConsumer<C, T> collector;
93+
final BiConsumer<? super C, ? super T> collector;
9894

9995
C collection;
10096

10197
boolean done;
10298

10399
ParallelCollectSubscriber(Subscriber<? super C> subscriber,
104-
C initialValue, BiConsumer<C, T> collector) {
100+
C initialValue, BiConsumer<? super C, ? super T> collector) {
105101
super(subscriber);
106102
this.collection = initialValue;
107103
this.collector = collector;

src/main/java/io/reactivex/internal/operators/parallel/ParallelConcatMap.java

+1-17
Original file line numberDiff line numberDiff line change
@@ -63,24 +63,8 @@ public void subscribe(Subscriber<? super R>[] subscribers) {
6363
@SuppressWarnings("unchecked")
6464
final Subscriber<T>[] parents = new Subscriber[n];
6565

66-
// FIXME cheat until we have support from RxJava2 internals
67-
Publisher<T> p = new Publisher<T>() {
68-
int i;
69-
70-
@SuppressWarnings("unchecked")
71-
@Override
72-
public void subscribe(Subscriber<? super T> s) {
73-
parents[i++] = (Subscriber<T>)s;
74-
}
75-
};
76-
77-
FlowableConcatMap<T, R> op = new FlowableConcatMap<T, R>(p, mapper, prefetch, errorMode);
78-
7966
for (int i = 0; i < n; i++) {
80-
81-
op.subscribe(subscribers[i]);
82-
// FIXME needs a FlatMap subscriber
83-
// parents[i] = FlowableConcatMap.createSubscriber(s, mapper, prefetch, errorMode);
67+
parents[i] = FlowableConcatMap.subscribe(subscribers[i], mapper, prefetch, errorMode);
8468
}
8569

8670
source.subscribe(parents);

src/main/java/io/reactivex/internal/operators/parallel/ParallelFilter.java

+103-30
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import io.reactivex.exceptions.Exceptions;
1919
import io.reactivex.functions.Predicate;
20+
import io.reactivex.internal.fuseable.ConditionalSubscriber;
2021
import io.reactivex.internal.subscriptions.SubscriptionHelper;
2122
import io.reactivex.parallel.ParallelFlowable;
2223
import io.reactivex.plugins.RxJavaPlugins;
@@ -48,7 +49,12 @@ public void subscribe(Subscriber<? super T>[] subscribers) {
4849
Subscriber<? super T>[] parents = new Subscriber[n];
4950

5051
for (int i = 0; i < n; i++) {
51-
parents[i] = new ParallelFilterSubscriber<T>(subscribers[i], predicate);
52+
Subscriber<? super T> a = subscribers[i];
53+
if (a instanceof ConditionalSubscriber) {
54+
parents[i] = new ParallelFilterConditionalSubscriber<T>((ConditionalSubscriber<? super T>)a, predicate);
55+
} else {
56+
parents[i] = new ParallelFilterSubscriber<T>(a, predicate);
57+
}
5258
}
5359

5460
source.subscribe(parents);
@@ -59,31 +65,44 @@ public int parallelism() {
5965
return source.parallelism();
6066
}
6167

62-
static final class ParallelFilterSubscriber<T> implements Subscriber<T>, Subscription {
63-
64-
final Subscriber<? super T> actual;
65-
68+
abstract static class BaseFilterSubscriber<T> implements ConditionalSubscriber<T>, Subscription {
6669
final Predicate<? super T> predicate;
6770

6871
Subscription s;
6972

7073
boolean done;
7174

72-
ParallelFilterSubscriber(Subscriber<? super T> actual, Predicate<? super T> predicate) {
73-
this.actual = actual;
75+
BaseFilterSubscriber(Predicate<? super T> predicate) {
7476
this.predicate = predicate;
7577
}
7678

7779
@Override
78-
public void request(long n) {
80+
public final void request(long n) {
7981
s.request(n);
8082
}
8183

8284
@Override
83-
public void cancel() {
85+
public final void cancel() {
8486
s.cancel();
8587
}
8688

89+
@Override
90+
public final void onNext(T t) {
91+
if (!tryOnNext(t)) {
92+
s.request(1);
93+
}
94+
}
95+
}
96+
97+
static final class ParallelFilterSubscriber<T> extends BaseFilterSubscriber<T> {
98+
99+
final Subscriber<? super T> actual;
100+
101+
ParallelFilterSubscriber(Subscriber<? super T> actual, Predicate<? super T> predicate) {
102+
super(predicate);
103+
this.actual = actual;
104+
}
105+
87106
@Override
88107
public void onSubscribe(Subscription s) {
89108
if (SubscriptionHelper.validate(this.s, s)) {
@@ -94,26 +113,83 @@ public void onSubscribe(Subscription s) {
94113
}
95114

96115
@Override
97-
public void onNext(T t) {
116+
public boolean tryOnNext(T t) {
117+
if (!done) {
118+
boolean b;
119+
120+
try {
121+
b = predicate.test(t);
122+
} catch (Throwable ex) {
123+
Exceptions.throwIfFatal(ex);
124+
cancel();
125+
onError(ex);
126+
return false;
127+
}
128+
129+
if (b) {
130+
actual.onNext(t);
131+
return true;
132+
}
133+
}
134+
return false;
135+
}
136+
137+
@Override
138+
public void onError(Throwable t) {
98139
if (done) {
140+
RxJavaPlugins.onError(t);
99141
return;
100142
}
101-
boolean b;
102-
103-
try {
104-
b = predicate.test(t);
105-
} catch (Throwable ex) {
106-
Exceptions.throwIfFatal(ex);
107-
cancel();
108-
onError(ex);
109-
return;
143+
done = true;
144+
actual.onError(t);
145+
}
146+
147+
@Override
148+
public void onComplete() {
149+
if (!done) {
150+
done = true;
151+
actual.onComplete();
110152
}
153+
}
154+
}
111155

112-
if (b) {
113-
actual.onNext(t);
114-
} else {
115-
s.request(1);
156+
static final class ParallelFilterConditionalSubscriber<T> extends BaseFilterSubscriber<T> {
157+
158+
final ConditionalSubscriber<? super T> actual;
159+
160+
ParallelFilterConditionalSubscriber(ConditionalSubscriber<? super T> actual, Predicate<? super T> predicate) {
161+
super(predicate);
162+
this.actual = actual;
163+
}
164+
165+
@Override
166+
public void onSubscribe(Subscription s) {
167+
if (SubscriptionHelper.validate(this.s, s)) {
168+
this.s = s;
169+
170+
actual.onSubscribe(this);
171+
}
172+
}
173+
174+
@Override
175+
public boolean tryOnNext(T t) {
176+
if (!done) {
177+
boolean b;
178+
179+
try {
180+
b = predicate.test(t);
181+
} catch (Throwable ex) {
182+
Exceptions.throwIfFatal(ex);
183+
cancel();
184+
onError(ex);
185+
return false;
186+
}
187+
188+
if (b) {
189+
return actual.tryOnNext(t);
190+
}
116191
}
192+
return false;
117193
}
118194

119195
@Override
@@ -128,12 +204,9 @@ public void onError(Throwable t) {
128204

129205
@Override
130206
public void onComplete() {
131-
if (done) {
132-
return;
207+
if (!done) {
208+
done = true;
209+
actual.onComplete();
133210
}
134-
done = true;
135-
actual.onComplete();
136211
}
137-
138-
}
139-
}
212+
}}

src/main/java/io/reactivex/internal/operators/parallel/ParallelFlatMap.java

+1-17
Original file line numberDiff line numberDiff line change
@@ -67,24 +67,8 @@ public void subscribe(Subscriber<? super R>[] subscribers) {
6767
@SuppressWarnings("unchecked")
6868
final Subscriber<T>[] parents = new Subscriber[n];
6969

70-
// FIXME cheat until we have support from RxJava2 internals
71-
Publisher<T> p = new Publisher<T>() {
72-
int i;
73-
74-
@SuppressWarnings("unchecked")
75-
@Override
76-
public void subscribe(Subscriber<? super T> s) {
77-
parents[i++] = (Subscriber<T>)s;
78-
}
79-
};
80-
81-
FlowableFlatMap<T, R> op = new FlowableFlatMap<T, R>(p, mapper, delayError, maxConcurrency, prefetch);
82-
8370
for (int i = 0; i < n; i++) {
84-
85-
op.subscribe(subscribers[i]);
86-
// FIXME needs a FlatMap subscriber
87-
// parents[i] = FlowableFlatMap.createSubscriber(s, mapper, delayError, maxConcurrency, prefetch);
71+
parents[i] = FlowableFlatMap.subscribe(subscribers[i], mapper, delayError, maxConcurrency, prefetch);
8872
}
8973

9074
source.subscribe(parents);

src/main/java/io/reactivex/internal/operators/parallel/ParallelFromPublisher.java

+4-15
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
import org.reactivestreams.*;
1919

20-
import io.reactivex.exceptions.Exceptions;
20+
import io.reactivex.exceptions.*;
2121
import io.reactivex.internal.fuseable.*;
2222
import io.reactivex.internal.queue.SpscArrayQueue;
2323
import io.reactivex.internal.subscriptions.SubscriptionHelper;
@@ -187,8 +187,8 @@ public void cancel() {
187187
public void onNext(T t) {
188188
if (sourceMode == QueueSubscription.NONE) {
189189
if (!queue.offer(t)) {
190-
cancel();
191-
onError(new IllegalStateException("Queue is full?"));
190+
s.cancel();
191+
onError(new MissingBackpressureException("Queue is full?"));
192192
return;
193193
}
194194
}
@@ -344,18 +344,7 @@ void drainSync() {
344344
return;
345345
}
346346

347-
boolean empty;
348-
349-
try {
350-
empty = q.isEmpty();
351-
} catch (Throwable ex) {
352-
Exceptions.throwIfFatal(ex);
353-
s.cancel();
354-
for (Subscriber<? super T> s : a) {
355-
s.onError(ex);
356-
}
357-
return;
358-
}
347+
boolean empty = q.isEmpty();
359348

360349
if (empty) {
361350
for (Subscriber<? super T> s : a) {

src/main/java/io/reactivex/internal/operators/parallel/ParallelJoin.java

+8-5
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828

2929
/**
3030
* Merges the individual 'rails' of the source ParallelFlowable, unordered,
31-
* into a single regular Publisher sequence (exposed as Px).
31+
* into a single regular Publisher sequence (exposed as Flowable).
3232
*
3333
* @param <T> the value type
3434
*/
@@ -123,7 +123,7 @@ void onNext(JoinInnerSubscriber<T> inner, T value) {
123123
}
124124
inner.request(1);
125125
} else {
126-
SimpleQueue<T> q = inner.getQueue();
126+
SimplePlainQueue<T> q = inner.getQueue();
127127

128128
if (!q.offer(value)) {
129129
cancelAll();
@@ -140,10 +140,13 @@ void onNext(JoinInnerSubscriber<T> inner, T value) {
140140
return;
141141
}
142142
} else {
143-
SimpleQueue<T> q = inner.getQueue();
143+
SimplePlainQueue<T> q = inner.getQueue();
144144

145-
// FIXME overflow handling
146-
q.offer(value);
145+
if (!q.offer(value)) {
146+
cancelAll();
147+
onError(new MissingBackpressureException("Queue full?!"));
148+
return;
149+
}
147150

148151
if (getAndIncrement() != 0) {
149152
return;

0 commit comments

Comments
 (0)