Skip to content

Commit 1b60b12

Browse files
authored
2.x: add resilient versions of parallel map(), filter() & doOnNext() (#5202)
1 parent 29bceef commit 1b60b12

11 files changed

+2173
-1
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,294 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.parallel;
15+
16+
import org.reactivestreams.*;
17+
18+
import io.reactivex.exceptions.*;
19+
import io.reactivex.functions.*;
20+
import io.reactivex.internal.functions.ObjectHelper;
21+
import io.reactivex.internal.fuseable.ConditionalSubscriber;
22+
import io.reactivex.internal.subscriptions.SubscriptionHelper;
23+
import io.reactivex.parallel.*;
24+
import io.reactivex.plugins.RxJavaPlugins;
25+
26+
/**
27+
* Calls a Consumer for each upstream value passing by
28+
* and handles any failure with a handler function.
29+
*
30+
* @param <T> the input value type
31+
* @since 2.0.8 - experimental
32+
*/
33+
public final class ParallelDoOnNextTry<T> extends ParallelFlowable<T> {
34+
35+
final ParallelFlowable<T> source;
36+
37+
final Consumer<? super T> onNext;
38+
39+
final BiFunction<? super Long, ? super Throwable, ParallelFailureHandling> errorHandler;
40+
41+
public ParallelDoOnNextTry(ParallelFlowable<T> source, Consumer<? super T> onNext,
42+
BiFunction<? super Long, ? super Throwable, ParallelFailureHandling> errorHandler) {
43+
this.source = source;
44+
this.onNext = onNext;
45+
this.errorHandler = errorHandler;
46+
}
47+
48+
@Override
49+
public void subscribe(Subscriber<? super T>[] subscribers) {
50+
if (!validate(subscribers)) {
51+
return;
52+
}
53+
54+
int n = subscribers.length;
55+
@SuppressWarnings("unchecked")
56+
Subscriber<? super T>[] parents = new Subscriber[n];
57+
58+
for (int i = 0; i < n; i++) {
59+
Subscriber<? super T> a = subscribers[i];
60+
if (a instanceof ConditionalSubscriber) {
61+
parents[i] = new ParallelDoOnNextConditionalSubscriber<T>((ConditionalSubscriber<? super T>)a, onNext, errorHandler);
62+
} else {
63+
parents[i] = new ParallelDoOnNextSubscriber<T>(a, onNext, errorHandler);
64+
}
65+
}
66+
67+
source.subscribe(parents);
68+
}
69+
70+
@Override
71+
public int parallelism() {
72+
return source.parallelism();
73+
}
74+
75+
static final class ParallelDoOnNextSubscriber<T> implements ConditionalSubscriber<T>, Subscription {
76+
77+
final Subscriber<? super T> actual;
78+
79+
final Consumer<? super T> onNext;
80+
81+
final BiFunction<? super Long, ? super Throwable, ParallelFailureHandling> errorHandler;
82+
83+
Subscription s;
84+
85+
boolean done;
86+
87+
ParallelDoOnNextSubscriber(Subscriber<? super T> actual, Consumer<? super T> onNext,
88+
BiFunction<? super Long, ? super Throwable, ParallelFailureHandling> errorHandler) {
89+
this.actual = actual;
90+
this.onNext = onNext;
91+
this.errorHandler = errorHandler;
92+
}
93+
94+
@Override
95+
public void request(long n) {
96+
s.request(n);
97+
}
98+
99+
@Override
100+
public void cancel() {
101+
s.cancel();
102+
}
103+
104+
@Override
105+
public void onSubscribe(Subscription s) {
106+
if (SubscriptionHelper.validate(this.s, s)) {
107+
this.s = s;
108+
109+
actual.onSubscribe(this);
110+
}
111+
}
112+
113+
@Override
114+
public void onNext(T t) {
115+
if (!tryOnNext(t)) {
116+
s.request(1);
117+
}
118+
}
119+
120+
@Override
121+
public boolean tryOnNext(T t) {
122+
if (done) {
123+
return false;
124+
}
125+
long retries = 0;
126+
127+
for (;;) {
128+
try {
129+
onNext.accept(t);
130+
} catch (Throwable ex) {
131+
Exceptions.throwIfFatal(ex);
132+
133+
ParallelFailureHandling h;
134+
135+
try {
136+
h = ObjectHelper.requireNonNull(errorHandler.apply(++retries, ex), "The errorHandler returned a null item");
137+
} catch (Throwable exc) {
138+
Exceptions.throwIfFatal(exc);
139+
cancel();
140+
onError(new CompositeException(ex, exc));
141+
return false;
142+
}
143+
144+
switch (h) {
145+
case RETRY:
146+
continue;
147+
case SKIP:
148+
return false;
149+
case STOP:
150+
cancel();
151+
onComplete();
152+
return false;
153+
default:
154+
cancel();
155+
onError(ex);
156+
return false;
157+
}
158+
}
159+
160+
actual.onNext(t);
161+
return true;
162+
}
163+
}
164+
165+
@Override
166+
public void onError(Throwable t) {
167+
if (done) {
168+
RxJavaPlugins.onError(t);
169+
return;
170+
}
171+
done = true;
172+
actual.onError(t);
173+
}
174+
175+
@Override
176+
public void onComplete() {
177+
if (done) {
178+
return;
179+
}
180+
done = true;
181+
actual.onComplete();
182+
}
183+
184+
}
185+
static final class ParallelDoOnNextConditionalSubscriber<T> implements ConditionalSubscriber<T>, Subscription {
186+
187+
final ConditionalSubscriber<? super T> actual;
188+
189+
final Consumer<? super T> onNext;
190+
191+
final BiFunction<? super Long, ? super Throwable, ParallelFailureHandling> errorHandler;
192+
Subscription s;
193+
194+
boolean done;
195+
196+
ParallelDoOnNextConditionalSubscriber(ConditionalSubscriber<? super T> actual,
197+
Consumer<? super T> onNext,
198+
BiFunction<? super Long, ? super Throwable, ParallelFailureHandling> errorHandler) {
199+
this.actual = actual;
200+
this.onNext = onNext;
201+
this.errorHandler = errorHandler;
202+
}
203+
204+
@Override
205+
public void request(long n) {
206+
s.request(n);
207+
}
208+
209+
@Override
210+
public void cancel() {
211+
s.cancel();
212+
}
213+
214+
@Override
215+
public void onSubscribe(Subscription s) {
216+
if (SubscriptionHelper.validate(this.s, s)) {
217+
this.s = s;
218+
219+
actual.onSubscribe(this);
220+
}
221+
}
222+
223+
@Override
224+
public void onNext(T t) {
225+
if (!tryOnNext(t) && !done) {
226+
s.request(1);
227+
}
228+
}
229+
230+
@Override
231+
public boolean tryOnNext(T t) {
232+
if (done) {
233+
return false;
234+
}
235+
long retries = 0;
236+
237+
for (;;) {
238+
try {
239+
onNext.accept(t);
240+
} catch (Throwable ex) {
241+
Exceptions.throwIfFatal(ex);
242+
243+
ParallelFailureHandling h;
244+
245+
try {
246+
h = ObjectHelper.requireNonNull(errorHandler.apply(++retries, ex), "The errorHandler returned a null item");
247+
} catch (Throwable exc) {
248+
Exceptions.throwIfFatal(exc);
249+
cancel();
250+
onError(new CompositeException(ex, exc));
251+
return false;
252+
}
253+
254+
switch (h) {
255+
case RETRY:
256+
continue;
257+
case SKIP:
258+
return false;
259+
case STOP:
260+
cancel();
261+
onComplete();
262+
return false;
263+
default:
264+
cancel();
265+
onError(ex);
266+
return false;
267+
}
268+
}
269+
270+
return actual.tryOnNext(t);
271+
}
272+
}
273+
274+
@Override
275+
public void onError(Throwable t) {
276+
if (done) {
277+
RxJavaPlugins.onError(t);
278+
return;
279+
}
280+
done = true;
281+
actual.onError(t);
282+
}
283+
284+
@Override
285+
public void onComplete() {
286+
if (done) {
287+
return;
288+
}
289+
done = true;
290+
actual.onComplete();
291+
}
292+
293+
}
294+
}

Diff for: src/main/java/io/reactivex/internal/operators/parallel/ParallelFilter.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public final void cancel() {
8888

8989
@Override
9090
public final void onNext(T t) {
91-
if (!tryOnNext(t)) {
91+
if (!tryOnNext(t) && !done) {
9292
s.request(1);
9393
}
9494
}

0 commit comments

Comments
 (0)