From d2677ad47fe38558f884dedde9ce35b160113c44 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Sun, 26 Jan 2020 23:55:16 +0100 Subject: [PATCH] 3.x: Add various concatXDelayError operators --- docs/Operator-Matrix.md | 34 ++--- .../reactivex/rxjava3/core/Completable.java | 91 +++++++++++ .../java/io/reactivex/rxjava3/core/Maybe.java | 68 ++++++++- .../io/reactivex/rxjava3/core/Single.java | 144 +++++++++++++++++- .../CompletableConcatArrayDelayErrorTest.java | 43 ++++++ .../CompletableConcatDelayErrorTest.java | 82 ++++++++++ .../MaybeConcatArrayEagerDelayErrorTest.java | 34 +++++ .../SingleConcatArrayDelayErrorTest.java | 34 +++++ .../SingleConcatArrayEagerDelayErrorTest.java | 34 +++++ .../single/SingleConcatDelayErrorTest.java | 58 +++++++ .../io/reactivex/rxjava3/maybe/MaybeTest.java | 11 ++ 11 files changed, 605 insertions(+), 28 deletions(-) create mode 100644 src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableConcatArrayDelayErrorTest.java create mode 100644 src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableConcatDelayErrorTest.java create mode 100644 src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeConcatArrayEagerDelayErrorTest.java create mode 100644 src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleConcatArrayDelayErrorTest.java create mode 100644 src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleConcatArrayEagerDelayErrorTest.java create mode 100644 src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleConcatDelayErrorTest.java diff --git a/docs/Operator-Matrix.md b/docs/Operator-Matrix.md index cd71558ddb..2ddd1cf21f 100644 --- a/docs/Operator-Matrix.md +++ b/docs/Operator-Matrix.md @@ -32,10 +32,10 @@ Operator | ![Flowable](https://raw.github.com/wiki/ReactiveX/RxJava/images/opmat `compose`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)| `concat`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)| `concatArray`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)| -`concatArrayDelayError`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_half.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_half.png)| +`concatArrayDelayError`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)| `concatArrayEager`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([24](#notes-24))| -`concatArrayEagerDelayError`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_half.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_half.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([25](#notes-25))| -`concatDelayError`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_half.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_half.png)| +`concatArrayEagerDelayError`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([25](#notes-25))| +`concatDelayError`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)| `concatEager`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([26](#notes-26))| `concatMap`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([27](#notes-27))| `concatMapCompletable`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([27](#notes-27))| @@ -237,7 +237,7 @@ Operator | ![Flowable](https://raw.github.com/wiki/ReactiveX/RxJava/images/opmat `zip`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([111](#notes-111))| `zipArray`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([112](#notes-112))| `zipWith`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([113](#notes-113))| -**237 operators** | **215** | **209** | **115** | **100** | **78** | +**237 operators** | **215** | **209** | **116** | **103** | **80** | #### Notes 1 Use [`contains()`](#contains).
@@ -356,19 +356,13 @@ Operator | ![Flowable](https://raw.github.com/wiki/ReactiveX/RxJava/images/opmat #### Under development -1. Single.concatArrayDelayError() -2. Completable.concatArrayDelayError() -3. Maybe.concatArrayEagerDelayError() -4. Single.concatArrayEagerDelayError() -5. Single.concatDelayError() -6. Completable.concatDelayError() -7. Single.mergeArray() -8. Single.mergeArrayDelayError() -9. Completable.onErrorReturn() -10. Completable.onErrorReturnItem() -11. Maybe.safeSubscribe() -12. Single.safeSubscribe() -13. Completable.safeSubscribe() -14. Completable.sequenceEqual() -15. Maybe.startWith() -16. Single.startWith() +1. Single.mergeArray() +2. Single.mergeArrayDelayError() +3. Completable.onErrorReturn() +4. Completable.onErrorReturnItem() +5. Maybe.safeSubscribe() +6. Single.safeSubscribe() +7. Completable.safeSubscribe() +8. Completable.sequenceEqual() +9. Maybe.startWith() +10. Single.startWith() diff --git a/src/main/java/io/reactivex/rxjava3/core/Completable.java b/src/main/java/io/reactivex/rxjava3/core/Completable.java index 9febbd1204..253610ef80 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Completable.java +++ b/src/main/java/io/reactivex/rxjava3/core/Completable.java @@ -202,6 +202,27 @@ public static Completable concatArray(@NonNull CompletableSource... sources) { return RxJavaPlugins.onAssembly(new CompletableConcatArray(sources)); } + /** + * Returns a {@code Completable} which completes only when all sources complete, one after another. + *

+ * + *

+ *
Scheduler:
+ *
{@code concatArrayDelayError} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param sources the sources to concatenate + * @return the new {@code Completable} instance + * @throws NullPointerException if {@code sources} is {@code null} + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + @SafeVarargs + public static Completable concatArrayDelayError(@NonNull CompletableSource... sources) { + return Flowable.fromArray(sources).concatMapCompletableDelayError(Functions.identity(), true, 2); + } + /** * Returns a {@code Completable} which completes only when all sources complete, one after another. *

@@ -273,6 +294,76 @@ public static Completable concat(@NonNull Publisher<@NonNull ? extends Completab return RxJavaPlugins.onAssembly(new CompletableConcat(sources, prefetch)); } + /** + * Returns a {@code Completable} which completes only when all sources complete, one after another. + *

+ * + *

+ *
Scheduler:
+ *
{@code concatDelayError} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param sources the sources to concatenate + * @return the new {@code Completable} instance + * @throws NullPointerException if {@code sources} is {@code null} + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + public static Completable concatDelayError(@NonNull Iterable<@NonNull ? extends CompletableSource> sources) { + return Flowable.fromIterable(sources).concatMapCompletableDelayError(Functions.identity()); + } + + /** + * Returns a {@code Completable} which completes only when all sources complete, one after another. + *

+ * + *

+ *
Backpressure:
+ *
The returned {@code Completable} honors the backpressure of the downstream consumer + * and expects the other {@link Publisher} to honor it as well.
+ *
Scheduler:
+ *
{@code concatDelayError} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param sources the sources to concatenate + * @return the new {@code Completable} instance + * @throws NullPointerException if {@code sources} is {@code null} + * @since 3.0.0 + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @BackpressureSupport(BackpressureKind.FULL) + @NonNull + public static Completable concatDelayError(@NonNull Publisher<@NonNull ? extends CompletableSource> sources) { + return concatDelayError(sources, 2); + } + + /** + * Returns a {@code Completable} which completes only when all sources complete, one after another. + *

+ * + *

+ *
Backpressure:
+ *
The returned {@code Completable} honors the backpressure of the downstream consumer + * and expects the other {@link Publisher} to honor it as well.
+ *
Scheduler:
+ *
{@code concatDelayError} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param sources the sources to concatenate + * @param prefetch the number of sources to prefetch from the sources + * @return the new {@code Completable} instance + * @throws NullPointerException if {@code sources} is {@code null} + * @throws IllegalArgumentException if {@code prefetch} is non-positive + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + @BackpressureSupport(BackpressureKind.FULL) + public static Completable concatDelayError(@NonNull Publisher<@NonNull ? extends CompletableSource> sources, int prefetch) { + return Flowable.fromPublisher(sources).concatMapCompletableDelayError(Functions.identity(), true, prefetch); + } + /** * Provides an API (via a cold {@code Completable}) that bridges the reactive world with the callback-style world. *

diff --git a/src/main/java/io/reactivex/rxjava3/core/Maybe.java b/src/main/java/io/reactivex/rxjava3/core/Maybe.java index 6754f65e7b..ce02849d56 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Maybe.java +++ b/src/main/java/io/reactivex/rxjava3/core/Maybe.java @@ -446,13 +446,42 @@ public static Flowable concatArrayDelayError(@NonNull MaybeSource Flowable concatArrayEager(@NonNull MaybeSource... sources) { return Flowable.fromArray(sources).concatMapEager((Function)MaybeToPublisher.instance()); } + /** + * Concatenates a sequence of {@link MaybeSource} eagerly into a {@link Flowable} sequence. + *

+ * Eager concatenation means that once an observer subscribes, this operator subscribes to all of the + * source {@code MaybeSource}s. The operator buffers the value emitted by these {@code MaybeSource}s and then drains them + * in order, each one after the previous one completes. + *

+ * + *

+ *
Backpressure:
+ *
The operator honors backpressure from downstream.
+ *
Scheduler:
+ *
This method does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the value type + * @param sources a sequence of {@code MaybeSource}s that need to be eagerly concatenated + * @return the new {@code Flowable} instance with the specified concatenation behavior + * @throws NullPointerException if {@code sources} is {@code null} + * @since 3.0.0 + */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + @BackpressureSupport(BackpressureKind.FULL) + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @NonNull + @SafeVarargs + public static Flowable concatArrayEagerDelayError(@NonNull MaybeSource... sources) { + return Flowable.fromArray(sources).concatMapEagerDelayError((Function)MaybeToPublisher.instance(), true); + } /** * Concatenates the {@link Iterable} sequence of {@link MaybeSource}s into a single sequence by subscribing to each {@code MaybeSource}, * one after the other, one at a time and delays any errors till the all inner {@code MaybeSource}s terminate * as a {@link Flowable} sequence. *

- * + * *

*
Backpressure:
*
The operator honors backpressure from downstream.
@@ -465,14 +494,12 @@ public static Flowable concatArrayEager(@NonNull MaybeSource * @return the new {@code Flowable} with the concatenating behavior * @throws NullPointerException if {@code sources} is {@code null} */ - @SuppressWarnings({ "unchecked", "rawtypes" }) @BackpressureSupport(BackpressureKind.FULL) @CheckReturnValue @NonNull @SchedulerSupport(SchedulerSupport.NONE) public static Flowable concatDelayError(@NonNull Iterable<@NonNull ? extends MaybeSource> sources) { - Objects.requireNonNull(sources, "sources is null"); - return Flowable.fromIterable(sources).concatMapDelayError((Function)MaybeToPublisher.instance()); + return Flowable.fromIterable(sources).concatMapMaybeDelayError(Functions.identity()); } /** @@ -493,13 +520,42 @@ public static Flowable concatDelayError(@NonNull Iterable<@NonNull ? exte * @return the new {@code Flowable} with the concatenating behavior * @throws NullPointerException if {@code sources} is {@code null} */ - @SuppressWarnings({ "unchecked", "rawtypes" }) @BackpressureSupport(BackpressureKind.FULL) @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) @NonNull public static Flowable concatDelayError(@NonNull Publisher<@NonNull ? extends MaybeSource> sources) { - return Flowable.fromPublisher(sources).concatMapDelayError((Function)MaybeToPublisher.instance()); + return Flowable.fromPublisher(sources).concatMapMaybeDelayError(Functions.identity()); + } + /** + * Concatenates the {@link Publisher} sequence of {@link MaybeSource}s into a single sequence by subscribing to each inner {@code MaybeSource}, + * one after the other, one at a time and delays any errors till the all inner and the outer {@code Publisher} terminate + * as a {@link Flowable} sequence. + *

+ * + *

+ *
Backpressure:
+ *
{@code concatDelayError} fully supports backpressure.
+ *
Scheduler:
+ *
{@code concatDelayError} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param the common element base type + * @param sources the {@code Publisher} sequence of {@code MaybeSource}s + * @param prefetch The number of upstream items to prefetch so that fresh items are + * ready to be mapped when a previous {@code MaybeSource} terminates. + * The operator replenishes after half of the prefetch amount has been consumed + * and turned into {@code MaybeSource}s. + * @return the new {@code Flowable} with the concatenating behavior + * @throws NullPointerException if {@code sources} is {@code null} + * @throws IllegalArgumentException if {@code prefetch} is non-positive + */ + @BackpressureSupport(BackpressureKind.FULL) + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @NonNull + public static Flowable concatDelayError(@NonNull Publisher<@NonNull ? extends MaybeSource> sources, int prefetch) { + return Flowable.fromPublisher(sources).concatMapMaybeDelayError(Functions.identity(), true, prefetch); } /** diff --git a/src/main/java/io/reactivex/rxjava3/core/Single.java b/src/main/java/io/reactivex/rxjava3/core/Single.java index 0a1d872b14..9f5c3446c5 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Single.java +++ b/src/main/java/io/reactivex/rxjava3/core/Single.java @@ -406,10 +406,35 @@ public static Flowable concat( @NonNull @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) - @SuppressWarnings({ "unchecked", "rawtypes" }) @SafeVarargs public static Flowable concatArray(@NonNull SingleSource... sources) { - return RxJavaPlugins.onAssembly(new FlowableConcatMap(Flowable.fromArray(sources), SingleInternalHelper.toFlowable(), 2, ErrorMode.BOUNDARY)); + return Flowable.fromArray(sources).concatMap(SingleInternalHelper.toFlowable(), 2); + } + + /** + * Concatenate the single values, in a non-overlapping fashion, of the {@link SingleSource}s provided in + * an array. + *

+ * + *

+ *
Backpressure:
+ *
The returned {@link Flowable} honors the backpressure of the downstream consumer.
+ *
Scheduler:
+ *
{@code concatArrayDelayError} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the value type + * @param sources the array of {@code SingleSource} instances + * @return the new {@code Flowable} instance + * @throws NullPointerException if {@code sources} is {@code null} + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + @SafeVarargs + public static Flowable concatArrayDelayError(@NonNull SingleSource... sources) { + return Flowable.fromArray(sources).concatMapDelayError(SingleInternalHelper.toFlowable(), true, 2); } /** @@ -440,6 +465,35 @@ public static Flowable concatArrayEager(@NonNull SingleSource + * + *

+ * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the + * source {@code SingleSource}s. The operator buffers the value emitted by these {@code SingleSource}s and then drains them + * in order, each one after the previous one succeeds. + *

+ *
Backpressure:
+ *
The operator honors backpressure from downstream.
+ *
Scheduler:
+ *
This method does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the value type + * @param sources a sequence of {@code SingleSource}s that need to be eagerly concatenated + * @return the new {@link Flowable} instance with the specified concatenation behavior + * @throws NullPointerException if {@code sources} is {@code null} + * @since 3.0.0 + */ + @BackpressureSupport(BackpressureKind.FULL) + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + @SafeVarargs + public static Flowable concatArrayEagerDelayError(@NonNull SingleSource... sources) { + return Flowable.fromArray(sources).concatMapEagerDelayError(SingleInternalHelper.toFlowable(), true); + } + /** * Concatenates a {@link Publisher} sequence of {@link SingleSource}s eagerly into a single stream of values. *

@@ -469,6 +523,92 @@ public static Flowable concatEager(@NonNull Publisher<@NonNull ? extends return Flowable.fromPublisher(sources).concatMapEager(SingleInternalHelper.toFlowable()); } + /** + * Concatenates the {@link Iterable} sequence of {@link SingleSource}s into a single sequence by subscribing to each {@code SingleSource}, + * one after the other, one at a time and delays any errors till the all inner {@code SingleSource}s terminate + * as a {@link Flowable} sequence. + *

+ * + *

+ *
Backpressure:
+ *
The operator honors backpressure from downstream.
+ *
Scheduler:
+ *
{@code concatDelayError} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param the common element base type + * @param sources the {@code Iterable} sequence of {@code SingleSource}s + * @return the new {@code Flowable} with the concatenating behavior + * @throws NullPointerException if {@code sources} is {@code null} + * @since 3.0.0 + */ + @BackpressureSupport(BackpressureKind.FULL) + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + public static Flowable concatDelayError(@NonNull Iterable<@NonNull ? extends SingleSource> sources) { + return Flowable.fromIterable(sources).concatMapSingleDelayError(Functions.identity()); + } + + /** + * Concatenates the {@link Publisher} sequence of {@link SingleSource}s into a single sequence by subscribing to each inner {@code SingleSource}, + * one after the other, one at a time and delays any errors till the all inner and the outer {@code Publisher} terminate + * as a {@link Flowable} sequence. + *

+ * + *

+ *
Backpressure:
+ *
{@code concatDelayError} fully supports backpressure.
+ *
Scheduler:
+ *
{@code concatDelayError} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param the common element base type + * @param sources the {@code Publisher} sequence of {@code SingleSource}s + * @return the new {@code Flowable} with the concatenating behavior + * @throws NullPointerException if {@code sources} is {@code null} + * @since 3.0.0 + */ + @BackpressureSupport(BackpressureKind.FULL) + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @NonNull + public static Flowable concatDelayError(@NonNull Publisher<@NonNull ? extends SingleSource> sources) { + return Flowable.fromPublisher(sources).concatMapSingleDelayError(Functions.identity()); + } + + /** + * Concatenates the {@link Publisher} sequence of {@link SingleSource}s into a single sequence by subscribing to each inner {@code SingleSource}, + * one after the other, one at a time and delays any errors till the all inner and the outer {@code Publisher} terminate + * as a {@link Flowable} sequence. + *

+ * + *

+ *
Backpressure:
+ *
{@code concatDelayError} fully supports backpressure.
+ *
Scheduler:
+ *
{@code concatDelayError} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param the common element base type + * @param sources the {@code Publisher} sequence of {@code SingleSource}s + * @param prefetch The number of upstream items to prefetch so that fresh items are + * ready to be mapped when a previous {@code SingleSource} terminates. + * The operator replenishes after half of the prefetch amount has been consumed + * and turned into {@code SingleSource}s. + * @return the new {@code Flowable} with the concatenating behavior + * @throws NullPointerException if {@code sources} is {@code null} + * @throws IllegalArgumentException if {@code prefetch} is non-positive + * @since 3.0.0 + */ + @BackpressureSupport(BackpressureKind.FULL) + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @NonNull + public static Flowable concatDelayError(@NonNull Publisher<@NonNull ? extends SingleSource> sources, int prefetch) { + return Flowable.fromPublisher(sources).concatMapSingleDelayError(Functions.identity(), true, prefetch); + } + /** * Concatenates an {@link Iterable} sequence of {@link SingleSource}s eagerly into a single stream of values. *

diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableConcatArrayDelayErrorTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableConcatArrayDelayErrorTest.java new file mode 100644 index 0000000000..229d00acc6 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableConcatArrayDelayErrorTest.java @@ -0,0 +1,43 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.completable; + +import static org.mockito.Mockito.*; +import org.junit.Test; + +import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.exceptions.TestException; +import io.reactivex.rxjava3.functions.Action; + +public class CompletableConcatArrayDelayErrorTest { + + @Test + public void normal() throws Throwable { + Action action1 = mock(Action.class); + Action action2 = mock(Action.class); + + Completable.concatArrayDelayError( + Completable.fromAction(action1), + Completable.error(new TestException()), + Completable.fromAction(action2) + ) + .test() + .assertFailure(TestException.class); + + verify(action1).run(); + + verify(action2).run(); + } + +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableConcatDelayErrorTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableConcatDelayErrorTest.java new file mode 100644 index 0000000000..fd3124a96c --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableConcatDelayErrorTest.java @@ -0,0 +1,82 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.completable; + +import static org.mockito.Mockito.*; + +import java.util.Arrays; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.exceptions.TestException; +import io.reactivex.rxjava3.functions.Action; + +public class CompletableConcatDelayErrorTest { + + @Test + public void normalIterable() throws Throwable { + Action action1 = mock(Action.class); + Action action2 = mock(Action.class); + + Completable.concatDelayError(Arrays.asList( + Completable.fromAction(action1), + Completable.error(new TestException()), + Completable.fromAction(action2) + )) + .test() + .assertFailure(TestException.class); + + verify(action1).run(); + + verify(action2).run(); + } + + @Test + public void normalPublisher() throws Throwable { + Action action1 = mock(Action.class); + Action action2 = mock(Action.class); + + Completable.concatDelayError(Flowable.fromArray( + Completable.fromAction(action1), + Completable.error(new TestException()), + Completable.fromAction(action2) + )) + .test() + .assertFailure(TestException.class); + + verify(action1).run(); + + verify(action2).run(); + } + + @Test + public void normalPublisherPrefetch() throws Throwable { + Action action1 = mock(Action.class); + Action action2 = mock(Action.class); + + Completable.concatDelayError(Flowable.fromArray( + Completable.fromAction(action1), + Completable.error(new TestException()), + Completable.fromAction(action2) + ), 1) + .test() + .assertFailure(TestException.class); + + verify(action1).run(); + + verify(action2).run(); + } + +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeConcatArrayEagerDelayErrorTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeConcatArrayEagerDelayErrorTest.java new file mode 100644 index 0000000000..6608fbc424 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeConcatArrayEagerDelayErrorTest.java @@ -0,0 +1,34 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.maybe; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.Maybe; +import io.reactivex.rxjava3.exceptions.TestException; + +public class MaybeConcatArrayEagerDelayErrorTest { + + @Test + public void normal() { + Maybe.concatArrayEagerDelayError( + Maybe.just(1), + Maybe.error(new TestException()), + Maybe.empty(), + Maybe.just(2) + ) + .test() + .assertFailure(TestException.class, 1, 2); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleConcatArrayDelayErrorTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleConcatArrayDelayErrorTest.java new file mode 100644 index 0000000000..10fec20410 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleConcatArrayDelayErrorTest.java @@ -0,0 +1,34 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.single; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.Single; +import io.reactivex.rxjava3.exceptions.TestException; + +public class SingleConcatArrayDelayErrorTest { + + @Test + public void normal() { + Single.concatArrayDelayError( + Single.just(1), + Single.error(new TestException()), + Single.just(2) + ) + .test() + .assertFailure(TestException.class, 1, 2); + } + +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleConcatArrayEagerDelayErrorTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleConcatArrayEagerDelayErrorTest.java new file mode 100644 index 0000000000..ce0df373bb --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleConcatArrayEagerDelayErrorTest.java @@ -0,0 +1,34 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.single; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.Single; +import io.reactivex.rxjava3.exceptions.TestException; + +public class SingleConcatArrayEagerDelayErrorTest { + + @Test + public void normal() { + Single.concatArrayEagerDelayError( + Single.just(1), + Single.error(new TestException()), + Single.just(2) + ) + .test() + .assertFailure(TestException.class, 1, 2); + } + +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleConcatDelayErrorTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleConcatDelayErrorTest.java new file mode 100644 index 0000000000..83ae9cbf9e --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleConcatDelayErrorTest.java @@ -0,0 +1,58 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.single; + +import java.util.Arrays; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.exceptions.TestException; + +public class SingleConcatDelayErrorTest { + + @Test + public void normalIterable() { + Single.concatDelayError(Arrays.asList( + Single.just(1), + Single.error(new TestException()), + Single.just(2) + )) + .test() + .assertFailure(TestException.class, 1, 2); + } + + @Test + public void normalPublisher() { + Single.concatDelayError(Flowable.fromArray( + Single.just(1), + Single.error(new TestException()), + Single.just(2) + )) + .test() + .assertFailure(TestException.class, 1, 2); + } + + @Test + public void normalPublisherPrefetch() { + Single.concatDelayError(Flowable.fromArray( + Single.just(1), + Single.error(new TestException()), + Single.just(2) + ), 1) + .test() + .assertFailure(TestException.class, 1, 2); + } + +} diff --git a/src/test/java/io/reactivex/rxjava3/maybe/MaybeTest.java b/src/test/java/io/reactivex/rxjava3/maybe/MaybeTest.java index 7a0fe1ae7d..94cf62d641 100644 --- a/src/test/java/io/reactivex/rxjava3/maybe/MaybeTest.java +++ b/src/test/java/io/reactivex/rxjava3/maybe/MaybeTest.java @@ -2388,6 +2388,17 @@ public void concatPublisherDelayError() { .assertFailure(TestException.class, 1); } + @Test + public void concatPublisherDelayErrorPrefetch() { + Maybe.concatDelayError(Flowable.just(Maybe.empty(), Maybe.just(1), Maybe.error(new TestException())), 1) + .test() + .assertFailure(TestException.class, 1); + + Maybe.concatDelayError(Flowable.just(Maybe.error(new TestException()), Maybe.empty(), Maybe.just(1)), 1) + .test() + .assertFailure(TestException.class, 1); + } + @Test public void concatEagerArray() { PublishProcessor pp1 = PublishProcessor.create();