From b5df250f9cbd32921516807d29949576cd7d02c1 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?D=C3=A1vid=20Karnok?= <akarnokd@gmail.com>
Date: Thu, 20 Jun 2019 20:22:24 +0200
Subject: [PATCH 1/2] 3.x: Add X.fromSupplier()

---
 src/main/java/io/reactivex/Completable.java   |  32 ++
 src/main/java/io/reactivex/Flowable.java      |  42 +++
 src/main/java/io/reactivex/Maybe.java         |  47 +++
 src/main/java/io/reactivex/Observable.java    |  38 +++
 src/main/java/io/reactivex/Single.java        |  40 +++
 .../completable/CompletableFromSupplier.java  |  53 +++
 .../flowable/FlowableFromCallable.java        |   5 +-
 .../flowable/FlowableFromSupplier.java        |  63 ++++
 .../operators/maybe/MaybeFromSupplier.java    |  71 ++++
 .../observable/ObservableFromCallable.java    |   7 +-
 .../observable/ObservableFromSupplier.java    |  62 ++++
 .../operators/single/SingleFromSupplier.java  |  62 ++++
 .../CompletableFromSupplierTest.java          | 185 +++++++++++
 .../flowable/FlowableFromSupplierTest.java    | 269 +++++++++++++++
 .../maybe/MaybeFromSupplierTest.java          | 222 +++++++++++++
 .../ObservableFromSupplierTest.java           | 314 ++++++++++++++++++
 .../single/SingleFromCallableTest.java        |   4 +-
 .../single/SingleFromSupplierTest.java        | 275 +++++++++++++++
 .../io/reactivex/tck/FromCallableTckTest.java |  14 +
 .../io/reactivex/tck/FromSupplierTckTest.java |  56 ++++
 20 files changed, 1855 insertions(+), 6 deletions(-)
 create mode 100644 src/main/java/io/reactivex/internal/operators/completable/CompletableFromSupplier.java
 create mode 100644 src/main/java/io/reactivex/internal/operators/flowable/FlowableFromSupplier.java
 create mode 100644 src/main/java/io/reactivex/internal/operators/maybe/MaybeFromSupplier.java
 create mode 100644 src/main/java/io/reactivex/internal/operators/observable/ObservableFromSupplier.java
 create mode 100644 src/main/java/io/reactivex/internal/operators/single/SingleFromSupplier.java
 create mode 100644 src/test/java/io/reactivex/internal/operators/completable/CompletableFromSupplierTest.java
 create mode 100644 src/test/java/io/reactivex/internal/operators/flowable/FlowableFromSupplierTest.java
 create mode 100644 src/test/java/io/reactivex/internal/operators/maybe/MaybeFromSupplierTest.java
 create mode 100644 src/test/java/io/reactivex/internal/operators/observable/ObservableFromSupplierTest.java
 create mode 100644 src/test/java/io/reactivex/internal/operators/single/SingleFromSupplierTest.java
 create mode 100644 src/test/java/io/reactivex/tck/FromSupplierTckTest.java

diff --git a/src/main/java/io/reactivex/Completable.java b/src/main/java/io/reactivex/Completable.java
index 92de75d97b..b618650d5d 100644
--- a/src/main/java/io/reactivex/Completable.java
+++ b/src/main/java/io/reactivex/Completable.java
@@ -444,6 +444,8 @@ public static Completable fromAction(final Action run) {
      * </dl>
      * @param callable the callable instance to execute for each subscriber
      * @return the new Completable instance
+     * @see #defer(Supplier)
+     * @see #fromSupplier(Supplier)
      */
     @CheckReturnValue
     @NonNull
@@ -609,6 +611,36 @@ public static <T> Completable fromSingle(final SingleSource<T> single) {
         return RxJavaPlugins.onAssembly(new CompletableFromSingle<T>(single));
     }
 
+    /**
+     * Returns a Completable which when subscribed, executes the supplier function, ignores its
+     * normal result and emits onError or onComplete only.
+     * <p>
+     * <img width="640" height="286" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.fromCallable.png" alt="">
+     * <dl>
+     *  <dt><b>Scheduler:</b></dt>
+     *  <dd>{@code fromSupplier} does not operate by default on a particular {@link Scheduler}.</dd>
+     *  <dt><b>Error handling:</b></dt>
+     *  <dd> If the {@link Supplier} throws an exception, the respective {@link Throwable} is
+     *  delivered to the downstream via {@link CompletableObserver#onError(Throwable)},
+     *  except when the downstream has disposed this {@code Completable} source.
+     *  In this latter case, the {@code Throwable} is delivered to the global error handler via
+     *  {@link RxJavaPlugins#onError(Throwable)} as an {@link io.reactivex.exceptions.UndeliverableException UndeliverableException}.
+     *  </dd>
+     * </dl>
+     * @param supplier the Supplier instance to execute for each subscriber
+     * @return the new Completable instance
+     * @see #defer(Supplier)
+     * @see #fromCallable(Callable)
+     * @since 3.0.0
+     */
+    @CheckReturnValue
+    @NonNull
+    @SchedulerSupport(SchedulerSupport.NONE)
+    public static Completable fromSupplier(final Supplier<?> supplier) {
+        ObjectHelper.requireNonNull(supplier, "callable is null");
+        return RxJavaPlugins.onAssembly(new CompletableFromSupplier(supplier));
+    }
+
     /**
      * Returns a Completable instance that subscribes to all sources at once and
      * completes only when all source Completables complete or one of them emits an error.
diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java
index 07b76aa101..50c1fe436a 100644
--- a/src/main/java/io/reactivex/Flowable.java
+++ b/src/main/java/io/reactivex/Flowable.java
@@ -2087,6 +2087,7 @@ public static <T> Flowable<T> fromArray(T... items) {
      *         the type of the item emitted by the Publisher
      * @return a Flowable whose {@link Subscriber}s' subscriptions trigger an invocation of the given function
      * @see #defer(Supplier)
+     * @see #fromSupplier(Supplier)
      * @since 2.0
      */
     @CheckReturnValue
@@ -2331,6 +2332,47 @@ public static <T> Flowable<T> fromPublisher(final Publisher<? extends T> source)
         return RxJavaPlugins.onAssembly(new FlowableFromPublisher<T>(source));
     }
 
+    /**
+     * Returns a Flowable that, when a Subscriber subscribes to it, invokes a supplier function you specify and then
+     * emits the value returned from that function.
+     * <p>
+     * <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/fromCallable.png" alt="">
+     * <p>
+     * This allows you to defer the execution of the function you specify until a Subscriber subscribes to the
+     * Publisher. That is to say, it makes the function "lazy."
+     * <dl>
+     *   <dt><b>Backpressure:</b></dt>
+     *   <dd>The operator honors backpressure from downstream.</dd>
+     *   <dt><b>Scheduler:</b></dt>
+     *   <dd>{@code fromSupplier} does not operate by default on a particular {@link Scheduler}.</dd>
+     *   <dt><b>Error handling:</b></dt>
+     *   <dd> If the {@link Supplier} throws an exception, the respective {@link Throwable} is
+     *   delivered to the downstream via {@link Subscriber#onError(Throwable)},
+     *   except when the downstream has canceled this {@code Flowable} source.
+     *   In this latter case, the {@code Throwable} is delivered to the global error handler via
+     *   {@link RxJavaPlugins#onError(Throwable)} as an {@link io.reactivex.exceptions.UndeliverableException UndeliverableException}.
+     *   </dd>
+     * </dl>
+     *
+     * @param supplier
+     *         a function, the execution of which should be deferred; {@code fromSupplier} will invoke this
+     *         function only when a Subscriber subscribes to the Publisher that {@code fromSupplier} returns
+     * @param <T>
+     *         the type of the item emitted by the Publisher
+     * @return a Flowable whose {@link Subscriber}s' subscriptions trigger an invocation of the given function
+     * @see #defer(Supplier)
+     * @see #fromCallable(Callable)
+     * @since 3.0.0
+     */
+    @CheckReturnValue
+    @NonNull
+    @BackpressureSupport(BackpressureKind.FULL)
+    @SchedulerSupport(SchedulerSupport.NONE)
+    public static <T> Flowable<T> fromSupplier(Supplier<? extends T> supplier) {
+        ObjectHelper.requireNonNull(supplier, "supplier is null");
+        return RxJavaPlugins.onAssembly(new FlowableFromSupplier<T>(supplier));
+    }
+
     /**
      * Returns a cold, synchronous, stateless and backpressure-aware generator of values.
      * <p>
diff --git a/src/main/java/io/reactivex/Maybe.java b/src/main/java/io/reactivex/Maybe.java
index 5b03dd8184..72954d89d0 100644
--- a/src/main/java/io/reactivex/Maybe.java
+++ b/src/main/java/io/reactivex/Maybe.java
@@ -763,6 +763,8 @@ public static <T> Maybe<T> fromSingle(SingleSource<T> singleSource) {
      * @param <T>
      *         the type of the item emitted by the {@link Maybe}.
      * @return a new Maybe instance
+     * @see #defer(Supplier)
+     * @see #fromSupplier(Supplier)
      */
     @CheckReturnValue
     @NonNull
@@ -865,6 +867,51 @@ public static <T> Maybe<T> fromRunnable(final Runnable run) {
         return RxJavaPlugins.onAssembly(new MaybeFromRunnable<T>(run));
     }
 
+    /**
+     * Returns a {@link Maybe} that invokes the given {@link Supplier} for each individual {@link MaybeObserver} that
+     * subscribes and emits the resulting non-null item via {@code onSuccess} while
+     * considering a {@code null} result from the {@code Supplier} as indication for valueless completion
+     * via {@code onComplete}.
+     * <p>
+     * This operator allows you to defer the execution of the given {@code Supplier} until a {@code MaybeObserver}
+     * subscribes to the  returned {@link Maybe}. In other terms, this source operator evaluates the given
+     * {@code Callable} "lazily".
+     * <p>
+     * Note that the {@code null} handling of this operator differs from the similar source operators in the other
+     * {@link io.reactivex base reactive classes}. Those operators signal a {@code NullPointerException} if the value returned by their
+     * {@code Supplier} is {@code null} while this {@code fromSupplier} considers it to indicate the
+     * returned {@code Maybe} is empty.
+     * <dl>
+     *   <dt><b>Scheduler:</b></dt>
+     *   <dd>{@code fromSupplier} does not operate by default on a particular {@link Scheduler}.</dd>
+     *   <dt><b>Error handling:</b></dt>
+     *   <dd>Any non-fatal exception thrown by {@link Supplier#get()} will be forwarded to {@code onError},
+     *   except if the {@code MaybeObserver} disposed the subscription in the meantime. In this latter case,
+     *   the exception is forwarded to the global error handler via
+     *   {@link io.reactivex.plugins.RxJavaPlugins#onError(Throwable)} wrapped into a
+     *   {@link io.reactivex.exceptions.UndeliverableException UndeliverableException}.
+     *   Fatal exceptions are rethrown and usually will end up in the executing thread's
+     *   {@link java.lang.Thread.UncaughtExceptionHandler#uncaughtException(Thread, Throwable)} handler.</dd>
+     * </dl>
+     *
+     * @param supplier
+     *         a {@link Supplier} instance whose execution should be deferred and performed for each individual
+     *         {@code MaybeObserver} that subscribes to the returned {@link Maybe}.
+     * @param <T>
+     *         the type of the item emitted by the {@link Maybe}.
+     * @return a new Maybe instance
+     * @see #defer(Supplier)
+     * @see #fromCallable(Callable)
+     * @since 3.0.0
+     */
+    @CheckReturnValue
+    @NonNull
+    @SchedulerSupport(SchedulerSupport.NONE)
+    public static <T> Maybe<T> fromSupplier(@NonNull final Supplier<? extends T> supplier) {
+        ObjectHelper.requireNonNull(supplier, "supplier is null");
+        return RxJavaPlugins.onAssembly(new MaybeFromSupplier<T>(supplier));
+    }
+
     /**
      * Returns a {@code Maybe} that emits a specified item.
      * <p>
diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java
index 6123ce455d..a258106c64 100644
--- a/src/main/java/io/reactivex/Observable.java
+++ b/src/main/java/io/reactivex/Observable.java
@@ -1798,6 +1798,7 @@ public static <T> Observable<T> fromArray(T... items) {
      *         the type of the item emitted by the ObservableSource
      * @return an Observable whose {@link Observer}s' subscriptions trigger an invocation of the given function
      * @see #defer(Supplier)
+     * @see #fromSupplier(Supplier)
      * @since 2.0
      */
     @CheckReturnValue
@@ -2021,6 +2022,43 @@ public static <T> Observable<T> fromPublisher(Publisher<? extends T> publisher)
         return RxJavaPlugins.onAssembly(new ObservableFromPublisher<T>(publisher));
     }
 
+    /**
+     * Returns an Observable that, when an observer subscribes to it, invokes a supplier function you specify and then
+     * emits the value returned from that function.
+     * <p>
+     * <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/fromCallable.png" alt="">
+     * <p>
+     * This allows you to defer the execution of the function you specify until an observer subscribes to the
+     * ObservableSource. That is to say, it makes the function "lazy."
+     * <dl>
+     *   <dt><b>Scheduler:</b></dt>
+     *   <dd>{@code fromSupplier} does not operate by default on a particular {@link Scheduler}.</dd>
+     *   <dt><b>Error handling:</b></dt>
+     *   <dd> If the {@link Supplier} throws an exception, the respective {@link Throwable} is
+     *   delivered to the downstream via {@link Observer#onError(Throwable)},
+     *   except when the downstream has disposed this {@code Observable} source.
+     *   In this latter case, the {@code Throwable} is delivered to the global error handler via
+     *   {@link RxJavaPlugins#onError(Throwable)} as an {@link io.reactivex.exceptions.UndeliverableException UndeliverableException}.
+     *   </dd>
+     * </dl>
+     * @param supplier
+     *         a function, the execution of which should be deferred; {@code fromSupplier} will invoke this
+     *         function only when an observer subscribes to the ObservableSource that {@code fromSupplier} returns
+     * @param <T>
+     *         the type of the item emitted by the ObservableSource
+     * @return an Observable whose {@link Observer}s' subscriptions trigger an invocation of the given function
+     * @see #defer(Supplier)
+     * @see #fromCallable(Callable)
+     * @since 3.0.0
+     */
+    @CheckReturnValue
+    @NonNull
+    @SchedulerSupport(SchedulerSupport.NONE)
+    public static <T> Observable<T> fromSupplier(Supplier<? extends T> supplier) {
+        ObjectHelper.requireNonNull(supplier, "supplier is null");
+        return RxJavaPlugins.onAssembly(new ObservableFromSupplier<T>(supplier));
+    }
+
     /**
      * Returns a cold, synchronous and stateless generator of values.
      * <p>
diff --git a/src/main/java/io/reactivex/Single.java b/src/main/java/io/reactivex/Single.java
index 7fa1f5b905..da446cfafd 100644
--- a/src/main/java/io/reactivex/Single.java
+++ b/src/main/java/io/reactivex/Single.java
@@ -613,6 +613,8 @@ public static <T> Single<T> error(final Throwable exception) {
      * @param <T>
      *         the type of the item emitted by the {@link Single}.
      * @return a {@link Single} whose {@link SingleObserver}s' subscriptions trigger an invocation of the given function.
+     * @see #defer(Supplier)
+     * @see #fromSupplier(Supplier)
      */
     @CheckReturnValue
     @NonNull
@@ -811,6 +813,44 @@ public static <T> Single<T> fromObservable(ObservableSource<? extends T> observa
         return RxJavaPlugins.onAssembly(new ObservableSingleSingle<T>(observableSource, null));
     }
 
+    /**
+     * Returns a {@link Single} that invokes passed supplierfunction and emits its result
+     * for each new SingleObserver that subscribes.
+     * <p>
+     * Allows you to defer execution of passed function until SingleObserver subscribes to the {@link Single}.
+     * It makes passed function "lazy".
+     * Result of the function invocation will be emitted by the {@link Single}.
+     * <p>
+     * <img width="640" height="467" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.fromCallable.png" alt="">
+     * <dl>
+     *   <dt><b>Scheduler:</b></dt>
+     *   <dd>{@code fromSupplier} does not operate by default on a particular {@link Scheduler}.</dd>
+     *   <dt><b>Error handling:</b></dt>
+     *   <dd> If the {@link Supplier} throws an exception, the respective {@link Throwable} is
+     *   delivered to the downstream via {@link SingleObserver#onError(Throwable)},
+     *   except when the downstream has disposed this {@code Single} source.
+     *   In this latter case, the {@code Throwable} is delivered to the global error handler via
+     *   {@link RxJavaPlugins#onError(Throwable)} as an {@link io.reactivex.exceptions.UndeliverableException UndeliverableException}.
+     *   </dd>
+     * </dl>
+     *
+     * @param supplier
+     *         function which execution should be deferred, it will be invoked when SingleObserver will subscribe to the {@link Single}.
+     * @param <T>
+     *         the type of the item emitted by the {@link Single}.
+     * @return a {@link Single} whose {@link SingleObserver}s' subscriptions trigger an invocation of the given function.
+     * @see #defer(Supplier)
+     * @see #fromCallable(Callable)
+     * @since 3.0.0
+     */
+    @CheckReturnValue
+    @NonNull
+    @SchedulerSupport(SchedulerSupport.NONE)
+    public static <T> Single<T> fromSupplier(final Supplier<? extends T> supplier) {
+        ObjectHelper.requireNonNull(supplier, "supplier is null");
+        return RxJavaPlugins.onAssembly(new SingleFromSupplier<T>(supplier));
+    }
+
     /**
      * Returns a {@code Single} that emits a specified item.
      * <p>
diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableFromSupplier.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableFromSupplier.java
new file mode 100644
index 0000000000..d676437b96
--- /dev/null
+++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableFromSupplier.java
@@ -0,0 +1,53 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.internal.operators.completable;
+
+import io.reactivex.*;
+import io.reactivex.disposables.*;
+import io.reactivex.exceptions.Exceptions;
+import io.reactivex.functions.Supplier;
+import io.reactivex.plugins.RxJavaPlugins;
+
+/**
+ * Call a Supplier for each incoming CompletableObserver and signal completion or the thrown exception.
+ * @since 3.0.0
+ */
+public final class CompletableFromSupplier extends Completable {
+
+    final Supplier<?> supplier;
+
+    public CompletableFromSupplier(Supplier<?> supplier) {
+        this.supplier = supplier;
+    }
+
+    @Override
+    protected void subscribeActual(CompletableObserver observer) {
+        Disposable d = Disposables.empty();
+        observer.onSubscribe(d);
+        try {
+            supplier.get();
+        } catch (Throwable e) {
+            Exceptions.throwIfFatal(e);
+            if (!d.isDisposed()) {
+                observer.onError(e);
+            } else {
+                RxJavaPlugins.onError(e);
+            }
+            return;
+        }
+        if (!d.isDisposed()) {
+            observer.onComplete();
+        }
+    }
+}
diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFromCallable.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFromCallable.java
index 6dcb226daa..e54feffecf 100644
--- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFromCallable.java
+++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFromCallable.java
@@ -19,11 +19,12 @@
 
 import io.reactivex.Flowable;
 import io.reactivex.exceptions.Exceptions;
+import io.reactivex.functions.Supplier;
 import io.reactivex.internal.functions.ObjectHelper;
 import io.reactivex.internal.subscriptions.DeferredScalarSubscription;
 import io.reactivex.plugins.RxJavaPlugins;
 
-public final class FlowableFromCallable<T> extends Flowable<T> implements Callable<T> {
+public final class FlowableFromCallable<T> extends Flowable<T> implements Supplier<T> {
     final Callable<? extends T> callable;
     public FlowableFromCallable(Callable<? extends T> callable) {
         this.callable = callable;
@@ -51,7 +52,7 @@ public void subscribeActual(Subscriber<? super T> s) {
     }
 
     @Override
-    public T call() throws Exception {
+    public T get() throws Throwable {
         return ObjectHelper.requireNonNull(callable.call(), "The callable returned a null value");
     }
 }
diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFromSupplier.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFromSupplier.java
new file mode 100644
index 0000000000..e9a5bf9d53
--- /dev/null
+++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFromSupplier.java
@@ -0,0 +1,63 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.internal.operators.flowable;
+
+import org.reactivestreams.Subscriber;
+
+import io.reactivex.Flowable;
+import io.reactivex.exceptions.Exceptions;
+import io.reactivex.functions.Supplier;
+import io.reactivex.internal.functions.ObjectHelper;
+import io.reactivex.internal.subscriptions.DeferredScalarSubscription;
+import io.reactivex.plugins.RxJavaPlugins;
+
+/**
+ * Call a Supplier for each incoming Subscriber and signal the returned value or the thrown exception.
+ * @param <T> the value type and element type returned by the supplier and the flow
+ * @since 3.0.0
+ */
+public final class FlowableFromSupplier<T> extends Flowable<T> implements Supplier<T> {
+
+    final Supplier<? extends T> supplier;
+
+    public FlowableFromSupplier(Supplier<? extends T> supplier) {
+        this.supplier = supplier;
+    }
+
+    @Override
+    public void subscribeActual(Subscriber<? super T> s) {
+        DeferredScalarSubscription<T> deferred = new DeferredScalarSubscription<T>(s);
+        s.onSubscribe(deferred);
+
+        T t;
+        try {
+            t = ObjectHelper.requireNonNull(supplier.get(), "The supplier returned a null value");
+        } catch (Throwable ex) {
+            Exceptions.throwIfFatal(ex);
+            if (deferred.isCancelled()) {
+                RxJavaPlugins.onError(ex);
+            } else {
+                s.onError(ex);
+            }
+            return;
+        }
+
+        deferred.complete(t);
+    }
+
+    @Override
+    public T get() throws Throwable {
+        return ObjectHelper.requireNonNull(supplier.get(), "The supplier returned a null value");
+    }
+}
diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeFromSupplier.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeFromSupplier.java
new file mode 100644
index 0000000000..fa2e9c1cd4
--- /dev/null
+++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeFromSupplier.java
@@ -0,0 +1,71 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.internal.operators.maybe;
+
+import io.reactivex.*;
+import io.reactivex.disposables.*;
+import io.reactivex.exceptions.Exceptions;
+import io.reactivex.functions.Supplier;
+import io.reactivex.plugins.RxJavaPlugins;
+
+/**
+ * Executes a callable and signals its value as success or signals an exception.
+ *
+ * @param <T> the value type
+ * @since 3.0.0
+ */
+public final class MaybeFromSupplier<T> extends Maybe<T> implements Supplier<T> {
+
+    final Supplier<? extends T> supplier;
+
+    public MaybeFromSupplier(Supplier<? extends T> supplier) {
+        this.supplier = supplier;
+    }
+
+    @Override
+    protected void subscribeActual(MaybeObserver<? super T> observer) {
+        Disposable d = Disposables.empty();
+        observer.onSubscribe(d);
+
+        if (!d.isDisposed()) {
+
+            T v;
+
+            try {
+                v = supplier.get();
+            } catch (Throwable ex) {
+                Exceptions.throwIfFatal(ex);
+                if (!d.isDisposed()) {
+                    observer.onError(ex);
+                } else {
+                    RxJavaPlugins.onError(ex);
+                }
+                return;
+            }
+
+            if (!d.isDisposed()) {
+                if (v == null) {
+                    observer.onComplete();
+                } else {
+                    observer.onSuccess(v);
+                }
+            }
+        }
+    }
+
+    @Override
+    public T get() throws Throwable {
+        return supplier.get();
+    }
+}
diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableFromCallable.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableFromCallable.java
index fe3c364793..554d504675 100644
--- a/src/main/java/io/reactivex/internal/operators/observable/ObservableFromCallable.java
+++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableFromCallable.java
@@ -17,6 +17,7 @@
 
 import io.reactivex.*;
 import io.reactivex.exceptions.Exceptions;
+import io.reactivex.functions.Supplier;
 import io.reactivex.internal.functions.ObjectHelper;
 import io.reactivex.internal.observers.DeferredScalarDisposable;
 import io.reactivex.plugins.RxJavaPlugins;
@@ -25,8 +26,10 @@
  * Calls a Callable and emits its resulting single value or signals its exception.
  * @param <T> the value type
  */
-public final class ObservableFromCallable<T> extends Observable<T> implements Callable<T> {
+public final class ObservableFromCallable<T> extends Observable<T> implements Supplier<T> {
+
     final Callable<? extends T> callable;
+
     public ObservableFromCallable(Callable<? extends T> callable) {
         this.callable = callable;
     }
@@ -54,7 +57,7 @@ public void subscribeActual(Observer<? super T> observer) {
     }
 
     @Override
-    public T call() throws Exception {
+    public T get() throws Throwable {
         return ObjectHelper.requireNonNull(callable.call(), "The callable returned a null value");
     }
 }
diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableFromSupplier.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableFromSupplier.java
new file mode 100644
index 0000000000..2ba55b6f36
--- /dev/null
+++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableFromSupplier.java
@@ -0,0 +1,62 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.internal.operators.observable;
+
+import io.reactivex.*;
+import io.reactivex.exceptions.Exceptions;
+import io.reactivex.functions.Supplier;
+import io.reactivex.internal.functions.ObjectHelper;
+import io.reactivex.internal.observers.DeferredScalarDisposable;
+import io.reactivex.plugins.RxJavaPlugins;
+
+/**
+ * Calls a Callable and emits its resulting single value or signals its exception.
+ * @param <T> the value type
+ * @since 3.0.0
+ */
+public final class ObservableFromSupplier<T> extends Observable<T> implements Supplier<T> {
+
+    final Supplier<? extends T> supplier;
+
+    public ObservableFromSupplier(Supplier<? extends T> supplier) {
+        this.supplier = supplier;
+    }
+
+    @Override
+    public void subscribeActual(Observer<? super T> observer) {
+        DeferredScalarDisposable<T> d = new DeferredScalarDisposable<T>(observer);
+        observer.onSubscribe(d);
+        if (d.isDisposed()) {
+            return;
+        }
+        T value;
+        try {
+            value = ObjectHelper.requireNonNull(supplier.get(), "Supplier returned null");
+        } catch (Throwable e) {
+            Exceptions.throwIfFatal(e);
+            if (!d.isDisposed()) {
+                observer.onError(e);
+            } else {
+                RxJavaPlugins.onError(e);
+            }
+            return;
+        }
+        d.complete(value);
+    }
+
+    @Override
+    public T get() throws Throwable {
+        return ObjectHelper.requireNonNull(supplier.get(), "The callable returned a null value");
+    }
+}
diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleFromSupplier.java b/src/main/java/io/reactivex/internal/operators/single/SingleFromSupplier.java
new file mode 100644
index 0000000000..99766aed42
--- /dev/null
+++ b/src/main/java/io/reactivex/internal/operators/single/SingleFromSupplier.java
@@ -0,0 +1,62 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.internal.operators.single;
+
+import io.reactivex.*;
+import io.reactivex.disposables.*;
+import io.reactivex.exceptions.Exceptions;
+import io.reactivex.functions.Supplier;
+import io.reactivex.internal.functions.ObjectHelper;
+import io.reactivex.plugins.RxJavaPlugins;
+
+/**
+ * Calls a supplier and emits its value or exception to the incoming SingleObserver.
+ * @param <T> the value type returned
+ * @since 3.0.0
+ */
+public final class SingleFromSupplier<T> extends Single<T> {
+
+    final Supplier<? extends T> supplier;
+
+    public SingleFromSupplier(Supplier<? extends T> supplier) {
+        this.supplier = supplier;
+    }
+
+    @Override
+    protected void subscribeActual(SingleObserver<? super T> observer) {
+        Disposable d = Disposables.empty();
+        observer.onSubscribe(d);
+
+        if (d.isDisposed()) {
+            return;
+        }
+        T value;
+
+        try {
+            value = ObjectHelper.requireNonNull(supplier.get(), "The supplier returned a null value");
+        } catch (Throwable ex) {
+            Exceptions.throwIfFatal(ex);
+            if (!d.isDisposed()) {
+                observer.onError(ex);
+            } else {
+                RxJavaPlugins.onError(ex);
+            }
+            return;
+        }
+
+        if (!d.isDisposed()) {
+            observer.onSuccess(value);
+        }
+    }
+}
diff --git a/src/test/java/io/reactivex/internal/operators/completable/CompletableFromSupplierTest.java b/src/test/java/io/reactivex/internal/operators/completable/CompletableFromSupplierTest.java
new file mode 100644
index 0000000000..5d13b36d82
--- /dev/null
+++ b/src/test/java/io/reactivex/internal/operators/completable/CompletableFromSupplierTest.java
@@ -0,0 +1,185 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.internal.operators.completable;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.*;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import io.reactivex.*;
+import io.reactivex.disposables.Disposable;
+import io.reactivex.exceptions.TestException;
+import io.reactivex.functions.Supplier;
+import io.reactivex.observers.TestObserver;
+import io.reactivex.schedulers.Schedulers;
+import io.reactivex.testsupport.TestHelper;
+
+public class CompletableFromSupplierTest {
+
+    @Test(expected = NullPointerException.class)
+    public void fromSupplierNull() {
+        Completable.fromSupplier(null);
+    }
+
+    @Test
+    public void fromSupplier() {
+        final AtomicInteger atomicInteger = new AtomicInteger();
+
+        Completable.fromSupplier(new Supplier<Object>() {
+            @Override
+            public Object get() throws Exception {
+                atomicInteger.incrementAndGet();
+                return null;
+            }
+        })
+            .test()
+            .assertResult();
+
+        assertEquals(1, atomicInteger.get());
+    }
+
+    @Test
+    public void fromSupplierTwice() {
+        final AtomicInteger atomicInteger = new AtomicInteger();
+
+        Supplier<Object> supplier = new Supplier<Object>() {
+            @Override
+            public Object get() throws Exception {
+                atomicInteger.incrementAndGet();
+                return null;
+            }
+        };
+
+        Completable.fromSupplier(supplier)
+            .test()
+            .assertResult();
+
+        assertEquals(1, atomicInteger.get());
+
+        Completable.fromSupplier(supplier)
+            .test()
+            .assertResult();
+
+        assertEquals(2, atomicInteger.get());
+    }
+
+    @Test
+    public void fromSupplierInvokesLazy() {
+        final AtomicInteger atomicInteger = new AtomicInteger();
+
+        Completable completable = Completable.fromSupplier(new Supplier<Object>() {
+            @Override
+            public Object get() throws Exception {
+                atomicInteger.incrementAndGet();
+                return null;
+            }
+        });
+
+        assertEquals(0, atomicInteger.get());
+
+        completable
+            .test()
+            .assertResult();
+
+        assertEquals(1, atomicInteger.get());
+    }
+
+    @Test
+    public void fromSupplierThrows() {
+        Completable.fromSupplier(new Supplier<Object>() {
+            @Override
+            public Object get() throws Exception {
+                throw new UnsupportedOperationException();
+            }
+        })
+        .test()
+        .assertFailure(UnsupportedOperationException.class);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldNotDeliverResultIfSubscriberUnsubscribedBeforeEmission() throws Throwable {
+        Supplier<String> func = mock(Supplier.class);
+
+        final CountDownLatch funcLatch = new CountDownLatch(1);
+        final CountDownLatch observerLatch = new CountDownLatch(1);
+
+        when(func.get()).thenAnswer(new Answer<String>() {
+            @Override
+            public String answer(InvocationOnMock invocation) throws Throwable {
+                observerLatch.countDown();
+
+                try {
+                    funcLatch.await();
+                } catch (InterruptedException e) {
+                    // It's okay, unsubscription causes Thread interruption
+
+                    // Restoring interruption status of the Thread
+                    Thread.currentThread().interrupt();
+                }
+
+                return "should_not_be_delivered";
+            }
+        });
+
+        Completable fromSupplierObservable = Completable.fromSupplier(func);
+
+        Observer<Object> observer = TestHelper.mockObserver();
+
+        TestObserver<String> outer = new TestObserver<String>(observer);
+
+        fromSupplierObservable
+                .subscribeOn(Schedulers.computation())
+                .subscribe(outer);
+
+        // Wait until func will be invoked
+        observerLatch.await();
+
+        // Unsubscribing before emission
+        outer.dispose();
+
+        // Emitting result
+        funcLatch.countDown();
+
+        // func must be invoked
+        verify(func).get();
+
+        // Observer must not be notified at all
+        verify(observer).onSubscribe(any(Disposable.class));
+        verifyNoMoreInteractions(observer);
+    }
+
+    @Test
+    public void fromActionErrorsDisposed() {
+        final AtomicInteger calls = new AtomicInteger();
+        Completable.fromSupplier(new Supplier<Object>() {
+            @Override
+            public Object get() throws Exception {
+                calls.incrementAndGet();
+                throw new TestException();
+            }
+        })
+        .test(true)
+        .assertEmpty();
+
+        assertEquals(1, calls.get());
+    }
+}
diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFromSupplierTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFromSupplierTest.java
new file mode 100644
index 0000000000..d909b30c7a
--- /dev/null
+++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFromSupplierTest.java
@@ -0,0 +1,269 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.reactivex.internal.operators.flowable;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.*;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.reactivestreams.*;
+
+import io.reactivex.Flowable;
+import io.reactivex.exceptions.TestException;
+import io.reactivex.functions.*;
+import io.reactivex.plugins.RxJavaPlugins;
+import io.reactivex.schedulers.Schedulers;
+import io.reactivex.subscribers.TestSubscriber;
+import io.reactivex.testsupport.TestHelper;
+
+public class FlowableFromSupplierTest {
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldNotInvokeFuncUntilSubscription() throws Throwable {
+        Supplier<Object> func = mock(Supplier.class);
+
+        when(func.get()).thenReturn(new Object());
+
+        Flowable<Object> fromSupplierFlowable = Flowable.fromSupplier(func);
+
+        verifyZeroInteractions(func);
+
+        fromSupplierFlowable.subscribe();
+
+        verify(func).get();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldCallOnNextAndOnCompleted() throws Throwable {
+        Supplier<String> func = mock(Supplier.class);
+
+        when(func.get()).thenReturn("test_value");
+
+        Flowable<String> fromSupplierFlowable = Flowable.fromSupplier(func);
+
+        Subscriber<String> subscriber = TestHelper.mockSubscriber();
+
+        fromSupplierFlowable.subscribe(subscriber);
+
+        verify(subscriber).onNext("test_value");
+        verify(subscriber).onComplete();
+        verify(subscriber, never()).onError(any(Throwable.class));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldCallOnError() throws Throwable {
+        Supplier<Object> func = mock(Supplier.class);
+
+        Throwable throwable = new IllegalStateException("Test exception");
+        when(func.get()).thenThrow(throwable);
+
+        Flowable<Object> fromSupplierFlowable = Flowable.fromSupplier(func);
+
+        Subscriber<Object> subscriber = TestHelper.mockSubscriber();
+
+        fromSupplierFlowable.subscribe(subscriber);
+
+        verify(subscriber, never()).onNext(any());
+        verify(subscriber, never()).onComplete();
+        verify(subscriber).onError(throwable);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldNotDeliverResultIfSubscriberUnsubscribedBeforeEmission() throws Throwable {
+        Supplier<String> func = mock(Supplier.class);
+
+        final CountDownLatch funcLatch = new CountDownLatch(1);
+        final CountDownLatch observerLatch = new CountDownLatch(1);
+
+        when(func.get()).thenAnswer(new Answer<String>() {
+            @Override
+            public String answer(InvocationOnMock invocation) throws Throwable {
+                observerLatch.countDown();
+
+                try {
+                    funcLatch.await();
+                } catch (InterruptedException e) {
+                    // It's okay, unsubscription causes Thread interruption
+
+                    // Restoring interruption status of the Thread
+                    Thread.currentThread().interrupt();
+                }
+
+                return "should_not_be_delivered";
+            }
+        });
+
+        Flowable<String> fromSupplierFlowable = Flowable.fromSupplier(func);
+
+        Subscriber<String> subscriber = TestHelper.mockSubscriber();
+
+        TestSubscriber<String> outer = new TestSubscriber<String>(subscriber);
+
+        fromSupplierFlowable
+                .subscribeOn(Schedulers.computation())
+                .subscribe(outer);
+
+        // Wait until func will be invoked
+        observerLatch.await();
+
+        // Unsubscribing before emission
+        outer.cancel();
+
+        // Emitting result
+        funcLatch.countDown();
+
+        // func must be invoked
+        verify(func).get();
+
+        // Observer must not be notified at all
+        verify(subscriber).onSubscribe(any(Subscription.class));
+        verifyNoMoreInteractions(subscriber);
+    }
+
+    @Test
+    public void shouldAllowToThrowCheckedException() {
+        final Exception checkedException = new Exception("test exception");
+
+        Flowable<Object> fromSupplierFlowable = Flowable.fromSupplier(new Supplier<Object>() {
+            @Override
+            public Object get() throws Exception {
+                throw checkedException;
+            }
+        });
+
+        Subscriber<Object> subscriber = TestHelper.mockSubscriber();
+
+        fromSupplierFlowable.subscribe(subscriber);
+
+        verify(subscriber).onSubscribe(any(Subscription.class));
+        verify(subscriber).onError(checkedException);
+        verifyNoMoreInteractions(subscriber);
+    }
+
+    @Test
+    public void fusedFlatMapExecution() {
+        final int[] calls = { 0 };
+
+        Flowable.just(1).flatMap(new Function<Integer, Publisher<? extends Object>>() {
+            @Override
+            public Publisher<? extends Object> apply(Integer v)
+                    throws Exception {
+                return Flowable.fromSupplier(new Supplier<Object>() {
+                    @Override
+                    public Object get() throws Exception {
+                        return ++calls[0];
+                    }
+                });
+            }
+        })
+        .test()
+        .assertResult(1);
+
+        assertEquals(1, calls[0]);
+    }
+
+    @Test
+    public void fusedFlatMapExecutionHidden() {
+        final int[] calls = { 0 };
+
+        Flowable.just(1).hide().flatMap(new Function<Integer, Publisher<? extends Object>>() {
+            @Override
+            public Publisher<? extends Object> apply(Integer v)
+                    throws Exception {
+                return Flowable.fromSupplier(new Supplier<Object>() {
+                    @Override
+                    public Object get() throws Exception {
+                        return ++calls[0];
+                    }
+                });
+            }
+        })
+        .test()
+        .assertResult(1);
+
+        assertEquals(1, calls[0]);
+    }
+
+    @Test
+    public void fusedFlatMapNull() {
+        Flowable.just(1).flatMap(new Function<Integer, Publisher<? extends Object>>() {
+            @Override
+            public Publisher<? extends Object> apply(Integer v)
+                    throws Exception {
+                return Flowable.fromSupplier(new Supplier<Object>() {
+                    @Override
+                    public Object get() throws Exception {
+                        return null;
+                    }
+                });
+            }
+        })
+        .test()
+        .assertFailure(NullPointerException.class);
+    }
+
+    @Test
+    public void fusedFlatMapNullHidden() {
+        Flowable.just(1).hide().flatMap(new Function<Integer, Publisher<? extends Object>>() {
+            @Override
+            public Publisher<? extends Object> apply(Integer v)
+                    throws Exception {
+                return Flowable.fromSupplier(new Supplier<Object>() {
+                    @Override
+                    public Object get() throws Exception {
+                        return null;
+                    }
+                });
+            }
+        })
+        .test()
+        .assertFailure(NullPointerException.class);
+    }
+
+    @Test(timeout = 5000)
+    public void undeliverableUponCancellation() throws Exception {
+        List<Throwable> errors = TestHelper.trackPluginErrors();
+        try {
+            final TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
+
+            Flowable.fromSupplier(new Supplier<Integer>() {
+                @Override
+                public Integer get() throws Exception {
+                    ts.cancel();
+                    throw new TestException();
+                }
+            })
+            .subscribe(ts);
+
+            ts.assertEmpty();
+
+            TestHelper.assertUndeliverable(errors, 0, TestException.class);
+        } finally {
+            RxJavaPlugins.reset();
+        }
+    }
+}
diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeFromSupplierTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeFromSupplierTest.java
new file mode 100644
index 0000000000..a281af9f4b
--- /dev/null
+++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeFromSupplierTest.java
@@ -0,0 +1,222 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.internal.operators.maybe;
+
+import static org.junit.Assert.*;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.*;
+
+import java.util.List;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import io.reactivex.*;
+import io.reactivex.disposables.Disposable;
+import io.reactivex.functions.Supplier;
+import io.reactivex.observers.TestObserver;
+import io.reactivex.plugins.RxJavaPlugins;
+import io.reactivex.schedulers.Schedulers;
+import io.reactivex.testsupport.TestHelper;
+
+public class MaybeFromSupplierTest {
+
+    @Test(expected = NullPointerException.class)
+    public void fromSupplierNull() {
+        Maybe.fromSupplier(null);
+    }
+
+    @Test
+    public void fromSupplier() {
+        final AtomicInteger atomicInteger = new AtomicInteger();
+
+        Maybe.fromSupplier(new Supplier<Object>() {
+            @Override
+            public Object get() throws Exception {
+                atomicInteger.incrementAndGet();
+                return null;
+            }
+        })
+            .test()
+            .assertResult();
+
+        assertEquals(1, atomicInteger.get());
+    }
+
+    @Test
+    public void fromSupplierTwice() {
+        final AtomicInteger atomicInteger = new AtomicInteger();
+
+        Supplier<Object> callable = new Supplier<Object>() {
+            @Override
+            public Object get() throws Exception {
+                atomicInteger.incrementAndGet();
+                return null;
+            }
+        };
+
+        Maybe.fromSupplier(callable)
+            .test()
+            .assertResult();
+
+        assertEquals(1, atomicInteger.get());
+
+        Maybe.fromSupplier(callable)
+            .test()
+            .assertResult();
+
+        assertEquals(2, atomicInteger.get());
+    }
+
+    @Test
+    public void fromSupplierInvokesLazy() {
+        final AtomicInteger atomicInteger = new AtomicInteger();
+
+        Maybe<Object> completable = Maybe.fromSupplier(new Supplier<Object>() {
+            @Override
+            public Object get() throws Exception {
+                atomicInteger.incrementAndGet();
+                return null;
+            }
+        });
+
+        assertEquals(0, atomicInteger.get());
+
+        completable
+            .test()
+            .assertResult();
+
+        assertEquals(1, atomicInteger.get());
+    }
+
+    @Test
+    public void fromSupplierThrows() {
+        Maybe.fromSupplier(new Supplier<Object>() {
+            @Override
+            public Object get() throws Exception {
+                throw new UnsupportedOperationException();
+            }
+        })
+            .test()
+            .assertFailure(UnsupportedOperationException.class);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void callable() throws Throwable {
+        final int[] counter = { 0 };
+
+        Maybe<Integer> m = Maybe.fromSupplier(new Supplier<Integer>() {
+            @Override
+            public Integer get() throws Exception {
+                counter[0]++;
+                return 0;
+            }
+        });
+
+        assertTrue(m.getClass().toString(), m instanceof Supplier);
+
+        assertEquals(0, ((Supplier<Void>)m).get());
+
+        assertEquals(1, counter[0]);
+    }
+
+    @Test
+    public void noErrorLoss() throws Exception {
+        List<Throwable> errors = TestHelper.trackPluginErrors();
+        try {
+            final CountDownLatch cdl1 = new CountDownLatch(1);
+            final CountDownLatch cdl2 = new CountDownLatch(1);
+
+            TestObserver<Integer> to = Maybe.fromSupplier(new Supplier<Integer>() {
+                @Override
+                public Integer get() throws Exception {
+                    cdl1.countDown();
+                    cdl2.await(5, TimeUnit.SECONDS);
+                    return 1;
+                }
+            }).subscribeOn(Schedulers.single()).test();
+
+            assertTrue(cdl1.await(5, TimeUnit.SECONDS));
+
+            to.dispose();
+
+            int timeout = 10;
+
+            while (timeout-- > 0 && errors.isEmpty()) {
+                Thread.sleep(100);
+            }
+
+            TestHelper.assertUndeliverable(errors, 0, InterruptedException.class);
+        } finally {
+            RxJavaPlugins.reset();
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldNotDeliverResultIfSubscriberUnsubscribedBeforeEmission() throws Throwable {
+        Supplier<String> func = mock(Supplier.class);
+
+        final CountDownLatch funcLatch = new CountDownLatch(1);
+        final CountDownLatch observerLatch = new CountDownLatch(1);
+
+        when(func.get()).thenAnswer(new Answer<String>() {
+            @Override
+            public String answer(InvocationOnMock invocation) throws Throwable {
+                observerLatch.countDown();
+
+                try {
+                    funcLatch.await();
+                } catch (InterruptedException e) {
+                    // It's okay, unsubscription causes Thread interruption
+
+                    // Restoring interruption status of the Thread
+                    Thread.currentThread().interrupt();
+                }
+
+                return "should_not_be_delivered";
+            }
+        });
+
+        Maybe<String> fromSupplierObservable = Maybe.fromSupplier(func);
+
+        Observer<Object> observer = TestHelper.mockObserver();
+
+        TestObserver<String> outer = new TestObserver<String>(observer);
+
+        fromSupplierObservable
+                .subscribeOn(Schedulers.computation())
+                .subscribe(outer);
+
+        // Wait until func will be invoked
+        observerLatch.await();
+
+        // Unsubscribing before emission
+        outer.dispose();
+
+        // Emitting result
+        funcLatch.countDown();
+
+        // func must be invoked
+        verify(func).get();
+
+        // Observer must not be notified at all
+        verify(observer).onSubscribe(any(Disposable.class));
+        verifyNoMoreInteractions(observer);
+    }
+}
diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableFromSupplierTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableFromSupplierTest.java
new file mode 100644
index 0000000000..e3df29e1c5
--- /dev/null
+++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableFromSupplierTest.java
@@ -0,0 +1,314 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.reactivex.internal.operators.observable;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.*;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import io.reactivex.*;
+import io.reactivex.disposables.Disposable;
+import io.reactivex.exceptions.TestException;
+import io.reactivex.functions.*;
+import io.reactivex.observers.TestObserver;
+import io.reactivex.plugins.RxJavaPlugins;
+import io.reactivex.schedulers.Schedulers;
+import io.reactivex.testsupport.TestHelper;
+
+public class ObservableFromSupplierTest {
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldNotInvokeFuncUntilSubscription() throws Throwable {
+        Supplier<Object> func = mock(Supplier.class);
+
+        when(func.get()).thenReturn(new Object());
+
+        Observable<Object> fromSupplierObservable = Observable.fromSupplier(func);
+
+        verifyZeroInteractions(func);
+
+        fromSupplierObservable.subscribe();
+
+        verify(func).get();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldCallOnNextAndOnCompleted() throws Throwable {
+        Supplier<String> func = mock(Supplier.class);
+
+        when(func.get()).thenReturn("test_value");
+
+        Observable<String> fromSupplierObservable = Observable.fromSupplier(func);
+
+        Observer<Object> observer = TestHelper.mockObserver();
+
+        fromSupplierObservable.subscribe(observer);
+
+        verify(observer).onNext("test_value");
+        verify(observer).onComplete();
+        verify(observer, never()).onError(any(Throwable.class));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldCallOnError() throws Throwable {
+        Supplier<Object> func = mock(Supplier.class);
+
+        Throwable throwable = new IllegalStateException("Test exception");
+        when(func.get()).thenThrow(throwable);
+
+        Observable<Object> fromSupplierObservable = Observable.fromSupplier(func);
+
+        Observer<Object> observer = TestHelper.mockObserver();
+
+        fromSupplierObservable.subscribe(observer);
+
+        verify(observer, never()).onNext(any());
+        verify(observer, never()).onComplete();
+        verify(observer).onError(throwable);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldNotDeliverResultIfSubscriberUnsubscribedBeforeEmission() throws Throwable {
+        Supplier<String> func = mock(Supplier.class);
+
+        final CountDownLatch funcLatch = new CountDownLatch(1);
+        final CountDownLatch observerLatch = new CountDownLatch(1);
+
+        when(func.get()).thenAnswer(new Answer<String>() {
+            @Override
+            public String answer(InvocationOnMock invocation) throws Throwable {
+                observerLatch.countDown();
+
+                try {
+                    funcLatch.await();
+                } catch (InterruptedException e) {
+                    // It's okay, unsubscription causes Thread interruption
+
+                    // Restoring interruption status of the Thread
+                    Thread.currentThread().interrupt();
+                }
+
+                return "should_not_be_delivered";
+            }
+        });
+
+        Observable<String> fromSupplierObservable = Observable.fromSupplier(func);
+
+        Observer<Object> observer = TestHelper.mockObserver();
+
+        TestObserver<String> outer = new TestObserver<String>(observer);
+
+        fromSupplierObservable
+                .subscribeOn(Schedulers.computation())
+                .subscribe(outer);
+
+        // Wait until func will be invoked
+        observerLatch.await();
+
+        // Unsubscribing before emission
+        outer.dispose();
+
+        // Emitting result
+        funcLatch.countDown();
+
+        // func must be invoked
+        verify(func).get();
+
+        // Observer must not be notified at all
+        verify(observer).onSubscribe(any(Disposable.class));
+        verifyNoMoreInteractions(observer);
+    }
+
+    @Test
+    public void shouldAllowToThrowCheckedException() {
+        final Exception checkedException = new Exception("test exception");
+
+        Observable<Object> fromSupplierObservable = Observable.fromSupplier(new Supplier<Object>() {
+            @Override
+            public Object get() throws Exception {
+                throw checkedException;
+            }
+        });
+
+        Observer<Object> observer = TestHelper.mockObserver();
+
+        fromSupplierObservable.subscribe(observer);
+
+        verify(observer).onSubscribe(any(Disposable.class));
+        verify(observer).onError(checkedException);
+        verifyNoMoreInteractions(observer);
+    }
+
+    @Test
+    public void fusedFlatMapExecution() {
+        final int[] calls = { 0 };
+
+        Observable.just(1).flatMap(new Function<Integer, ObservableSource<? extends Object>>() {
+            @Override
+            public ObservableSource<? extends Object> apply(Integer v)
+                    throws Exception {
+                return Observable.fromSupplier(new Supplier<Object>() {
+                    @Override
+                    public Object get() throws Exception {
+                        return ++calls[0];
+                    }
+                });
+            }
+        })
+        .test()
+        .assertResult(1);
+
+        assertEquals(1, calls[0]);
+    }
+
+    @Test
+    public void fusedFlatMapExecutionHidden() {
+        final int[] calls = { 0 };
+
+        Observable.just(1).hide().flatMap(new Function<Integer, ObservableSource<? extends Object>>() {
+            @Override
+            public ObservableSource<? extends Object> apply(Integer v)
+                    throws Exception {
+                return Observable.fromSupplier(new Supplier<Object>() {
+                    @Override
+                    public Object get() throws Exception {
+                        return ++calls[0];
+                    }
+                });
+            }
+        })
+        .test()
+        .assertResult(1);
+
+        assertEquals(1, calls[0]);
+    }
+
+    @Test
+    public void fusedFlatMapNull() {
+        Observable.just(1).flatMap(new Function<Integer, ObservableSource<? extends Object>>() {
+            @Override
+            public ObservableSource<? extends Object> apply(Integer v)
+                    throws Exception {
+                return Observable.fromSupplier(new Supplier<Object>() {
+                    @Override
+                    public Object get() throws Exception {
+                        return null;
+                    }
+                });
+            }
+        })
+        .test()
+        .assertFailure(NullPointerException.class);
+    }
+
+    @Test
+    public void fusedFlatMapNullHidden() {
+        Observable.just(1).hide().flatMap(new Function<Integer, ObservableSource<? extends Object>>() {
+            @Override
+            public ObservableSource<? extends Object> apply(Integer v)
+                    throws Exception {
+                return Observable.fromSupplier(new Supplier<Object>() {
+                    @Override
+                    public Object get() throws Exception {
+                        return null;
+                    }
+                });
+            }
+        })
+        .test()
+        .assertFailure(NullPointerException.class);
+    }
+
+    @Test
+    public void disposedOnArrival() {
+        final int[] count = { 0 };
+        Observable.fromSupplier(new Supplier<Object>() {
+            @Override
+            public Object get() throws Exception {
+                count[0]++;
+                return 1;
+            }
+        })
+        .test(true)
+        .assertEmpty();
+
+        assertEquals(0, count[0]);
+    }
+
+    @Test
+    public void disposedOnCall() {
+        final TestObserver<Integer> to = new TestObserver<Integer>();
+
+        Observable.fromSupplier(new Supplier<Integer>() {
+            @Override
+            public Integer get() throws Exception {
+                to.dispose();
+                return 1;
+            }
+        })
+                .subscribe(to);
+
+        to.assertEmpty();
+    }
+
+    @Test
+    public void disposedOnCallThrows() {
+        List<Throwable> errors = TestHelper.trackPluginErrors();
+        try {
+            final TestObserver<Integer> to = new TestObserver<Integer>();
+
+            Observable.fromSupplier(new Supplier<Integer>() {
+                @Override
+                public Integer get() throws Exception {
+                    to.dispose();
+                    throw new TestException();
+                }
+            })
+            .subscribe(to);
+
+            to.assertEmpty();
+
+            TestHelper.assertUndeliverable(errors, 0, TestException.class);
+        } finally {
+            RxJavaPlugins.reset();
+        }
+    }
+
+    @Test
+    public void take() {
+        Observable.fromSupplier(new Supplier<Object>() {
+            @Override
+            public Object get() throws Exception {
+                return 1;
+            }
+        })
+        .take(1)
+        .test()
+        .assertResult(1);
+    }
+}
diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleFromCallableTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleFromCallableTest.java
index 41b9ce523e..7f54a9e594 100644
--- a/src/test/java/io/reactivex/internal/operators/single/SingleFromCallableTest.java
+++ b/src/test/java/io/reactivex/internal/operators/single/SingleFromCallableTest.java
@@ -44,8 +44,8 @@ public void fromCallableValue() {
                 return 5;
             }
         })
-            .test()
-            .assertResult(5);
+        .test()
+        .assertResult(5);
     }
 
     @Test
diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleFromSupplierTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleFromSupplierTest.java
new file mode 100644
index 0000000000..b0cda6668e
--- /dev/null
+++ b/src/test/java/io/reactivex/internal/operators/single/SingleFromSupplierTest.java
@@ -0,0 +1,275 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.internal.operators.single;
+
+import static org.junit.Assert.*;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.*;
+
+import java.util.List;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import io.reactivex.*;
+import io.reactivex.disposables.Disposable;
+import io.reactivex.functions.Supplier;
+import io.reactivex.observers.TestObserver;
+import io.reactivex.plugins.RxJavaPlugins;
+import io.reactivex.schedulers.Schedulers;
+import io.reactivex.testsupport.TestHelper;
+
+public class SingleFromSupplierTest {
+
+    @Test
+    public void fromCallableValue() {
+        Single.fromSupplier(new Supplier<Integer>() {
+            @Override public Integer get() throws Exception {
+                return 5;
+            }
+        })
+        .test()
+        .assertResult(5);
+    }
+
+    @Test
+    public void fromSupplierError() {
+        Single.fromSupplier(new Supplier<Integer>() {
+            @Override public Integer get() throws Exception {
+                throw new UnsupportedOperationException();
+            }
+        })
+            .test()
+            .assertFailure(UnsupportedOperationException.class);
+    }
+
+    @Test
+    public void fromSupplierNull() {
+        Single.fromSupplier(new Supplier<Integer>() {
+            @Override public Integer get() throws Exception {
+                return null;
+            }
+        })
+        .to(TestHelper.<Integer>testConsumer())
+        .assertFailureAndMessage(NullPointerException.class, "The supplier returned a null value");
+    }
+
+    @Test
+    public void fromSupplierTwice() {
+        final AtomicInteger atomicInteger = new AtomicInteger();
+
+        Supplier<Integer> callable = new Supplier<Integer>() {
+            @Override
+            public Integer get() throws Exception {
+                return atomicInteger.incrementAndGet();
+            }
+        };
+
+        Single.fromSupplier(callable)
+                .test()
+                .assertResult(1);
+
+        assertEquals(1, atomicInteger.get());
+
+        Single.fromSupplier(callable)
+                .test()
+                .assertResult(2);
+
+        assertEquals(2, atomicInteger.get());
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldNotInvokeFuncUntilSubscription() throws Throwable {
+        Supplier<Object> func = mock(Supplier.class);
+
+        when(func.get()).thenReturn(new Object());
+
+        Single<Object> fromSupplierSingle = Single.fromSupplier(func);
+
+        verifyZeroInteractions(func);
+
+        fromSupplierSingle.subscribe();
+
+        verify(func).get();
+    }
+
+    @Test
+    public void noErrorLoss() throws Exception {
+        List<Throwable> errors = TestHelper.trackPluginErrors();
+        try {
+            final CountDownLatch cdl1 = new CountDownLatch(1);
+            final CountDownLatch cdl2 = new CountDownLatch(1);
+
+            TestObserver<Integer> to = Single.fromSupplier(new Supplier<Integer>() {
+                @Override
+                public Integer get() throws Exception {
+                    cdl1.countDown();
+                    cdl2.await(5, TimeUnit.SECONDS);
+                    return 1;
+                }
+            }).subscribeOn(Schedulers.single()).test();
+
+            assertTrue(cdl1.await(5, TimeUnit.SECONDS));
+
+            to.dispose();
+
+            int timeout = 10;
+
+            while (timeout-- > 0 && errors.isEmpty()) {
+                Thread.sleep(100);
+            }
+
+            TestHelper.assertUndeliverable(errors, 0, InterruptedException.class);
+        } finally {
+            RxJavaPlugins.reset();
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldNotDeliverResultIfSubscriberUnsubscribedBeforeEmission() throws Throwable {
+        Supplier<String> func = mock(Supplier.class);
+
+        final CountDownLatch funcLatch = new CountDownLatch(1);
+        final CountDownLatch observerLatch = new CountDownLatch(1);
+
+        when(func.get()).thenAnswer(new Answer<String>() {
+            @Override
+            public String answer(InvocationOnMock invocation) throws Throwable {
+                observerLatch.countDown();
+
+                try {
+                    funcLatch.await();
+                } catch (InterruptedException e) {
+                    // It's okay, unsubscription causes Thread interruption
+
+                    // Restoring interruption status of the Thread
+                    Thread.currentThread().interrupt();
+                }
+
+                return "should_not_be_delivered";
+            }
+        });
+
+        Single<String> fromSupplierObservable = Single.fromSupplier(func);
+
+        Observer<Object> observer = TestHelper.mockObserver();
+
+        TestObserver<String> outer = new TestObserver<String>(observer);
+
+        fromSupplierObservable
+                .subscribeOn(Schedulers.computation())
+                .subscribe(outer);
+
+        // Wait until func will be invoked
+        observerLatch.await();
+
+        // Unsubscribing before emission
+        outer.dispose();
+
+        // Emitting result
+        funcLatch.countDown();
+
+        // func must be invoked
+        verify(func).get();
+
+        // Observer must not be notified at all
+        verify(observer).onSubscribe(any(Disposable.class));
+        verifyNoMoreInteractions(observer);
+    }
+
+    @Test
+    public void shouldAllowToThrowCheckedException() {
+        final Exception checkedException = new Exception("test exception");
+
+        Single<Object> fromSupplierObservable = Single.fromSupplier(new Supplier<Object>() {
+            @Override
+            public Object get() throws Exception {
+                throw checkedException;
+            }
+        });
+
+        SingleObserver<Object> observer = TestHelper.mockSingleObserver();
+
+        fromSupplierObservable.subscribe(observer);
+
+        verify(observer).onSubscribe(any(Disposable.class));
+        verify(observer).onError(checkedException);
+        verifyNoMoreInteractions(observer);
+    }
+
+    @Test
+    public void disposedOnArrival() {
+        final int[] count = { 0 };
+        Single.fromSupplier(new Supplier<Object>() {
+            @Override
+            public Object get() throws Exception {
+                count[0]++;
+                return 1;
+            }
+        })
+                .test(true)
+                .assertEmpty();
+
+        assertEquals(0, count[0]);
+    }
+
+    @Test
+    public void disposedOnCall() {
+        final TestObserver<Integer> to = new TestObserver<Integer>();
+
+        Single.fromSupplier(new Supplier<Integer>() {
+            @Override
+            public Integer get() throws Exception {
+                to.dispose();
+                return 1;
+            }
+        })
+                .subscribe(to);
+
+        to.assertEmpty();
+    }
+
+    @Test
+    public void toObservableTake() {
+        Single.fromSupplier(new Supplier<Object>() {
+            @Override
+            public Object get() throws Exception {
+                return 1;
+            }
+        })
+        .toObservable()
+        .take(1)
+        .test()
+        .assertResult(1);
+    }
+
+    @Test
+    public void toObservableAndBack() {
+        Single.fromSupplier(new Supplier<Integer>() {
+            @Override
+            public Integer get() throws Exception {
+                return 1;
+            }
+        })
+        .toObservable()
+        .singleOrError()
+        .test()
+        .assertResult(1);
+    }
+}
diff --git a/src/test/java/io/reactivex/tck/FromCallableTckTest.java b/src/test/java/io/reactivex/tck/FromCallableTckTest.java
index 7131e0f88c..12e46a06c5 100644
--- a/src/test/java/io/reactivex/tck/FromCallableTckTest.java
+++ b/src/test/java/io/reactivex/tck/FromCallableTckTest.java
@@ -19,6 +19,7 @@
 import org.testng.annotations.Test;
 
 import io.reactivex.Flowable;
+import io.reactivex.exceptions.TestException;
 
 @Test
 public class FromCallableTckTest extends BaseTck<Long> {
@@ -36,6 +37,19 @@ public Long call() throws Exception {
             ;
     }
 
+    @Override
+    public Publisher<Long> createFailedPublisher() {
+        return
+                Flowable.fromCallable(new Callable<Long>() {
+                    @Override
+                    public Long call() throws Exception {
+                        throw new TestException();
+                    }
+                }
+                )
+            ;
+    }
+
     @Override
     public long maxElementsFromPublisher() {
         return 1;
diff --git a/src/test/java/io/reactivex/tck/FromSupplierTckTest.java b/src/test/java/io/reactivex/tck/FromSupplierTckTest.java
new file mode 100644
index 0000000000..0878adc619
--- /dev/null
+++ b/src/test/java/io/reactivex/tck/FromSupplierTckTest.java
@@ -0,0 +1,56 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.tck;
+
+import org.reactivestreams.Publisher;
+import org.testng.annotations.Test;
+
+import io.reactivex.Flowable;
+import io.reactivex.exceptions.TestException;
+import io.reactivex.functions.Supplier;
+
+@Test
+public class FromSupplierTckTest extends BaseTck<Long> {
+
+    @Override
+    public Publisher<Long> createPublisher(final long elements) {
+        return
+                Flowable.fromSupplier(new Supplier<Long>() {
+                    @Override
+                    public Long get() throws Throwable {
+                        return 1L;
+                    }
+                }
+                )
+            ;
+    }
+
+    @Override
+    public Publisher<Long> createFailedPublisher() {
+        return
+                Flowable.fromSupplier(new Supplier<Long>() {
+                    @Override
+                    public Long get() throws Throwable {
+                        throw new TestException();
+                    }
+                }
+                )
+            ;
+    }
+
+    @Override
+    public long maxElementsFromPublisher() {
+        return 1;
+    }
+}

From 59d1151ceebafd0647932246c2db32bd69bfe38d Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?D=C3=A1vid=20Karnok?= <akarnokd@gmail.com>
Date: Thu, 20 Jun 2019 20:51:31 +0200
Subject: [PATCH 2/2] Correct some missing callable-supplier name changes

---
 src/main/java/io/reactivex/Completable.java               | 2 +-
 src/main/java/io/reactivex/Maybe.java                     | 2 +-
 .../internal/operators/maybe/MaybeFromSupplier.java       | 2 +-
 .../operators/observable/ObservableFromSupplier.java      | 4 ++--
 .../internal/operators/maybe/MaybeFromSupplierTest.java   | 8 ++++----
 .../internal/operators/single/SingleFromSupplierTest.java | 8 ++++----
 6 files changed, 13 insertions(+), 13 deletions(-)

diff --git a/src/main/java/io/reactivex/Completable.java b/src/main/java/io/reactivex/Completable.java
index b618650d5d..da9a6d4253 100644
--- a/src/main/java/io/reactivex/Completable.java
+++ b/src/main/java/io/reactivex/Completable.java
@@ -637,7 +637,7 @@ public static <T> Completable fromSingle(final SingleSource<T> single) {
     @NonNull
     @SchedulerSupport(SchedulerSupport.NONE)
     public static Completable fromSupplier(final Supplier<?> supplier) {
-        ObjectHelper.requireNonNull(supplier, "callable is null");
+        ObjectHelper.requireNonNull(supplier, "supplier is null");
         return RxJavaPlugins.onAssembly(new CompletableFromSupplier(supplier));
     }
 
diff --git a/src/main/java/io/reactivex/Maybe.java b/src/main/java/io/reactivex/Maybe.java
index 72954d89d0..ffc042e178 100644
--- a/src/main/java/io/reactivex/Maybe.java
+++ b/src/main/java/io/reactivex/Maybe.java
@@ -875,7 +875,7 @@ public static <T> Maybe<T> fromRunnable(final Runnable run) {
      * <p>
      * This operator allows you to defer the execution of the given {@code Supplier} until a {@code MaybeObserver}
      * subscribes to the  returned {@link Maybe}. In other terms, this source operator evaluates the given
-     * {@code Callable} "lazily".
+     * {@code Supplier} "lazily".
      * <p>
      * Note that the {@code null} handling of this operator differs from the similar source operators in the other
      * {@link io.reactivex base reactive classes}. Those operators signal a {@code NullPointerException} if the value returned by their
diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeFromSupplier.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeFromSupplier.java
index fa2e9c1cd4..2f33813898 100644
--- a/src/main/java/io/reactivex/internal/operators/maybe/MaybeFromSupplier.java
+++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeFromSupplier.java
@@ -20,7 +20,7 @@
 import io.reactivex.plugins.RxJavaPlugins;
 
 /**
- * Executes a callable and signals its value as success or signals an exception.
+ * Executes a supplier and signals its value as success or signals an exception.
  *
  * @param <T> the value type
  * @since 3.0.0
diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableFromSupplier.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableFromSupplier.java
index 2ba55b6f36..88791276c8 100644
--- a/src/main/java/io/reactivex/internal/operators/observable/ObservableFromSupplier.java
+++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableFromSupplier.java
@@ -21,7 +21,7 @@
 import io.reactivex.plugins.RxJavaPlugins;
 
 /**
- * Calls a Callable and emits its resulting single value or signals its exception.
+ * Calls a Supplier and emits its resulting single value or signals its exception.
  * @param <T> the value type
  * @since 3.0.0
  */
@@ -57,6 +57,6 @@ public void subscribeActual(Observer<? super T> observer) {
 
     @Override
     public T get() throws Throwable {
-        return ObjectHelper.requireNonNull(supplier.get(), "The callable returned a null value");
+        return ObjectHelper.requireNonNull(supplier.get(), "The supplier returned a null value");
     }
 }
diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeFromSupplierTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeFromSupplierTest.java
index a281af9f4b..fdb91888b3 100644
--- a/src/test/java/io/reactivex/internal/operators/maybe/MaybeFromSupplierTest.java
+++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeFromSupplierTest.java
@@ -61,7 +61,7 @@ public Object get() throws Exception {
     public void fromSupplierTwice() {
         final AtomicInteger atomicInteger = new AtomicInteger();
 
-        Supplier<Object> callable = new Supplier<Object>() {
+        Supplier<Object> supplier = new Supplier<Object>() {
             @Override
             public Object get() throws Exception {
                 atomicInteger.incrementAndGet();
@@ -69,13 +69,13 @@ public Object get() throws Exception {
             }
         };
 
-        Maybe.fromSupplier(callable)
+        Maybe.fromSupplier(supplier)
             .test()
             .assertResult();
 
         assertEquals(1, atomicInteger.get());
 
-        Maybe.fromSupplier(callable)
+        Maybe.fromSupplier(supplier)
             .test()
             .assertResult();
 
@@ -117,7 +117,7 @@ public Object get() throws Exception {
 
     @SuppressWarnings("unchecked")
     @Test
-    public void callable() throws Throwable {
+    public void supplier() throws Throwable {
         final int[] counter = { 0 };
 
         Maybe<Integer> m = Maybe.fromSupplier(new Supplier<Integer>() {
diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleFromSupplierTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleFromSupplierTest.java
index b0cda6668e..6c823a44bb 100644
--- a/src/test/java/io/reactivex/internal/operators/single/SingleFromSupplierTest.java
+++ b/src/test/java/io/reactivex/internal/operators/single/SingleFromSupplierTest.java
@@ -36,7 +36,7 @@
 public class SingleFromSupplierTest {
 
     @Test
-    public void fromCallableValue() {
+    public void fromSupplierValue() {
         Single.fromSupplier(new Supplier<Integer>() {
             @Override public Integer get() throws Exception {
                 return 5;
@@ -72,20 +72,20 @@ public void fromSupplierNull() {
     public void fromSupplierTwice() {
         final AtomicInteger atomicInteger = new AtomicInteger();
 
-        Supplier<Integer> callable = new Supplier<Integer>() {
+        Supplier<Integer> supplier = new Supplier<Integer>() {
             @Override
             public Integer get() throws Exception {
                 return atomicInteger.incrementAndGet();
             }
         };
 
-        Single.fromSupplier(callable)
+        Single.fromSupplier(supplier)
                 .test()
                 .assertResult(1);
 
         assertEquals(1, atomicInteger.get());
 
-        Single.fromSupplier(callable)
+        Single.fromSupplier(supplier)
                 .test()
                 .assertResult(2);