From 8a940f30a285fce6263b44b3edbe2fa55c2cfe68 Mon Sep 17 00:00:00 2001 From: akarnokd <akarnokd@gmail.com> Date: Tue, 28 Jan 2020 10:27:57 +0100 Subject: [PATCH] 3.x: Add Completable.onErrorReturn[Item] --- .../reactivex/rxjava3/core/Completable.java | 58 +++++++++++ .../java/io/reactivex/rxjava3/core/Maybe.java | 6 +- .../completable/CompletableOnErrorReturn.java | 99 +++++++++++++++++++ .../operators/maybe/MaybeOnErrorReturn.java | 14 +-- .../completable/CompletableOnErrorXTest.java | 57 +++++++++++ 5 files changed, 224 insertions(+), 10 deletions(-) create mode 100644 src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableOnErrorReturn.java diff --git a/src/main/java/io/reactivex/rxjava3/core/Completable.java b/src/main/java/io/reactivex/rxjava3/core/Completable.java index 2ab23028bd..7e28439ae2 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Completable.java +++ b/src/main/java/io/reactivex/rxjava3/core/Completable.java @@ -2323,6 +2323,64 @@ public final Completable onErrorResumeWith(@NonNull CompletableSource fallback) return onErrorResumeNext(Functions.justFunction(fallback)); } + /** + * Ends the flow with a success item returned by a function for the {@link Throwable} error signaled by the current + * {@code Completable} instead of signaling the error via {@code onError}. + * <p> + * <img width="640" height="567" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.onErrorReturn.png" alt=""> + * <p> + * You can use this to prevent errors from propagating or to supply fallback data should errors be + * encountered. + * <dl> + * <dt><b>Scheduler:</b></dt> + * <dd>{@code onErrorReturn} does not operate by default on a particular {@link Scheduler}.</dd> + * </dl> + * + * @param <T> the item type to return on error + * @param itemSupplier + * a function that returns a single value that will be emitted as success value + * the current {@code Completable} signals an {@code onError} event + * @return the new {@link Maybe} instance + * @throws NullPointerException if {@code itemSupplier} is {@code null} + * @see <a href="http://reactivex.io/documentation/operators/catch.html">ReactiveX operators documentation: Catch</a> + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + public final <T> Maybe<T> onErrorReturn(@NonNull Function<? super Throwable, ? extends T> itemSupplier) { + Objects.requireNonNull(itemSupplier, "itemSupplier is null"); + return RxJavaPlugins.onAssembly(new CompletableOnErrorReturn<>(this, itemSupplier)); + } + + /** + * Ends the flow with the given success item when the current {@code Completable} + * fails instead of signaling the error via {@code onError}. + * <p> + * <img width="640" height="567" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.onErrorReturnItem.png" alt=""> + * <p> + * You can use this to prevent errors from propagating or to supply fallback data should errors be + * encountered. + * <dl> + * <dt><b>Scheduler:</b></dt> + * <dd>{@code onErrorReturnItem} does not operate by default on a particular {@link Scheduler}.</dd> + * </dl> + * + * @param <T> the item type to return on error + * @param item + * the value that is emitted as {@code onSuccess} in case the current {@code Completable} signals an {@code onError} + * @return the new {@link Maybe} instance + * @throws NullPointerException if {@code item} is {@code null} + * @see <a href="http://reactivex.io/documentation/operators/catch.html">ReactiveX operators documentation: Catch</a> + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + public final <T> Maybe<T> onErrorReturnItem(@NonNull T item) { + Objects.requireNonNull(item, "item is null"); + return onErrorReturn(Functions.justFunction(item)); + } + /** * Nulls out references to the upstream producer and downstream {@link CompletableObserver} if * the sequence is terminated or downstream calls {@code dispose()}. diff --git a/src/main/java/io/reactivex/rxjava3/core/Maybe.java b/src/main/java/io/reactivex/rxjava3/core/Maybe.java index 96ff14c1e9..40902efa53 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Maybe.java +++ b/src/main/java/io/reactivex/rxjava3/core/Maybe.java @@ -4467,7 +4467,7 @@ public final Maybe<T> onErrorResumeNext(@NonNull Function<? super Throwable, ? e * Ends the flow with a success item returned by a function for the {@link Throwable} error signaled by the current * {@code Maybe} instead of signaling the error via {@code onError}. * <p> - * <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/onErrorReturn.png" alt=""> + * <img width="640" height="377" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.onErrorReturn.png" alt=""> * <p> * You can use this to prevent errors from propagating or to supply fallback data should errors be * encountered. @@ -4494,7 +4494,7 @@ public final Maybe<T> onErrorReturn(@NonNull Function<? super Throwable, ? exten /** * Ends the flow with the given success item when the current {@code Maybe} fails instead of signaling the error via {@code onError}. * <p> - * <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/onErrorReturn.png" alt=""> + * <img width="640" height="377" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.onErrorReturnItem.png" alt=""> * <p> * You can use this to prevent errors from propagating or to supply fallback data should errors be * encountered. @@ -4504,7 +4504,7 @@ public final Maybe<T> onErrorReturn(@NonNull Function<? super Throwable, ? exten * </dl> * * @param item - * the value that is emitted as {@code onSuccess} in case this {@code Maybe} signals an {@code onError} + * the value that is emitted as {@code onSuccess} in case the current {@code Maybe} signals an {@code onError} * @return the new {@code Maybe} instance * @throws NullPointerException if {@code item} is {@code null} * @see <a href="http://reactivex.io/documentation/operators/catch.html">ReactiveX operators documentation: Catch</a> diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableOnErrorReturn.java b/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableOnErrorReturn.java new file mode 100644 index 0000000000..c8fa43499f --- /dev/null +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableOnErrorReturn.java @@ -0,0 +1,99 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.completable; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.exceptions.*; +import io.reactivex.rxjava3.functions.Function; +import io.reactivex.rxjava3.internal.disposables.DisposableHelper; + +import java.util.Objects; + +/** + * Returns a value generated via a function if the main source signals an onError. + * @param <T> the value type + * @since 3.0.0 + */ +public final class CompletableOnErrorReturn<T> extends Maybe<T> { + + final CompletableSource source; + + final Function<? super Throwable, ? extends T> valueSupplier; + + public CompletableOnErrorReturn(CompletableSource source, + Function<? super Throwable, ? extends T> valueSupplier) { + this.source = source; + this.valueSupplier = valueSupplier; + } + + @Override + protected void subscribeActual(MaybeObserver<? super T> observer) { + source.subscribe(new OnErrorReturnMaybeObserver<>(observer, valueSupplier)); + } + + static final class OnErrorReturnMaybeObserver<T> implements CompletableObserver, Disposable { + + final MaybeObserver<? super T> downstream; + + final Function<? super Throwable, ? extends T> itemSupplier; + + Disposable upstream; + + OnErrorReturnMaybeObserver(MaybeObserver<? super T> actual, + Function<? super Throwable, ? extends T> itemSupplier) { + this.downstream = actual; + this.itemSupplier = itemSupplier; + } + + @Override + public void dispose() { + upstream.dispose(); + } + + @Override + public boolean isDisposed() { + return upstream.isDisposed(); + } + + @Override + public void onSubscribe(Disposable d) { + if (DisposableHelper.validate(this.upstream, d)) { + this.upstream = d; + + downstream.onSubscribe(this); + } + } + + @Override + public void onError(Throwable e) { + T v; + + try { + v = Objects.requireNonNull(itemSupplier.apply(e), "The itemSupplier returned a null value"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + downstream.onError(new CompositeException(e, ex)); + return; + } + + downstream.onSuccess(v); + } + + @Override + public void onComplete() { + downstream.onComplete(); + } + } +} diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeOnErrorReturn.java b/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeOnErrorReturn.java index 1a75120e84..190856d5c7 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeOnErrorReturn.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeOnErrorReturn.java @@ -27,31 +27,31 @@ */ public final class MaybeOnErrorReturn<T> extends AbstractMaybeWithUpstream<T, T> { - final Function<? super Throwable, ? extends T> valueSupplier; + final Function<? super Throwable, ? extends T> itemSupplier; public MaybeOnErrorReturn(MaybeSource<T> source, - Function<? super Throwable, ? extends T> valueSupplier) { + Function<? super Throwable, ? extends T> itemSupplier) { super(source); - this.valueSupplier = valueSupplier; + this.itemSupplier = itemSupplier; } @Override protected void subscribeActual(MaybeObserver<? super T> observer) { - source.subscribe(new OnErrorReturnMaybeObserver<>(observer, valueSupplier)); + source.subscribe(new OnErrorReturnMaybeObserver<>(observer, itemSupplier)); } static final class OnErrorReturnMaybeObserver<T> implements MaybeObserver<T>, Disposable { final MaybeObserver<? super T> downstream; - final Function<? super Throwable, ? extends T> valueSupplier; + final Function<? super Throwable, ? extends T> itemSupplier; Disposable upstream; OnErrorReturnMaybeObserver(MaybeObserver<? super T> actual, Function<? super Throwable, ? extends T> valueSupplier) { this.downstream = actual; - this.valueSupplier = valueSupplier; + this.itemSupplier = valueSupplier; } @Override @@ -83,7 +83,7 @@ public void onError(Throwable e) { T v; try { - v = Objects.requireNonNull(valueSupplier.apply(e), "The valueSupplier returned a null value"); + v = Objects.requireNonNull(itemSupplier.apply(e), "The itemSupplier returned a null value"); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); downstream.onError(new CompositeException(e, ex)); diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableOnErrorXTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableOnErrorXTest.java index 4a1333b530..531f13b8cc 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableOnErrorXTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableOnErrorXTest.java @@ -15,10 +15,16 @@ import static org.junit.Assert.assertEquals; +import java.io.IOException; + import org.junit.Test; import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.exceptions.TestException; import io.reactivex.rxjava3.functions.Function; +import io.reactivex.rxjava3.internal.functions.Functions; +import io.reactivex.rxjava3.subjects.CompletableSubject; +import io.reactivex.rxjava3.testsupport.TestHelper; public class CompletableOnErrorXTest extends RxJavaTest { @@ -46,4 +52,55 @@ public CompletableSource apply(Throwable e) throws Exception { assertEquals(0, call[0]); } + + @Test + public void onErrorReturnConst() { + Completable.error(new TestException()) + .onErrorReturnItem(1) + .test() + .assertResult(1); + } + + @Test + public void onErrorReturn() { + Completable.error(new TestException()) + .onErrorReturn(Functions.justFunction(1)) + .test() + .assertResult(1); + } + + @Test + public void onErrorReturnFunctionThrows() { + TestHelper.assertCompositeExceptions(Completable.error(new TestException()) + .onErrorReturn(new Function<Throwable, Object>() { + @Override + public Object apply(Throwable v) throws Exception { + throw new IOException(); + } + }) + .to(TestHelper.testConsumer()), TestException.class, IOException.class); + } + + @Test + public void onErrorReturnEmpty() { + Completable.complete() + .onErrorReturnItem(2) + .test() + .assertResult(); + } + + @Test + public void onErrorReturnDispose() { + TestHelper.checkDisposed(CompletableSubject.create().onErrorReturnItem(1)); + } + + @Test + public void onErrorReturnDoubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeCompletableToMaybe(new Function<Completable, MaybeSource<Object>>() { + @Override + public MaybeSource<Object> apply(Completable v) throws Exception { + return v.onErrorReturnItem(1); + } + }); + } }