Skip to content

Commit 84004a6

Browse files
authored
2.x: Add Flowable.concatMapCompletable{DelayError} operator (ReactiveX#5871)
1 parent 8068404 commit 84004a6

File tree

3 files changed

+845
-0
lines changed

3 files changed

+845
-0
lines changed

Diff for: src/main/java/io/reactivex/Flowable.java

+163
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import io.reactivex.internal.functions.*;
2626
import io.reactivex.internal.fuseable.*;
2727
import io.reactivex.internal.operators.flowable.*;
28+
import io.reactivex.internal.operators.mixed.*;
2829
import io.reactivex.internal.operators.observable.ObservableFromPublisher;
2930
import io.reactivex.internal.schedulers.ImmediateThinScheduler;
3031
import io.reactivex.internal.subscribers.*;
@@ -6886,6 +6887,168 @@ public final <R> Flowable<R> concatMap(Function<? super T, ? extends Publisher<?
68866887
return RxJavaPlugins.onAssembly(new FlowableConcatMap<T, R>(this, mapper, prefetch, ErrorMode.IMMEDIATE));
68876888
}
68886889

6890+
/**
6891+
* Maps the upstream intems into {@link CompletableSource}s and subscribes to them one after the
6892+
* other completes.
6893+
* <p>
6894+
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatMap.png" alt="">
6895+
* <dl>
6896+
* <dt><b>Backpressure:</b></dt>
6897+
* <dd>The operator expects the upstream to support backpressure. If this {@code Flowable} violates the rule, the operator will
6898+
* signal a {@code MissingBackpressureException}.</dd>
6899+
* <dt><b>Scheduler:</b></dt>
6900+
* <dd>{@code concatMapCompletable} does not operate by default on a particular {@link Scheduler}.</dd>
6901+
* </dl>
6902+
* @param mapper the function called with the upstream item and should return
6903+
* a {@code CompletableSource} to become the next source to
6904+
* be subscribed to
6905+
* @return a new Completable instance
6906+
* @since 2.1.11 - experimental
6907+
* @see #concatMapCompletableDelayError(Function)
6908+
*/
6909+
@CheckReturnValue
6910+
@SchedulerSupport(SchedulerSupport.NONE)
6911+
@BackpressureSupport(BackpressureKind.FULL)
6912+
@Experimental
6913+
public final Completable concatMapCompletable(Function<? super T, ? extends CompletableSource> mapper) {
6914+
return concatMapCompletable(mapper, 2);
6915+
}
6916+
6917+
/**
6918+
* Maps the upstream intems into {@link CompletableSource}s and subscribes to them one after the
6919+
* other completes.
6920+
* <p>
6921+
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatMap.png" alt="">
6922+
* <dl>
6923+
* <dt><b>Backpressure:</b></dt>
6924+
* <dd>The operator expects the upstream to support backpressure. If this {@code Flowable} violates the rule, the operator will
6925+
* signal a {@code MissingBackpressureException}.</dd>
6926+
* <dt><b>Scheduler:</b></dt>
6927+
* <dd>{@code concatMapCompletable} does not operate by default on a particular {@link Scheduler}.</dd>
6928+
* </dl>
6929+
* @param mapper the function called with the upstream item and should return
6930+
* a {@code CompletableSource} to become the next source to
6931+
* be subscribed to
6932+
* @param prefetch The number of upstream items to prefetch so that fresh items are
6933+
* ready to be mapped when a previous {@code CompletableSource} terminates.
6934+
* The operator replenishes after half of the prefetch amount has been consumed
6935+
* and turned into {@code CompletableSource}s.
6936+
* @return a new Completable instance
6937+
* @since 2.1.11 - experimental
6938+
* @see #concatMapCompletableDelayError(Function, boolean, int)
6939+
*/
6940+
@CheckReturnValue
6941+
@SchedulerSupport(SchedulerSupport.NONE)
6942+
@BackpressureSupport(BackpressureKind.FULL)
6943+
@Experimental
6944+
public final Completable concatMapCompletable(Function<? super T, ? extends CompletableSource> mapper, int prefetch) {
6945+
ObjectHelper.requireNonNull(mapper, "mapper is null");
6946+
ObjectHelper.verifyPositive(prefetch, "prefetch");
6947+
return RxJavaPlugins.onAssembly(new FlowableConcatMapCompletable<T>(this, mapper, ErrorMode.IMMEDIATE, prefetch));
6948+
}
6949+
6950+
/**
6951+
* Maps the upstream intems into {@link CompletableSource}s and subscribes to them one after the
6952+
* other terminates, delaying all errors till both this {@code Flowable} and all
6953+
* inner {@code CompletableSource}s terminate.
6954+
* <p>
6955+
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatMap.png" alt="">
6956+
* <dl>
6957+
* <dt><b>Backpressure:</b></dt>
6958+
* <dd>The operator expects the upstream to support backpressure. If this {@code Flowable} violates the rule, the operator will
6959+
* signal a {@code MissingBackpressureException}.</dd>
6960+
* <dt><b>Scheduler:</b></dt>
6961+
* <dd>{@code concatMapCompletableDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
6962+
* </dl>
6963+
* @param mapper the function called with the upstream item and should return
6964+
* a {@code CompletableSource} to become the next source to
6965+
* be subscribed to
6966+
* @return a new Completable instance
6967+
* @since 2.1.11 - experimental
6968+
* @see #concatMapCompletable(Function, int)
6969+
*/
6970+
@CheckReturnValue
6971+
@SchedulerSupport(SchedulerSupport.NONE)
6972+
@BackpressureSupport(BackpressureKind.FULL)
6973+
@Experimental
6974+
public final Completable concatMapCompletableDelayError(Function<? super T, ? extends CompletableSource> mapper) {
6975+
return concatMapCompletableDelayError(mapper, true, 2);
6976+
}
6977+
6978+
/**
6979+
* Maps the upstream intems into {@link CompletableSource}s and subscribes to them one after the
6980+
* other terminates, optionally delaying all errors till both this {@code Flowable} and all
6981+
* inner {@code CompletableSource}s terminate.
6982+
* <p>
6983+
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatMap.png" alt="">
6984+
* <dl>
6985+
* <dt><b>Backpressure:</b></dt>
6986+
* <dd>The operator expects the upstream to support backpressure. If this {@code Flowable} violates the rule, the operator will
6987+
* signal a {@code MissingBackpressureException}.</dd>
6988+
* <dt><b>Scheduler:</b></dt>
6989+
* <dd>{@code concatMapCompletableDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
6990+
* </dl>
6991+
* @param mapper the function called with the upstream item and should return
6992+
* a {@code CompletableSource} to become the next source to
6993+
* be subscribed to
6994+
* @param tillTheEnd If {@code true}, errors from this {@code Flowable} or any of the
6995+
* inner {@code CompletableSource}s are delayed until all
6996+
* of them terminate. If {@code false}, an error from this
6997+
* {@code Flowable} is delayed until the current inner
6998+
* {@code CompletableSource} terminates and only then is
6999+
* it emitted to the downstream.
7000+
* @return a new Completable instance
7001+
* @since 2.1.11 - experimental
7002+
* @see #concatMapCompletable(Function)
7003+
*/
7004+
@CheckReturnValue
7005+
@SchedulerSupport(SchedulerSupport.NONE)
7006+
@BackpressureSupport(BackpressureKind.FULL)
7007+
@Experimental
7008+
public final Completable concatMapCompletableDelayError(Function<? super T, ? extends CompletableSource> mapper, boolean tillTheEnd) {
7009+
return concatMapCompletableDelayError(mapper, tillTheEnd, 2);
7010+
}
7011+
7012+
/**
7013+
* Maps the upstream intems into {@link CompletableSource}s and subscribes to them one after the
7014+
* other terminates, optionally delaying all errors till both this {@code Flowable} and all
7015+
* inner {@code CompletableSource}s terminate.
7016+
* <p>
7017+
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatMap.png" alt="">
7018+
* <dl>
7019+
* <dt><b>Backpressure:</b></dt>
7020+
* <dd>The operator expects the upstream to support backpressure. If this {@code Flowable} violates the rule, the operator will
7021+
* signal a {@code MissingBackpressureException}.</dd>
7022+
* <dt><b>Scheduler:</b></dt>
7023+
* <dd>{@code concatMapCompletableDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
7024+
* </dl>
7025+
* @param mapper the function called with the upstream item and should return
7026+
* a {@code CompletableSource} to become the next source to
7027+
* be subscribed to
7028+
* @param tillTheEnd If {@code true}, errors from this {@code Flowable} or any of the
7029+
* inner {@code CompletableSource}s are delayed until all
7030+
* of them terminate. If {@code false}, an error from this
7031+
* {@code Flowable} is delayed until the current inner
7032+
* {@code CompletableSource} terminates and only then is
7033+
* it emitted to the downstream.
7034+
* @param prefetch The number of upstream items to prefetch so that fresh items are
7035+
* ready to be mapped when a previous {@code CompletableSource} terminates.
7036+
* The operator replenishes after half of the prefetch amount has been consumed
7037+
* and turned into {@code CompletableSource}s.
7038+
* @return a new Completable instance
7039+
* @since 2.1.11 - experimental
7040+
* @see #concatMapCompletable(Function, int)
7041+
*/
7042+
@CheckReturnValue
7043+
@SchedulerSupport(SchedulerSupport.NONE)
7044+
@BackpressureSupport(BackpressureKind.FULL)
7045+
@Experimental
7046+
public final Completable concatMapCompletableDelayError(Function<? super T, ? extends CompletableSource> mapper, boolean tillTheEnd, int prefetch) {
7047+
ObjectHelper.requireNonNull(mapper, "mapper is null");
7048+
ObjectHelper.verifyPositive(prefetch, "prefetch");
7049+
return RxJavaPlugins.onAssembly(new FlowableConcatMapCompletable<T>(this, mapper, tillTheEnd ? ErrorMode.END : ErrorMode.BOUNDARY, prefetch));
7050+
}
7051+
68897052
/**
68907053
* Maps each of the items into a Publisher, subscribes to them one after the other,
68917054
* one at a time and emits their values in order

0 commit comments

Comments
 (0)