From 00ce78b19d07b2a8dac3924810832248d059dd6d Mon Sep 17 00:00:00 2001 From: akarnokd Date: Mon, 24 Jun 2019 14:34:37 +0200 Subject: [PATCH] 3.x: Add concatMap with Scheduler guaranteeing where the mapper runs --- src/main/java/io/reactivex/Flowable.java | 111 +- src/main/java/io/reactivex/Observable.java | 90 ++ .../flowable/FlowableConcatMapScheduler.java | 553 +++++++++ .../ObservableConcatMapScheduler.java | 557 +++++++++ .../FlowableConcatMapSchedulerTest.java | 1078 +++++++++++++++++ .../flowable/FlowableConcatTest.java | 2 +- .../ObservableConcatMapSchedulerTest.java | 1015 ++++++++++++++++ 7 files changed, 3404 insertions(+), 2 deletions(-) create mode 100644 src/main/java/io/reactivex/internal/operators/flowable/FlowableConcatMapScheduler.java create mode 100644 src/main/java/io/reactivex/internal/operators/observable/ObservableConcatMapScheduler.java create mode 100644 src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatMapSchedulerTest.java create mode 100644 src/test/java/io/reactivex/internal/operators/observable/ObservableConcatMapSchedulerTest.java diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index 1445198ccc..ae3d546037 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -7280,6 +7280,10 @@ public final Flowable compose(FlowableTransformer * that result from concatenating those resulting Publishers. *

* + *

+ * Note that there is no guarantee where the given {@code mapper} function will be executed; it could be on the subscribing thread, + * on the upstream thread signaling the new item to be mapped or on the thread where the inner source terminates. To ensure + * the {@code mapper} function is confined to a known thread, use the {@link #concatMap(Function, int, Scheduler)} overload. *

*
Backpressure:
*
The operator honors backpressure from downstream. Both this and the inner {@code Publisher}s are @@ -7312,6 +7316,10 @@ public final Flowable concatMap(Function * + *

+ * Note that there is no guarantee where the given {@code mapper} function will be executed; it could be on the subscribing thread, + * on the upstream thread signaling the new item to be mapped or on the thread where the inner source terminates. To ensure + * the {@code mapper} function is confined to a known thread, use the {@link #concatMap(Function, int, Scheduler)} overload. *

*
Backpressure:
*
The operator honors backpressure from downstream. Both this and the inner {@code Publisher}s are @@ -7332,6 +7340,7 @@ public final Flowable concatMap(FunctionReactiveX operators documentation: FlatMap + * @see #concatMap(Function, int, Scheduler) */ @CheckReturnValue @NonNull @@ -7351,6 +7360,52 @@ public final Flowable concatMap(Function(this, mapper, prefetch, ErrorMode.IMMEDIATE)); } + /** + * Returns a new Flowable that emits items resulting from applying a function (on a designated scheduler) + * that you supply to each item emitted by the source Publisher, where that function returns a Publisher, and then emitting the items + * that result from concatenating those resulting Publishers. + *

+ * + *

+ * The difference between {@link #concatMap(Function, int)} and this operator is that this operator guarantees the {@code mapper} + * function is executed on the specified scheduler. + *

+ *
Backpressure:
+ *
The operator honors backpressure from downstream. Both this and the inner {@code Publisher}s are + * expected to honor backpressure as well. If the source {@code Publisher} violates the rule, the operator will + * signal a {@code MissingBackpressureException}. If any of the inner {@code Publisher}s doesn't honor + * backpressure, that may throw an {@code IllegalStateException} when that + * {@code Publisher} completes.
+ *
Scheduler:
+ *
{@code concatMap} executes the given {@code mapper} function on the provided {@link Scheduler}.
+ *
+ * + * @param the type of the inner Publisher sources and thus the output type + * @param mapper + * a function that, when applied to an item emitted by the source Publisher, returns a + * Publisher + * @param prefetch + * the number of elements to prefetch from the current Flowable + * @param scheduler + * the scheduler where the {@code mapper} function will be executed + * @return a Flowable that emits the result of applying the transformation function to each item emitted + * by the source Publisher and concatenating the Publishers obtained from this transformation + * @see ReactiveX operators documentation: FlatMap + * @since 3.0.0 + * @see #concatMap(Function, int) + * @see #concatMapDelayError(Function, int, boolean, Scheduler) + */ + @CheckReturnValue + @NonNull + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.CUSTOM) + public final Flowable concatMap(Function> mapper, int prefetch, Scheduler scheduler) { + ObjectHelper.requireNonNull(mapper, "mapper is null"); + ObjectHelper.verifyPositive(prefetch, "prefetch"); + ObjectHelper.requireNonNull(scheduler, "scheduler"); + return RxJavaPlugins.onAssembly(new FlowableConcatMapScheduler(this, mapper, prefetch, ErrorMode.IMMEDIATE, scheduler)); + } + /** * Maps the upstream items into {@link CompletableSource}s and subscribes to them one after the * other completes. @@ -7520,7 +7575,10 @@ public final Completable concatMapCompletableDelayError(Function + * Note that there is no guarantee where the given {@code mapper} function will be executed; it could be on the subscribing thread, + * on the upstream thread signaling the new item to be mapped or on the thread where the inner source terminates. To ensure + * the {@code mapper} function is confined to a known thread, use the {@link #concatMapDelayError(Function, int, boolean, Scheduler)} overload. *
*
Backpressure:
*
The operator honors backpressure from downstream. Both this and the inner {@code Publisher}s are @@ -7535,6 +7593,7 @@ public final Completable concatMapCompletableDelayError(Function the result value type * @param mapper the function that maps the items of this Publisher into the inner Publishers. * @return the new Publisher instance with the concatenation behavior + * @see #concatMapDelayError(Function, int, boolean, Scheduler) */ @CheckReturnValue @BackpressureSupport(BackpressureKind.FULL) @@ -7548,6 +7607,10 @@ public final Flowable concatMapDelayError(Function + * Note that there is no guarantee where the given {@code mapper} function will be executed; it could be on the subscribing thread, + * on the upstream thread signaling the new item to be mapped or on the thread where the inner source terminates. To ensure + * the {@code mapper} function is confined to a known thread, use the {@link #concatMapDelayError(Function, int, boolean, Scheduler)} overload. * *
*
Backpressure:
@@ -7568,6 +7631,7 @@ public final Flowable concatMapDelayError(Function Flowable concatMapDelayError(Function(this, mapper, prefetch, tillTheEnd ? ErrorMode.END : ErrorMode.BOUNDARY)); } + /** + * Maps each of the upstream items into a Publisher, subscribes to them one after the other, + * one at a time and emits their values in order + * while executing the mapper function on the designated scheduler, delaying any error from either this or any of the + * inner Publishers till all of them terminate. + *

+ * The difference between {@link #concatMapDelayError(Function, int, boolean)} and this operator is that this operator guarantees the {@code mapper} + * function is executed on the specified scheduler. + * + *

+ *
Backpressure:
+ *
The operator honors backpressure from downstream. Both this and the inner {@code Publisher}s are + * expected to honor backpressure as well. If the source {@code Publisher} violates the rule, the operator will + * signal a {@code MissingBackpressureException}. If any of the inner {@code Publisher}s doesn't honor + * backpressure, that may throw an {@code IllegalStateException} when that + * {@code Publisher} completes.
+ *
Scheduler:
+ *
{@code concatMapDelayError} executes the given {@code mapper} function on the provided {@link Scheduler}.
+ *
+ * + * @param the result value type + * @param mapper the function that maps the items of this Publisher into the inner Publishers. + * @param prefetch + * the number of elements to prefetch from the current Flowable + * @param tillTheEnd + * if true, all errors from the outer and inner Publisher sources are delayed until the end, + * if false, an error from the main source is signaled when the current Publisher source terminates + * @param scheduler + * the scheduler where the {@code mapper} function will be executed + * @return the new Publisher instance with the concatenation behavior + * @see #concatMapDelayError(Function, int, boolean) + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.CUSTOM) + public final Flowable concatMapDelayError(Function> mapper, + int prefetch, boolean tillTheEnd, Scheduler scheduler) { + ObjectHelper.requireNonNull(mapper, "mapper is null"); + ObjectHelper.verifyPositive(prefetch, "prefetch"); + ObjectHelper.requireNonNull(scheduler, "scheduler is null"); + return RxJavaPlugins.onAssembly(new FlowableConcatMapScheduler(this, mapper, prefetch, tillTheEnd ? ErrorMode.END : ErrorMode.BOUNDARY, scheduler)); + } + /** * Maps a sequence of values into Publishers and concatenates these Publishers eagerly into a single * Publisher. diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java index 656e82369f..90f5375291 100644 --- a/src/main/java/io/reactivex/Observable.java +++ b/src/main/java/io/reactivex/Observable.java @@ -6448,6 +6448,10 @@ public final Observable compose(ObservableTransformer * + *

+ * Note that there is no guarantee where the given {@code mapper} function will be executed; it could be on the subscribing thread, + * on the upstream thread signaling the new item to be mapped or on the thread where the inner source terminates. To ensure + * the {@code mapper} function is confined to a known thread, use the {@link #concatMap(Function, int, Scheduler)} overload. *

*
Scheduler:
*
{@code concatMap} does not operate by default on a particular {@link Scheduler}.
@@ -6460,6 +6464,7 @@ public final Observable compose(ObservableTransformerReactiveX operators documentation: FlatMap + * @see #concatMap(Function, int, Scheduler) */ @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) @@ -6473,6 +6478,10 @@ public final Observable concatMap(Function * + *

+ * Note that there is no guarantee where the given {@code mapper} function will be executed; it could be on the subscribing thread, + * on the upstream thread signaling the new item to be mapped or on the thread where the inner source terminates. To ensure + * the {@code mapper} function is confined to a known thread, use the {@link #concatMap(Function, int, Scheduler)} overload. *

*
Scheduler:
*
{@code concatMap} does not operate by default on a particular {@link Scheduler}.
@@ -6487,6 +6496,7 @@ public final Observable concatMap(FunctionReactiveX operators documentation: FlatMap + * @see #concatMap(Function, int, Scheduler) */ @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) @@ -6504,6 +6514,41 @@ public final Observable concatMap(Function(this, mapper, prefetch, ErrorMode.IMMEDIATE)); } + /** + * Returns a new Observable that emits items resulting from applying a function that you supply to each item + * emitted by the source ObservableSource, where that function returns an ObservableSource, and then emitting the items + * that result from concatenating those resulting ObservableSources. + *

+ * + *

+ * The difference between {@link #concatMap(Function, int)} and this operator is that this operator guarantees the {@code mapper} + * function is executed on the specified scheduler. + *

+ *
Scheduler:
+ *
{@code concatMap} executes the given {@code mapper} function on the provided {@link Scheduler}.
+ *
+ * + * @param the type of the inner ObservableSource sources and thus the output type + * @param mapper + * a function that, when applied to an item emitted by the source ObservableSource, returns an + * ObservableSource + * @param prefetch + * the number of elements to prefetch from the current Observable + * @param scheduler + * the scheduler where the {@code mapper} function will be executed + * @return an Observable that emits the result of applying the transformation function to each item emitted + * by the source ObservableSource and concatenating the ObservableSources obtained from this transformation + * @see ReactiveX operators documentation: FlatMap + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.CUSTOM) + public final Observable concatMap(Function> mapper, int prefetch, Scheduler scheduler) { + ObjectHelper.requireNonNull(mapper, "mapper is null"); + ObjectHelper.verifyPositive(prefetch, "prefetch"); + ObjectHelper.requireNonNull(scheduler, "scheduler is null"); + return RxJavaPlugins.onAssembly(new ObservableConcatMapScheduler(this, mapper, prefetch, ErrorMode.IMMEDIATE, scheduler)); + } + /** * Maps each of the items into an ObservableSource, subscribes to them one after the other, * one at a time and emits their values in order @@ -6511,6 +6556,10 @@ public final Observable concatMap(Function * + *

+ * Note that there is no guarantee where the given {@code mapper} function will be executed; it could be on the subscribing thread, + * on the upstream thread signaling the new item to be mapped or on the thread where the inner source terminates. To ensure + * the {@code mapper} function is confined to a known thread, use the {@link #concatMapDelayError(Function, int, boolean, Scheduler)} overload. *

*
Scheduler:
*
{@code concatMapDelayError} does not operate by default on a particular {@link Scheduler}.
@@ -6519,6 +6568,7 @@ public final Observable concatMap(Function the result value type * @param mapper the function that maps the items of this ObservableSource into the inner ObservableSources. * @return the new ObservableSource instance with the concatenation behavior + * @see #concatMapDelayError(Function, int, boolean, Scheduler) */ @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) @@ -6533,6 +6583,10 @@ public final Observable concatMapDelayError(Function * + *

+ * Note that there is no guarantee where the given {@code mapper} function will be executed; it could be on the subscribing thread, + * on the upstream thread signaling the new item to be mapped or on the thread where the inner source terminates. To ensure + * the {@code mapper} function is confined to a known thread, use the {@link #concatMapDelayError(Function, int, boolean, Scheduler)} overload. *

*
Scheduler:
*
{@code concatMapDelayError} does not operate by default on a particular {@link Scheduler}.
@@ -6546,6 +6600,7 @@ public final Observable concatMapDelayError(Function Observable concatMapDelayError(Function(this, mapper, prefetch, tillTheEnd ? ErrorMode.END : ErrorMode.BOUNDARY)); } + /** + * Maps each of the items into an ObservableSource, subscribes to them one after the other, + * one at a time and emits their values in order + * while delaying any error from either this or any of the inner ObservableSources + * till all of them terminate. + *

+ * + *

+ *
Scheduler:
+ *
{@code concatMapDelayError} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param the result value type + * @param mapper the function that maps the items of this ObservableSource into the inner ObservableSources. + * @param prefetch + * the number of elements to prefetch from the current Observable + * @param tillTheEnd + * if true, all errors from the outer and inner ObservableSource sources are delayed until the end, + * if false, an error from the main source is signalled when the current ObservableSource source terminates + * @param scheduler + * the scheduler where the {@code mapper} function will be executed + * @return the new ObservableSource instance with the concatenation behavior + * @see #concatMapDelayError(Function, int, boolean) + * @since 3.0.0 + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.CUSTOM) + public final Observable concatMapDelayError(Function> mapper, + int prefetch, boolean tillTheEnd, Scheduler scheduler) { + ObjectHelper.requireNonNull(mapper, "mapper is null"); + ObjectHelper.verifyPositive(prefetch, "prefetch"); + ObjectHelper.requireNonNull(scheduler, "scheduler is null"); + return RxJavaPlugins.onAssembly(new ObservableConcatMapScheduler(this, mapper, prefetch, tillTheEnd ? ErrorMode.END : ErrorMode.BOUNDARY, scheduler)); + } + /** * Maps a sequence of values into ObservableSources and concatenates these ObservableSources eagerly into a single * ObservableSource. diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableConcatMapScheduler.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableConcatMapScheduler.java new file mode 100644 index 0000000000..8b55a5b9d5 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableConcatMapScheduler.java @@ -0,0 +1,553 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ +package io.reactivex.internal.operators.flowable; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.reactivestreams.*; + +import io.reactivex.*; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.*; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.fuseable.*; +import io.reactivex.internal.operators.flowable.FlowableConcatMap.*; +import io.reactivex.internal.queue.SpscArrayQueue; +import io.reactivex.internal.subscriptions.SubscriptionHelper; +import io.reactivex.internal.util.*; +import io.reactivex.plugins.RxJavaPlugins; + +public final class FlowableConcatMapScheduler extends AbstractFlowableWithUpstream { + + final Function> mapper; + + final int prefetch; + + final ErrorMode errorMode; + + final Scheduler scheduler; + + public FlowableConcatMapScheduler(Flowable source, + Function> mapper, + int prefetch, ErrorMode errorMode, Scheduler scheduler) { + super(source); + this.mapper = mapper; + this.prefetch = prefetch; + this.errorMode = errorMode; + this.scheduler = scheduler; + } + + @Override + protected void subscribeActual(Subscriber s) { + switch (errorMode) { + case BOUNDARY: + source.subscribe(new ConcatMapDelayed(s, mapper, prefetch, false, scheduler.createWorker())); + break; + case END: + source.subscribe(new ConcatMapDelayed(s, mapper, prefetch, true, scheduler.createWorker())); + break; + default: + source.subscribe(new ConcatMapImmediate(s, mapper, prefetch, scheduler.createWorker())); + } + } + + abstract static class BaseConcatMapSubscriber + extends AtomicInteger + implements FlowableSubscriber, ConcatMapSupport, Subscription, Runnable { + + private static final long serialVersionUID = -3511336836796789179L; + + final ConcatMapInner inner; + + final Function> mapper; + + final int prefetch; + + final int limit; + + final Scheduler.Worker worker; + + Subscription upstream; + + int consumed; + + SimpleQueue queue; + + volatile boolean done; + + volatile boolean cancelled; + + final AtomicThrowable errors; + + volatile boolean active; + + int sourceMode; + + BaseConcatMapSubscriber( + Function> mapper, + int prefetch, Scheduler.Worker worker) { + this.mapper = mapper; + this.prefetch = prefetch; + this.limit = prefetch - (prefetch >> 2); + this.inner = new ConcatMapInner(this); + this.errors = new AtomicThrowable(); + this.worker = worker; + } + + @Override + public final void onSubscribe(Subscription s) { + if (SubscriptionHelper.validate(this.upstream, s)) { + this.upstream = s; + + if (s instanceof QueueSubscription) { + @SuppressWarnings("unchecked") QueueSubscription f = (QueueSubscription)s; + int m = f.requestFusion(QueueSubscription.ANY | QueueSubscription.BOUNDARY); + if (m == QueueSubscription.SYNC) { + sourceMode = m; + queue = f; + done = true; + + subscribeActual(); + + schedule(); + return; + } + if (m == QueueSubscription.ASYNC) { + sourceMode = m; + queue = f; + + subscribeActual(); + + s.request(prefetch); + return; + } + } + + queue = new SpscArrayQueue(prefetch); + + subscribeActual(); + + s.request(prefetch); + } + } + + abstract void schedule(); + + abstract void subscribeActual(); + + @Override + public final void onNext(T t) { + if (sourceMode != QueueSubscription.ASYNC) { + if (!queue.offer(t)) { + upstream.cancel(); + onError(new IllegalStateException("Queue full?!")); + return; + } + } + schedule(); + } + + @Override + public final void onComplete() { + done = true; + schedule(); + } + + @Override + public final void innerComplete() { + active = false; + schedule(); + } + + } + + static final class ConcatMapImmediate + extends BaseConcatMapSubscriber { + + private static final long serialVersionUID = 7898995095634264146L; + + final Subscriber downstream; + + final AtomicInteger wip; + + ConcatMapImmediate(Subscriber actual, + Function> mapper, + int prefetch, Scheduler.Worker worker) { + super(mapper, prefetch, worker); + this.downstream = actual; + this.wip = new AtomicInteger(); + } + + @Override + void subscribeActual() { + downstream.onSubscribe(this); + } + + @Override + public void onError(Throwable t) { + if (errors.addThrowable(t)) { + inner.cancel(); + + if (getAndIncrement() == 0) { + downstream.onError(errors.terminate()); + worker.dispose(); + } + } else { + RxJavaPlugins.onError(t); + } + } + + @Override + public void innerNext(R value) { + if (get() == 0 && compareAndSet(0, 1)) { + downstream.onNext(value); + if (compareAndSet(1, 0)) { + return; + } + downstream.onError(errors.terminate()); + worker.dispose(); + } + } + + @Override + public void innerError(Throwable e) { + if (errors.addThrowable(e)) { + upstream.cancel(); + + if (getAndIncrement() == 0) { + downstream.onError(errors.terminate()); + worker.dispose(); + } + } else { + RxJavaPlugins.onError(e); + } + } + + @Override + public void request(long n) { + inner.request(n); + } + + @Override + public void cancel() { + if (!cancelled) { + cancelled = true; + + inner.cancel(); + upstream.cancel(); + worker.dispose(); + } + } + + @Override + void schedule() { + if (wip.getAndIncrement() == 0) { + worker.schedule(this); + } + } + + @Override + public void run() { + for (;;) { + if (cancelled) { + return; + } + + if (!active) { + boolean d = done; + + T v; + + try { + v = queue.poll(); + } catch (Throwable e) { + Exceptions.throwIfFatal(e); + upstream.cancel(); + errors.addThrowable(e); + downstream.onError(errors.terminate()); + worker.dispose(); + return; + } + + boolean empty = v == null; + + if (d && empty) { + downstream.onComplete(); + worker.dispose(); + return; + } + + if (!empty) { + Publisher p; + + try { + p = ObjectHelper.requireNonNull(mapper.apply(v), "The mapper returned a null Publisher"); + } catch (Throwable e) { + Exceptions.throwIfFatal(e); + + upstream.cancel(); + errors.addThrowable(e); + downstream.onError(errors.terminate()); + worker.dispose(); + return; + } + + if (sourceMode != QueueSubscription.SYNC) { + int c = consumed + 1; + if (c == limit) { + consumed = 0; + upstream.request(c); + } else { + consumed = c; + } + } + + if (p instanceof Supplier) { + @SuppressWarnings("unchecked") + Supplier supplier = (Supplier) p; + + R vr; + + try { + vr = supplier.get(); + } catch (Throwable e) { + Exceptions.throwIfFatal(e); + upstream.cancel(); + errors.addThrowable(e); + downstream.onError(errors.terminate()); + worker.dispose(); + return; + } + + if (vr == null) { + continue; + } + + if (inner.isUnbounded()) { + if (get() == 0 && compareAndSet(0, 1)) { + downstream.onNext(vr); + if (!compareAndSet(1, 0)) { + downstream.onError(errors.terminate()); + worker.dispose(); + return; + } + } + continue; + } else { + active = true; + inner.setSubscription(new WeakScalarSubscription(vr, inner)); + } + + } else { + active = true; + p.subscribe(inner); + } + } + } + if (wip.decrementAndGet() == 0) { + break; + } + } + } + } + + static final class ConcatMapDelayed + extends BaseConcatMapSubscriber { + + private static final long serialVersionUID = -2945777694260521066L; + + final Subscriber downstream; + + final boolean veryEnd; + + ConcatMapDelayed(Subscriber actual, + Function> mapper, + int prefetch, boolean veryEnd, Scheduler.Worker worker) { + super(mapper, prefetch, worker); + this.downstream = actual; + this.veryEnd = veryEnd; + } + + @Override + void subscribeActual() { + downstream.onSubscribe(this); + } + + @Override + public void onError(Throwable t) { + if (errors.addThrowable(t)) { + done = true; + schedule(); + } else { + RxJavaPlugins.onError(t); + } + } + + @Override + public void innerNext(R value) { + downstream.onNext(value); + } + + @Override + public void innerError(Throwable e) { + if (errors.addThrowable(e)) { + if (!veryEnd) { + upstream.cancel(); + done = true; + } + active = false; + schedule(); + } else { + RxJavaPlugins.onError(e); + } + } + + @Override + public void request(long n) { + inner.request(n); + } + + @Override + public void cancel() { + if (!cancelled) { + cancelled = true; + + inner.cancel(); + upstream.cancel(); + worker.dispose(); + } + } + + @Override + void schedule() { + if (getAndIncrement() == 0) { + worker.schedule(this); + } + } + + @Override + public void run() { + + for (;;) { + if (cancelled) { + return; + } + + if (!active) { + + boolean d = done; + + if (d && !veryEnd) { + Throwable ex = errors.get(); + if (ex != null) { + downstream.onError(errors.terminate()); + worker.dispose(); + return; + } + } + + T v; + + try { + v = queue.poll(); + } catch (Throwable e) { + Exceptions.throwIfFatal(e); + upstream.cancel(); + errors.addThrowable(e); + downstream.onError(errors.terminate()); + worker.dispose(); + return; + } + + boolean empty = v == null; + + if (d && empty) { + Throwable ex = errors.terminate(); + if (ex != null) { + downstream.onError(ex); + } else { + downstream.onComplete(); + } + worker.dispose(); + return; + } + + if (!empty) { + Publisher p; + + try { + p = ObjectHelper.requireNonNull(mapper.apply(v), "The mapper returned a null Publisher"); + } catch (Throwable e) { + Exceptions.throwIfFatal(e); + + upstream.cancel(); + errors.addThrowable(e); + downstream.onError(errors.terminate()); + worker.dispose(); + return; + } + + if (sourceMode != QueueSubscription.SYNC) { + int c = consumed + 1; + if (c == limit) { + consumed = 0; + upstream.request(c); + } else { + consumed = c; + } + } + + if (p instanceof Supplier) { + @SuppressWarnings("unchecked") + Supplier supplier = (Supplier) p; + + R vr; + + try { + vr = supplier.get(); + } catch (Throwable e) { + Exceptions.throwIfFatal(e); + errors.addThrowable(e); + if (!veryEnd) { + upstream.cancel(); + downstream.onError(errors.terminate()); + worker.dispose(); + return; + } + vr = null; + } + + if (vr == null) { + continue; + } + + if (inner.isUnbounded()) { + downstream.onNext(vr); + continue; + } else { + active = true; + inner.setSubscription(new WeakScalarSubscription(vr, inner)); + } + } else { + active = true; + p.subscribe(inner); + } + } + } + if (decrementAndGet() == 0) { + break; + } + } + } + } +} diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableConcatMapScheduler.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableConcatMapScheduler.java new file mode 100644 index 0000000000..6338521fda --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableConcatMapScheduler.java @@ -0,0 +1,557 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ +package io.reactivex.internal.operators.observable; + +import java.util.concurrent.atomic.*; + +import io.reactivex.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.*; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.fuseable.*; +import io.reactivex.internal.queue.SpscLinkedArrayQueue; +import io.reactivex.internal.util.*; +import io.reactivex.observers.SerializedObserver; +import io.reactivex.plugins.RxJavaPlugins; + +public final class ObservableConcatMapScheduler extends AbstractObservableWithUpstream { + + final Function> mapper; + + final int bufferSize; + + final ErrorMode delayErrors; + + final Scheduler scheduler; + + public ObservableConcatMapScheduler(ObservableSource source, Function> mapper, + int bufferSize, ErrorMode delayErrors, Scheduler scheduler) { + super(source); + this.mapper = mapper; + this.delayErrors = delayErrors; + this.bufferSize = Math.max(8, bufferSize); + this.scheduler = scheduler; + } + + @Override + public void subscribeActual(Observer observer) { + if (delayErrors == ErrorMode.IMMEDIATE) { + SerializedObserver serial = new SerializedObserver(observer); + source.subscribe(new ConcatMapObserver(serial, mapper, bufferSize, scheduler.createWorker())); + } else { + source.subscribe(new ConcatMapDelayErrorObserver(observer, mapper, bufferSize, delayErrors == ErrorMode.END, scheduler.createWorker())); + } + } + + static final class ConcatMapObserver extends AtomicInteger implements Observer, Disposable, Runnable { + + private static final long serialVersionUID = 8828587559905699186L; + final Observer downstream; + final Function> mapper; + final InnerObserver inner; + final int bufferSize; + final Scheduler.Worker worker; + + SimpleQueue queue; + + Disposable upstream; + + volatile boolean active; + + volatile boolean disposed; + + volatile boolean done; + + int fusionMode; + + ConcatMapObserver(Observer actual, + Function> mapper, int bufferSize, Scheduler.Worker worker) { + this.downstream = actual; + this.mapper = mapper; + this.bufferSize = bufferSize; + this.inner = new InnerObserver(actual, this); + this.worker = worker; + } + + @Override + public void onSubscribe(Disposable d) { + if (DisposableHelper.validate(this.upstream, d)) { + this.upstream = d; + if (d instanceof QueueDisposable) { + @SuppressWarnings("unchecked") + QueueDisposable qd = (QueueDisposable) d; + + int m = qd.requestFusion(QueueDisposable.ANY); + if (m == QueueDisposable.SYNC) { + fusionMode = m; + queue = qd; + done = true; + + downstream.onSubscribe(this); + + drain(); + return; + } + + if (m == QueueDisposable.ASYNC) { + fusionMode = m; + queue = qd; + + downstream.onSubscribe(this); + + return; + } + } + + queue = new SpscLinkedArrayQueue(bufferSize); + + downstream.onSubscribe(this); + } + } + + @Override + public void onNext(T t) { + if (done) { + return; + } + if (fusionMode == QueueDisposable.NONE) { + queue.offer(t); + } + drain(); + } + + @Override + public void onError(Throwable t) { + if (done) { + RxJavaPlugins.onError(t); + return; + } + done = true; + dispose(); + downstream.onError(t); + } + + @Override + public void onComplete() { + if (done) { + return; + } + done = true; + drain(); + } + + void innerComplete() { + active = false; + drain(); + } + + @Override + public boolean isDisposed() { + return disposed; + } + + @Override + public void dispose() { + disposed = true; + inner.dispose(); + upstream.dispose(); + worker.dispose(); + + if (getAndIncrement() == 0) { + queue.clear(); + } + } + + void drain() { + if (getAndIncrement() != 0) { + return; + } + worker.schedule(this); + } + + @Override + public void run() { + for (;;) { + if (disposed) { + queue.clear(); + return; + } + if (!active) { + + boolean d = done; + + T t; + + try { + t = queue.poll(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + dispose(); + queue.clear(); + downstream.onError(ex); + worker.dispose(); + return; + } + + boolean empty = t == null; + + if (d && empty) { + disposed = true; + downstream.onComplete(); + worker.dispose(); + return; + } + + if (!empty) { + ObservableSource o; + + try { + o = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + dispose(); + queue.clear(); + downstream.onError(ex); + worker.dispose(); + return; + } + + active = true; + o.subscribe(inner); + } + } + + if (decrementAndGet() == 0) { + break; + } + } + } + + static final class InnerObserver extends AtomicReference implements Observer { + + private static final long serialVersionUID = -7449079488798789337L; + + final Observer downstream; + final ConcatMapObserver parent; + + InnerObserver(Observer actual, ConcatMapObserver parent) { + this.downstream = actual; + this.parent = parent; + } + + @Override + public void onSubscribe(Disposable d) { + DisposableHelper.replace(this, d); + } + + @Override + public void onNext(U t) { + downstream.onNext(t); + } + + @Override + public void onError(Throwable t) { + parent.dispose(); + downstream.onError(t); + } + + @Override + public void onComplete() { + parent.innerComplete(); + } + + void dispose() { + DisposableHelper.dispose(this); + } + } + } + + static final class ConcatMapDelayErrorObserver + extends AtomicInteger + implements Observer, Disposable, Runnable { + + private static final long serialVersionUID = -6951100001833242599L; + + final Observer downstream; + + final Function> mapper; + + final int bufferSize; + + final AtomicThrowable error; + + final DelayErrorInnerObserver observer; + + final boolean tillTheEnd; + + final Scheduler.Worker worker; + + SimpleQueue queue; + + Disposable upstream; + + volatile boolean active; + + volatile boolean done; + + volatile boolean cancelled; + + int sourceMode; + + ConcatMapDelayErrorObserver(Observer actual, + Function> mapper, int bufferSize, + boolean tillTheEnd, Scheduler.Worker worker) { + this.downstream = actual; + this.mapper = mapper; + this.bufferSize = bufferSize; + this.tillTheEnd = tillTheEnd; + this.error = new AtomicThrowable(); + this.observer = new DelayErrorInnerObserver(actual, this); + this.worker = worker; + } + + @Override + public void onSubscribe(Disposable d) { + if (DisposableHelper.validate(this.upstream, d)) { + this.upstream = d; + + if (d instanceof QueueDisposable) { + @SuppressWarnings("unchecked") + QueueDisposable qd = (QueueDisposable) d; + + int m = qd.requestFusion(QueueDisposable.ANY); + if (m == QueueDisposable.SYNC) { + sourceMode = m; + queue = qd; + done = true; + + downstream.onSubscribe(this); + + drain(); + return; + } + if (m == QueueDisposable.ASYNC) { + sourceMode = m; + queue = qd; + + downstream.onSubscribe(this); + + return; + } + } + + queue = new SpscLinkedArrayQueue(bufferSize); + + downstream.onSubscribe(this); + } + } + + @Override + public void onNext(T value) { + if (sourceMode == QueueDisposable.NONE) { + queue.offer(value); + } + drain(); + } + + @Override + public void onError(Throwable e) { + if (error.addThrowable(e)) { + done = true; + drain(); + } else { + RxJavaPlugins.onError(e); + } + } + + @Override + public void onComplete() { + done = true; + drain(); + } + + @Override + public boolean isDisposed() { + return cancelled; + } + + @Override + public void dispose() { + cancelled = true; + upstream.dispose(); + observer.dispose(); + worker.dispose(); + } + + void drain() { + if (getAndIncrement() != 0) { + return; + } + + worker.schedule(this); + } + + @SuppressWarnings("unchecked") + @Override + public void run() { + Observer actual = this.downstream; + SimpleQueue queue = this.queue; + AtomicThrowable error = this.error; + + for (;;) { + + if (!active) { + + if (cancelled) { + queue.clear(); + return; + } + + if (!tillTheEnd) { + Throwable ex = error.get(); + if (ex != null) { + queue.clear(); + cancelled = true; + actual.onError(error.terminate()); + worker.dispose(); + return; + } + } + + boolean d = done; + + T v; + + try { + v = queue.poll(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + cancelled = true; + this.upstream.dispose(); + error.addThrowable(ex); + actual.onError(error.terminate()); + worker.dispose(); + return; + } + + boolean empty = v == null; + + if (d && empty) { + cancelled = true; + Throwable ex = error.terminate(); + if (ex != null) { + actual.onError(ex); + } else { + actual.onComplete(); + } + worker.dispose(); + return; + } + + if (!empty) { + + ObservableSource o; + + try { + o = ObjectHelper.requireNonNull(mapper.apply(v), "The mapper returned a null ObservableSource"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + cancelled = true; + this.upstream.dispose(); + queue.clear(); + error.addThrowable(ex); + actual.onError(error.terminate()); + worker.dispose(); + return; + } + + if (o instanceof Supplier) { + R w; + + try { + w = ((Supplier)o).get(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + error.addThrowable(ex); + continue; + } + + if (w != null && !cancelled) { + actual.onNext(w); + } + continue; + } else { + active = true; + o.subscribe(observer); + } + } + } + + if (decrementAndGet() == 0) { + break; + } + } + } + + static final class DelayErrorInnerObserver extends AtomicReference implements Observer { + + private static final long serialVersionUID = 2620149119579502636L; + + final Observer downstream; + + final ConcatMapDelayErrorObserver parent; + + DelayErrorInnerObserver(Observer actual, ConcatMapDelayErrorObserver parent) { + this.downstream = actual; + this.parent = parent; + } + + @Override + public void onSubscribe(Disposable d) { + DisposableHelper.replace(this, d); + } + + @Override + public void onNext(R value) { + downstream.onNext(value); + } + + @Override + public void onError(Throwable e) { + ConcatMapDelayErrorObserver p = parent; + if (p.error.addThrowable(e)) { + if (!p.tillTheEnd) { + p.upstream.dispose(); + } + p.active = false; + p.drain(); + } else { + RxJavaPlugins.onError(e); + } + } + + @Override + public void onComplete() { + ConcatMapDelayErrorObserver p = parent; + p.active = false; + p.drain(); + } + + void dispose() { + DisposableHelper.dispose(this); + } + } + } +} diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatMapSchedulerTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatMapSchedulerTest.java new file mode 100644 index 0000000000..2777261473 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatMapSchedulerTest.java @@ -0,0 +1,1078 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.flowable; + +import static org.junit.Assert.*; + +import java.lang.reflect.Method; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.*; +import org.reactivestreams.*; + +import io.reactivex.*; +import io.reactivex.exceptions.*; +import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; +import io.reactivex.internal.schedulers.ImmediateThinScheduler; +import io.reactivex.internal.subscriptions.BooleanSubscription; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.processors.*; +import io.reactivex.schedulers.Schedulers; +import io.reactivex.subscribers.*; +import io.reactivex.testsupport.*; + +public class FlowableConcatMapSchedulerTest { + + @Test + public void boundaryFusion() { + Flowable.range(1, 10000) + .observeOn(Schedulers.single()) + .map(new Function() { + @Override + public String apply(Integer t) throws Exception { + String name = Thread.currentThread().getName(); + if (name.contains("RxSingleScheduler")) { + return "RxSingleScheduler"; + } + return name; + } + }) + .concatMap(new Function>() { + @Override + public Publisher apply(String v) + throws Exception { + return Flowable.just(v); + } + }, 2, ImmediateThinScheduler.INSTANCE) + .observeOn(Schedulers.computation()) + .distinct() + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult("RxSingleScheduler"); + } + + @Test + public void boundaryFusionDelayError() { + Flowable.range(1, 10000) + .observeOn(Schedulers.single()) + .map(new Function() { + @Override + public String apply(Integer t) throws Exception { + String name = Thread.currentThread().getName(); + if (name.contains("RxSingleScheduler")) { + return "RxSingleScheduler"; + } + return name; + } + }) + .concatMapDelayError(new Function>() { + @Override + public Publisher apply(String v) + throws Exception { + return Flowable.just(v); + } + }, 2, true, ImmediateThinScheduler.INSTANCE) + .observeOn(Schedulers.computation()) + .distinct() + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult("RxSingleScheduler"); + } + + @Test + public void pollThrows() { + Flowable.just(1) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + throw new TestException(); + } + }) + .compose(TestHelper.flowableStripBoundary()) + .concatMap(new Function>() { + @Override + public Publisher apply(Integer v) + throws Exception { + return Flowable.just(v); + } + }, 2, ImmediateThinScheduler.INSTANCE) + .test() + .assertFailure(TestException.class); + } + + @Test + public void pollThrowsDelayError() { + Flowable.just(1) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + throw new TestException(); + } + }) + .compose(TestHelper.flowableStripBoundary()) + .concatMapDelayError(new Function>() { + @Override + public Publisher apply(Integer v) + throws Exception { + return Flowable.just(v); + } + }, 2, true, ImmediateThinScheduler.INSTANCE) + .test() + .assertFailure(TestException.class); + } + + @Test + public void noCancelPrevious() { + final AtomicInteger counter = new AtomicInteger(); + + Flowable.range(1, 5) + .concatMap(new Function>() { + @Override + public Flowable apply(Integer v) throws Exception { + return Flowable.just(v).doOnCancel(new Action() { + @Override + public void run() throws Exception { + counter.getAndIncrement(); + } + }); + } + }, 2, ImmediateThinScheduler.INSTANCE) + .test() + .assertResult(1, 2, 3, 4, 5); + + assertEquals(0, counter.get()); + } + + @Test + public void delayErrorCallableTillTheEnd() { + Flowable.just(1, 2, 3, 101, 102, 23, 890, 120, 32) + .concatMapDelayError(new Function>() { + @Override public Flowable apply(final Integer integer) throws Exception { + return Flowable.fromCallable(new Callable() { + @Override public Integer call() throws Exception { + if (integer >= 100) { + throw new NullPointerException("test null exp"); + } + return integer; + } + }); + } + }, 2, true, ImmediateThinScheduler.INSTANCE) + .test() + .assertFailure(CompositeException.class, 1, 2, 3, 23, 32); + } + + @Test + public void delayErrorCallableEager() { + Flowable.just(1, 2, 3, 101, 102, 23, 890, 120, 32) + .concatMapDelayError(new Function>() { + @Override public Flowable apply(final Integer integer) throws Exception { + return Flowable.fromCallable(new Callable() { + @Override public Integer call() throws Exception { + if (integer >= 100) { + throw new NullPointerException("test null exp"); + } + return integer; + } + }); + } + }, 2, false, ImmediateThinScheduler.INSTANCE) + .test() + .assertFailure(NullPointerException.class, 1, 2, 3); + } + + @Test + public void mapperScheduled() { + TestSubscriber ts = Flowable.just(1) + .concatMap(new Function>() { + @Override + public Flowable apply(Integer t) throws Throwable { + return Flowable.just(Thread.currentThread().getName()); + } + }, 2, Schedulers.single()) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertValueCount(1) + .assertNoErrors() + .assertComplete(); + + assertTrue(ts.values().toString(), ts.values().get(0).startsWith("RxSingleScheduler-")); + } + + @Test + public void mapperScheduledHidden() { + TestSubscriber ts = Flowable.just(1) + .concatMap(new Function>() { + @Override + public Flowable apply(Integer t) throws Throwable { + return Flowable.just(Thread.currentThread().getName()).hide(); + } + }, 2, Schedulers.single()) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertValueCount(1) + .assertNoErrors() + .assertComplete(); + + assertTrue(ts.values().toString(), ts.values().get(0).startsWith("RxSingleScheduler-")); + } + + @Test + public void mapperDelayErrorScheduled() { + TestSubscriber ts = Flowable.just(1) + .concatMapDelayError(new Function>() { + @Override + public Flowable apply(Integer t) throws Throwable { + return Flowable.just(Thread.currentThread().getName()); + } + }, 2, false, Schedulers.single()) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertValueCount(1) + .assertNoErrors() + .assertComplete(); + + assertTrue(ts.values().toString(), ts.values().get(0).startsWith("RxSingleScheduler-")); + } + + @Test + public void mapperDelayErrorScheduledHidden() { + TestSubscriber ts = Flowable.just(1) + .concatMapDelayError(new Function>() { + @Override + public Flowable apply(Integer t) throws Throwable { + return Flowable.just(Thread.currentThread().getName()).hide(); + } + }, 2, false, Schedulers.single()) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertValueCount(1) + .assertNoErrors() + .assertComplete(); + + assertTrue(ts.values().toString(), ts.values().get(0).startsWith("RxSingleScheduler-")); + } + + @Test + public void mapperDelayError2Scheduled() { + TestSubscriber ts = Flowable.just(1) + .concatMapDelayError(new Function>() { + @Override + public Flowable apply(Integer t) throws Throwable { + return Flowable.just(Thread.currentThread().getName()); + } + }, 2, true, Schedulers.single()) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertValueCount(1) + .assertNoErrors() + .assertComplete(); + + assertTrue(ts.values().toString(), ts.values().get(0).startsWith("RxSingleScheduler-")); + } + + @Test + public void mapperDelayError2ScheduledHidden() { + TestSubscriber ts = Flowable.just(1) + .concatMapDelayError(new Function>() { + @Override + public Flowable apply(Integer t) throws Throwable { + return Flowable.just(Thread.currentThread().getName()).hide(); + } + }, 2, true, Schedulers.single()) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertValueCount(1) + .assertNoErrors() + .assertComplete(); + + assertTrue(ts.values().toString(), ts.values().get(0).startsWith("RxSingleScheduler-")); + } + + @Test(timeout = 30000) + public void issue2890NoStackoverflow() throws InterruptedException { + final ExecutorService executor = Executors.newFixedThreadPool(2); + final Scheduler sch = Schedulers.from(executor); + + Function> func = new Function>() { + @Override + public Flowable apply(Integer t) { + Flowable flowable = Flowable.just(t) + .subscribeOn(sch) + ; + FlowableProcessor processor = UnicastProcessor.create(); + flowable.subscribe(processor); + return processor; + } + }; + + int n = 5000; + final AtomicInteger counter = new AtomicInteger(); + + Flowable.range(1, n).concatMap(func, 2, ImmediateThinScheduler.INSTANCE).subscribe(new DefaultSubscriber() { + @Override + public void onNext(Integer t) { + // Consume after sleep for 1 ms + try { + Thread.sleep(1); + } catch (InterruptedException e) { + // ignored + } + if (counter.getAndIncrement() % 100 == 0) { + System.out.print("testIssue2890NoStackoverflow -> "); + System.out.println(counter.get()); + }; + } + + @Override + public void onComplete() { + executor.shutdown(); + } + + @Override + public void onError(Throwable e) { + executor.shutdown(); + } + }); + + executor.awaitTermination(20000, TimeUnit.MILLISECONDS); + + assertEquals(n, counter.get()); + } + + @Test//(timeout = 100000) + public void concatMapRangeAsyncLoopIssue2876() { + final long durationSeconds = 2; + final long startTime = System.currentTimeMillis(); + for (int i = 0;; i++) { + //only run this for a max of ten seconds + if (System.currentTimeMillis() - startTime > TimeUnit.SECONDS.toMillis(durationSeconds)) { + return; + } + if (i % 1000 == 0) { + System.out.println("concatMapRangeAsyncLoop > " + i); + } + TestSubscriberEx ts = new TestSubscriberEx(); + Flowable.range(0, 1000) + .concatMap(new Function>() { + @Override + public Flowable apply(Integer t) { + return Flowable.fromIterable(Arrays.asList(t)); + } + }, 2, ImmediateThinScheduler.INSTANCE) + .observeOn(Schedulers.computation()).subscribe(ts); + + ts.awaitDone(2500, TimeUnit.MILLISECONDS); + ts.assertTerminated(); + ts.assertNoErrors(); + assertEquals(1000, ts.values().size()); + assertEquals((Integer)999, ts.values().get(999)); + } + } + + @SuppressWarnings("unchecked") + @Test + @Ignore("concat(a, b, ...) replaced by concatArray(T...)") + public void concatMany() throws Exception { + for (int i = 2; i < 10; i++) { + Class[] clazz = new Class[i]; + Arrays.fill(clazz, Flowable.class); + + Flowable[] obs = new Flowable[i]; + Arrays.fill(obs, Flowable.just(1)); + + Integer[] expected = new Integer[i]; + Arrays.fill(expected, 1); + + Method m = Flowable.class.getMethod("concat", clazz); + + TestSubscriber ts = TestSubscriber.create(); + + ((Flowable)m.invoke(null, (Object[])obs)).subscribe(ts); + + ts.assertValues(expected); + ts.assertNoErrors(); + ts.assertComplete(); + } + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void concatMapJustJust() { + TestSubscriber ts = TestSubscriber.create(); + + Flowable.just(Flowable.just(1)).concatMap((Function)Functions.identity(), 2, ImmediateThinScheduler.INSTANCE).subscribe(ts); + + ts.assertValue(1); + ts.assertNoErrors(); + ts.assertComplete(); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void concatMapJustRange() { + TestSubscriber ts = TestSubscriber.create(); + + Flowable.just(Flowable.range(1, 5)).concatMap((Function)Functions.identity(), 2, ImmediateThinScheduler.INSTANCE).subscribe(ts); + + ts.assertValues(1, 2, 3, 4, 5); + ts.assertNoErrors(); + ts.assertComplete(); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void concatMapDelayErrorJustJust() { + TestSubscriber ts = TestSubscriber.create(); + + Flowable.just(Flowable.just(1)).concatMapDelayError((Function)Functions.identity(), 2, true, ImmediateThinScheduler.INSTANCE).subscribe(ts); + + ts.assertValue(1); + ts.assertNoErrors(); + ts.assertComplete(); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void concatMapDelayErrorJustRange() { + TestSubscriber ts = TestSubscriber.create(); + + Flowable.just(Flowable.range(1, 5)).concatMapDelayError((Function)Functions.identity(), 2, true, ImmediateThinScheduler.INSTANCE).subscribe(ts); + + ts.assertValues(1, 2, 3, 4, 5); + ts.assertNoErrors(); + ts.assertComplete(); + } + + @SuppressWarnings("unchecked") + @Test + @Ignore("startWith(a, b, ...) replaced by startWithArray(T...)") + public void startWith() throws Exception { + for (int i = 2; i < 10; i++) { + Class[] clazz = new Class[i]; + Arrays.fill(clazz, Object.class); + + Object[] obs = new Object[i]; + Arrays.fill(obs, 1); + + Integer[] expected = new Integer[i]; + Arrays.fill(expected, 1); + + Method m = Flowable.class.getMethod("startWith", clazz); + + TestSubscriber ts = TestSubscriber.create(); + + ((Flowable)m.invoke(Flowable.empty(), obs)).subscribe(ts); + + ts.assertValues(expected); + ts.assertNoErrors(); + ts.assertComplete(); + } + } + + static final class InfiniteIterator implements Iterator, Iterable { + + int count; + + @Override + public boolean hasNext() { + return true; + } + + @Override + public Integer next() { + return count++; + } + + @Override + public void remove() { + } + + @Override + public Iterator iterator() { + return this; + } + } + + @Test + public void concatMapDelayError() { + Flowable.just(Flowable.just(1), Flowable.just(2)) + .concatMapDelayError(Functions.>identity(), 2, true, ImmediateThinScheduler.INSTANCE) + .test() + .assertResult(1, 2); + } + + @Test + public void concatMapDelayErrorJustSource() { + Flowable.just(0) + .concatMapDelayError(new Function>() { + @Override + public Flowable apply(Object v) throws Exception { + return Flowable.just(1); + } + }, 16, true, ImmediateThinScheduler.INSTANCE) + .test() + .assertResult(1); + + } + + @Test + public void concatMapJustSource() { + Flowable.just(0).hide() + .concatMap(new Function>() { + @Override + public Flowable apply(Object v) throws Exception { + return Flowable.just(1); + } + }, 16, ImmediateThinScheduler.INSTANCE) + .test() + .assertResult(1); + } + + @Test + public void concatMapJustSourceDelayError() { + Flowable.just(0).hide() + .concatMapDelayError(new Function>() { + @Override + public Flowable apply(Object v) throws Exception { + return Flowable.just(1); + } + }, 16, false, ImmediateThinScheduler.INSTANCE) + .test() + .assertResult(1); + } + + @Test + public void concatMapScalarBackpressured() { + Flowable.just(1).hide() + .concatMap(Functions.justFunction(Flowable.just(2)), 2, ImmediateThinScheduler.INSTANCE) + .test(1L) + .assertResult(2); + } + + @Test + public void concatMapScalarBackpressuredDelayError() { + Flowable.just(1).hide() + .concatMapDelayError(Functions.justFunction(Flowable.just(2)), 2, true, ImmediateThinScheduler.INSTANCE) + .test(1L) + .assertResult(2); + } + + @Test + public void concatMapEmpty() { + Flowable.just(1).hide() + .concatMap(Functions.justFunction(Flowable.empty()), 2, ImmediateThinScheduler.INSTANCE) + .test() + .assertResult(); + } + + @Test + public void concatMapEmptyDelayError() { + Flowable.just(1).hide() + .concatMapDelayError(Functions.justFunction(Flowable.empty()), 2, true, ImmediateThinScheduler.INSTANCE) + .test() + .assertResult(); + } + + @Test + public void ignoreBackpressure() { + new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + s.onSubscribe(new BooleanSubscription()); + for (int i = 0; i < 10; i++) { + s.onNext(i); + } + } + } + .concatMap(Functions.justFunction(Flowable.just(2)), 8, ImmediateThinScheduler.INSTANCE) + .test(0L) + .assertFailure(IllegalStateException.class); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeFlowable(new Function, Publisher>() { + @Override + public Publisher apply(Flowable f) throws Exception { + return f.concatMap(Functions.justFunction(Flowable.just(2)), 2, ImmediateThinScheduler.INSTANCE); + } + }); + TestHelper.checkDoubleOnSubscribeFlowable(new Function, Publisher>() { + @Override + public Publisher apply(Flowable f) throws Exception { + return f.concatMapDelayError(Functions.justFunction(Flowable.just(2)), 2, true, ImmediateThinScheduler.INSTANCE); + } + }); + } + + @Test + public void immediateInnerNextOuterError() { + final PublishProcessor pp = PublishProcessor.create(); + + final TestSubscriberEx ts = new TestSubscriberEx() { + @Override + public void onNext(Integer t) { + super.onNext(t); + if (t == 1) { + pp.onError(new TestException("First")); + } + } + }; + + pp.concatMap(Functions.justFunction(Flowable.just(1)), 2, ImmediateThinScheduler.INSTANCE) + .subscribe(ts); + + pp.onNext(1); + + assertFalse(pp.hasSubscribers()); + + ts.assertFailureAndMessage(TestException.class, "First", 1); + } + + @Test + public void immediateInnerNextOuterError2() { + final PublishProcessor pp = PublishProcessor.create(); + + final TestSubscriberEx ts = new TestSubscriberEx() { + @Override + public void onNext(Integer t) { + super.onNext(t); + if (t == 1) { + pp.onError(new TestException("First")); + } + } + }; + + pp.concatMap(Functions.justFunction(Flowable.just(1).hide()), 2, ImmediateThinScheduler.INSTANCE) + .subscribe(ts); + + pp.onNext(1); + + assertFalse(pp.hasSubscribers()); + + ts.assertFailureAndMessage(TestException.class, "First", 1); + } + + @Test + public void concatMapInnerError() { + Flowable.just(1).hide() + .concatMap(Functions.justFunction(Flowable.error(new TestException())), 2, ImmediateThinScheduler.INSTANCE) + .test() + .assertFailure(TestException.class); + } + + @Test + public void concatMapInnerErrorDelayError() { + Flowable.just(1).hide() + .concatMapDelayError(Functions.justFunction(Flowable.error(new TestException())), 2, true, ImmediateThinScheduler.INSTANCE) + .test() + .assertFailure(TestException.class); + } + + @Test + public void badSource() { + TestHelper.checkBadSourceFlowable(new Function, Object>() { + @Override + public Object apply(Flowable f) throws Exception { + return f.concatMap(Functions.justFunction(Flowable.just(1).hide()), 2, ImmediateThinScheduler.INSTANCE); + } + }, true, 1, 1, 1); + } + + @Test + public void badInnerSource() { + @SuppressWarnings("rawtypes") + final Subscriber[] ts0 = { null }; + TestSubscriberEx ts = Flowable.just(1).hide().concatMap(Functions.justFunction(new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + ts0[0] = s; + s.onSubscribe(new BooleanSubscription()); + s.onError(new TestException("First")); + } + }), 2, ImmediateThinScheduler.INSTANCE) + .to(TestHelper.testConsumer()); + + ts.assertFailureAndMessage(TestException.class, "First"); + + List errors = TestHelper.trackPluginErrors(); + try { + ts0[0].onError(new TestException("Second")); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void badInnerSourceDelayError() { + @SuppressWarnings("rawtypes") + final Subscriber[] ts0 = { null }; + TestSubscriberEx ts = Flowable.just(1).hide().concatMapDelayError(Functions.justFunction(new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + ts0[0] = s; + s.onSubscribe(new BooleanSubscription()); + s.onError(new TestException("First")); + } + }), 2, true, ImmediateThinScheduler.INSTANCE) + .to(TestHelper.testConsumer()); + + ts.assertFailureAndMessage(TestException.class, "First"); + + List errors = TestHelper.trackPluginErrors(); + try { + ts0[0].onError(new TestException("Second")); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void badSourceDelayError() { + TestHelper.checkBadSourceFlowable(new Function, Object>() { + @Override + public Object apply(Flowable f) throws Exception { + return f.concatMapDelayError(Functions.justFunction(Flowable.just(1).hide()), 2, true, ImmediateThinScheduler.INSTANCE); + } + }, true, 1, 1, 1); + } + + @Test + public void fusedCrash() { + Flowable.range(1, 2) + .map(new Function() { + @Override + public Object apply(Integer v) throws Exception { throw new TestException(); } + }) + .concatMap(Functions.justFunction(Flowable.just(1)), 2, ImmediateThinScheduler.INSTANCE) + .test() + .assertFailure(TestException.class); + } + + @Test + public void fusedCrashDelayError() { + Flowable.range(1, 2) + .map(new Function() { + @Override + public Object apply(Integer v) throws Exception { throw new TestException(); } + }) + .concatMapDelayError(Functions.justFunction(Flowable.just(1)), 2, true, ImmediateThinScheduler.INSTANCE) + .test() + .assertFailure(TestException.class); + } + + @Test + public void callableCrash() { + Flowable.just(1).hide() + .concatMap(Functions.justFunction(Flowable.fromCallable(new Callable() { + @Override + public Object call() throws Exception { + throw new TestException(); + } + })), 2, ImmediateThinScheduler.INSTANCE) + .test() + .assertFailure(TestException.class); + } + + @Test + public void callableCrashDelayError() { + Flowable.just(1).hide() + .concatMapDelayError(Functions.justFunction(Flowable.fromCallable(new Callable() { + @Override + public Object call() throws Exception { + throw new TestException(); + } + })), 2, true, ImmediateThinScheduler.INSTANCE) + .test() + .assertFailure(TestException.class); + } + + @Test + public void dispose() { + TestHelper.checkDisposed(Flowable.range(1, 2) + .concatMap(Functions.justFunction(Flowable.just(1)), 2, ImmediateThinScheduler.INSTANCE)); + + TestHelper.checkDisposed(Flowable.range(1, 2) + .concatMapDelayError(Functions.justFunction(Flowable.just(1)), 2, true, ImmediateThinScheduler.INSTANCE)); + } + + @Test + public void notVeryEnd() { + Flowable.range(1, 2) + .concatMapDelayError(Functions.justFunction(Flowable.error(new TestException())), 16, false, ImmediateThinScheduler.INSTANCE) + .test() + .assertFailure(TestException.class); + } + + @Test + public void error() { + Flowable.error(new TestException()) + .concatMapDelayError(Functions.justFunction(Flowable.just(2)), 16, false, ImmediateThinScheduler.INSTANCE) + .test() + .assertFailure(TestException.class); + } + + @Test + public void mapperThrows() { + Flowable.range(1, 2) + .concatMap(new Function>() { + @Override + public Publisher apply(Integer v) throws Exception { + throw new TestException(); + } + }, 2, ImmediateThinScheduler.INSTANCE) + .test() + .assertFailure(TestException.class); + } + + @Test + public void mainErrors() { + PublishProcessor source = PublishProcessor.create(); + + TestSubscriber ts = TestSubscriber.create(); + + source.concatMapDelayError(new Function>() { + @Override + public Flowable apply(Integer v) { + return Flowable.range(v, 2); + } + }, 2, true, ImmediateThinScheduler.INSTANCE).subscribe(ts); + + source.onNext(1); + source.onNext(2); + source.onError(new TestException()); + + ts.assertValues(1, 2, 2, 3); + ts.assertError(TestException.class); + ts.assertNotComplete(); + } + + @Test + public void innerErrors() { + final Flowable inner = Flowable.range(1, 2) + .concatWith(Flowable.error(new TestException())); + + TestSubscriber ts = TestSubscriber.create(); + + Flowable.range(1, 3).concatMapDelayError(new Function>() { + @Override + public Flowable apply(Integer v) { + return inner; + } + }, 2, true, ImmediateThinScheduler.INSTANCE).subscribe(ts); + + ts.assertValues(1, 2, 1, 2, 1, 2); + ts.assertError(CompositeException.class); + ts.assertNotComplete(); + } + + @Test + public void singleInnerErrors() { + final Flowable inner = Flowable.range(1, 2).concatWith(Flowable.error(new TestException())); + + TestSubscriber ts = TestSubscriber.create(); + + Flowable.just(1) + .hide() // prevent scalar optimization + .concatMapDelayError(new Function>() { + @Override + public Flowable apply(Integer v) { + return inner; + } + }, 2, true, ImmediateThinScheduler.INSTANCE).subscribe(ts); + + ts.assertValues(1, 2); + ts.assertError(TestException.class); + ts.assertNotComplete(); + } + + @Test + public void innerNull() { + TestSubscriber ts = TestSubscriber.create(); + + Flowable.just(1) + .hide() // prevent scalar optimization + .concatMapDelayError(new Function>() { + @Override + public Flowable apply(Integer v) { + return null; + } + }, 2, true, ImmediateThinScheduler.INSTANCE).subscribe(ts); + + ts.assertNoValues(); + ts.assertError(NullPointerException.class); + ts.assertNotComplete(); + } + + @Test + public void innerThrows() { + TestSubscriber ts = TestSubscriber.create(); + + Flowable.just(1) + .hide() // prevent scalar optimization + .concatMapDelayError(new Function>() { + @Override + public Flowable apply(Integer v) { + throw new TestException(); + } + }, 2, true, ImmediateThinScheduler.INSTANCE).subscribe(ts); + + ts.assertNoValues(); + ts.assertError(TestException.class); + ts.assertNotComplete(); + } + + @Test + public void innerWithEmpty() { + TestSubscriber ts = TestSubscriber.create(); + + Flowable.range(1, 3) + .concatMapDelayError(new Function>() { + @Override + public Flowable apply(Integer v) { + return v == 2 ? Flowable.empty() : Flowable.range(1, 2); + } + }, 2, true, ImmediateThinScheduler.INSTANCE).subscribe(ts); + + ts.assertValues(1, 2, 1, 2); + ts.assertNoErrors(); + ts.assertComplete(); + } + + @Test + public void innerWithScalar() { + TestSubscriber ts = TestSubscriber.create(); + + Flowable.range(1, 3) + .concatMapDelayError(new Function>() { + @Override + public Flowable apply(Integer v) { + return v == 2 ? Flowable.just(3) : Flowable.range(1, 2); + } + }, 2, true, ImmediateThinScheduler.INSTANCE).subscribe(ts); + + ts.assertValues(1, 2, 3, 1, 2); + ts.assertNoErrors(); + ts.assertComplete(); + } + + @Test + public void backpressure() { + TestSubscriber ts = TestSubscriber.create(0); + + Flowable.range(1, 3).concatMapDelayError(new Function>() { + @Override + public Flowable apply(Integer v) { + return Flowable.range(v, 2); + } + }, 2, true, ImmediateThinScheduler.INSTANCE).subscribe(ts); + + ts.assertNoValues(); + ts.assertNoErrors(); + ts.assertNotComplete(); + + ts.request(1); + ts.assertValues(1); + ts.assertNoErrors(); + ts.assertNotComplete(); + + ts.request(3); + ts.assertValues(1, 2, 2, 3); + ts.assertNoErrors(); + ts.assertNotComplete(); + + ts.request(2); + + ts.assertValues(1, 2, 2, 3, 3, 4); + ts.assertNoErrors(); + ts.assertComplete(); + } + + @Test + public void mapperScheduledLong() { + TestSubscriber ts = Flowable.range(1, 1000) + .hide() + .observeOn(Schedulers.computation()) + .concatMap(new Function>() { + @Override + public Flowable apply(Integer t) throws Throwable { + return Flowable.just(Thread.currentThread().getName()) + .repeat(1000) + .observeOn(Schedulers.io()); + } + }, 2, Schedulers.single()) + .distinct() + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertValueCount(1) + .assertNoErrors() + .assertComplete(); + + assertTrue(ts.values().toString(), ts.values().get(0).startsWith("RxSingleScheduler-")); + } + + @Test + public void mapperDelayErrorScheduledLong() { + TestSubscriber ts = Flowable.range(1, 1000) + .hide() + .observeOn(Schedulers.computation()) + .concatMapDelayError(new Function>() { + @Override + public Flowable apply(Integer t) throws Throwable { + return Flowable.just(Thread.currentThread().getName()) + .repeat(1000) + .observeOn(Schedulers.io()); + } + }, 2, false, Schedulers.single()) + .distinct() + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertValueCount(1) + .assertNoErrors() + .assertComplete(); + + assertTrue(ts.values().toString(), ts.values().get(0).startsWith("RxSingleScheduler-")); + } + + @Test + public void mapperDelayError2ScheduledLong() { + TestSubscriber ts = Flowable.range(1, 1000) + .hide() + .observeOn(Schedulers.computation()) + .concatMapDelayError(new Function>() { + @Override + public Flowable apply(Integer t) throws Throwable { + return Flowable.just(Thread.currentThread().getName()) + .repeat(1000) + .observeOn(Schedulers.io()); + } + }, 2, true, Schedulers.single()) + .distinct() + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertValueCount(1) + .assertNoErrors() + .assertComplete(); + + assertTrue(ts.values().toString(), ts.values().get(0).startsWith("RxSingleScheduler-")); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatTest.java index 168422f127..df911ed52b 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatTest.java @@ -1452,7 +1452,7 @@ public void badSourceDelayError() { TestHelper.checkBadSourceFlowable(new Function, Object>() { @Override public Object apply(Flowable f) throws Exception { - return f.concatMap(Functions.justFunction(Flowable.just(1).hide())); + return f.concatMapDelayError(Functions.justFunction(Flowable.just(1).hide())); } }, true, 1, 1, 1); } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableConcatMapSchedulerTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableConcatMapSchedulerTest.java new file mode 100644 index 0000000000..b19eab49e2 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableConcatMapSchedulerTest.java @@ -0,0 +1,1015 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.observable; + +import static org.junit.Assert.*; + +import java.lang.reflect.Method; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.*; + +import io.reactivex.*; +import io.reactivex.Observable; +import io.reactivex.Observer; +import io.reactivex.disposables.Disposables; +import io.reactivex.exceptions.*; +import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; +import io.reactivex.internal.schedulers.ImmediateThinScheduler; +import io.reactivex.observers.*; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.schedulers.Schedulers; +import io.reactivex.subjects.*; +import io.reactivex.testsupport.*; + +public class ObservableConcatMapSchedulerTest { + + @Test + public void boundaryFusion() { + Observable.range(1, 10000) + .observeOn(Schedulers.single()) + .map(new Function() { + @Override + public String apply(Integer t) throws Exception { + String name = Thread.currentThread().getName(); + if (name.contains("RxSingleScheduler")) { + return "RxSingleScheduler"; + } + return name; + } + }) + .concatMap(new Function>() { + @Override + public ObservableSource apply(String v) + throws Exception { + return Observable.just(v); + } + }, 2, ImmediateThinScheduler.INSTANCE) + .observeOn(Schedulers.computation()) + .distinct() + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult("RxSingleScheduler"); + } + + @Test + public void boundaryFusionDelayError() { + Observable.range(1, 10000) + .observeOn(Schedulers.single()) + .map(new Function() { + @Override + public String apply(Integer t) throws Exception { + String name = Thread.currentThread().getName(); + if (name.contains("RxSingleScheduler")) { + return "RxSingleScheduler"; + } + return name; + } + }) + .concatMapDelayError(new Function>() { + @Override + public ObservableSource apply(String v) + throws Exception { + return Observable.just(v); + } + }, 2, true, ImmediateThinScheduler.INSTANCE) + .observeOn(Schedulers.computation()) + .distinct() + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult("RxSingleScheduler"); + } + + @Test + public void pollThrows() { + Observable.just(1) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + throw new TestException(); + } + }) + .compose(TestHelper.observableStripBoundary()) + .concatMap(new Function>() { + @Override + public ObservableSource apply(Integer v) + throws Exception { + return Observable.just(v); + } + }, 2, ImmediateThinScheduler.INSTANCE) + .test() + .assertFailure(TestException.class); + } + + @Test + public void pollThrowsDelayError() { + Observable.just(1) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + throw new TestException(); + } + }) + .compose(TestHelper.observableStripBoundary()) + .concatMapDelayError(new Function>() { + @Override + public ObservableSource apply(Integer v) + throws Exception { + return Observable.just(v); + } + }, 2, true, ImmediateThinScheduler.INSTANCE) + .test() + .assertFailure(TestException.class); + } + + @Test + public void noCancelPrevious() { + final AtomicInteger counter = new AtomicInteger(); + + Observable.range(1, 5) + .concatMap(new Function>() { + @Override + public Observable apply(Integer v) throws Exception { + return Observable.just(v).doOnDispose(new Action() { + @Override + public void run() throws Exception { + counter.getAndIncrement(); + } + }); + } + }, 2, ImmediateThinScheduler.INSTANCE) + .test() + .assertResult(1, 2, 3, 4, 5); + + assertEquals(0, counter.get()); + } + + @Test + public void delayErrorCallableTillTheEnd() { + Observable.just(1, 2, 3, 101, 102, 23, 890, 120, 32) + .concatMapDelayError(new Function>() { + @Override public Observable apply(final Integer integer) throws Exception { + return Observable.fromCallable(new Callable() { + @Override public Integer call() throws Exception { + if (integer >= 100) { + throw new NullPointerException("test null exp"); + } + return integer; + } + }); + } + }, 2, true, ImmediateThinScheduler.INSTANCE) + .test() + .assertFailure(CompositeException.class, 1, 2, 3, 23, 32); + } + + @Test + public void delayErrorCallableEager() { + Observable.just(1, 2, 3, 101, 102, 23, 890, 120, 32) + .concatMapDelayError(new Function>() { + @Override public Observable apply(final Integer integer) throws Exception { + return Observable.fromCallable(new Callable() { + @Override public Integer call() throws Exception { + if (integer >= 100) { + throw new NullPointerException("test null exp"); + } + return integer; + } + }); + } + }, 2, false, ImmediateThinScheduler.INSTANCE) + .test() + .assertFailure(NullPointerException.class, 1, 2, 3); + } + + @Test + public void mapperScheduled() { + TestObserver to = Observable.just(1) + .concatMap(new Function>() { + @Override + public Observable apply(Integer t) throws Throwable { + return Observable.just(Thread.currentThread().getName()); + } + }, 2, Schedulers.single()) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertValueCount(1) + .assertNoErrors() + .assertComplete(); + + assertTrue(to.values().toString(), to.values().get(0).startsWith("RxSingleScheduler-")); + } + + @Test + public void mapperScheduledHidden() { + TestObserver to = Observable.just(1) + .concatMap(new Function>() { + @Override + public Observable apply(Integer t) throws Throwable { + return Observable.just(Thread.currentThread().getName()).hide(); + } + }, 2, Schedulers.single()) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertValueCount(1) + .assertNoErrors() + .assertComplete(); + + assertTrue(to.values().toString(), to.values().get(0).startsWith("RxSingleScheduler-")); + } + + @Test + public void mapperDelayErrorScheduled() { + TestObserver to = Observable.just(1) + .concatMapDelayError(new Function>() { + @Override + public Observable apply(Integer t) throws Throwable { + return Observable.just(Thread.currentThread().getName()); + } + }, 2, false, Schedulers.single()) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertValueCount(1) + .assertNoErrors() + .assertComplete(); + + assertTrue(to.values().toString(), to.values().get(0).startsWith("RxSingleScheduler-")); + } + + @Test + public void mapperDelayErrorScheduledHidden() { + TestObserver to = Observable.just(1) + .concatMapDelayError(new Function>() { + @Override + public Observable apply(Integer t) throws Throwable { + return Observable.just(Thread.currentThread().getName()).hide(); + } + }, 2, false, Schedulers.single()) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertValueCount(1) + .assertNoErrors() + .assertComplete(); + + assertTrue(to.values().toString(), to.values().get(0).startsWith("RxSingleScheduler-")); + } + + @Test + public void mapperDelayError2Scheduled() { + TestObserver to = Observable.just(1) + .concatMapDelayError(new Function>() { + @Override + public Observable apply(Integer t) throws Throwable { + return Observable.just(Thread.currentThread().getName()); + } + }, 2, true, Schedulers.single()) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertValueCount(1) + .assertNoErrors() + .assertComplete(); + + assertTrue(to.values().toString(), to.values().get(0).startsWith("RxSingleScheduler-")); + } + + @Test + public void mapperDelayError2ScheduledHidden() { + TestObserver to = Observable.just(1) + .concatMapDelayError(new Function>() { + @Override + public Observable apply(Integer t) throws Throwable { + return Observable.just(Thread.currentThread().getName()).hide(); + } + }, 2, true, Schedulers.single()) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertValueCount(1) + .assertNoErrors() + .assertComplete(); + + assertTrue(to.values().toString(), to.values().get(0).startsWith("RxSingleScheduler-")); + } + + @Test(timeout = 30000) + public void issue2890NoStackoverflow() throws InterruptedException { + final ExecutorService executor = Executors.newFixedThreadPool(2); + final Scheduler sch = Schedulers.from(executor); + + Function> func = new Function>() { + @Override + public Observable apply(Integer t) { + Observable flowable = Observable.just(t) + .subscribeOn(sch) + ; + Subject processor = UnicastSubject.create(); + flowable.subscribe(processor); + return processor; + } + }; + + int n = 5000; + final AtomicInteger counter = new AtomicInteger(); + + Observable.range(1, n).concatMap(func, 2, ImmediateThinScheduler.INSTANCE).subscribe(new DefaultObserver() { + @Override + public void onNext(Integer t) { + // Consume after sleep for 1 ms + try { + Thread.sleep(1); + } catch (InterruptedException e) { + // ignored + } + if (counter.getAndIncrement() % 100 == 0) { + System.out.print("testIssue2890NoStackoverflow -> "); + System.out.println(counter.get()); + }; + } + + @Override + public void onComplete() { + executor.shutdown(); + } + + @Override + public void onError(Throwable e) { + executor.shutdown(); + } + }); + + executor.awaitTermination(20000, TimeUnit.MILLISECONDS); + + assertEquals(n, counter.get()); + } + + @Test//(timeout = 100000) + public void concatMapRangeAsyncLoopIssue2876() { + final long durationSeconds = 2; + final long startTime = System.currentTimeMillis(); + for (int i = 0;; i++) { + //only run this for a max of ten seconds + if (System.currentTimeMillis() - startTime > TimeUnit.SECONDS.toMillis(durationSeconds)) { + return; + } + if (i % 1000 == 0) { + System.out.println("concatMapRangeAsyncLoop > " + i); + } + TestObserverEx to = new TestObserverEx(); + Observable.range(0, 1000) + .concatMap(new Function>() { + @Override + public Observable apply(Integer t) { + return Observable.fromIterable(Arrays.asList(t)); + } + }, 2, ImmediateThinScheduler.INSTANCE) + .observeOn(Schedulers.computation()).subscribe(to); + + to.awaitDone(2500, TimeUnit.MILLISECONDS); + to.assertTerminated(); + to.assertNoErrors(); + assertEquals(1000, to.values().size()); + assertEquals((Integer)999, to.values().get(999)); + } + } + + @SuppressWarnings("unchecked") + @Test + @Ignore("concat(a, b, ...) replaced by concatArray(T...)") + public void concatMany() throws Exception { + for (int i = 2; i < 10; i++) { + Class[] clazz = new Class[i]; + Arrays.fill(clazz, Observable.class); + + Observable[] obs = new Observable[i]; + Arrays.fill(obs, Observable.just(1)); + + Integer[] expected = new Integer[i]; + Arrays.fill(expected, 1); + + Method m = Observable.class.getMethod("concat", clazz); + + TestObserver to = TestObserver.create(); + + ((Observable)m.invoke(null, (Object[])obs)).subscribe(to); + + to.assertValues(expected); + to.assertNoErrors(); + to.assertComplete(); + } + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void concatMapJustJust() { + TestObserver to = TestObserver.create(); + + Observable.just(Observable.just(1)).concatMap((Function)Functions.identity(), 2, ImmediateThinScheduler.INSTANCE).subscribe(to); + + to.assertValue(1); + to.assertNoErrors(); + to.assertComplete(); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void concatMapJustRange() { + TestObserver to = TestObserver.create(); + + Observable.just(Observable.range(1, 5)).concatMap((Function)Functions.identity(), 2, ImmediateThinScheduler.INSTANCE).subscribe(to); + + to.assertValues(1, 2, 3, 4, 5); + to.assertNoErrors(); + to.assertComplete(); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void concatMapDelayErrorJustJust() { + TestObserver to = TestObserver.create(); + + Observable.just(Observable.just(1)).concatMapDelayError((Function)Functions.identity(), 2, true, ImmediateThinScheduler.INSTANCE).subscribe(to); + + to.assertValue(1); + to.assertNoErrors(); + to.assertComplete(); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void concatMapDelayErrorJustRange() { + TestObserver to = TestObserver.create(); + + Observable.just(Observable.range(1, 5)).concatMapDelayError((Function)Functions.identity(), 2, true, ImmediateThinScheduler.INSTANCE).subscribe(to); + + to.assertValues(1, 2, 3, 4, 5); + to.assertNoErrors(); + to.assertComplete(); + } + + @SuppressWarnings("unchecked") + @Test + @Ignore("startWith(a, b, ...) replaced by startWithArray(T...)") + public void startWith() throws Exception { + for (int i = 2; i < 10; i++) { + Class[] clazz = new Class[i]; + Arrays.fill(clazz, Object.class); + + Object[] obs = new Object[i]; + Arrays.fill(obs, 1); + + Integer[] expected = new Integer[i]; + Arrays.fill(expected, 1); + + Method m = Observable.class.getMethod("startWith", clazz); + + TestObserver to = TestObserver.create(); + + ((Observable)m.invoke(Observable.empty(), obs)).subscribe(to); + + to.assertValues(expected); + to.assertNoErrors(); + to.assertComplete(); + } + } + + static final class InfiniteIterator implements Iterator, Iterable { + + int count; + + @Override + public boolean hasNext() { + return true; + } + + @Override + public Integer next() { + return count++; + } + + @Override + public void remove() { + } + + @Override + public Iterator iterator() { + return this; + } + } + + @Test + public void concatMapDelayError() { + Observable.just(Observable.just(1), Observable.just(2)) + .concatMapDelayError(Functions.>identity(), 2, true, ImmediateThinScheduler.INSTANCE) + .test() + .assertResult(1, 2); + } + + @Test + public void concatMapDelayErrorJustSource() { + Observable.just(0) + .concatMapDelayError(new Function>() { + @Override + public Observable apply(Object v) throws Exception { + return Observable.just(1); + } + }, 16, true, ImmediateThinScheduler.INSTANCE) + .test() + .assertResult(1); + + } + + @Test + public void concatMapJustSource() { + Observable.just(0).hide() + .concatMap(new Function>() { + @Override + public Observable apply(Object v) throws Exception { + return Observable.just(1); + } + }, 16, ImmediateThinScheduler.INSTANCE) + .test() + .assertResult(1); + } + + @Test + public void concatMapJustSourceDelayError() { + Observable.just(0).hide() + .concatMapDelayError(new Function>() { + @Override + public Observable apply(Object v) throws Exception { + return Observable.just(1); + } + }, 16, false, ImmediateThinScheduler.INSTANCE) + .test() + .assertResult(1); + } + + @Test + public void concatMapEmpty() { + Observable.just(1).hide() + .concatMap(Functions.justFunction(Observable.empty()), 2, ImmediateThinScheduler.INSTANCE) + .test() + .assertResult(); + } + + @Test + public void concatMapEmptyDelayError() { + Observable.just(1).hide() + .concatMapDelayError(Functions.justFunction(Observable.empty()), 2, true, ImmediateThinScheduler.INSTANCE) + .test() + .assertResult(); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeObservable(new Function, ObservableSource>() { + @Override + public ObservableSource apply(Observable f) throws Exception { + return f.concatMap(Functions.justFunction(Observable.just(2)), 2, ImmediateThinScheduler.INSTANCE); + } + }); + TestHelper.checkDoubleOnSubscribeObservable(new Function, ObservableSource>() { + @Override + public ObservableSource apply(Observable f) throws Exception { + return f.concatMapDelayError(Functions.justFunction(Observable.just(2)), 2, true, ImmediateThinScheduler.INSTANCE); + } + }); + } + + @Test + public void immediateInnerNextOuterError() { + final PublishSubject ps = PublishSubject.create(); + + final TestObserverEx to = new TestObserverEx() { + @Override + public void onNext(Integer t) { + super.onNext(t); + if (t == 1) { + ps.onError(new TestException("First")); + } + } + }; + + ps.concatMap(Functions.justFunction(Observable.just(1)), 2, ImmediateThinScheduler.INSTANCE) + .subscribe(to); + + ps.onNext(1); + + assertFalse(ps.hasObservers()); + + to.assertFailureAndMessage(TestException.class, "First", 1); + } + + @Test + public void immediateInnerNextOuterError2() { + final PublishSubject ps = PublishSubject.create(); + + final TestObserverEx to = new TestObserverEx() { + @Override + public void onNext(Integer t) { + super.onNext(t); + if (t == 1) { + ps.onError(new TestException("First")); + } + } + }; + + ps.concatMap(Functions.justFunction(Observable.just(1).hide()), 2, ImmediateThinScheduler.INSTANCE) + .subscribe(to); + + ps.onNext(1); + + assertFalse(ps.hasObservers()); + + to.assertFailureAndMessage(TestException.class, "First", 1); + } + + @Test + public void concatMapInnerError() { + Observable.just(1).hide() + .concatMap(Functions.justFunction(Observable.error(new TestException())), 2, ImmediateThinScheduler.INSTANCE) + .test() + .assertFailure(TestException.class); + } + + @Test + public void concatMapInnerErrorDelayError() { + Observable.just(1).hide() + .concatMapDelayError(Functions.justFunction(Observable.error(new TestException())), 2, true, ImmediateThinScheduler.INSTANCE) + .test() + .assertFailure(TestException.class); + } + + @Test + public void badSource() { + TestHelper.checkBadSourceObservable(new Function, Object>() { + @Override + public Object apply(Observable f) throws Exception { + return f.concatMap(Functions.justFunction(Observable.just(1).hide()), 2, ImmediateThinScheduler.INSTANCE); + } + }, true, 1, 1, 1); + } + + @Test + public void badInnerSource() { + @SuppressWarnings("rawtypes") + final Observer[] ts0 = { null }; + TestObserverEx to = Observable.just(1).hide().concatMap(Functions.justFunction(new Observable() { + @Override + protected void subscribeActual(Observer o) { + ts0[0] = o; + o.onSubscribe(Disposables.empty()); + o.onError(new TestException("First")); + } + }), 2, ImmediateThinScheduler.INSTANCE) + .to(TestHelper.testConsumer()); + + to.assertFailureAndMessage(TestException.class, "First"); + + List errors = TestHelper.trackPluginErrors(); + try { + ts0[0].onError(new TestException("Second")); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void badInnerSourceDelayError() { + @SuppressWarnings("rawtypes") + final Observer[] ts0 = { null }; + TestObserverEx to = Observable.just(1).hide().concatMapDelayError(Functions.justFunction(new Observable() { + @Override + protected void subscribeActual(Observer o) { + ts0[0] = o; + o.onSubscribe(Disposables.empty()); + o.onError(new TestException("First")); + } + }), 2, true, ImmediateThinScheduler.INSTANCE) + .to(TestHelper.testConsumer()); + + to.assertFailureAndMessage(TestException.class, "First"); + + List errors = TestHelper.trackPluginErrors(); + try { + ts0[0].onError(new TestException("Second")); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void badSourceDelayError() { + TestHelper.checkBadSourceObservable(new Function, Object>() { + @Override + public Object apply(Observable f) throws Exception { + return f.concatMapDelayError(Functions.justFunction(Observable.just(1).hide()), 2, true, ImmediateThinScheduler.INSTANCE); + } + }, true, 1, 1, 1); + } + + @Test + public void fusedCrash() { + Observable.range(1, 2) + .map(new Function() { + @Override + public Object apply(Integer v) throws Exception { throw new TestException(); } + }) + .concatMap(Functions.justFunction(Observable.just(1)), 2, ImmediateThinScheduler.INSTANCE) + .test() + .assertFailure(TestException.class); + } + + @Test + public void fusedCrashDelayError() { + Observable.range(1, 2) + .map(new Function() { + @Override + public Object apply(Integer v) throws Exception { throw new TestException(); } + }) + .concatMapDelayError(Functions.justFunction(Observable.just(1)), 2, true, ImmediateThinScheduler.INSTANCE) + .test() + .assertFailure(TestException.class); + } + + @Test + public void callableCrash() { + Observable.just(1).hide() + .concatMap(Functions.justFunction(Observable.fromCallable(new Callable() { + @Override + public Object call() throws Exception { + throw new TestException(); + } + })), 2, ImmediateThinScheduler.INSTANCE) + .test() + .assertFailure(TestException.class); + } + + @Test + public void callableCrashDelayError() { + Observable.just(1).hide() + .concatMapDelayError(Functions.justFunction(Observable.fromCallable(new Callable() { + @Override + public Object call() throws Exception { + throw new TestException(); + } + })), 2, true, ImmediateThinScheduler.INSTANCE) + .test() + .assertFailure(TestException.class); + } + + @Test + public void dispose() { + TestHelper.checkDisposed(Observable.range(1, 2) + .concatMap(Functions.justFunction(Observable.just(1)), 2, ImmediateThinScheduler.INSTANCE)); + + TestHelper.checkDisposed(Observable.range(1, 2) + .concatMapDelayError(Functions.justFunction(Observable.just(1)), 2, true, ImmediateThinScheduler.INSTANCE)); + } + + @Test + public void notVeryEnd() { + Observable.range(1, 2) + .concatMapDelayError(Functions.justFunction(Observable.error(new TestException())), 16, false, ImmediateThinScheduler.INSTANCE) + .test() + .assertFailure(TestException.class); + } + + @Test + public void error() { + Observable.error(new TestException()) + .concatMapDelayError(Functions.justFunction(Observable.just(2)), 16, false, ImmediateThinScheduler.INSTANCE) + .test() + .assertFailure(TestException.class); + } + + @Test + public void mapperThrows() { + Observable.range(1, 2) + .concatMap(new Function>() { + @Override + public ObservableSource apply(Integer v) throws Exception { + throw new TestException(); + } + }, 2, ImmediateThinScheduler.INSTANCE) + .test() + .assertFailure(TestException.class); + } + + @Test + public void mainErrors() { + PublishSubject source = PublishSubject.create(); + + TestObserver to = TestObserver.create(); + + source.concatMapDelayError(new Function>() { + @Override + public Observable apply(Integer v) { + return Observable.range(v, 2); + } + }, 2, true, ImmediateThinScheduler.INSTANCE).subscribe(to); + + source.onNext(1); + source.onNext(2); + source.onError(new TestException()); + + to.assertValues(1, 2, 2, 3); + to.assertError(TestException.class); + to.assertNotComplete(); + } + + @Test + public void innerErrors() { + final Observable inner = Observable.range(1, 2) + .concatWith(Observable.error(new TestException())); + + TestObserver to = TestObserver.create(); + + Observable.range(1, 3).concatMapDelayError(new Function>() { + @Override + public Observable apply(Integer v) { + return inner; + } + }, 2, true, ImmediateThinScheduler.INSTANCE).subscribe(to); + + to.assertValues(1, 2, 1, 2, 1, 2); + to.assertError(CompositeException.class); + to.assertNotComplete(); + } + + @Test + public void singleInnerErrors() { + final Observable inner = Observable.range(1, 2).concatWith(Observable.error(new TestException())); + + TestObserver to = TestObserver.create(); + + Observable.just(1) + .hide() // prevent scalar optimization + .concatMapDelayError(new Function>() { + @Override + public Observable apply(Integer v) { + return inner; + } + }, 2, true, ImmediateThinScheduler.INSTANCE).subscribe(to); + + to.assertValues(1, 2); + to.assertError(TestException.class); + to.assertNotComplete(); + } + + @Test + public void innerNull() { + TestObserver to = TestObserver.create(); + + Observable.just(1) + .hide() // prevent scalar optimization + .concatMapDelayError(new Function>() { + @Override + public Observable apply(Integer v) { + return null; + } + }, 2, true, ImmediateThinScheduler.INSTANCE).subscribe(to); + + to.assertNoValues(); + to.assertError(NullPointerException.class); + to.assertNotComplete(); + } + + @Test + public void innerThrows() { + TestObserver to = TestObserver.create(); + + Observable.just(1) + .hide() // prevent scalar optimization + .concatMapDelayError(new Function>() { + @Override + public Observable apply(Integer v) { + throw new TestException(); + } + }, 2, true, ImmediateThinScheduler.INSTANCE).subscribe(to); + + to.assertNoValues(); + to.assertError(TestException.class); + to.assertNotComplete(); + } + + @Test + public void innerWithEmpty() { + TestObserver to = TestObserver.create(); + + Observable.range(1, 3) + .concatMapDelayError(new Function>() { + @Override + public Observable apply(Integer v) { + return v == 2 ? Observable.empty() : Observable.range(1, 2); + } + }, 2, true, ImmediateThinScheduler.INSTANCE).subscribe(to); + + to.assertValues(1, 2, 1, 2); + to.assertNoErrors(); + to.assertComplete(); + } + + @Test + public void innerWithScalar() { + TestObserver to = TestObserver.create(); + + Observable.range(1, 3) + .concatMapDelayError(new Function>() { + @Override + public Observable apply(Integer v) { + return v == 2 ? Observable.just(3) : Observable.range(1, 2); + } + }, 2, true, ImmediateThinScheduler.INSTANCE).subscribe(to); + + to.assertValues(1, 2, 3, 1, 2); + to.assertNoErrors(); + to.assertComplete(); + } + + @Test + public void mapperScheduledLong() { + TestObserver to = Observable.range(1, 1000) + .hide() + .observeOn(Schedulers.computation()) + .concatMap(new Function>() { + @Override + public Observable apply(Integer t) throws Throwable { + return Observable.just(Thread.currentThread().getName()) + .repeat(1000) + .observeOn(Schedulers.io()); + } + }, 2, Schedulers.single()) + .distinct() + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertValueCount(1) + .assertNoErrors() + .assertComplete(); + + assertTrue(to.values().toString(), to.values().get(0).startsWith("RxSingleScheduler-")); + } + + @Test + public void mapperDelayErrorScheduledLong() { + TestObserver to = Observable.range(1, 1000) + .hide() + .observeOn(Schedulers.computation()) + .concatMapDelayError(new Function>() { + @Override + public Observable apply(Integer t) throws Throwable { + return Observable.just(Thread.currentThread().getName()) + .repeat(1000) + .observeOn(Schedulers.io()); + } + }, 2, false, Schedulers.single()) + .distinct() + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertValueCount(1) + .assertNoErrors() + .assertComplete(); + + assertTrue(to.values().toString(), to.values().get(0).startsWith("RxSingleScheduler-")); + } + + @Test + public void mapperDelayError2ScheduledLong() { + TestObserver to = Observable.range(1, 1000) + .hide() + .observeOn(Schedulers.computation()) + .concatMapDelayError(new Function>() { + @Override + public Observable apply(Integer t) throws Throwable { + return Observable.just(Thread.currentThread().getName()) + .repeat(1000) + .observeOn(Schedulers.io()); + } + }, 2, true, Schedulers.single()) + .distinct() + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertValueCount(1) + .assertNoErrors() + .assertComplete(); + + assertTrue(to.values().toString(), to.values().get(0).startsWith("RxSingleScheduler-")); + } +}