Skip to content

Commit 9283700

Browse files
authored
3.x: Add various concatXDelayError operators (#6881)
1 parent 57bd1a9 commit 9283700

11 files changed

+605
-28
lines changed

docs/Operator-Matrix.md

+14-20
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,10 @@ Operator | ![Flowable](https://raw.github.com/wiki/ReactiveX/RxJava/images/opmat
3232
<a name='compose'></a>`compose`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|
3333
<a name='concat'></a>`concat`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|
3434
<a name='concatArray'></a>`concatArray`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|
35-
<a name='concatArrayDelayError'></a>`concatArrayDelayError`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_half.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_half.png)|
35+
<a name='concatArrayDelayError'></a>`concatArrayDelayError`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|
3636
<a name='concatArrayEager'></a>`concatArrayEager`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) <sup title='No items to keep ordered. Use mergeArray().'>([24](#notes-24))</sup>|
37-
<a name='concatArrayEagerDelayError'></a>`concatArrayEagerDelayError`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_half.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_half.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) <sup title='No items to keep ordered. Use mergeArrayDelayError().'>([25](#notes-25))</sup>|
38-
<a name='concatDelayError'></a>`concatDelayError`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_half.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_half.png)|
37+
<a name='concatArrayEagerDelayError'></a>`concatArrayEagerDelayError`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) <sup title='No items to keep ordered. Use mergeArrayDelayError().'>([25](#notes-25))</sup>|
38+
<a name='concatDelayError'></a>`concatDelayError`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|
3939
<a name='concatEager'></a>`concatEager`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) <sup title='No items to keep ordered. Use merge().'>([26](#notes-26))</sup>|
4040
<a name='concatMap'></a>`concatMap`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) <sup title='Always empty thus no items to map.'>([27](#notes-27))</sup>|
4141
<a name='concatMapCompletable'></a>`concatMapCompletable`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) <sup title='Always empty thus no items to map.'>([27](#notes-27))</sup>|
@@ -237,7 +237,7 @@ Operator | ![Flowable](https://raw.github.com/wiki/ReactiveX/RxJava/images/opmat
237237
<a name='zip'></a>`zip`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) <sup title='Use merge().'>([111](#notes-111))</sup>|
238238
<a name='zipArray'></a>`zipArray`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) <sup title='Use mergeArray().'>([112](#notes-112))</sup>|
239239
<a name='zipWith'></a>`zipWith`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) <sup title='Use mergeWith().'>([113](#notes-113))</sup>|
240-
<a name='total'></a>**237 operators** | **215** | **209** | **115** | **100** | **78** |
240+
<a name='total'></a>**237 operators** | **215** | **209** | **116** | **103** | **80** |
241241

242242
#### Notes
243243
<a name='notes-1'></a><sup>1</sup> Use [`contains()`](#contains).<br/>
@@ -356,19 +356,13 @@ Operator | ![Flowable](https://raw.github.com/wiki/ReactiveX/RxJava/images/opmat
356356

357357
#### Under development
358358

359-
1. Single.concatArrayDelayError()
360-
2. Completable.concatArrayDelayError()
361-
3. Maybe.concatArrayEagerDelayError()
362-
4. Single.concatArrayEagerDelayError()
363-
5. Single.concatDelayError()
364-
6. Completable.concatDelayError()
365-
7. Single.mergeArray()
366-
8. Single.mergeArrayDelayError()
367-
9. Completable.onErrorReturn()
368-
10. Completable.onErrorReturnItem()
369-
11. Maybe.safeSubscribe()
370-
12. Single.safeSubscribe()
371-
13. Completable.safeSubscribe()
372-
14. Completable.sequenceEqual()
373-
15. Maybe.startWith()
374-
16. Single.startWith()
359+
1. Single.mergeArray()
360+
2. Single.mergeArrayDelayError()
361+
3. Completable.onErrorReturn()
362+
4. Completable.onErrorReturnItem()
363+
5. Maybe.safeSubscribe()
364+
6. Single.safeSubscribe()
365+
7. Completable.safeSubscribe()
366+
8. Completable.sequenceEqual()
367+
9. Maybe.startWith()
368+
10. Single.startWith()

src/main/java/io/reactivex/rxjava3/core/Completable.java

+91
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,27 @@ public static Completable concatArray(@NonNull CompletableSource... sources) {
202202
return RxJavaPlugins.onAssembly(new CompletableConcatArray(sources));
203203
}
204204

205+
/**
206+
* Returns a {@code Completable} which completes only when all sources complete, one after another.
207+
* <p>
208+
* <img width="640" height="322" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.concatArrayDelayError.png" alt="">
209+
* <dl>
210+
* <dt><b>Scheduler:</b></dt>
211+
* <dd>{@code concatArrayDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
212+
* </dl>
213+
* @param sources the sources to concatenate
214+
* @return the new {@code Completable} instance
215+
* @throws NullPointerException if {@code sources} is {@code null}
216+
* @since 3.0.0
217+
*/
218+
@CheckReturnValue
219+
@NonNull
220+
@SchedulerSupport(SchedulerSupport.NONE)
221+
@SafeVarargs
222+
public static Completable concatArrayDelayError(@NonNull CompletableSource... sources) {
223+
return Flowable.fromArray(sources).concatMapCompletableDelayError(Functions.identity(), true, 2);
224+
}
225+
205226
/**
206227
* Returns a {@code Completable} which completes only when all sources complete, one after another.
207228
* <p>
@@ -273,6 +294,76 @@ public static Completable concat(@NonNull Publisher<@NonNull ? extends Completab
273294
return RxJavaPlugins.onAssembly(new CompletableConcat(sources, prefetch));
274295
}
275296

297+
/**
298+
* Returns a {@code Completable} which completes only when all sources complete, one after another.
299+
* <p>
300+
* <img width="640" height="361" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.concatDelayError.png" alt="">
301+
* <dl>
302+
* <dt><b>Scheduler:</b></dt>
303+
* <dd>{@code concatDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
304+
* </dl>
305+
* @param sources the sources to concatenate
306+
* @return the new {@code Completable} instance
307+
* @throws NullPointerException if {@code sources} is {@code null}
308+
* @since 3.0.0
309+
*/
310+
@CheckReturnValue
311+
@NonNull
312+
@SchedulerSupport(SchedulerSupport.NONE)
313+
public static Completable concatDelayError(@NonNull Iterable<@NonNull ? extends CompletableSource> sources) {
314+
return Flowable.fromIterable(sources).concatMapCompletableDelayError(Functions.identity());
315+
}
316+
317+
/**
318+
* Returns a {@code Completable} which completes only when all sources complete, one after another.
319+
* <p>
320+
* <img width="640" height="396" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.concatDelayError.p.png" alt="">
321+
* <dl>
322+
* <dt><b>Backpressure:</b></dt>
323+
* <dd>The returned {@code Completable} honors the backpressure of the downstream consumer
324+
* and expects the other {@link Publisher} to honor it as well.</dd>
325+
* <dt><b>Scheduler:</b></dt>
326+
* <dd>{@code concatDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
327+
* </dl>
328+
* @param sources the sources to concatenate
329+
* @return the new {@code Completable} instance
330+
* @throws NullPointerException if {@code sources} is {@code null}
331+
* @since 3.0.0
332+
*/
333+
@CheckReturnValue
334+
@SchedulerSupport(SchedulerSupport.NONE)
335+
@BackpressureSupport(BackpressureKind.FULL)
336+
@NonNull
337+
public static Completable concatDelayError(@NonNull Publisher<@NonNull ? extends CompletableSource> sources) {
338+
return concatDelayError(sources, 2);
339+
}
340+
341+
/**
342+
* Returns a {@code Completable} which completes only when all sources complete, one after another.
343+
* <p>
344+
* <img width="640" height="359" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.concatDelayError.pn.png" alt="">
345+
* <dl>
346+
* <dt><b>Backpressure:</b></dt>
347+
* <dd>The returned {@code Completable} honors the backpressure of the downstream consumer
348+
* and expects the other {@link Publisher} to honor it as well.</dd>
349+
* <dt><b>Scheduler:</b></dt>
350+
* <dd>{@code concatDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
351+
* </dl>
352+
* @param sources the sources to concatenate
353+
* @param prefetch the number of sources to prefetch from the sources
354+
* @return the new {@code Completable} instance
355+
* @throws NullPointerException if {@code sources} is {@code null}
356+
* @throws IllegalArgumentException if {@code prefetch} is non-positive
357+
* @since 3.0.0
358+
*/
359+
@CheckReturnValue
360+
@NonNull
361+
@SchedulerSupport(SchedulerSupport.NONE)
362+
@BackpressureSupport(BackpressureKind.FULL)
363+
public static Completable concatDelayError(@NonNull Publisher<@NonNull ? extends CompletableSource> sources, int prefetch) {
364+
return Flowable.fromPublisher(sources).concatMapCompletableDelayError(Functions.identity(), true, prefetch);
365+
}
366+
276367
/**
277368
* Provides an API (via a cold {@code Completable}) that bridges the reactive world with the callback-style world.
278369
* <p>

0 commit comments

Comments
 (0)