diff --git a/src/main/java/io/reactivex/rxjava3/core/Maybe.java b/src/main/java/io/reactivex/rxjava3/core/Maybe.java
index f8cbdcdbe7..20e52f2f5f 100644
--- a/src/main/java/io/reactivex/rxjava3/core/Maybe.java
+++ b/src/main/java/io/reactivex/rxjava3/core/Maybe.java
@@ -3653,7 +3653,7 @@ public final <R> Maybe<R> flatMap(@NonNull Function<? super T, ? extends MaybeSo
-     * Maps the {@code onSuccess}, {@code onError} or {@code onComplete} signals of this {@code Maybe} into {@link MaybeSource} and emits that
+     * Maps the {@code onSuccess}, {@code onError} or {@code onComplete} signals of the current {@code Maybe} into a {@link MaybeSource} and emits that
      * {@code MaybeSource}'s signals.
      * <p>
      * <img width="640" height="354" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.flatMap.mmm.png" alt="">
@@ -3691,7 +3691,7 @@ public final <R> Maybe<R> flatMap(
      * Returns a {@code Maybe} that emits the results of a specified function to the pair of values emitted by the
      * current {@code Maybe} and a specified mapped {@link MaybeSource}.
      * <p>
-     * <img width="640" height="390" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/mergeMap.r.png" alt="">
+     * <img width="640" height="268" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.flatMap.combiner.png" alt="">
      * <dl>
      *  <dt><b>Scheduler:</b></dt>
      *  <dd>{@code flatMap} does not operate by default on a particular {@link Scheduler}.</dd>
diff --git a/src/main/java/io/reactivex/rxjava3/core/Single.java b/src/main/java/io/reactivex/rxjava3/core/Single.java
index cab4d3c073..667125b08b 100644
--- a/src/main/java/io/reactivex/rxjava3/core/Single.java
+++ b/src/main/java/io/reactivex/rxjava3/core/Single.java
@@ -3196,6 +3196,72 @@ public final <R> Single<R> flatMap(@NonNull Function<? super T, ? extends Single
         return RxJavaPlugins.onAssembly(new SingleFlatMap<>(this, mapper));
+    /**
+     * Returns a {@code Single} that emits the results of a specified function to the pair of values emitted by the
+     * current {@code Single} and a specified mapped {@link SingleSource}.
+     * <p>
+     * <img width="640" height="268" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.flatMap.combiner.png" alt="">
+     * <dl>
+     *  <dt><b>Scheduler:</b></dt>
+     *  <dd>{@code flatMap} does not operate by default on a particular {@link Scheduler}.</dd>
+     * </dl>
+     *
+     * @param <U>
+     *            the type of items emitted by the {@code SingleSource} returned by the {@code mapper} function
+     * @param <R>
+     *            the type of items emitted by the resulting {@code Single}
+     * @param mapper
+     *            a function that returns a {@code SingleSource} for the item emitted by the current {@code Single}
+     * @param combiner
+     *            a function that combines one item emitted by each of the source and collection {@code SingleSource} and
+     *            returns an item to be emitted by the resulting {@code SingleSource}
+     * @return the new {@code Single} instance
+     * @throws NullPointerException if {@code mapper} or {@code combiner} is {@code null}
+     * @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
+     * @since 3.0.0
+     */
+    @CheckReturnValue
+    @NonNull
+    @SchedulerSupport(SchedulerSupport.NONE)
+    public final <U, R> Single<R> flatMap(@NonNull Function<? super T, ? extends SingleSource<? extends U>> mapper,
+            @NonNull BiFunction<? super T, ? super U, ? extends R> combiner) {
+        Objects.requireNonNull(mapper, "mapper is null");
+        Objects.requireNonNull(combiner, "combiner is null");
+        return RxJavaPlugins.onAssembly(new SingleFlatMapBiSelector<>(this, mapper, combiner));
+    }
+    /**
+     * Maps the {@code onSuccess} or {@code onError} signals of the current {@code Single} into a {@link SingleSource} and emits that
+     * {@code SingleSource}'s signals.
+     * <p>
+     * <img width="640" height="449" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.flatMap.notification.png" alt="">
+     * <dl>
+     *  <dt><b>Scheduler:</b></dt>
+     *  <dd>{@code flatMap} does not operate by default on a particular {@link Scheduler}.</dd>
+     * </dl>
+     *
+     * @param <R>
+     *            the result type
+     * @param onSuccessMapper
+     *            a function that returns a {@code SingleSource} to merge for the {@code onSuccess} item emitted by this {@code Single}
+     * @param onErrorMapper
+     *            a function that returns a {@code SingleSource} to merge for an {@code onError} notification from this {@code Single}
+     * @return the new {@code Single} instance
+     * @throws NullPointerException if {@code onSuccessMapper} or {@code onErrorMapper} is {@code null}
+     * @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
+     * @since 3.0.0
+     */
+    @CheckReturnValue
+    @NonNull
+    @SchedulerSupport(SchedulerSupport.NONE)
+    public final <R> Single<R> flatMap(
+            @NonNull Function<? super T, ? extends SingleSource<? extends R>> onSuccessMapper,
+            @NonNull Function<? super Throwable, ? extends SingleSource<? extends R>> onErrorMapper) {
+        Objects.requireNonNull(onSuccessMapper, "onSuccessMapper is null");
+        Objects.requireNonNull(onErrorMapper, "onErrorMapper is null");
+        return RxJavaPlugins.onAssembly(new SingleFlatMapNotification<>(this, onSuccessMapper, onErrorMapper));
+    }
      * Returns a {@link Maybe} that is based on applying a specified function to the item emitted by the current {@code Single},
      * where that function returns a {@link MaybeSource}.
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFlatMapNotification.java b/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFlatMapNotification.java
index e357938db2..ffe6ee5e20 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFlatMapNotification.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFlatMapNotification.java
@@ -109,7 +109,9 @@ public void onSuccess(T value) {
-            source.subscribe(new InnerObserver());
+            if (!isDisposed()) {
+                source.subscribe(new InnerObserver());
+            }
@@ -124,7 +126,9 @@ public void onError(Throwable e) {
-            source.subscribe(new InnerObserver());
+            if (!isDisposed()) {
+                source.subscribe(new InnerObserver());
+            }
@@ -139,7 +143,9 @@ public void onComplete() {
-            source.subscribe(new InnerObserver());
+            if (!isDisposed()) {
+                source.subscribe(new InnerObserver());
+            }
         final class InnerObserver implements MaybeObserver<R> {
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFlatMapSingle.java b/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFlatMapSingle.java
index aed71dc855..999b22f836 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFlatMapSingle.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFlatMapSingle.java
@@ -89,7 +89,9 @@ public void onSuccess(T value) {
-            ss.subscribe(new FlatMapSingleObserver<R>(this, downstream));
+            if (!isDisposed()) {
+                ss.subscribe(new FlatMapSingleObserver<R>(this, downstream));
+            }
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/MaybeFlatMapObservable.java b/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/MaybeFlatMapObservable.java
index ae42405f74..ef1dc5f023 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/MaybeFlatMapObservable.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/MaybeFlatMapObservable.java
@@ -106,7 +106,9 @@ public void onSuccess(T t) {
-            o.subscribe(this);
+            if (!isDisposed()) {
+                o.subscribe(this);
+            }
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/MaybeFlatMapPublisher.java b/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/MaybeFlatMapPublisher.java
index c590755704..82a6d5da7d 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/MaybeFlatMapPublisher.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/MaybeFlatMapPublisher.java
@@ -116,7 +116,9 @@ public void onSuccess(T t) {
-            p.subscribe(this);
+            if (get() != SubscriptionHelper.CANCELLED) {
+                p.subscribe(this);
+            }
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/SingleFlatMapObservable.java b/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/SingleFlatMapObservable.java
index c3dd9b59c1..b2562d8dba 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/SingleFlatMapObservable.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/SingleFlatMapObservable.java
@@ -106,7 +106,9 @@ public void onSuccess(T t) {
-            o.subscribe(this);
+            if (!isDisposed()) {
+                o.subscribe(this);
+            }
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleFlatMapBiSelector.java b/src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleFlatMapBiSelector.java
new file mode 100644
index 0000000000..d1051f3357
--- /dev/null
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleFlatMapBiSelector.java
@@ -0,0 +1,156 @@
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+package io.reactivex.rxjava3.internal.operators.single;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
+import io.reactivex.rxjava3.core.*;
+import io.reactivex.rxjava3.disposables.Disposable;
+import io.reactivex.rxjava3.exceptions.Exceptions;
+import io.reactivex.rxjava3.functions.*;
+import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
+ * Maps a source item to another SingleSource then calls a BiFunction with the
+ * original item and the secondary item to generate the final result.
+ *
+ * @param <T> the main value type
+ * @param <U> the second value type
+ * @param <R> the result value type
+ * @since 3.0.0
+ */
+public final class SingleFlatMapBiSelector<T, U, R> extends Single<R> {
+    final SingleSource<T> source;
+    final Function<? super T, ? extends SingleSource<? extends U>> mapper;
+    final BiFunction<? super T, ? super U, ? extends R> resultSelector;
+    public SingleFlatMapBiSelector(SingleSource<T> source,
+            Function<? super T, ? extends SingleSource<? extends U>> mapper,
+            BiFunction<? super T, ? super U, ? extends R> resultSelector) {
+        this.source = source;
+        this.mapper = mapper;
+        this.resultSelector = resultSelector;
+    }
+    @Override
+    protected void subscribeActual(SingleObserver<? super R> observer) {
+        source.subscribe(new FlatMapBiMainObserver<T, U, R>(observer, mapper, resultSelector));
+    }
+    static final class FlatMapBiMainObserver<T, U, R>
+    implements SingleObserver<T>, Disposable {
+        final Function<? super T, ? extends SingleSource<? extends U>> mapper;
+        final InnerObserver<T, U, R> inner;
+        FlatMapBiMainObserver(SingleObserver<? super R> actual,
+                Function<? super T, ? extends SingleSource<? extends U>> mapper,
+                BiFunction<? super T, ? super U, ? extends R> resultSelector) {
+            this.inner = new InnerObserver<>(actual, resultSelector);
+            this.mapper = mapper;
+        }
+        @Override
+        public void dispose() {
+            DisposableHelper.dispose(inner);
+        }
+        @Override
+        public boolean isDisposed() {
+            return DisposableHelper.isDisposed(inner.get());
+        }
+        @Override
+        public void onSubscribe(Disposable d) {
+            if (DisposableHelper.setOnce(inner, d)) {
+                inner.downstream.onSubscribe(this);
+            }
+        }
+        @Override
+        public void onSuccess(T value) {
+            SingleSource<? extends U> next;
+            try {
+                next = Objects.requireNonNull(mapper.apply(value), "The mapper returned a null MaybeSource");
+            } catch (Throwable ex) {
+                Exceptions.throwIfFatal(ex);
+                inner.downstream.onError(ex);
+                return;
+            }
+            if (DisposableHelper.replace(inner, null)) {
+                inner.value = value;
+                next.subscribe(inner);
+            }
+        }
+        @Override
+        public void onError(Throwable e) {
+            inner.downstream.onError(e);
+        }
+        static final class InnerObserver<T, U, R>
+        extends AtomicReference<Disposable>
+        implements SingleObserver<U> {
+            private static final long serialVersionUID = -2897979525538174559L;
+            final SingleObserver<? super R> downstream;
+            final BiFunction<? super T, ? super U, ? extends R> resultSelector;
+            T value;
+            InnerObserver(SingleObserver<? super R> actual,
+                    BiFunction<? super T, ? super U, ? extends R> resultSelector) {
+                this.downstream = actual;
+                this.resultSelector = resultSelector;
+            }
+            @Override
+            public void onSubscribe(Disposable d) {
+                DisposableHelper.setOnce(this, d);
+            }
+            @Override
+            public void onSuccess(U value) {
+                T t = this.value;
+                this.value = null;
+                R r;
+                try {
+                    r = Objects.requireNonNull(resultSelector.apply(t, value), "The resultSelector returned a null value");
+                } catch (Throwable ex) {
+                    Exceptions.throwIfFatal(ex);
+                    downstream.onError(ex);
+                    return;
+                }
+                downstream.onSuccess(r);
+            }
+            @Override
+            public void onError(Throwable e) {
+                downstream.onError(e);
+            }
+        }
+    }
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleFlatMapNotification.java b/src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleFlatMapNotification.java
new file mode 100644
index 0000000000..1ba9796b7c
--- /dev/null
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleFlatMapNotification.java
@@ -0,0 +1,147 @@
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+package io.reactivex.rxjava3.internal.operators.single;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
+import io.reactivex.rxjava3.core.*;
+import io.reactivex.rxjava3.disposables.Disposable;
+import io.reactivex.rxjava3.exceptions.*;
+import io.reactivex.rxjava3.functions.*;
+import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
+ * Maps a value into a SingleSource and relays its signal.
+ *
+ * @param <T> the source value type
+ * @param <R> the result value type
+ * @since 3.0.0
+ */
+public final class SingleFlatMapNotification<T, R> extends Single<R> {
+    final SingleSource<T> source;
+    final Function<? super T, ? extends SingleSource<? extends R>> onSuccessMapper;
+    final Function<? super Throwable, ? extends SingleSource<? extends R>> onErrorMapper;
+    public SingleFlatMapNotification(SingleSource<T> source,
+            Function<? super T, ? extends SingleSource<? extends R>> onSuccessMapper,
+            Function<? super Throwable, ? extends SingleSource<? extends R>> onErrorMapper) {
+        this.source = source;
+        this.onSuccessMapper = onSuccessMapper;
+        this.onErrorMapper = onErrorMapper;
+    }
+    @Override
+    protected void subscribeActual(SingleObserver<? super R> observer) {
+        source.subscribe(new FlatMapSingleObserver<>(observer, onSuccessMapper, onErrorMapper));
+    }
+    static final class FlatMapSingleObserver<T, R>
+    extends AtomicReference<Disposable>
+    implements SingleObserver<T>, Disposable {
+        private static final long serialVersionUID = 4375739915521278546L;
+        final SingleObserver<? super R> downstream;
+        final Function<? super T, ? extends SingleSource<? extends R>> onSuccessMapper;
+        final Function<? super Throwable, ? extends SingleSource<? extends R>> onErrorMapper;
+        Disposable upstream;
+        FlatMapSingleObserver(SingleObserver<? super R> actual,
+                Function<? super T, ? extends SingleSource<? extends R>> onSuccessMapper,
+                Function<? super Throwable, ? extends SingleSource<? extends R>> onErrorMapper) {
+            this.downstream = actual;
+            this.onSuccessMapper = onSuccessMapper;
+            this.onErrorMapper = onErrorMapper;
+        }
+        @Override
+        public void dispose() {
+            DisposableHelper.dispose(this);
+            upstream.dispose();
+        }
+        @Override
+        public boolean isDisposed() {
+            return DisposableHelper.isDisposed(get());
+        }
+        @Override
+        public void onSubscribe(Disposable d) {
+            if (DisposableHelper.validate(this.upstream, d)) {
+                this.upstream = d;
+                downstream.onSubscribe(this);
+            }
+        }
+        @Override
+        public void onSuccess(T value) {
+            SingleSource<? extends R> source;
+            try {
+                source = Objects.requireNonNull(onSuccessMapper.apply(value), "The onSuccessMapper returned a null SingleSource");
+            } catch (Throwable ex) {
+                Exceptions.throwIfFatal(ex);
+                downstream.onError(ex);
+                return;
+            }
+            if (!isDisposed()) {
+                source.subscribe(new InnerObserver());
+            }
+        }
+        @Override
+        public void onError(Throwable e) {
+            SingleSource<? extends R> source;
+            try {
+                source = Objects.requireNonNull(onErrorMapper.apply(e), "The onErrorMapper returned a null SingleSource");
+            } catch (Throwable ex) {
+                Exceptions.throwIfFatal(ex);
+                downstream.onError(new CompositeException(e, ex));
+                return;
+            }
+            if (!isDisposed()) {
+                source.subscribe(new InnerObserver());
+            }
+        }
+        final class InnerObserver implements SingleObserver<R> {
+            @Override
+            public void onSubscribe(Disposable d) {
+                DisposableHelper.setOnce(FlatMapSingleObserver.this, d);
+            }
+            @Override
+            public void onSuccess(R value) {
+                downstream.onSuccess(value);
+            }
+            @Override
+            public void onError(Throwable e) {
+                downstream.onError(e);
+            }
+        }
+    }
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleFlatMapPublisher.java b/src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleFlatMapPublisher.java
index f474830e40..c89913791d 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleFlatMapPublisher.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleFlatMapPublisher.java
@@ -92,7 +92,9 @@ public void onSuccess(S value) {
-            f.subscribe(this);
+            if (parent.get() != SubscriptionHelper.CANCELLED) {
+                f.subscribe(this);
+            }
diff --git a/src/test/java/io/reactivex/rxjava3/core/XFlatMapTest.java b/src/test/java/io/reactivex/rxjava3/core/XFlatMapTest.java
index 6e80b57904..06394de7ab 100644
--- a/src/test/java/io/reactivex/rxjava3/core/XFlatMapTest.java
+++ b/src/test/java/io/reactivex/rxjava3/core/XFlatMapTest.java
@@ -22,7 +22,7 @@
 import org.reactivestreams.Publisher;
 import io.reactivex.rxjava3.exceptions.TestException;
-import io.reactivex.rxjava3.functions.Function;
+import io.reactivex.rxjava3.functions.*;
 import io.reactivex.rxjava3.observers.TestObserver;
 import io.reactivex.rxjava3.plugins.RxJavaPlugins;
 import io.reactivex.rxjava3.schedulers.Schedulers;
@@ -224,7 +224,7 @@ public Completable apply(Integer v) throws Exception {
-    public void observableFlowable() throws Exception {
+    public void observableObservable() throws Exception {
         List<Throwable> errors = TestHelper.trackPluginErrors();
         try {
             TestObserver<Integer> to = Observable.just(1)
@@ -505,7 +505,179 @@ public Completable apply(Integer v) throws Exception {
-    @Ignore
+    public void singlePublisher() throws Exception {
+        List<Throwable> errors = TestHelper.trackPluginErrors();
+        try {
+            TestSubscriber<Integer> ts = Single.just(1)
+            .subscribeOn(Schedulers.io())
+            .flatMapPublisher(new Function<Integer, Publisher<Integer>>() {
+                @Override
+                public Publisher<Integer> apply(Integer v) throws Exception {
+                    sleep();
+                    return Flowable.<Integer>error(new TestException());
+                }
+            })
+            .test();
+            cb.await();
+            beforeCancelSleep(ts);
+            ts.cancel();
+            Thread.sleep(SLEEP_AFTER_CANCEL);
+            ts.assertEmpty();
+            assertTrue(errors.toString(), errors.isEmpty());
+        } finally {
+            RxJavaPlugins.reset();
+        }
+    }
+    @Test
+    public void singleCombiner() throws Exception {
+        List<Throwable> errors = TestHelper.trackPluginErrors();
+        try {
+            TestObserver<Integer> to = Single.just(1)
+            .subscribeOn(Schedulers.io())
+            .flatMap(new Function<Integer, Single<Integer>>() {
+                @Override
+                public Single<Integer> apply(Integer v) throws Exception {
+                    sleep();
+                    return Single.<Integer>error(new TestException());
+                }
+            }, (a, b) -> a + b)
+            .test();
+            cb.await();
+            beforeCancelSleep(to);
+            to.dispose();
+            Thread.sleep(SLEEP_AFTER_CANCEL);
+            to.assertEmpty();
+            assertTrue(errors.toString(), errors.isEmpty());
+        } finally {
+            RxJavaPlugins.reset();
+        }
+    }
+    @Test
+    public void singleObservable() throws Exception {
+        List<Throwable> errors = TestHelper.trackPluginErrors();
+        try {
+            TestObserver<Integer> to = Single.just(1)
+            .subscribeOn(Schedulers.io())
+            .flatMapObservable(new Function<Integer, Observable<Integer>>() {
+                @Override
+                public Observable<Integer> apply(Integer v) throws Exception {
+                    sleep();
+                    return Observable.<Integer>error(new TestException());
+                }
+            })
+            .test();
+            cb.await();
+            beforeCancelSleep(to);
+            to.dispose();
+            Thread.sleep(SLEEP_AFTER_CANCEL);
+            to.assertEmpty();
+            assertTrue(errors.toString(), errors.isEmpty());
+        } finally {
+            RxJavaPlugins.reset();
+        }
+    }
+    @Test
+    public void singleNotificationSuccess() throws Exception {
+        List<Throwable> errors = TestHelper.trackPluginErrors();
+        try {
+            TestObserver<Integer> to = Single.just(1)
+            .subscribeOn(Schedulers.io())
+            .flatMap(
+                new Function<Integer, Single<Integer>>() {
+                    @Override
+                    public Single<Integer> apply(Integer v) throws Exception {
+                        sleep();
+                        return Single.<Integer>error(new TestException());
+                    }
+                },
+                new Function<Throwable, Single<Integer>>() {
+                    @Override
+                    public Single<Integer> apply(Throwable v) throws Exception {
+                        sleep();
+                        return Single.<Integer>error(new TestException());
+                    }
+                }
+            )
+            .test();
+            cb.await();
+            beforeCancelSleep(to);
+            to.dispose();
+            Thread.sleep(SLEEP_AFTER_CANCEL);
+            to.assertEmpty();
+            assertTrue(errors.toString(), errors.isEmpty());
+        } finally {
+            RxJavaPlugins.reset();
+        }
+    }
+    @Test
+    public void singleNotificationError() throws Exception {
+        List<Throwable> errors = TestHelper.trackPluginErrors();
+        try {
+            TestObserver<Integer> to = Single.<Integer>error(new TestException())
+            .subscribeOn(Schedulers.io())
+            .flatMap(
+                new Function<Integer, Single<Integer>>() {
+                    @Override
+                    public Single<Integer> apply(Integer v) throws Exception {
+                        sleep();
+                        return Single.<Integer>error(new TestException());
+                    }
+                },
+                new Function<Throwable, Single<Integer>>() {
+                    @Override
+                    public Single<Integer> apply(Throwable v) throws Exception {
+                        sleep();
+                        return Single.<Integer>error(new TestException());
+                    }
+                }
+            )
+            .test();
+            cb.await();
+            beforeCancelSleep(to);
+            to.dispose();
+            Thread.sleep(SLEEP_AFTER_CANCEL);
+            to.assertEmpty();
+            assertTrue(errors.toString(), errors.isEmpty());
+        } finally {
+            RxJavaPlugins.reset();
+        }
+    }
+    @Test
     public void maybeSingle() throws Exception {
         List<Throwable> errors = TestHelper.trackPluginErrors();
         try {
@@ -537,6 +709,37 @@ public Single<Integer> apply(Integer v) throws Exception {
+    @Test
+    public void maybeSingle2() throws Exception {
+        List<Throwable> errors = TestHelper.trackPluginErrors();
+        try {
+            TestObserver<Integer> to = Maybe.just(1)
+            .subscribeOn(Schedulers.io())
+            .flatMapSingle(new Function<Integer, Single<Integer>>() {
+                @Override
+                public Single<Integer> apply(Integer v) throws Exception {
+                    sleep();
+                    return Single.<Integer>error(new TestException());
+                }
+            })
+            .test();
+            cb.await();
+            beforeCancelSleep(to);
+            to.dispose();
+            Thread.sleep(SLEEP_AFTER_CANCEL);
+            to.assertEmpty();
+            assertTrue(errors.toString(), errors.isEmpty());
+        } finally {
+            RxJavaPlugins.reset();
+        }
+    }
     public void maybeMaybe() throws Exception {
         List<Throwable> errors = TestHelper.trackPluginErrors();
@@ -568,6 +771,240 @@ public Maybe<Integer> apply(Integer v) throws Exception {
+    @Test
+    public void maybePublisher() throws Exception {
+        List<Throwable> errors = TestHelper.trackPluginErrors();
+        try {
+            TestSubscriber<Integer> ts = Maybe.just(1)
+            .subscribeOn(Schedulers.io())
+            .flatMapPublisher(new Function<Integer, Publisher<Integer>>() {
+                @Override
+                public Publisher<Integer> apply(Integer v) throws Exception {
+                    sleep();
+                    return Flowable.<Integer>error(new TestException());
+                }
+            })
+            .test();
+            cb.await();
+            beforeCancelSleep(ts);
+            ts.cancel();
+            Thread.sleep(SLEEP_AFTER_CANCEL);
+            ts.assertEmpty();
+            assertTrue(errors.toString(), errors.isEmpty());
+        } finally {
+            RxJavaPlugins.reset();
+        }
+    }
+    @Test
+    public void maybeObservable() throws Exception {
+        List<Throwable> errors = TestHelper.trackPluginErrors();
+        try {
+            TestObserver<Integer> to = Maybe.just(1)
+            .subscribeOn(Schedulers.io())
+            .flatMapObservable(new Function<Integer, Observable<Integer>>() {
+                @Override
+                public Observable<Integer> apply(Integer v) throws Exception {
+                    sleep();
+                    return Observable.<Integer>error(new TestException());
+                }
+            })
+            .test();
+            cb.await();
+            beforeCancelSleep(to);
+            to.dispose();
+            Thread.sleep(SLEEP_AFTER_CANCEL);
+            to.assertEmpty();
+            assertTrue(errors.toString(), errors.isEmpty());
+        } finally {
+            RxJavaPlugins.reset();
+        }
+    }
+    @Test
+    public void maybeNotificationSuccess() throws Exception {
+        List<Throwable> errors = TestHelper.trackPluginErrors();
+        try {
+            TestObserver<Integer> to = Maybe.just(1)
+            .subscribeOn(Schedulers.io())
+            .flatMap(
+                new Function<Integer, Maybe<Integer>>() {
+                    @Override
+                    public Maybe<Integer> apply(Integer v) throws Exception {
+                        sleep();
+                        return Maybe.<Integer>error(new TestException());
+                    }
+                },
+                new Function<Throwable, Maybe<Integer>>() {
+                    @Override
+                    public Maybe<Integer> apply(Throwable v) throws Exception {
+                        sleep();
+                        return Maybe.<Integer>error(new TestException());
+                    }
+                },
+                new Supplier<Maybe<Integer>>() {
+                    @Override
+                    public Maybe<Integer> get() throws Exception {
+                        sleep();
+                        return Maybe.<Integer>error(new TestException());
+                    }
+                }
+            )
+            .test();
+            cb.await();
+            beforeCancelSleep(to);
+            to.dispose();
+            Thread.sleep(SLEEP_AFTER_CANCEL);
+            to.assertEmpty();
+            assertTrue(errors.toString(), errors.isEmpty());
+        } finally {
+            RxJavaPlugins.reset();
+        }
+    }
+    @Test
+    public void maybeNotificationError() throws Exception {
+        List<Throwable> errors = TestHelper.trackPluginErrors();
+        try {
+            TestObserver<Integer> to = Maybe.<Integer>error(new TestException())
+            .subscribeOn(Schedulers.io())
+            .flatMap(
+                new Function<Integer, Maybe<Integer>>() {
+                    @Override
+                    public Maybe<Integer> apply(Integer v) throws Exception {
+                        sleep();
+                        return Maybe.<Integer>error(new TestException());
+                    }
+                },
+                new Function<Throwable, Maybe<Integer>>() {
+                    @Override
+                    public Maybe<Integer> apply(Throwable v) throws Exception {
+                        sleep();
+                        return Maybe.<Integer>error(new TestException());
+                    }
+                },
+                new Supplier<Maybe<Integer>>() {
+                    @Override
+                    public Maybe<Integer> get() throws Exception {
+                        sleep();
+                        return Maybe.<Integer>error(new TestException());
+                    }
+                }
+            )
+            .test();
+            cb.await();
+            beforeCancelSleep(to);
+            to.dispose();
+            Thread.sleep(SLEEP_AFTER_CANCEL);
+            to.assertEmpty();
+            assertTrue(errors.toString(), errors.isEmpty());
+        } finally {
+            RxJavaPlugins.reset();
+        }
+    }
+    @Test
+    public void maybeNotificationEmpty() throws Exception {
+        List<Throwable> errors = TestHelper.trackPluginErrors();
+        try {
+            TestObserver<Integer> to = Maybe.<Integer>empty()
+            .subscribeOn(Schedulers.io())
+            .flatMap(
+                new Function<Integer, Maybe<Integer>>() {
+                    @Override
+                    public Maybe<Integer> apply(Integer v) throws Exception {
+                        sleep();
+                        return Maybe.<Integer>error(new TestException());
+                    }
+                },
+                new Function<Throwable, Maybe<Integer>>() {
+                    @Override
+                    public Maybe<Integer> apply(Throwable v) throws Exception {
+                        sleep();
+                        return Maybe.<Integer>error(new TestException());
+                    }
+                },
+                new Supplier<Maybe<Integer>>() {
+                    @Override
+                    public Maybe<Integer> get() throws Exception {
+                        sleep();
+                        return Maybe.<Integer>error(new TestException());
+                    }
+                }
+            )
+            .test();
+            cb.await();
+            beforeCancelSleep(to);
+            to.dispose();
+            Thread.sleep(SLEEP_AFTER_CANCEL);
+            to.assertEmpty();
+            assertTrue(errors.toString(), errors.isEmpty());
+        } finally {
+            RxJavaPlugins.reset();
+        }
+    }
+    @Test
+    public void maybeCombiner() throws Exception {
+        List<Throwable> errors = TestHelper.trackPluginErrors();
+        try {
+            TestObserver<Integer> to = Maybe.just(1)
+            .subscribeOn(Schedulers.io())
+            .flatMap(new Function<Integer, Maybe<Integer>>() {
+                @Override
+                public Maybe<Integer> apply(Integer v) throws Exception {
+                    sleep();
+                    return Maybe.<Integer>error(new TestException());
+                }
+            }, (a, b) -> a + b)
+            .test();
+            cb.await();
+            beforeCancelSleep(to);
+            to.dispose();
+            Thread.sleep(SLEEP_AFTER_CANCEL);
+            to.assertEmpty();
+            assertTrue(errors.toString(), errors.isEmpty());
+        } finally {
+            RxJavaPlugins.reset();
+        }
+    }
     public void maybeCompletable() throws Exception {
         List<Throwable> errors = TestHelper.trackPluginErrors();
diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleFlatMapBiSelectorTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleFlatMapBiSelectorTest.java
new file mode 100644
index 0000000000..84a2a650c4
--- /dev/null
+++ b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleFlatMapBiSelectorTest.java
@@ -0,0 +1,205 @@
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+package io.reactivex.rxjava3.internal.operators.single;
+import static org.junit.Assert.assertEquals;
+import org.junit.Test;
+import io.reactivex.rxjava3.core.*;
+import io.reactivex.rxjava3.exceptions.TestException;
+import io.reactivex.rxjava3.functions.*;
+import io.reactivex.rxjava3.observers.TestObserver;
+import io.reactivex.rxjava3.subjects.SingleSubject;
+import io.reactivex.rxjava3.testsupport.TestHelper;
+public class SingleFlatMapBiSelectorTest extends RxJavaTest {
+    BiFunction<Integer, Integer, String> stringCombine() {
+        return new BiFunction<Integer, Integer, String>() {
+            @Override
+            public String apply(Integer a, Integer b) throws Exception {
+                return a + ":" + b;
+            }
+        };
+    }
+    @Test
+    public void normal() {
+        Single.just(1)
+        .flatMap(new Function<Integer, SingleSource<Integer>>() {
+            @Override
+            public SingleSource<Integer> apply(Integer v) throws Exception {
+                return Single.just(2);
+            }
+        }, stringCombine())
+        .test()
+        .assertResult("1:2");
+    }
+    @Test
+    public void errorWithJust() {
+        final int[] call = { 0 };
+        Single.<Integer>error(new TestException())
+        .flatMap(new Function<Integer, SingleSource<Integer>>() {
+            @Override
+            public SingleSource<Integer> apply(Integer v) throws Exception {
+                call[0]++;
+                return Single.just(1);
+            }
+        }, stringCombine())
+        .test()
+        .assertFailure(TestException.class);
+        assertEquals(0, call[0]);
+    }
+    @Test
+    public void justWithError() {
+        final int[] call = { 0 };
+        Single.just(1)
+        .flatMap(new Function<Integer, SingleSource<Integer>>() {
+            @Override
+            public SingleSource<Integer> apply(Integer v) throws Exception {
+                call[0]++;
+                return Single.<Integer>error(new TestException());
+            }
+        }, stringCombine())
+        .test()
+        .assertFailure(TestException.class);
+        assertEquals(1, call[0]);
+    }
+    @Test
+    public void dispose() {
+        TestHelper.checkDisposed(SingleSubject.create()
+                .flatMap(new Function<Object, SingleSource<Integer>>() {
+            @Override
+            public SingleSource<Integer> apply(Object v) throws Exception {
+                return Single.just(1);
+            }
+        }, new BiFunction<Object, Integer, Object>() {
+            @Override
+            public Object apply(Object a, Integer b) throws Exception {
+                return b;
+            }
+        }));
+    }
+    @Test
+    public void doubleOnSubscribe() {
+        TestHelper.checkDoubleOnSubscribeSingle(new Function<Single<Object>, SingleSource<Object>>() {
+            @Override
+            public SingleSource<Object> apply(Single<Object> v) throws Exception {
+                return v.flatMap(new Function<Object, SingleSource<Integer>>() {
+                    @Override
+                    public SingleSource<Integer> apply(Object v) throws Exception {
+                        return Single.just(1);
+                    }
+                }, new BiFunction<Object, Integer, Object>() {
+                    @Override
+                    public Object apply(Object a, Integer b) throws Exception {
+                        return b;
+                    }
+                });
+            }
+        });
+    }
+    @Test
+    public void mapperThrows() {
+        Single.just(1)
+        .flatMap(new Function<Integer, SingleSource<Integer>>() {
+            @Override
+            public SingleSource<Integer> apply(Integer v) throws Exception {
+                throw new TestException();
+            }
+        }, stringCombine())
+        .test()
+        .assertFailure(TestException.class);
+    }
+    @Test
+    public void mapperReturnsNull() {
+        Single.just(1)
+        .flatMap(new Function<Integer, SingleSource<Integer>>() {
+            @Override
+            public SingleSource<Integer> apply(Integer v) throws Exception {
+                return null;
+            }
+        }, stringCombine())
+        .test()
+        .assertFailure(NullPointerException.class);
+    }
+    @Test
+    public void resultSelectorThrows() {
+        Single.just(1)
+        .flatMap(new Function<Integer, SingleSource<Integer>>() {
+            @Override
+            public SingleSource<Integer> apply(Integer v) throws Exception {
+                return Single.just(2);
+            }
+        }, new BiFunction<Integer, Integer, Object>() {
+            @Override
+            public Object apply(Integer a, Integer b) throws Exception {
+                throw new TestException();
+            }
+        })
+        .test()
+        .assertFailure(TestException.class);
+    }
+    @Test
+    public void resultSelectorReturnsNull() {
+        Single.just(1)
+        .flatMap(new Function<Integer, SingleSource<Integer>>() {
+            @Override
+            public SingleSource<Integer> apply(Integer v) throws Exception {
+                return Single.just(2);
+            }
+        }, new BiFunction<Integer, Integer, Object>() {
+            @Override
+            public Object apply(Integer a, Integer b) throws Exception {
+                return null;
+            }
+        })
+        .test()
+        .assertFailure(NullPointerException.class);
+    }
+    @Test
+    public void mapperCancels() {
+        final TestObserver<Integer> to = new TestObserver<>();
+        Single.just(1)
+        .flatMap(new Function<Integer, SingleSource<Integer>>() {
+            @Override
+            public SingleSource<Integer> apply(Integer v) throws Exception {
+                to.dispose();
+                return Single.just(2);
+            }
+        }, new BiFunction<Integer, Integer, Integer>() {
+            @Override
+            public Integer apply(Integer a, Integer b) throws Exception {
+                throw new IllegalStateException();
+            }
+        })
+        .subscribeWith(to)
+        .assertEmpty();
+    }
diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleFlatMapNotificationTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleFlatMapNotificationTest.java
new file mode 100644
index 0000000000..266acc93f2
--- /dev/null
+++ b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleFlatMapNotificationTest.java
@@ -0,0 +1,113 @@
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+package io.reactivex.rxjava3.internal.operators.single;
+import static org.junit.Assert.assertTrue;
+import java.io.IOException;
+import java.util.List;
+import org.junit.Test;
+import io.reactivex.rxjava3.core.*;
+import io.reactivex.rxjava3.exceptions.*;
+import io.reactivex.rxjava3.functions.Function;
+import io.reactivex.rxjava3.internal.functions.Functions;
+import io.reactivex.rxjava3.testsupport.*;
+public class SingleFlatMapNotificationTest extends RxJavaTest {
+    @Test
+    public void dispose() {
+        TestHelper.checkDisposed(Single.just(1)
+                .flatMap(Functions.justFunction(Single.just(1)),
+                        Functions.justFunction(Single.just(1))));
+    }
+    @Test
+    public void doubleOnSubscribe() {
+        TestHelper.checkDoubleOnSubscribeSingle(new Function<Single<Integer>, SingleSource<Integer>>() {
+            @Override
+            public SingleSource<Integer> apply(Single<Integer> m) throws Exception {
+                return m
+                        .flatMap(Functions.justFunction(Single.just(1)),
+                                Functions.justFunction(Single.just(1)));
+            }
+        });
+    }
+    @Test
+    public void onSuccessNull() {
+        Single.just(1)
+        .flatMap(Functions.justFunction((Single<Integer>)null),
+                Functions.justFunction(Single.just(1)))
+        .test()
+        .assertFailure(NullPointerException.class);
+    }
+    @Test
+    public void onErrorNull() {
+        TestObserverEx<Integer> to = Single.<Integer>error(new TestException())
+        .flatMap(Functions.justFunction(Single.just(1)),
+                Functions.justFunction((Single<Integer>)null))
+        .to(TestHelper.<Integer>testConsumer())
+        .assertFailure(CompositeException.class);
+        List<Throwable> ce = TestHelper.compositeList(to.errors().get(0));
+        TestHelper.assertError(ce, 0, TestException.class);
+        TestHelper.assertError(ce, 1, NullPointerException.class);
+    }
+    @Test
+    public void onSuccessError() {
+        Single.just(1)
+        .flatMap(Functions.justFunction(Single.<Integer>error(new TestException())),
+                Functions.justFunction((Single<Integer>)null))
+        .test()
+        .assertFailure(TestException.class);
+    }
+    @Test
+    public void onSucccessSuccess() {
+        Single.just(1)
+        .flatMap(v -> Single.just(2), e -> Single.just(3))
+        .test()
+        .assertResult(2);
+    }
+    @Test
+    public void onErrorSuccess() throws Throwable {
+        TestHelper.withErrorTracking(errors -> {
+            Single.error(new TestException())
+            .flatMap(v -> Single.just(2), e -> Single.just(3))
+            .test()
+            .assertResult(3);
+            assertTrue("" + errors, errors.isEmpty());
+        });
+    }
+    @Test
+    public void onErrorError() throws Throwable {
+        TestHelper.withErrorTracking(errors -> {
+            Single.error(new TestException())
+            .flatMap(v -> Single.just(2), e -> Single.<Integer>error(new IOException()))
+            .test()
+            .assertFailure(IOException.class);
+            assertTrue("" + errors, errors.isEmpty());
+        });
+    }