Skip to content

Commit 9e9d31c

Browse files
authoredJun 24, 2019
3.x: Add concatMap with Scheduler guaranteeing where the mapper runs (#6538)
1 parent 94ae795 commit 9e9d31c

File tree

7 files changed

+3404
-2
lines changed

7 files changed

+3404
-2
lines changed
 

‎src/main/java/io/reactivex/Flowable.java

+110-1
Original file line numberDiff line numberDiff line change
@@ -7280,6 +7280,10 @@ public final <R> Flowable<R> compose(FlowableTransformer<? super T, ? extends R>
72807280
* that result from concatenating those resulting Publishers.
72817281
* <p>
72827282
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatMap.png" alt="">
7283+
* <p>
7284+
* Note that there is no guarantee where the given {@code mapper} function will be executed; it could be on the subscribing thread,
7285+
* on the upstream thread signaling the new item to be mapped or on the thread where the inner source terminates. To ensure
7286+
* the {@code mapper} function is confined to a known thread, use the {@link #concatMap(Function, int, Scheduler)} overload.
72837287
* <dl>
72847288
* <dt><b>Backpressure:</b></dt>
72857289
* <dd>The operator honors backpressure from downstream. Both this and the inner {@code Publisher}s are
@@ -7312,6 +7316,10 @@ public final <R> Flowable<R> concatMap(Function<? super T, ? extends Publisher<?
73127316
* that result from concatenating those resulting Publishers.
73137317
* <p>
73147318
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatMap.png" alt="">
7319+
* <p>
7320+
* Note that there is no guarantee where the given {@code mapper} function will be executed; it could be on the subscribing thread,
7321+
* on the upstream thread signaling the new item to be mapped or on the thread where the inner source terminates. To ensure
7322+
* the {@code mapper} function is confined to a known thread, use the {@link #concatMap(Function, int, Scheduler)} overload.
73157323
* <dl>
73167324
* <dt><b>Backpressure:</b></dt>
73177325
* <dd>The operator honors backpressure from downstream. Both this and the inner {@code Publisher}s are
@@ -7332,6 +7340,7 @@ public final <R> Flowable<R> concatMap(Function<? super T, ? extends Publisher<?
73327340
* @return a Flowable that emits the result of applying the transformation function to each item emitted
73337341
* by the source Publisher and concatenating the Publishers obtained from this transformation
73347342
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
7343+
* @see #concatMap(Function, int, Scheduler)
73357344
*/
73367345
@CheckReturnValue
73377346
@NonNull
@@ -7351,6 +7360,52 @@ public final <R> Flowable<R> concatMap(Function<? super T, ? extends Publisher<?
73517360
return RxJavaPlugins.onAssembly(new FlowableConcatMap<T, R>(this, mapper, prefetch, ErrorMode.IMMEDIATE));
73527361
}
73537362

7363+
/**
7364+
* Returns a new Flowable that emits items resulting from applying a function (on a designated scheduler)
7365+
* that you supply to each item emitted by the source Publisher, where that function returns a Publisher, and then emitting the items
7366+
* that result from concatenating those resulting Publishers.
7367+
* <p>
7368+
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatMap.png" alt="">
7369+
* <p>
7370+
* The difference between {@link #concatMap(Function, int)} and this operator is that this operator guarantees the {@code mapper}
7371+
* function is executed on the specified scheduler.
7372+
* <dl>
7373+
* <dt><b>Backpressure:</b></dt>
7374+
* <dd>The operator honors backpressure from downstream. Both this and the inner {@code Publisher}s are
7375+
* expected to honor backpressure as well. If the source {@code Publisher} violates the rule, the operator will
7376+
* signal a {@code MissingBackpressureException}. If any of the inner {@code Publisher}s doesn't honor
7377+
* backpressure, that <em>may</em> throw an {@code IllegalStateException} when that
7378+
* {@code Publisher} completes.</dd>
7379+
* <dt><b>Scheduler:</b></dt>
7380+
* <dd>{@code concatMap} executes the given {@code mapper} function on the provided {@link Scheduler}.</dd>
7381+
* </dl>
7382+
*
7383+
* @param <R> the type of the inner Publisher sources and thus the output type
7384+
* @param mapper
7385+
* a function that, when applied to an item emitted by the source Publisher, returns a
7386+
* Publisher
7387+
* @param prefetch
7388+
* the number of elements to prefetch from the current Flowable
7389+
* @param scheduler
7390+
* the scheduler where the {@code mapper} function will be executed
7391+
* @return a Flowable that emits the result of applying the transformation function to each item emitted
7392+
* by the source Publisher and concatenating the Publishers obtained from this transformation
7393+
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
7394+
* @since 3.0.0
7395+
* @see #concatMap(Function, int)
7396+
* @see #concatMapDelayError(Function, int, boolean, Scheduler)
7397+
*/
7398+
@CheckReturnValue
7399+
@NonNull
7400+
@BackpressureSupport(BackpressureKind.FULL)
7401+
@SchedulerSupport(SchedulerSupport.CUSTOM)
7402+
public final <R> Flowable<R> concatMap(Function<? super T, ? extends Publisher<? extends R>> mapper, int prefetch, Scheduler scheduler) {
7403+
ObjectHelper.requireNonNull(mapper, "mapper is null");
7404+
ObjectHelper.verifyPositive(prefetch, "prefetch");
7405+
ObjectHelper.requireNonNull(scheduler, "scheduler");
7406+
return RxJavaPlugins.onAssembly(new FlowableConcatMapScheduler<T, R>(this, mapper, prefetch, ErrorMode.IMMEDIATE, scheduler));
7407+
}
7408+
73547409
/**
73557410
* Maps the upstream items into {@link CompletableSource}s and subscribes to them one after the
73567411
* other completes.
@@ -7520,7 +7575,10 @@ public final Completable concatMapCompletableDelayError(Function<? super T, ? ex
75207575
* one at a time and emits their values in order
75217576
* while delaying any error from either this or any of the inner Publishers
75227577
* till all of them terminate.
7523-
*
7578+
* <p>
7579+
* Note that there is no guarantee where the given {@code mapper} function will be executed; it could be on the subscribing thread,
7580+
* on the upstream thread signaling the new item to be mapped or on the thread where the inner source terminates. To ensure
7581+
* the {@code mapper} function is confined to a known thread, use the {@link #concatMapDelayError(Function, int, boolean, Scheduler)} overload.
75247582
* <dl>
75257583
* <dt><b>Backpressure:</b></dt>
75267584
* <dd>The operator honors backpressure from downstream. Both this and the inner {@code Publisher}s are
@@ -7535,6 +7593,7 @@ public final Completable concatMapCompletableDelayError(Function<? super T, ? ex
75357593
* @param <R> the result value type
75367594
* @param mapper the function that maps the items of this Publisher into the inner Publishers.
75377595
* @return the new Publisher instance with the concatenation behavior
7596+
* @see #concatMapDelayError(Function, int, boolean, Scheduler)
75387597
*/
75397598
@CheckReturnValue
75407599
@BackpressureSupport(BackpressureKind.FULL)
@@ -7548,6 +7607,10 @@ public final <R> Flowable<R> concatMapDelayError(Function<? super T, ? extends P
75487607
* one at a time and emits their values in order
75497608
* while delaying any error from either this or any of the inner Publishers
75507609
* till all of them terminate.
7610+
* <p>
7611+
* Note that there is no guarantee where the given {@code mapper} function will be executed; it could be on the subscribing thread,
7612+
* on the upstream thread signaling the new item to be mapped or on the thread where the inner source terminates. To ensure
7613+
* the {@code mapper} function is confined to a known thread, use the {@link #concatMapDelayError(Function, int, boolean, Scheduler)} overload.
75517614
*
75527615
* <dl>
75537616
* <dt><b>Backpressure:</b></dt>
@@ -7568,6 +7631,7 @@ public final <R> Flowable<R> concatMapDelayError(Function<? super T, ? extends P
75687631
* if true, all errors from the outer and inner Publisher sources are delayed until the end,
75697632
* if false, an error from the main source is signaled when the current Publisher source terminates
75707633
* @return the new Publisher instance with the concatenation behavior
7634+
* @see #concatMapDelayError(Function, int, boolean, Scheduler)
75717635
*/
75727636
@CheckReturnValue
75737637
@NonNull
@@ -7588,6 +7652,51 @@ public final <R> Flowable<R> concatMapDelayError(Function<? super T, ? extends P
75887652
return RxJavaPlugins.onAssembly(new FlowableConcatMap<T, R>(this, mapper, prefetch, tillTheEnd ? ErrorMode.END : ErrorMode.BOUNDARY));
75897653
}
75907654

7655+
/**
7656+
* Maps each of the upstream items into a Publisher, subscribes to them one after the other,
7657+
* one at a time and emits their values in order
7658+
* while executing the mapper function on the designated scheduler, delaying any error from either this or any of the
7659+
* inner Publishers till all of them terminate.
7660+
* <p>
7661+
* The difference between {@link #concatMapDelayError(Function, int, boolean)} and this operator is that this operator guarantees the {@code mapper}
7662+
* function is executed on the specified scheduler.
7663+
*
7664+
* <dl>
7665+
* <dt><b>Backpressure:</b></dt>
7666+
* <dd>The operator honors backpressure from downstream. Both this and the inner {@code Publisher}s are
7667+
* expected to honor backpressure as well. If the source {@code Publisher} violates the rule, the operator will
7668+
* signal a {@code MissingBackpressureException}. If any of the inner {@code Publisher}s doesn't honor
7669+
* backpressure, that <em>may</em> throw an {@code IllegalStateException} when that
7670+
* {@code Publisher} completes.</dd>
7671+
* <dt><b>Scheduler:</b></dt>
7672+
* <dd>{@code concatMapDelayError} executes the given {@code mapper} function on the provided {@link Scheduler}.</dd>
7673+
* </dl>
7674+
*
7675+
* @param <R> the result value type
7676+
* @param mapper the function that maps the items of this Publisher into the inner Publishers.
7677+
* @param prefetch
7678+
* the number of elements to prefetch from the current Flowable
7679+
* @param tillTheEnd
7680+
* if true, all errors from the outer and inner Publisher sources are delayed until the end,
7681+
* if false, an error from the main source is signaled when the current Publisher source terminates
7682+
* @param scheduler
7683+
* the scheduler where the {@code mapper} function will be executed
7684+
* @return the new Publisher instance with the concatenation behavior
7685+
* @see #concatMapDelayError(Function, int, boolean)
7686+
* @since 3.0.0
7687+
*/
7688+
@CheckReturnValue
7689+
@NonNull
7690+
@BackpressureSupport(BackpressureKind.FULL)
7691+
@SchedulerSupport(SchedulerSupport.CUSTOM)
7692+
public final <R> Flowable<R> concatMapDelayError(Function<? super T, ? extends Publisher<? extends R>> mapper,
7693+
int prefetch, boolean tillTheEnd, Scheduler scheduler) {
7694+
ObjectHelper.requireNonNull(mapper, "mapper is null");
7695+
ObjectHelper.verifyPositive(prefetch, "prefetch");
7696+
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
7697+
return RxJavaPlugins.onAssembly(new FlowableConcatMapScheduler<T, R>(this, mapper, prefetch, tillTheEnd ? ErrorMode.END : ErrorMode.BOUNDARY, scheduler));
7698+
}
7699+
75917700
/**
75927701
* Maps a sequence of values into Publishers and concatenates these Publishers eagerly into a single
75937702
* Publisher.

0 commit comments

Comments
 (0)