Skip to content

Commit 2d8660e

Browse files
authored
3.x: Add safeSubscribe to Maybe, Single & Completable (#6887)
1 parent e077ed8 commit 2d8660e

File tree

11 files changed

+837
-19
lines changed

11 files changed

+837
-19
lines changed

docs/Operator-Matrix.md

+9-18
Large diffs are not rendered by default.

src/main/java/io/reactivex/rxjava3/core/Completable.java

+25
Original file line numberDiff line numberDiff line change
@@ -2651,6 +2651,31 @@ public final Completable retryWhen(@NonNull Function<? super Flowable<Throwable>
26512651
return fromPublisher(toFlowable().retryWhen(handler));
26522652
}
26532653

2654+
/**
2655+
* Wraps the given {@link CompletableObserver}, catches any {@link RuntimeException}s thrown by its
2656+
* {@link CompletableObserver#onSubscribe(Disposable)}, {@link CompletableObserver#onError(Throwable)}
2657+
* or {@link CompletableObserver#onComplete()} methods and routes those to the global
2658+
* error handler via {@link RxJavaPlugins#onError(Throwable)}.
2659+
* <p>
2660+
* By default, the {@code Completable} protocol forbids the {@code onXXX} methods to throw, but some
2661+
* {@code CompletableObserver} implementation may do it anyway, causing undefined behavior in the
2662+
* upstream. This method and the underlying safe wrapper ensures such misbehaving consumers don't
2663+
* disrupt the protocol.
2664+
* <dl>
2665+
* <dt><b>Scheduler:</b></dt>
2666+
* <dd>{@code safeSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
2667+
* </dl>
2668+
* @param observer the potentially misbehaving {@code CompletableObserver}
2669+
* @throws NullPointerException if {@code observer} is {@code null}
2670+
* @see #subscribe(Action, Consumer)
2671+
* @since 3.0.0
2672+
*/
2673+
@SchedulerSupport(SchedulerSupport.NONE)
2674+
public final void safeSubscribe(@NonNull CompletableObserver observer) {
2675+
Objects.requireNonNull(observer, "observer is null");
2676+
subscribe(new SafeCompletableObserver(observer));
2677+
}
2678+
26542679
/**
26552680
* Returns a {@code Completable} which first runs the other {@link CompletableSource}
26562681
* then the current {@code Completable} if the other completed normally.

src/main/java/io/reactivex/rxjava3/core/Maybe.java

+25
Original file line numberDiff line numberDiff line change
@@ -4860,6 +4860,31 @@ public final Maybe<T> retryWhen(
48604860
return toFlowable().retryWhen(handler).singleElement();
48614861
}
48624862

4863+
/**
4864+
* Wraps the given {@link MaybeObserver}, catches any {@link RuntimeException}s thrown by its
4865+
* {@link MaybeObserver#onSubscribe(Disposable)}, {@link MaybeObserver#onSuccess(Object)},
4866+
* {@link MaybeObserver#onError(Throwable)} or {@link MaybeObserver#onComplete()} methods
4867+
* and routes those to the global error handler via {@link RxJavaPlugins#onError(Throwable)}.
4868+
* <p>
4869+
* By default, the {@code Maybe} protocol forbids the {@code onXXX} methods to throw, but some
4870+
* {@code MaybeObserver} implementation may do it anyway, causing undefined behavior in the
4871+
* upstream. This method and the underlying safe wrapper ensures such misbehaving consumers don't
4872+
* disrupt the protocol.
4873+
* <dl>
4874+
* <dt><b>Scheduler:</b></dt>
4875+
* <dd>{@code safeSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
4876+
* </dl>
4877+
* @param observer the potentially misbehaving {@code MaybeObserver}
4878+
* @throws NullPointerException if {@code observer} is {@code null}
4879+
* @see #subscribe(Consumer,Consumer, Action)
4880+
* @since 3.0.0
4881+
*/
4882+
@SchedulerSupport(SchedulerSupport.NONE)
4883+
public final void safeSubscribe(@NonNull MaybeObserver<? super T> observer) {
4884+
Objects.requireNonNull(observer, "observer is null");
4885+
subscribe(new SafeMaybeObserver<>(observer));
4886+
}
4887+
48634888
/**
48644889
* Returns a {@link Flowable} which first runs the other {@link CompletableSource}
48654890
* then the current {@code Maybe} if the other completed normally.

src/main/java/io/reactivex/rxjava3/core/Single.java

+25
Original file line numberDiff line numberDiff line change
@@ -4287,6 +4287,31 @@ public final Single<T> retryWhen(@NonNull Function<? super Flowable<Throwable>,
42874287
return toSingle(toFlowable().retryWhen(handler));
42884288
}
42894289

4290+
/**
4291+
* Wraps the given {@link SingleObserver}, catches any {@link RuntimeException}s thrown by its
4292+
* {@link SingleObserver#onSubscribe(Disposable)}, {@link SingleObserver#onSuccess(Object)} or
4293+
* {@link SingleObserver#onError(Throwable)} methods* and routes those to the global error handler
4294+
* via {@link RxJavaPlugins#onError(Throwable)}.
4295+
* <p>
4296+
* By default, the {@code Single} protocol forbids the {@code onXXX} methods to throw, but some
4297+
* {@code SingleObserver} implementation may do it anyway, causing undefined behavior in the
4298+
* upstream. This method and the underlying safe wrapper ensures such misbehaving consumers don't
4299+
* disrupt the protocol.
4300+
* <dl>
4301+
* <dt><b>Scheduler:</b></dt>
4302+
* <dd>{@code safeSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
4303+
* </dl>
4304+
* @param observer the potentially misbehaving {@code SingleObserver}
4305+
* @throws NullPointerException if {@code observer} is {@code null}
4306+
* @see #subscribe(Consumer,Consumer)
4307+
* @since 3.0.0
4308+
*/
4309+
@SchedulerSupport(SchedulerSupport.NONE)
4310+
public final void safeSubscribe(@NonNull SingleObserver<? super T> observer) {
4311+
Objects.requireNonNull(observer, "observer is null");
4312+
subscribe(new SafeSingleObserver<>(observer));
4313+
}
4314+
42904315
/**
42914316
* Returns a {@link Flowable} which first runs the other {@link CompletableSource}
42924317
* then the current {@code Single} if the other completed normally.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava3.internal.observers;
15+
16+
import io.reactivex.rxjava3.annotations.NonNull;
17+
import io.reactivex.rxjava3.core.CompletableObserver;
18+
import io.reactivex.rxjava3.disposables.Disposable;
19+
import io.reactivex.rxjava3.exceptions.*;
20+
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
21+
22+
/**
23+
* Wraps another {@link CompletableObserver} and catches exceptions thrown by its
24+
* {@code onSubscribe}, {@code onError} or
25+
* {@code onComplete} methods despite the protocol forbids it.
26+
* <p>
27+
* Such exceptions are routed to the {@link RxJavaPlugins#onError(Throwable)} handler.
28+
*
29+
* @since 3.0.0
30+
*/
31+
public final class SafeCompletableObserver implements CompletableObserver {
32+
33+
final CompletableObserver downstream;
34+
35+
boolean onSubscribeFailed;
36+
37+
public SafeCompletableObserver(CompletableObserver downstream) {
38+
this.downstream = downstream;
39+
}
40+
41+
@Override
42+
public void onSubscribe(@NonNull Disposable d) {
43+
try {
44+
downstream.onSubscribe(d);
45+
} catch (Throwable ex) {
46+
Exceptions.throwIfFatal(ex);
47+
onSubscribeFailed = true;
48+
d.dispose();
49+
RxJavaPlugins.onError(ex);
50+
}
51+
}
52+
53+
@Override
54+
public void onError(@NonNull Throwable e) {
55+
if (onSubscribeFailed) {
56+
RxJavaPlugins.onError(e);
57+
} else {
58+
try {
59+
downstream.onError(e);
60+
} catch (Throwable ex) {
61+
Exceptions.throwIfFatal(ex);
62+
RxJavaPlugins.onError(new CompositeException(e, ex));
63+
}
64+
}
65+
}
66+
67+
@Override
68+
public void onComplete() {
69+
if (!onSubscribeFailed) {
70+
try {
71+
downstream.onComplete();
72+
} catch (Throwable ex) {
73+
Exceptions.throwIfFatal(ex);
74+
RxJavaPlugins.onError(ex);
75+
}
76+
}
77+
}
78+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava3.internal.observers;
15+
16+
import io.reactivex.rxjava3.annotations.NonNull;
17+
import io.reactivex.rxjava3.core.MaybeObserver;
18+
import io.reactivex.rxjava3.disposables.Disposable;
19+
import io.reactivex.rxjava3.exceptions.*;
20+
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
21+
22+
/**
23+
* Wraps another {@link MaybeObserver} and catches exceptions thrown by its
24+
* {@code onSubscribe}, {@code onSuccess}, {@code onError} or
25+
* {@code onComplete} methods despite the protocol forbids it.
26+
* <p>
27+
* Such exceptions are routed to the {@link RxJavaPlugins#onError(Throwable)} handler.
28+
*
29+
* @param <T> the element type of the sequence
30+
* @since 3.0.0
31+
*/
32+
public final class SafeMaybeObserver<T> implements MaybeObserver<T> {
33+
34+
final MaybeObserver<? super T> downstream;
35+
36+
boolean onSubscribeFailed;
37+
38+
public SafeMaybeObserver(MaybeObserver<? super T> downstream) {
39+
this.downstream = downstream;
40+
}
41+
42+
@Override
43+
public void onSubscribe(@NonNull Disposable d) {
44+
try {
45+
downstream.onSubscribe(d);
46+
} catch (Throwable ex) {
47+
Exceptions.throwIfFatal(ex);
48+
onSubscribeFailed = true;
49+
d.dispose();
50+
RxJavaPlugins.onError(ex);
51+
}
52+
}
53+
54+
@Override
55+
public void onSuccess(@NonNull T t) {
56+
if (!onSubscribeFailed) {
57+
try {
58+
downstream.onSuccess(t);
59+
} catch (Throwable ex) {
60+
Exceptions.throwIfFatal(ex);
61+
RxJavaPlugins.onError(ex);
62+
}
63+
}
64+
}
65+
66+
@Override
67+
public void onError(@NonNull Throwable e) {
68+
if (onSubscribeFailed) {
69+
RxJavaPlugins.onError(e);
70+
} else {
71+
try {
72+
downstream.onError(e);
73+
} catch (Throwable ex) {
74+
Exceptions.throwIfFatal(ex);
75+
RxJavaPlugins.onError(new CompositeException(e, ex));
76+
}
77+
}
78+
}
79+
80+
@Override
81+
public void onComplete() {
82+
if (!onSubscribeFailed) {
83+
try {
84+
downstream.onComplete();
85+
} catch (Throwable ex) {
86+
Exceptions.throwIfFatal(ex);
87+
RxJavaPlugins.onError(ex);
88+
}
89+
}
90+
}
91+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava3.internal.observers;
15+
16+
import io.reactivex.rxjava3.annotations.NonNull;
17+
import io.reactivex.rxjava3.core.SingleObserver;
18+
import io.reactivex.rxjava3.disposables.Disposable;
19+
import io.reactivex.rxjava3.exceptions.*;
20+
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
21+
22+
/**
23+
* Wraps another {@link SingleObserver} and catches exceptions thrown by its
24+
* {@code onSubscribe}, {@code onSuccess} or {@code onError} methods despite
25+
* the protocol forbids it.
26+
* <p>
27+
* Such exceptions are routed to the {@link RxJavaPlugins#onError(Throwable)} handler.
28+
*
29+
* @param <T> the element type of the sequence
30+
* @since 3.0.0
31+
*/
32+
public final class SafeSingleObserver<T> implements SingleObserver<T> {
33+
34+
final SingleObserver<? super T> downstream;
35+
36+
boolean onSubscribeFailed;
37+
38+
public SafeSingleObserver(SingleObserver<? super T> downstream) {
39+
this.downstream = downstream;
40+
}
41+
42+
@Override
43+
public void onSubscribe(@NonNull Disposable d) {
44+
try {
45+
downstream.onSubscribe(d);
46+
} catch (Throwable ex) {
47+
Exceptions.throwIfFatal(ex);
48+
onSubscribeFailed = true;
49+
d.dispose();
50+
RxJavaPlugins.onError(ex);
51+
}
52+
}
53+
54+
@Override
55+
public void onSuccess(@NonNull T t) {
56+
if (!onSubscribeFailed) {
57+
try {
58+
downstream.onSuccess(t);
59+
} catch (Throwable ex) {
60+
Exceptions.throwIfFatal(ex);
61+
RxJavaPlugins.onError(ex);
62+
}
63+
}
64+
}
65+
66+
@Override
67+
public void onError(@NonNull Throwable e) {
68+
if (onSubscribeFailed) {
69+
RxJavaPlugins.onError(e);
70+
} else {
71+
try {
72+
downstream.onError(e);
73+
} catch (Throwable ex) {
74+
Exceptions.throwIfFatal(ex);
75+
RxJavaPlugins.onError(new CompositeException(e, ex));
76+
}
77+
}
78+
}
79+
}

0 commit comments

Comments
 (0)