Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.x: Add concatMap with Scheduler guaranteeing where the mapper runs #6538

Merged
merged 1 commit into from
Jun 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 110 additions & 1 deletion src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -7280,6 +7280,10 @@ public final <R> Flowable<R> compose(FlowableTransformer<? super T, ? extends R>
* that result from concatenating those resulting Publishers.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatMap.png" alt="">
* <p>
* Note that there is no guarantee where the given {@code mapper} function will be executed; it could be on the subscribing thread,
* on the upstream thread signaling the new item to be mapped or on the thread where the inner source terminates. To ensure
* the {@code mapper} function is confined to a known thread, use the {@link #concatMap(Function, int, Scheduler)} overload.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream. Both this and the inner {@code Publisher}s are
Expand Down Expand Up @@ -7312,6 +7316,10 @@ public final <R> Flowable<R> concatMap(Function<? super T, ? extends Publisher<?
* that result from concatenating those resulting Publishers.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatMap.png" alt="">
* <p>
* Note that there is no guarantee where the given {@code mapper} function will be executed; it could be on the subscribing thread,
* on the upstream thread signaling the new item to be mapped or on the thread where the inner source terminates. To ensure
* the {@code mapper} function is confined to a known thread, use the {@link #concatMap(Function, int, Scheduler)} overload.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream. Both this and the inner {@code Publisher}s are
Expand All @@ -7332,6 +7340,7 @@ public final <R> Flowable<R> concatMap(Function<? super T, ? extends Publisher<?
* @return a Flowable that emits the result of applying the transformation function to each item emitted
* by the source Publisher and concatenating the Publishers obtained from this transformation
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
* @see #concatMap(Function, int, Scheduler)
*/
@CheckReturnValue
@NonNull
Expand All @@ -7351,6 +7360,52 @@ public final <R> Flowable<R> concatMap(Function<? super T, ? extends Publisher<?
return RxJavaPlugins.onAssembly(new FlowableConcatMap<T, R>(this, mapper, prefetch, ErrorMode.IMMEDIATE));
}

/**
* Returns a new Flowable that emits items resulting from applying a function (on a designated scheduler)
* that you supply to each item emitted by the source Publisher, where that function returns a Publisher, and then emitting the items
* that result from concatenating those resulting Publishers.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatMap.png" alt="">
* <p>
* The difference between {@link #concatMap(Function, int)} and this operator is that this operator guarantees the {@code mapper}
* function is executed on the specified scheduler.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream. Both this and the inner {@code Publisher}s are
* expected to honor backpressure as well. If the source {@code Publisher} violates the rule, the operator will
* signal a {@code MissingBackpressureException}. If any of the inner {@code Publisher}s doesn't honor
* backpressure, that <em>may</em> throw an {@code IllegalStateException} when that
* {@code Publisher} completes.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concatMap} executes the given {@code mapper} function on the provided {@link Scheduler}.</dd>
* </dl>
*
* @param <R> the type of the inner Publisher sources and thus the output type
* @param mapper
* a function that, when applied to an item emitted by the source Publisher, returns a
* Publisher
* @param prefetch
* the number of elements to prefetch from the current Flowable
* @param scheduler
* the scheduler where the {@code mapper} function will be executed
* @return a Flowable that emits the result of applying the transformation function to each item emitted
* by the source Publisher and concatenating the Publishers obtained from this transformation
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
* @since 3.0.0
* @see #concatMap(Function, int)
* @see #concatMapDelayError(Function, int, boolean, Scheduler)
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final <R> Flowable<R> concatMap(Function<? super T, ? extends Publisher<? extends R>> mapper, int prefetch, Scheduler scheduler) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
ObjectHelper.verifyPositive(prefetch, "prefetch");
ObjectHelper.requireNonNull(scheduler, "scheduler");
return RxJavaPlugins.onAssembly(new FlowableConcatMapScheduler<T, R>(this, mapper, prefetch, ErrorMode.IMMEDIATE, scheduler));
}

/**
* Maps the upstream items into {@link CompletableSource}s and subscribes to them one after the
* other completes.
Expand Down Expand Up @@ -7520,7 +7575,10 @@ public final Completable concatMapCompletableDelayError(Function<? super T, ? ex
* one at a time and emits their values in order
* while delaying any error from either this or any of the inner Publishers
* till all of them terminate.
*
* <p>
* Note that there is no guarantee where the given {@code mapper} function will be executed; it could be on the subscribing thread,
* on the upstream thread signaling the new item to be mapped or on the thread where the inner source terminates. To ensure
* the {@code mapper} function is confined to a known thread, use the {@link #concatMapDelayError(Function, int, boolean, Scheduler)} overload.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream. Both this and the inner {@code Publisher}s are
Expand All @@ -7535,6 +7593,7 @@ public final Completable concatMapCompletableDelayError(Function<? super T, ? ex
* @param <R> the result value type
* @param mapper the function that maps the items of this Publisher into the inner Publishers.
* @return the new Publisher instance with the concatenation behavior
* @see #concatMapDelayError(Function, int, boolean, Scheduler)
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
Expand All @@ -7548,6 +7607,10 @@ public final <R> Flowable<R> concatMapDelayError(Function<? super T, ? extends P
* one at a time and emits their values in order
* while delaying any error from either this or any of the inner Publishers
* till all of them terminate.
* <p>
* Note that there is no guarantee where the given {@code mapper} function will be executed; it could be on the subscribing thread,
* on the upstream thread signaling the new item to be mapped or on the thread where the inner source terminates. To ensure
* the {@code mapper} function is confined to a known thread, use the {@link #concatMapDelayError(Function, int, boolean, Scheduler)} overload.
*
* <dl>
* <dt><b>Backpressure:</b></dt>
Expand All @@ -7568,6 +7631,7 @@ public final <R> Flowable<R> concatMapDelayError(Function<? super T, ? extends P
* if true, all errors from the outer and inner Publisher sources are delayed until the end,
* if false, an error from the main source is signaled when the current Publisher source terminates
* @return the new Publisher instance with the concatenation behavior
* @see #concatMapDelayError(Function, int, boolean, Scheduler)
*/
@CheckReturnValue
@NonNull
Expand All @@ -7588,6 +7652,51 @@ public final <R> Flowable<R> concatMapDelayError(Function<? super T, ? extends P
return RxJavaPlugins.onAssembly(new FlowableConcatMap<T, R>(this, mapper, prefetch, tillTheEnd ? ErrorMode.END : ErrorMode.BOUNDARY));
}

/**
* Maps each of the upstream items into a Publisher, subscribes to them one after the other,
* one at a time and emits their values in order
* while executing the mapper function on the designated scheduler, delaying any error from either this or any of the
* inner Publishers till all of them terminate.
* <p>
* The difference between {@link #concatMapDelayError(Function, int, boolean)} and this operator is that this operator guarantees the {@code mapper}
* function is executed on the specified scheduler.
*
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream. Both this and the inner {@code Publisher}s are
* expected to honor backpressure as well. If the source {@code Publisher} violates the rule, the operator will
* signal a {@code MissingBackpressureException}. If any of the inner {@code Publisher}s doesn't honor
* backpressure, that <em>may</em> throw an {@code IllegalStateException} when that
* {@code Publisher} completes.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concatMapDelayError} executes the given {@code mapper} function on the provided {@link Scheduler}.</dd>
* </dl>
*
* @param <R> the result value type
* @param mapper the function that maps the items of this Publisher into the inner Publishers.
* @param prefetch
* the number of elements to prefetch from the current Flowable
* @param tillTheEnd
* if true, all errors from the outer and inner Publisher sources are delayed until the end,
* if false, an error from the main source is signaled when the current Publisher source terminates
* @param scheduler
* the scheduler where the {@code mapper} function will be executed
* @return the new Publisher instance with the concatenation behavior
* @see #concatMapDelayError(Function, int, boolean)
* @since 3.0.0
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final <R> Flowable<R> concatMapDelayError(Function<? super T, ? extends Publisher<? extends R>> mapper,
int prefetch, boolean tillTheEnd, Scheduler scheduler) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
ObjectHelper.verifyPositive(prefetch, "prefetch");
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new FlowableConcatMapScheduler<T, R>(this, mapper, prefetch, tillTheEnd ? ErrorMode.END : ErrorMode.BOUNDARY, scheduler));
}

/**
* Maps a sequence of values into Publishers and concatenates these Publishers eagerly into a single
* Publisher.
Expand Down
Loading