From 8a940f30a285fce6263b44b3edbe2fa55c2cfe68 Mon Sep 17 00:00:00 2001
From: akarnokd <akarnokd@gmail.com>
Date: Tue, 28 Jan 2020 10:27:57 +0100
Subject: [PATCH] 3.x: Add Completable.onErrorReturn[Item]

---
 .../reactivex/rxjava3/core/Completable.java   | 58 +++++++++++
 .../java/io/reactivex/rxjava3/core/Maybe.java |  6 +-
 .../completable/CompletableOnErrorReturn.java | 99 +++++++++++++++++++
 .../operators/maybe/MaybeOnErrorReturn.java   | 14 +--
 .../completable/CompletableOnErrorXTest.java  | 57 +++++++++++
 5 files changed, 224 insertions(+), 10 deletions(-)
 create mode 100644 src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableOnErrorReturn.java

diff --git a/src/main/java/io/reactivex/rxjava3/core/Completable.java b/src/main/java/io/reactivex/rxjava3/core/Completable.java
index 2ab23028bd..7e28439ae2 100644
--- a/src/main/java/io/reactivex/rxjava3/core/Completable.java
+++ b/src/main/java/io/reactivex/rxjava3/core/Completable.java
@@ -2323,6 +2323,64 @@ public final Completable onErrorResumeWith(@NonNull CompletableSource fallback)
         return onErrorResumeNext(Functions.justFunction(fallback));
     }
 
+    /**
+     * Ends the flow with a success item returned by a function for the {@link Throwable} error signaled by the current
+     * {@code Completable} instead of signaling the error via {@code onError}.
+     * <p>
+     * <img width="640" height="567" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.onErrorReturn.png" alt="">
+     * <p>
+     * You can use this to prevent errors from propagating or to supply fallback data should errors be
+     * encountered.
+     * <dl>
+     *  <dt><b>Scheduler:</b></dt>
+     *  <dd>{@code onErrorReturn} does not operate by default on a particular {@link Scheduler}.</dd>
+     * </dl>
+     *
+     * @param <T> the item type to return on error
+     * @param itemSupplier
+     *            a function that returns a single value that will be emitted as success value
+     *            the current {@code Completable} signals an {@code onError} event
+     * @return the new {@link Maybe} instance
+     * @throws NullPointerException if {@code itemSupplier} is {@code null}
+     * @see <a href="http://reactivex.io/documentation/operators/catch.html">ReactiveX operators documentation: Catch</a>
+     * @since 3.0.0
+     */
+    @CheckReturnValue
+    @NonNull
+    @SchedulerSupport(SchedulerSupport.NONE)
+    public final <T> Maybe<T> onErrorReturn(@NonNull Function<? super Throwable, ? extends T> itemSupplier) {
+        Objects.requireNonNull(itemSupplier, "itemSupplier is null");
+        return RxJavaPlugins.onAssembly(new CompletableOnErrorReturn<>(this, itemSupplier));
+    }
+
+    /**
+     * Ends the flow with the given success item when the current {@code Completable}
+     * fails instead of signaling the error via {@code onError}.
+     * <p>
+     * <img width="640" height="567" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.onErrorReturnItem.png" alt="">
+     * <p>
+     * You can use this to prevent errors from propagating or to supply fallback data should errors be
+     * encountered.
+     * <dl>
+     *  <dt><b>Scheduler:</b></dt>
+     *  <dd>{@code onErrorReturnItem} does not operate by default on a particular {@link Scheduler}.</dd>
+     * </dl>
+     *
+     * @param <T> the item type to return on error
+     * @param item
+     *            the value that is emitted as {@code onSuccess} in case the current {@code Completable} signals an {@code onError}
+     * @return the new {@link Maybe} instance
+     * @throws NullPointerException if {@code item} is {@code null}
+     * @see <a href="http://reactivex.io/documentation/operators/catch.html">ReactiveX operators documentation: Catch</a>
+     */
+    @CheckReturnValue
+    @NonNull
+    @SchedulerSupport(SchedulerSupport.NONE)
+    public final <T> Maybe<T> onErrorReturnItem(@NonNull T item) {
+        Objects.requireNonNull(item, "item is null");
+        return onErrorReturn(Functions.justFunction(item));
+    }
+
     /**
      * Nulls out references to the upstream producer and downstream {@link CompletableObserver} if
      * the sequence is terminated or downstream calls {@code dispose()}.
diff --git a/src/main/java/io/reactivex/rxjava3/core/Maybe.java b/src/main/java/io/reactivex/rxjava3/core/Maybe.java
index 96ff14c1e9..40902efa53 100644
--- a/src/main/java/io/reactivex/rxjava3/core/Maybe.java
+++ b/src/main/java/io/reactivex/rxjava3/core/Maybe.java
@@ -4467,7 +4467,7 @@ public final Maybe<T> onErrorResumeNext(@NonNull Function<? super Throwable, ? e
      * Ends the flow with a success item returned by a function for the {@link Throwable} error signaled by the current
      * {@code Maybe} instead of signaling the error via {@code onError}.
      * <p>
-     * <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/onErrorReturn.png" alt="">
+     * <img width="640" height="377" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.onErrorReturn.png" alt="">
      * <p>
      * You can use this to prevent errors from propagating or to supply fallback data should errors be
      * encountered.
@@ -4494,7 +4494,7 @@ public final Maybe<T> onErrorReturn(@NonNull Function<? super Throwable, ? exten
     /**
      * Ends the flow with the given success item when the current {@code Maybe} fails instead of signaling the error via {@code onError}.
      * <p>
-     * <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/onErrorReturn.png" alt="">
+     * <img width="640" height="377" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.onErrorReturnItem.png" alt="">
      * <p>
      * You can use this to prevent errors from propagating or to supply fallback data should errors be
      * encountered.
@@ -4504,7 +4504,7 @@ public final Maybe<T> onErrorReturn(@NonNull Function<? super Throwable, ? exten
      * </dl>
      *
      * @param item
-     *            the value that is emitted as {@code onSuccess} in case this {@code Maybe} signals an {@code onError}
+     *            the value that is emitted as {@code onSuccess} in case the current {@code Maybe} signals an {@code onError}
      * @return the new {@code Maybe} instance
      * @throws NullPointerException if {@code item} is {@code null}
      * @see <a href="http://reactivex.io/documentation/operators/catch.html">ReactiveX operators documentation: Catch</a>
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableOnErrorReturn.java b/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableOnErrorReturn.java
new file mode 100644
index 0000000000..c8fa43499f
--- /dev/null
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableOnErrorReturn.java
@@ -0,0 +1,99 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.rxjava3.internal.operators.completable;
+
+import io.reactivex.rxjava3.core.*;
+import io.reactivex.rxjava3.disposables.Disposable;
+import io.reactivex.rxjava3.exceptions.*;
+import io.reactivex.rxjava3.functions.Function;
+import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
+
+import java.util.Objects;
+
+/**
+ * Returns a value generated via a function if the main source signals an onError.
+ * @param <T> the value type
+ * @since 3.0.0
+ */
+public final class CompletableOnErrorReturn<T> extends Maybe<T> {
+
+    final CompletableSource source;
+
+    final Function<? super Throwable, ? extends T> valueSupplier;
+
+    public CompletableOnErrorReturn(CompletableSource source,
+            Function<? super Throwable, ? extends T> valueSupplier) {
+        this.source = source;
+        this.valueSupplier = valueSupplier;
+    }
+
+    @Override
+    protected void subscribeActual(MaybeObserver<? super T> observer) {
+        source.subscribe(new OnErrorReturnMaybeObserver<>(observer, valueSupplier));
+    }
+
+    static final class OnErrorReturnMaybeObserver<T> implements CompletableObserver, Disposable {
+
+        final MaybeObserver<? super T> downstream;
+
+        final Function<? super Throwable, ? extends T> itemSupplier;
+
+        Disposable upstream;
+
+        OnErrorReturnMaybeObserver(MaybeObserver<? super T> actual,
+                Function<? super Throwable, ? extends T> itemSupplier) {
+            this.downstream = actual;
+            this.itemSupplier = itemSupplier;
+        }
+
+        @Override
+        public void dispose() {
+            upstream.dispose();
+        }
+
+        @Override
+        public boolean isDisposed() {
+            return upstream.isDisposed();
+        }
+
+        @Override
+        public void onSubscribe(Disposable d) {
+            if (DisposableHelper.validate(this.upstream, d)) {
+                this.upstream = d;
+
+                downstream.onSubscribe(this);
+            }
+        }
+
+        @Override
+        public void onError(Throwable e) {
+            T v;
+
+            try {
+                v = Objects.requireNonNull(itemSupplier.apply(e), "The itemSupplier returned a null value");
+            } catch (Throwable ex) {
+                Exceptions.throwIfFatal(ex);
+                downstream.onError(new CompositeException(e, ex));
+                return;
+            }
+
+            downstream.onSuccess(v);
+        }
+
+        @Override
+        public void onComplete() {
+            downstream.onComplete();
+        }
+    }
+}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeOnErrorReturn.java b/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeOnErrorReturn.java
index 1a75120e84..190856d5c7 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeOnErrorReturn.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeOnErrorReturn.java
@@ -27,31 +27,31 @@
  */
 public final class MaybeOnErrorReturn<T> extends AbstractMaybeWithUpstream<T, T> {
 
-    final Function<? super Throwable, ? extends T> valueSupplier;
+    final Function<? super Throwable, ? extends T> itemSupplier;
 
     public MaybeOnErrorReturn(MaybeSource<T> source,
-            Function<? super Throwable, ? extends T> valueSupplier) {
+            Function<? super Throwable, ? extends T> itemSupplier) {
         super(source);
-        this.valueSupplier = valueSupplier;
+        this.itemSupplier = itemSupplier;
     }
 
     @Override
     protected void subscribeActual(MaybeObserver<? super T> observer) {
-        source.subscribe(new OnErrorReturnMaybeObserver<>(observer, valueSupplier));
+        source.subscribe(new OnErrorReturnMaybeObserver<>(observer, itemSupplier));
     }
 
     static final class OnErrorReturnMaybeObserver<T> implements MaybeObserver<T>, Disposable {
 
         final MaybeObserver<? super T> downstream;
 
-        final Function<? super Throwable, ? extends T> valueSupplier;
+        final Function<? super Throwable, ? extends T> itemSupplier;
 
         Disposable upstream;
 
         OnErrorReturnMaybeObserver(MaybeObserver<? super T> actual,
                 Function<? super Throwable, ? extends T> valueSupplier) {
             this.downstream = actual;
-            this.valueSupplier = valueSupplier;
+            this.itemSupplier = valueSupplier;
         }
 
         @Override
@@ -83,7 +83,7 @@ public void onError(Throwable e) {
             T v;
 
             try {
-                v = Objects.requireNonNull(valueSupplier.apply(e), "The valueSupplier returned a null value");
+                v = Objects.requireNonNull(itemSupplier.apply(e), "The itemSupplier returned a null value");
             } catch (Throwable ex) {
                 Exceptions.throwIfFatal(ex);
                 downstream.onError(new CompositeException(e, ex));
diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableOnErrorXTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableOnErrorXTest.java
index 4a1333b530..531f13b8cc 100644
--- a/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableOnErrorXTest.java
+++ b/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableOnErrorXTest.java
@@ -15,10 +15,16 @@
 
 import static org.junit.Assert.assertEquals;
 
+import java.io.IOException;
+
 import org.junit.Test;
 
 import io.reactivex.rxjava3.core.*;
+import io.reactivex.rxjava3.exceptions.TestException;
 import io.reactivex.rxjava3.functions.Function;
+import io.reactivex.rxjava3.internal.functions.Functions;
+import io.reactivex.rxjava3.subjects.CompletableSubject;
+import io.reactivex.rxjava3.testsupport.TestHelper;
 
 public class CompletableOnErrorXTest extends RxJavaTest {
 
@@ -46,4 +52,55 @@ public CompletableSource apply(Throwable e) throws Exception {
 
         assertEquals(0, call[0]);
     }
+
+    @Test
+    public void onErrorReturnConst() {
+        Completable.error(new TestException())
+        .onErrorReturnItem(1)
+        .test()
+        .assertResult(1);
+    }
+
+    @Test
+    public void onErrorReturn() {
+        Completable.error(new TestException())
+        .onErrorReturn(Functions.justFunction(1))
+        .test()
+        .assertResult(1);
+    }
+
+    @Test
+    public void onErrorReturnFunctionThrows() {
+        TestHelper.assertCompositeExceptions(Completable.error(new TestException())
+        .onErrorReturn(new Function<Throwable, Object>() {
+            @Override
+            public Object apply(Throwable v) throws Exception {
+                throw new IOException();
+            }
+        })
+        .to(TestHelper.testConsumer()), TestException.class, IOException.class);
+    }
+
+    @Test
+    public void onErrorReturnEmpty() {
+        Completable.complete()
+        .onErrorReturnItem(2)
+        .test()
+        .assertResult();
+    }
+
+    @Test
+    public void onErrorReturnDispose() {
+        TestHelper.checkDisposed(CompletableSubject.create().onErrorReturnItem(1));
+    }
+
+    @Test
+    public void onErrorReturnDoubleOnSubscribe() {
+        TestHelper.checkDoubleOnSubscribeCompletableToMaybe(new Function<Completable, MaybeSource<Object>>() {
+            @Override
+            public MaybeSource<Object> apply(Completable v) throws Exception {
+                return v.onErrorReturnItem(1);
+            }
+        });
+    }
 }