Skip to content

Commit 33a1dcf

Browse files
authored
3.x: Add Maybe/Single/Completable blockingSubscribe (#6862)
* 3.x: Add Maybe/Single/Completable blockingSubscribe * Update marble dimensions
1 parent bb3260e commit 33a1dcf

File tree

8 files changed

+1698
-1
lines changed

8 files changed

+1698
-1
lines changed

src/main/java/io/reactivex/rxjava3/core/Completable.java

+100
Original file line numberDiff line numberDiff line change
@@ -1310,6 +1310,106 @@ public final boolean blockingAwait(long timeout, @NonNull TimeUnit unit) {
13101310
return observer.blockingAwait(timeout, unit);
13111311
}
13121312

1313+
/**
1314+
* Subscribes to the current {@code Completable} and <em>blocks the current thread</em> until it terminates.
1315+
* <p>
1316+
* <img width="640" height="346" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.blockingSubscribe.png" alt="">
1317+
* <dl>
1318+
* <dt><b>Scheduler:</b></dt>
1319+
* <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
1320+
* <dt><b>Error handling:</b></dt>
1321+
* <dd>If the current {@code Completable} signals an error,
1322+
* the {@link Throwable} is routed to the global error handler via {@link RxJavaPlugins#onError(Throwable)}.
1323+
* If the current thread is interrupted, an {@link InterruptedException} is routed to the same global error handler.
1324+
* </dd>
1325+
* </dl>
1326+
* @since 3.0.0
1327+
* @see #blockingSubscribe(Action)
1328+
* @see #blockingSubscribe(Action, Consumer)
1329+
*/
1330+
@SchedulerSupport(SchedulerSupport.NONE)
1331+
public final void blockingSubscribe() {
1332+
blockingSubscribe(Functions.EMPTY_ACTION, Functions.ERROR_CONSUMER);
1333+
}
1334+
1335+
/**
1336+
* Subscribes to the current {@code Completable} and calls given {@code onComplete} callback on the <em>current thread</em>
1337+
* when it completes normally.
1338+
* <p>
1339+
* <img width="640" height="351" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.blockingSubscribe.a.png" alt="">
1340+
* <dl>
1341+
* <dt><b>Scheduler:</b></dt>
1342+
* <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
1343+
* <dt><b>Error handling:</b></dt>
1344+
* <dd>If either the current {@code Completable} signals an error or {@code onComplete} throws,
1345+
* the respective {@link Throwable} is routed to the global error handler via {@link RxJavaPlugins#onError(Throwable)}.
1346+
* If the current thread is interrupted, an {@link InterruptedException} is routed to the same global error handler.
1347+
* </dd>
1348+
* </dl>
1349+
* @param onComplete the {@link Action} to call if the current {@code Completable} completes normally
1350+
* @throws NullPointerException if {@code onComplete} is {@code null}
1351+
* @since 3.0.0
1352+
* @see #blockingSubscribe(Action, Consumer)
1353+
*/
1354+
@SchedulerSupport(SchedulerSupport.NONE)
1355+
public final void blockingSubscribe(@NonNull Action onComplete) {
1356+
blockingSubscribe(onComplete, Functions.ERROR_CONSUMER);
1357+
}
1358+
1359+
/**
1360+
* Subscribes to the current {@code Completable} and calls the appropriate callback on the <em>current thread</em>
1361+
* when it terminates.
1362+
* <p>
1363+
* <img width="640" height="352" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.blockingSubscribe.ac.png" alt="">
1364+
* <dl>
1365+
* <dt><b>Scheduler:</b></dt>
1366+
* <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
1367+
* <dt><b>Error handling:</b></dt>
1368+
* <dd>If either {@code onComplete} or {@code onError} throw, the {@link Throwable} is routed to the
1369+
* global error handler via {@link RxJavaPlugins#onError(Throwable)}.
1370+
* If the current thread is interrupted, the {@code onError} consumer is called with an {@link InterruptedException}.
1371+
* </dd>
1372+
* </dl>
1373+
* @param onComplete the {@link Action} to call if the current {@code Completable} completes normally
1374+
* @param onError the {@link Consumer} to call if the current {@code Completable} signals an error
1375+
* @throws NullPointerException if {@code onComplete} or {@code onError} is {@code null}
1376+
* @since 3.0.0
1377+
*/
1378+
@SchedulerSupport(SchedulerSupport.NONE)
1379+
public final void blockingSubscribe(@NonNull Action onComplete, @NonNull Consumer<? super Throwable> onError) {
1380+
Objects.requireNonNull(onComplete, "onComplete is null");
1381+
Objects.requireNonNull(onError, "onError is null");
1382+
BlockingMultiObserver<Void> observer = new BlockingMultiObserver<>();
1383+
subscribe(observer);
1384+
observer.blockingConsume(Functions.emptyConsumer(), onError, onComplete);
1385+
}
1386+
1387+
/**
1388+
* Subscribes to the current {@code Completable} and calls the appropriate {@link CompletableObserver} method on the <em>current thread</em>.
1389+
* <p>
1390+
* <img width="640" height="468" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.blockingSubscribe.o.png" alt="">
1391+
* <dl>
1392+
* <dt><b>Scheduler:</b></dt>
1393+
* <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
1394+
* <dt><b>Error handling:</b></dt>
1395+
* <dd>An {@code onError} signal is delivered to the {@link CompletableObserver#onError(Throwable)} method.
1396+
* If any of the {@code CompletableObserver}'s methods throw, the {@link RuntimeException} is propagated to the caller of this method.
1397+
* If the current thread is interrupted, an {@link InterruptedException} is delivered to {@code observer.onError}.
1398+
* </dd>
1399+
* </dl>
1400+
* @param observer the {@code CompletableObserver} to call methods on the current thread
1401+
* @throws NullPointerException if {@code observer} is {@code null}
1402+
* @since 3.0.0
1403+
*/
1404+
@SchedulerSupport(SchedulerSupport.NONE)
1405+
public final void blockingSubscribe(@NonNull CompletableObserver observer) {
1406+
Objects.requireNonNull(observer, "observer is null");
1407+
BlockingDisposableMultiObserver<Void> blockingObserver = new BlockingDisposableMultiObserver<>();
1408+
observer.onSubscribe(blockingObserver);
1409+
subscribe(blockingObserver);
1410+
blockingObserver.blockingConsume(observer);
1411+
}
1412+
13131413
/**
13141414
* Subscribes to this {@code Completable} only once, when the first {@link CompletableObserver}
13151415
* subscribes to the result {@code Completable}, caches its terminal event

src/main/java/io/reactivex/rxjava3/core/Maybe.java

+130-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import io.reactivex.rxjava3.internal.functions.*;
2727
import io.reactivex.rxjava3.internal.fuseable.*;
2828
import io.reactivex.rxjava3.internal.jdk8.*;
29-
import io.reactivex.rxjava3.internal.observers.BlockingMultiObserver;
29+
import io.reactivex.rxjava3.internal.observers.*;
3030
import io.reactivex.rxjava3.internal.operators.flowable.*;
3131
import io.reactivex.rxjava3.internal.operators.maybe.*;
3232
import io.reactivex.rxjava3.internal.operators.mixed.*;
@@ -2475,6 +2475,135 @@ public final T blockingGet(@NonNull T defaultValue) {
24752475
return observer.blockingGet(defaultValue);
24762476
}
24772477

2478+
/**
2479+
* Subscribes to the current {@code Maybe} and <em>blocks the current thread</em> until it terminates.
2480+
* <p>
2481+
* <img width="640" height="238" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.blockingSubscribe.png" alt="">
2482+
* <dl>
2483+
* <dt><b>Scheduler:</b></dt>
2484+
* <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
2485+
* <dt><b>Error handling:</b></dt>
2486+
* <dd>If the current {@code Maybe} signals an error,
2487+
* the {@link Throwable} is routed to the global error handler via {@link RxJavaPlugins#onError(Throwable)}.
2488+
* If the current thread is interrupted, an {@link InterruptedException} is routed to the same global error handler.
2489+
* </dd>
2490+
* </dl>
2491+
* @since 3.0.0
2492+
* @see #blockingSubscribe(Consumer)
2493+
* @see #blockingSubscribe(Consumer, Consumer)
2494+
* @see #blockingSubscribe(Consumer, Consumer, Action)
2495+
*/
2496+
@SchedulerSupport(SchedulerSupport.NONE)
2497+
public final void blockingSubscribe() {
2498+
blockingSubscribe(Functions.emptyConsumer(), Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION);
2499+
}
2500+
2501+
/**
2502+
* Subscribes to the current {@code Maybe} and calls given {@code onSuccess} callback on the <em>current thread</em>
2503+
* when it completes normally.
2504+
* <p>
2505+
* <img width="640" height="245" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.blockingSubscribe.c.png" alt="">
2506+
* <dl>
2507+
* <dt><b>Scheduler:</b></dt>
2508+
* <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
2509+
* <dt><b>Error handling:</b></dt>
2510+
* <dd>If either the current {@code Maybe} signals an error or {@code onSuccess} throws,
2511+
* the respective {@link Throwable} is routed to the global error handler via {@link RxJavaPlugins#onError(Throwable)}.
2512+
* If the current thread is interrupted, an {@link InterruptedException} is routed to the same global error handler.
2513+
* </dd>
2514+
* </dl>
2515+
* @param onSuccess the {@link Consumer} to call if the current {@code Maybe} succeeds
2516+
* @throws NullPointerException if {@code onSuccess} is {@code null}
2517+
* @since 3.0.0
2518+
* @see #blockingSubscribe(Consumer, Consumer)
2519+
* @see #blockingSubscribe(Consumer, Consumer, Action)
2520+
*/
2521+
@SchedulerSupport(SchedulerSupport.NONE)
2522+
public final void blockingSubscribe(@NonNull Consumer<? super T> onSuccess) {
2523+
blockingSubscribe(onSuccess, Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION);
2524+
}
2525+
2526+
/**
2527+
* Subscribes to the current {@code Maybe} and calls the appropriate callback on the <em>current thread</em>
2528+
* when it terminates.
2529+
* <p>
2530+
* <img width="640" height="256" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.blockingSubscribe.cc.png" alt="">
2531+
* <dl>
2532+
* <dt><b>Scheduler:</b></dt>
2533+
* <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
2534+
* <dt><b>Error handling:</b></dt>
2535+
* <dd>If either {@code onSuccess} or {@code onError} throw, the {@link Throwable} is routed to the
2536+
* global error handler via {@link RxJavaPlugins#onError(Throwable)}.
2537+
* If the current thread is interrupted, the {@code onError} consumer is called with an {@link InterruptedException}.
2538+
* </dd>
2539+
* </dl>
2540+
* @param onSuccess the {@link Consumer} to call if the current {@code Maybe} succeeds
2541+
* @param onError the {@code Consumer} to call if the current {@code Maybe} signals an error
2542+
* @throws NullPointerException if {@code onSuccess} or {@code onError} is {@code null}
2543+
* @since 3.0.0
2544+
* @see #blockingSubscribe(Consumer, Consumer, Action)
2545+
*/
2546+
@SchedulerSupport(SchedulerSupport.NONE)
2547+
public final void blockingSubscribe(@NonNull Consumer<? super T> onSuccess, @NonNull Consumer<? super Throwable> onError) {
2548+
blockingSubscribe(onSuccess, onError, Functions.EMPTY_ACTION);
2549+
}
2550+
2551+
/**
2552+
* Subscribes to the current {@code Maybe} and calls the appropriate callback on the <em>current thread</em>
2553+
* when it terminates.
2554+
* <p>
2555+
* <img width="640" height="251" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.blockingSubscribe.cca.png" alt="">
2556+
* <dl>
2557+
* <dt><b>Scheduler:</b></dt>
2558+
* <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
2559+
* <dt><b>Error handling:</b></dt>
2560+
* <dd>If either {@code onSuccess}, {@code onError} or {@code onComplete} throw, the {@link Throwable} is routed to the
2561+
* global error handler via {@link RxJavaPlugins#onError(Throwable)}.
2562+
* If the current thread is interrupted, the {@code onError} consumer is called with an {@link InterruptedException}.
2563+
* </dd>
2564+
* </dl>
2565+
* @param onSuccess the {@link Consumer} to call if the current {@code Maybe} succeeds
2566+
* @param onError the {@code Consumer} to call if the current {@code Maybe} signals an error
2567+
* @param onComplete the {@linnk Action} to call if the current {@code Maybe} completes without a value
2568+
* @throws NullPointerException if {@code onSuccess}, {@code onError} or {@code onComplete} is {@code null}
2569+
* @since 3.0.0
2570+
*/
2571+
@SchedulerSupport(SchedulerSupport.NONE)
2572+
public final void blockingSubscribe(@NonNull Consumer<? super T> onSuccess, @NonNull Consumer<? super Throwable> onError, @NonNull Action onComplete) {
2573+
Objects.requireNonNull(onSuccess, "onSuccess is null");
2574+
Objects.requireNonNull(onError, "onError is null");
2575+
Objects.requireNonNull(onComplete, "onComplete is null");
2576+
BlockingMultiObserver<T> observer = new BlockingMultiObserver<>();
2577+
subscribe(observer);
2578+
observer.blockingConsume(onSuccess, onError, onComplete);
2579+
}
2580+
2581+
/**
2582+
* Subscribes to the current {@code Maybe} and calls the appropriate {@link MaybeObserver} method on the <em>current thread</em>.
2583+
* <p>
2584+
* <img width="640" height="398" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.blockingSubscribe.o.png" alt="">
2585+
* <dl>
2586+
* <dt><b>Scheduler:</b></dt>
2587+
* <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
2588+
* <dt><b>Error handling:</b></dt>
2589+
* <dd>An {@code onError} signal is delivered to the {@link MaybeObserver#onError(Throwable)} method.
2590+
* If any of the {@code MaybeObserver}'s methods throw, the {@link RuntimeException} is propagated to the caller of this method.
2591+
* If the current thread is interrupted, an {@link InterruptedException} is delivered to {@code observer.onError}.
2592+
* </dd>
2593+
* </dl>
2594+
* @param observer the {@code MaybeObserver} to call methods on the current thread
2595+
* @throws NullPointerException if {@code observer} is {@code null}
2596+
* @since 3.0.0
2597+
*/
2598+
@SchedulerSupport(SchedulerSupport.NONE)
2599+
public final void blockingSubscribe(@NonNull MaybeObserver<? super T> observer) {
2600+
Objects.requireNonNull(observer, "observer is null");
2601+
BlockingDisposableMultiObserver<T> blockingObserver = new BlockingDisposableMultiObserver<>();
2602+
observer.onSubscribe(blockingObserver);
2603+
subscribe(blockingObserver);
2604+
blockingObserver.blockingConsume(observer);
2605+
}
2606+
24782607
/**
24792608
* Returns a {@code Maybe} that subscribes to this {@code Maybe} lazily, caches its event
24802609
* and replays it, to all the downstream subscribers.

src/main/java/io/reactivex/rxjava3/core/Single.java

+100
Original file line numberDiff line numberDiff line change
@@ -2947,6 +2947,106 @@ public final T blockingGet() {
29472947
return observer.blockingGet();
29482948
}
29492949

2950+
/**
2951+
* Subscribes to the current {@code Single} and <em>blocks the current thread</em> until it terminates.
2952+
* <p>
2953+
* <img width="640" height="329" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.blockingSubscribe.png" alt="">
2954+
* <dl>
2955+
* <dt><b>Scheduler:</b></dt>
2956+
* <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
2957+
* <dt><b>Error handling:</b></dt>
2958+
* <dd>If the current {@code Single} signals an error,
2959+
* the {@link Throwable} is routed to the global error handler via {@link RxJavaPlugins#onError(Throwable)}.
2960+
* If the current thread is interrupted, an {@link InterruptedException} is routed to the same global error handler.
2961+
* </dd>
2962+
* </dl>
2963+
* @since 3.0.0
2964+
* @see #blockingSubscribe(Consumer)
2965+
* @see #blockingSubscribe(Consumer, Consumer)
2966+
*/
2967+
@SchedulerSupport(SchedulerSupport.NONE)
2968+
public final void blockingSubscribe() {
2969+
blockingSubscribe(Functions.emptyConsumer(), Functions.ERROR_CONSUMER);
2970+
}
2971+
2972+
/**
2973+
* Subscribes to the current {@code Single} and calls given {@code onSuccess} callback on the <em>current thread</em>
2974+
* when it completes normally.
2975+
* <p>
2976+
* <img width="640" height="351" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.blockingSubscribe.c.png" alt="">
2977+
* <dl>
2978+
* <dt><b>Scheduler:</b></dt>
2979+
* <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
2980+
* <dt><b>Error handling:</b></dt>
2981+
* <dd>If either the current {@code Single} signals an error or {@code onSuccess} throws,
2982+
* the respective {@link Throwable} is routed to the global error handler via {@link RxJavaPlugins#onError(Throwable)}.
2983+
* If the current thread is interrupted, an {@link InterruptedException} is routed to the same global error handler.
2984+
* </dd>
2985+
* </dl>
2986+
* @param onSuccess the {@link Consumer} to call if the current {@code Single} succeeds
2987+
* @throws NullPointerException if {@code onSuccess} is {@code null}
2988+
* @since 3.0.0
2989+
* @see #blockingSubscribe(Consumer, Consumer)
2990+
*/
2991+
@SchedulerSupport(SchedulerSupport.NONE)
2992+
public final void blockingSubscribe(@NonNull Consumer<? super T> onSuccess) {
2993+
blockingSubscribe(onSuccess, Functions.ERROR_CONSUMER);
2994+
}
2995+
2996+
/**
2997+
* Subscribes to the current {@code Single} and calls the appropriate callback on the <em>current thread</em>
2998+
* when it terminates.
2999+
* <p>
3000+
* <img width="640" height="348" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.blockingSubscribe.cc.png" alt="">
3001+
* <dl>
3002+
* <dt><b>Scheduler:</b></dt>
3003+
* <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
3004+
* <dt><b>Error handling:</b></dt>
3005+
* <dd>If either {@code onSuccess} or {@code onError} throw, the {@link Throwable} is routed to the
3006+
* global error handler via {@link RxJavaPlugins#onError(Throwable)}.
3007+
* If the current thread is interrupted, the {@code onError} consumer is called with an {@link InterruptedException}.
3008+
* </dd>
3009+
* </dl>
3010+
* @param onSuccess the {@link Consumer} to call if the current {@code Single} succeeds
3011+
* @param onError the {@code Consumer} to call if the current {@code Single} signals an error
3012+
* @throws NullPointerException if {@code onSuccess} or {@code onError} is {@code null}
3013+
* @since 3.0.0
3014+
*/
3015+
@SchedulerSupport(SchedulerSupport.NONE)
3016+
public final void blockingSubscribe(@NonNull Consumer<? super T> onSuccess, @NonNull Consumer<? super Throwable> onError) {
3017+
Objects.requireNonNull(onSuccess, "onSuccess is null");
3018+
Objects.requireNonNull(onError, "onError is null");
3019+
BlockingMultiObserver<T> observer = new BlockingMultiObserver<>();
3020+
subscribe(observer);
3021+
observer.blockingConsume(onSuccess, onError, Functions.EMPTY_ACTION);
3022+
}
3023+
3024+
/**
3025+
* Subscribes to the current {@code Single} and calls the appropriate {@link SingleObserver} method on the <em>current thread</em>.
3026+
* <p>
3027+
* <img width="640" height="479" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.blockingSubscribe.o.png" alt="">
3028+
* <dl>
3029+
* <dt><b>Scheduler:</b></dt>
3030+
* <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
3031+
* <dt><b>Error handling:</b></dt>
3032+
* <dd>An {@code onError} signal is delivered to the {@link SingleObserver#onError(Throwable)} method.
3033+
* If any of the {@code SingleObserver}'s methods throw, the {@link RuntimeException} is propagated to the caller of this method.
3034+
* If the current thread is interrupted, an {@link InterruptedException} is delivered to {@code observer.onError}.
3035+
* </dd>
3036+
* </dl>
3037+
* @param observer the {@code SingleObserver} to call methods on the current thread
3038+
* @throws NullPointerException if {@code observer} is {@code null}
3039+
* @since 3.0.0
3040+
*/
3041+
@SchedulerSupport(SchedulerSupport.NONE)
3042+
public final void blockingSubscribe(@NonNull SingleObserver<? super T> observer) {
3043+
Objects.requireNonNull(observer, "observer is null");
3044+
BlockingDisposableMultiObserver<T> blockingObserver = new BlockingDisposableMultiObserver<>();
3045+
observer.onSubscribe(blockingObserver);
3046+
subscribe(blockingObserver);
3047+
blockingObserver.blockingConsume(observer);
3048+
}
3049+
29503050
/**
29513051
* <strong>This method requires advanced knowledge about building operators, please consider
29523052
* other standard composition methods first;</strong>

0 commit comments

Comments
 (0)