Skip to content

Commit 98561ce

Browse files
authored
3.x: Add X.fromSupplier() (#6529)
* 3.x: Add X.fromSupplier() * Correct some missing callable-supplier name changes
1 parent f854c17 commit 98561ce

20 files changed

+1855
-6
lines changed

src/main/java/io/reactivex/Completable.java

+32
Original file line numberDiff line numberDiff line change
@@ -444,6 +444,8 @@ public static Completable fromAction(final Action run) {
444444
* </dl>
445445
* @param callable the callable instance to execute for each subscriber
446446
* @return the new Completable instance
447+
* @see #defer(Supplier)
448+
* @see #fromSupplier(Supplier)
447449
*/
448450
@CheckReturnValue
449451
@NonNull
@@ -609,6 +611,36 @@ public static <T> Completable fromSingle(final SingleSource<T> single) {
609611
return RxJavaPlugins.onAssembly(new CompletableFromSingle<T>(single));
610612
}
611613

614+
/**
615+
* Returns a Completable which when subscribed, executes the supplier function, ignores its
616+
* normal result and emits onError or onComplete only.
617+
* <p>
618+
* <img width="640" height="286" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.fromCallable.png" alt="">
619+
* <dl>
620+
* <dt><b>Scheduler:</b></dt>
621+
* <dd>{@code fromSupplier} does not operate by default on a particular {@link Scheduler}.</dd>
622+
* <dt><b>Error handling:</b></dt>
623+
* <dd> If the {@link Supplier} throws an exception, the respective {@link Throwable} is
624+
* delivered to the downstream via {@link CompletableObserver#onError(Throwable)},
625+
* except when the downstream has disposed this {@code Completable} source.
626+
* In this latter case, the {@code Throwable} is delivered to the global error handler via
627+
* {@link RxJavaPlugins#onError(Throwable)} as an {@link io.reactivex.exceptions.UndeliverableException UndeliverableException}.
628+
* </dd>
629+
* </dl>
630+
* @param supplier the Supplier instance to execute for each subscriber
631+
* @return the new Completable instance
632+
* @see #defer(Supplier)
633+
* @see #fromCallable(Callable)
634+
* @since 3.0.0
635+
*/
636+
@CheckReturnValue
637+
@NonNull
638+
@SchedulerSupport(SchedulerSupport.NONE)
639+
public static Completable fromSupplier(final Supplier<?> supplier) {
640+
ObjectHelper.requireNonNull(supplier, "supplier is null");
641+
return RxJavaPlugins.onAssembly(new CompletableFromSupplier(supplier));
642+
}
643+
612644
/**
613645
* Returns a Completable instance that subscribes to all sources at once and
614646
* completes only when all source Completables complete or one of them emits an error.

src/main/java/io/reactivex/Flowable.java

+42
Original file line numberDiff line numberDiff line change
@@ -2087,6 +2087,7 @@ public static <T> Flowable<T> fromArray(T... items) {
20872087
* the type of the item emitted by the Publisher
20882088
* @return a Flowable whose {@link Subscriber}s' subscriptions trigger an invocation of the given function
20892089
* @see #defer(Supplier)
2090+
* @see #fromSupplier(Supplier)
20902091
* @since 2.0
20912092
*/
20922093
@CheckReturnValue
@@ -2331,6 +2332,47 @@ public static <T> Flowable<T> fromPublisher(final Publisher<? extends T> source)
23312332
return RxJavaPlugins.onAssembly(new FlowableFromPublisher<T>(source));
23322333
}
23332334

2335+
/**
2336+
* Returns a Flowable that, when a Subscriber subscribes to it, invokes a supplier function you specify and then
2337+
* emits the value returned from that function.
2338+
* <p>
2339+
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/fromCallable.png" alt="">
2340+
* <p>
2341+
* This allows you to defer the execution of the function you specify until a Subscriber subscribes to the
2342+
* Publisher. That is to say, it makes the function "lazy."
2343+
* <dl>
2344+
* <dt><b>Backpressure:</b></dt>
2345+
* <dd>The operator honors backpressure from downstream.</dd>
2346+
* <dt><b>Scheduler:</b></dt>
2347+
* <dd>{@code fromSupplier} does not operate by default on a particular {@link Scheduler}.</dd>
2348+
* <dt><b>Error handling:</b></dt>
2349+
* <dd> If the {@link Supplier} throws an exception, the respective {@link Throwable} is
2350+
* delivered to the downstream via {@link Subscriber#onError(Throwable)},
2351+
* except when the downstream has canceled this {@code Flowable} source.
2352+
* In this latter case, the {@code Throwable} is delivered to the global error handler via
2353+
* {@link RxJavaPlugins#onError(Throwable)} as an {@link io.reactivex.exceptions.UndeliverableException UndeliverableException}.
2354+
* </dd>
2355+
* </dl>
2356+
*
2357+
* @param supplier
2358+
* a function, the execution of which should be deferred; {@code fromSupplier} will invoke this
2359+
* function only when a Subscriber subscribes to the Publisher that {@code fromSupplier} returns
2360+
* @param <T>
2361+
* the type of the item emitted by the Publisher
2362+
* @return a Flowable whose {@link Subscriber}s' subscriptions trigger an invocation of the given function
2363+
* @see #defer(Supplier)
2364+
* @see #fromCallable(Callable)
2365+
* @since 3.0.0
2366+
*/
2367+
@CheckReturnValue
2368+
@NonNull
2369+
@BackpressureSupport(BackpressureKind.FULL)
2370+
@SchedulerSupport(SchedulerSupport.NONE)
2371+
public static <T> Flowable<T> fromSupplier(Supplier<? extends T> supplier) {
2372+
ObjectHelper.requireNonNull(supplier, "supplier is null");
2373+
return RxJavaPlugins.onAssembly(new FlowableFromSupplier<T>(supplier));
2374+
}
2375+
23342376
/**
23352377
* Returns a cold, synchronous, stateless and backpressure-aware generator of values.
23362378
* <p>

src/main/java/io/reactivex/Maybe.java

+47
Original file line numberDiff line numberDiff line change
@@ -763,6 +763,8 @@ public static <T> Maybe<T> fromSingle(SingleSource<T> singleSource) {
763763
* @param <T>
764764
* the type of the item emitted by the {@link Maybe}.
765765
* @return a new Maybe instance
766+
* @see #defer(Supplier)
767+
* @see #fromSupplier(Supplier)
766768
*/
767769
@CheckReturnValue
768770
@NonNull
@@ -865,6 +867,51 @@ public static <T> Maybe<T> fromRunnable(final Runnable run) {
865867
return RxJavaPlugins.onAssembly(new MaybeFromRunnable<T>(run));
866868
}
867869

870+
/**
871+
* Returns a {@link Maybe} that invokes the given {@link Supplier} for each individual {@link MaybeObserver} that
872+
* subscribes and emits the resulting non-null item via {@code onSuccess} while
873+
* considering a {@code null} result from the {@code Supplier} as indication for valueless completion
874+
* via {@code onComplete}.
875+
* <p>
876+
* This operator allows you to defer the execution of the given {@code Supplier} until a {@code MaybeObserver}
877+
* subscribes to the returned {@link Maybe}. In other terms, this source operator evaluates the given
878+
* {@code Supplier} "lazily".
879+
* <p>
880+
* Note that the {@code null} handling of this operator differs from the similar source operators in the other
881+
* {@link io.reactivex base reactive classes}. Those operators signal a {@code NullPointerException} if the value returned by their
882+
* {@code Supplier} is {@code null} while this {@code fromSupplier} considers it to indicate the
883+
* returned {@code Maybe} is empty.
884+
* <dl>
885+
* <dt><b>Scheduler:</b></dt>
886+
* <dd>{@code fromSupplier} does not operate by default on a particular {@link Scheduler}.</dd>
887+
* <dt><b>Error handling:</b></dt>
888+
* <dd>Any non-fatal exception thrown by {@link Supplier#get()} will be forwarded to {@code onError},
889+
* except if the {@code MaybeObserver} disposed the subscription in the meantime. In this latter case,
890+
* the exception is forwarded to the global error handler via
891+
* {@link io.reactivex.plugins.RxJavaPlugins#onError(Throwable)} wrapped into a
892+
* {@link io.reactivex.exceptions.UndeliverableException UndeliverableException}.
893+
* Fatal exceptions are rethrown and usually will end up in the executing thread's
894+
* {@link java.lang.Thread.UncaughtExceptionHandler#uncaughtException(Thread, Throwable)} handler.</dd>
895+
* </dl>
896+
*
897+
* @param supplier
898+
* a {@link Supplier} instance whose execution should be deferred and performed for each individual
899+
* {@code MaybeObserver} that subscribes to the returned {@link Maybe}.
900+
* @param <T>
901+
* the type of the item emitted by the {@link Maybe}.
902+
* @return a new Maybe instance
903+
* @see #defer(Supplier)
904+
* @see #fromCallable(Callable)
905+
* @since 3.0.0
906+
*/
907+
@CheckReturnValue
908+
@NonNull
909+
@SchedulerSupport(SchedulerSupport.NONE)
910+
public static <T> Maybe<T> fromSupplier(@NonNull final Supplier<? extends T> supplier) {
911+
ObjectHelper.requireNonNull(supplier, "supplier is null");
912+
return RxJavaPlugins.onAssembly(new MaybeFromSupplier<T>(supplier));
913+
}
914+
868915
/**
869916
* Returns a {@code Maybe} that emits a specified item.
870917
* <p>

src/main/java/io/reactivex/Observable.java

+38
Original file line numberDiff line numberDiff line change
@@ -1798,6 +1798,7 @@ public static <T> Observable<T> fromArray(T... items) {
17981798
* the type of the item emitted by the ObservableSource
17991799
* @return an Observable whose {@link Observer}s' subscriptions trigger an invocation of the given function
18001800
* @see #defer(Supplier)
1801+
* @see #fromSupplier(Supplier)
18011802
* @since 2.0
18021803
*/
18031804
@CheckReturnValue
@@ -2021,6 +2022,43 @@ public static <T> Observable<T> fromPublisher(Publisher<? extends T> publisher)
20212022
return RxJavaPlugins.onAssembly(new ObservableFromPublisher<T>(publisher));
20222023
}
20232024

2025+
/**
2026+
* Returns an Observable that, when an observer subscribes to it, invokes a supplier function you specify and then
2027+
* emits the value returned from that function.
2028+
* <p>
2029+
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/fromCallable.png" alt="">
2030+
* <p>
2031+
* This allows you to defer the execution of the function you specify until an observer subscribes to the
2032+
* ObservableSource. That is to say, it makes the function "lazy."
2033+
* <dl>
2034+
* <dt><b>Scheduler:</b></dt>
2035+
* <dd>{@code fromSupplier} does not operate by default on a particular {@link Scheduler}.</dd>
2036+
* <dt><b>Error handling:</b></dt>
2037+
* <dd> If the {@link Supplier} throws an exception, the respective {@link Throwable} is
2038+
* delivered to the downstream via {@link Observer#onError(Throwable)},
2039+
* except when the downstream has disposed this {@code Observable} source.
2040+
* In this latter case, the {@code Throwable} is delivered to the global error handler via
2041+
* {@link RxJavaPlugins#onError(Throwable)} as an {@link io.reactivex.exceptions.UndeliverableException UndeliverableException}.
2042+
* </dd>
2043+
* </dl>
2044+
* @param supplier
2045+
* a function, the execution of which should be deferred; {@code fromSupplier} will invoke this
2046+
* function only when an observer subscribes to the ObservableSource that {@code fromSupplier} returns
2047+
* @param <T>
2048+
* the type of the item emitted by the ObservableSource
2049+
* @return an Observable whose {@link Observer}s' subscriptions trigger an invocation of the given function
2050+
* @see #defer(Supplier)
2051+
* @see #fromCallable(Callable)
2052+
* @since 3.0.0
2053+
*/
2054+
@CheckReturnValue
2055+
@NonNull
2056+
@SchedulerSupport(SchedulerSupport.NONE)
2057+
public static <T> Observable<T> fromSupplier(Supplier<? extends T> supplier) {
2058+
ObjectHelper.requireNonNull(supplier, "supplier is null");
2059+
return RxJavaPlugins.onAssembly(new ObservableFromSupplier<T>(supplier));
2060+
}
2061+
20242062
/**
20252063
* Returns a cold, synchronous and stateless generator of values.
20262064
* <p>

src/main/java/io/reactivex/Single.java

+40
Original file line numberDiff line numberDiff line change
@@ -613,6 +613,8 @@ public static <T> Single<T> error(final Throwable exception) {
613613
* @param <T>
614614
* the type of the item emitted by the {@link Single}.
615615
* @return a {@link Single} whose {@link SingleObserver}s' subscriptions trigger an invocation of the given function.
616+
* @see #defer(Supplier)
617+
* @see #fromSupplier(Supplier)
616618
*/
617619
@CheckReturnValue
618620
@NonNull
@@ -811,6 +813,44 @@ public static <T> Single<T> fromObservable(ObservableSource<? extends T> observa
811813
return RxJavaPlugins.onAssembly(new ObservableSingleSingle<T>(observableSource, null));
812814
}
813815

816+
/**
817+
* Returns a {@link Single} that invokes passed supplierfunction and emits its result
818+
* for each new SingleObserver that subscribes.
819+
* <p>
820+
* Allows you to defer execution of passed function until SingleObserver subscribes to the {@link Single}.
821+
* It makes passed function "lazy".
822+
* Result of the function invocation will be emitted by the {@link Single}.
823+
* <p>
824+
* <img width="640" height="467" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.fromCallable.png" alt="">
825+
* <dl>
826+
* <dt><b>Scheduler:</b></dt>
827+
* <dd>{@code fromSupplier} does not operate by default on a particular {@link Scheduler}.</dd>
828+
* <dt><b>Error handling:</b></dt>
829+
* <dd> If the {@link Supplier} throws an exception, the respective {@link Throwable} is
830+
* delivered to the downstream via {@link SingleObserver#onError(Throwable)},
831+
* except when the downstream has disposed this {@code Single} source.
832+
* In this latter case, the {@code Throwable} is delivered to the global error handler via
833+
* {@link RxJavaPlugins#onError(Throwable)} as an {@link io.reactivex.exceptions.UndeliverableException UndeliverableException}.
834+
* </dd>
835+
* </dl>
836+
*
837+
* @param supplier
838+
* function which execution should be deferred, it will be invoked when SingleObserver will subscribe to the {@link Single}.
839+
* @param <T>
840+
* the type of the item emitted by the {@link Single}.
841+
* @return a {@link Single} whose {@link SingleObserver}s' subscriptions trigger an invocation of the given function.
842+
* @see #defer(Supplier)
843+
* @see #fromCallable(Callable)
844+
* @since 3.0.0
845+
*/
846+
@CheckReturnValue
847+
@NonNull
848+
@SchedulerSupport(SchedulerSupport.NONE)
849+
public static <T> Single<T> fromSupplier(final Supplier<? extends T> supplier) {
850+
ObjectHelper.requireNonNull(supplier, "supplier is null");
851+
return RxJavaPlugins.onAssembly(new SingleFromSupplier<T>(supplier));
852+
}
853+
814854
/**
815855
* Returns a {@code Single} that emits a specified item.
816856
* <p>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.completable;
15+
16+
import io.reactivex.*;
17+
import io.reactivex.disposables.*;
18+
import io.reactivex.exceptions.Exceptions;
19+
import io.reactivex.functions.Supplier;
20+
import io.reactivex.plugins.RxJavaPlugins;
21+
22+
/**
23+
* Call a Supplier for each incoming CompletableObserver and signal completion or the thrown exception.
24+
* @since 3.0.0
25+
*/
26+
public final class CompletableFromSupplier extends Completable {
27+
28+
final Supplier<?> supplier;
29+
30+
public CompletableFromSupplier(Supplier<?> supplier) {
31+
this.supplier = supplier;
32+
}
33+
34+
@Override
35+
protected void subscribeActual(CompletableObserver observer) {
36+
Disposable d = Disposables.empty();
37+
observer.onSubscribe(d);
38+
try {
39+
supplier.get();
40+
} catch (Throwable e) {
41+
Exceptions.throwIfFatal(e);
42+
if (!d.isDisposed()) {
43+
observer.onError(e);
44+
} else {
45+
RxJavaPlugins.onError(e);
46+
}
47+
return;
48+
}
49+
if (!d.isDisposed()) {
50+
observer.onComplete();
51+
}
52+
}
53+
}

src/main/java/io/reactivex/internal/operators/flowable/FlowableFromCallable.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,12 @@
1919

2020
import io.reactivex.Flowable;
2121
import io.reactivex.exceptions.Exceptions;
22+
import io.reactivex.functions.Supplier;
2223
import io.reactivex.internal.functions.ObjectHelper;
2324
import io.reactivex.internal.subscriptions.DeferredScalarSubscription;
2425
import io.reactivex.plugins.RxJavaPlugins;
2526

26-
public final class FlowableFromCallable<T> extends Flowable<T> implements Callable<T> {
27+
public final class FlowableFromCallable<T> extends Flowable<T> implements Supplier<T> {
2728
final Callable<? extends T> callable;
2829
public FlowableFromCallable(Callable<? extends T> callable) {
2930
this.callable = callable;
@@ -51,7 +52,7 @@ public void subscribeActual(Subscriber<? super T> s) {
5152
}
5253

5354
@Override
54-
public T call() throws Exception {
55+
public T get() throws Throwable {
5556
return ObjectHelper.requireNonNull(callable.call(), "The callable returned a null value");
5657
}
5758
}

0 commit comments

Comments
 (0)