Skip to content

Commit 747f59e

Browse files
authored
3.x: [Java 8] Add fromOpt/Stage, mapOptional, toCompletionStage to M/S/C (#6783)
1 parent 166c529 commit 747f59e

19 files changed

+1779
-5
lines changed

src/main/java/io/reactivex/rxjava3/core/Completable.java

+69-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
*/
1313
package io.reactivex.rxjava3.core;
1414

15-
import java.util.Objects;
15+
import java.util.*;
1616
import java.util.concurrent.*;
1717

1818
import org.reactivestreams.Publisher;
@@ -23,6 +23,7 @@
2323
import io.reactivex.rxjava3.functions.*;
2424
import io.reactivex.rxjava3.internal.functions.*;
2525
import io.reactivex.rxjava3.internal.fuseable.*;
26+
import io.reactivex.rxjava3.internal.jdk8.*;
2627
import io.reactivex.rxjava3.internal.observers.*;
2728
import io.reactivex.rxjava3.internal.operators.completable.*;
2829
import io.reactivex.rxjava3.internal.operators.maybe.*;
@@ -2753,4 +2754,71 @@ public final TestObserver<Void> test(boolean dispose) {
27532754
subscribe(to);
27542755
return to;
27552756
}
2757+
2758+
// -------------------------------------------------------------------------
2759+
// JDK 8 Support
2760+
// -------------------------------------------------------------------------
2761+
2762+
/**
2763+
* Signals completion (or error) when the {@link CompletionStage} terminates.
2764+
* <p>
2765+
* <img width="640" height="262" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/fromCompletionStage.c.png" alt="">
2766+
* <p>
2767+
* Note that the operator takes an already instantiated, running or terminated {@code CompletionStage}.
2768+
* If the optional is to be created per consumer upon subscription, use {@link #defer(Supplier)}
2769+
* around {@code fromCompletionStage}:
2770+
* <pre><code>
2771+
* Maybe.defer(() -&gt; Completable.fromCompletionStage(createCompletionStage()));
2772+
* </code></pre>
2773+
* <p>
2774+
* Canceling the flow can't cancel the execution of the {@code CompletionStage} because {@code CompletionStage}
2775+
* itself doesn't support cancellation. Instead, the operator detaches from the {@code CompletionStage}.
2776+
* <dl>
2777+
* <dt><b>Scheduler:</b></dt>
2778+
* <dd>{@code fromCompletionStage} does not operate by default on a particular {@link Scheduler}.</dd>
2779+
* </dl>
2780+
* @param stage the CompletionStage to convert to Maybe and signal its terminal value or error
2781+
* @return the new Completable instance
2782+
* @since 3.0.0
2783+
*/
2784+
@CheckReturnValue
2785+
@SchedulerSupport(SchedulerSupport.NONE)
2786+
@NonNull
2787+
public static Completable fromCompletionStage(@NonNull CompletionStage<?> stage) {
2788+
Objects.requireNonNull(stage, "stage is null");
2789+
return RxJavaPlugins.onAssembly(new CompletableFromCompletionStage<>(stage));
2790+
}
2791+
2792+
/**
2793+
* Signals the given default item when the upstream completes or signals the upstream error via
2794+
* a {@link CompletionStage}.
2795+
* <p>
2796+
* <img width="640" height="323" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/toCompletionStage.c.png" alt="">
2797+
* <p>
2798+
* The upstream can be canceled by converting the resulting {@code CompletionStage} into
2799+
* {@link CompletableFuture} via {@link CompletionStage#toCompletableFuture()} and
2800+
* calling {@link CompletableFuture#cancel(boolean)} on it.
2801+
* The upstream will be also cancelled if the resulting {@code CompletionStage} is converted to and
2802+
* completed manually by {@link CompletableFuture#complete(Object)} or {@link CompletableFuture#completeExceptionally(Throwable)}.
2803+
* <p>
2804+
* {@code CompletionStage}s don't have a notion of emptyness and allow {@code null}s, therefore, one can either use
2805+
* a {@code defaultItem} of {@code null} or turn the flow into a sequence of {@link Optional}s and default to {@link Optional#empty()}:
2806+
* <pre><code>
2807+
* CompletionStage&lt;Optional&lt;T&gt;&gt; stage = source.map(Optional::of).toCompletionStage(Optional.empty());
2808+
* </code></pre>
2809+
* <dl>
2810+
* <dt><b>Scheduler:</b></dt>
2811+
* <dd>{@code toCompletionStage} does not operate by default on a particular {@link Scheduler}.</dd>
2812+
* </dl>
2813+
* @param <T> the type of the default item to signal upon completion
2814+
* @param defaultItem the item to signal if the upstream is empty
2815+
* @return the new CompletionStage instance
2816+
* @since 3.0.0
2817+
*/
2818+
@CheckReturnValue
2819+
@SchedulerSupport(SchedulerSupport.NONE)
2820+
@NonNull
2821+
public final <T> CompletionStage<T> toCompletionStage(@Nullable T defaultItem) {
2822+
return subscribeWith(new CompletionStageConsumer<>(true, defaultItem));
2823+
}
27562824
}

src/main/java/io/reactivex/rxjava3/core/Maybe.java

+160-2
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,7 @@
1313

1414
package io.reactivex.rxjava3.core;
1515

16-
import java.util.NoSuchElementException;
17-
import java.util.Objects;
16+
import java.util.*;
1817
import java.util.concurrent.*;
1918

2019
import org.reactivestreams.*;
@@ -25,6 +24,7 @@
2524
import io.reactivex.rxjava3.functions.*;
2625
import io.reactivex.rxjava3.internal.functions.*;
2726
import io.reactivex.rxjava3.internal.fuseable.*;
27+
import io.reactivex.rxjava3.internal.jdk8.*;
2828
import io.reactivex.rxjava3.internal.observers.BlockingMultiObserver;
2929
import io.reactivex.rxjava3.internal.operators.flowable.*;
3030
import io.reactivex.rxjava3.internal.operators.maybe.*;
@@ -4794,4 +4794,162 @@ public final TestObserver<T> test(boolean dispose) {
47944794
subscribe(to);
47954795
return to;
47964796
}
4797+
4798+
// -------------------------------------------------------------------------
4799+
// JDK 8 Support
4800+
// -------------------------------------------------------------------------
4801+
4802+
/**
4803+
* Converts the existing value of the provided optional into a {@link #just(Object)}
4804+
* or an empty optional into an {@link #empty()} {@code Maybe} instance.
4805+
* <p>
4806+
* <img width="640" height="335" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/fromOptional.m.png" alt="">
4807+
* <p>
4808+
* Note that the operator takes an already instantiated optional reference and does not
4809+
* by any means create this original optional. If the optional is to be created per
4810+
* consumer upon subscription, use {@link #defer(Supplier)} around {@code fromOptional}:
4811+
* <pre><code>
4812+
* Maybe.defer(() -&gt; Maybe.fromOptional(createOptional()));
4813+
* </code></pre>
4814+
* <dl>
4815+
* <dt><b>Scheduler:</b></dt>
4816+
* <dd>{@code fromOptional} does not operate by default on a particular {@link Scheduler}.</dd>
4817+
* </dl>
4818+
* @param <T> the element type of the optional value
4819+
* @param optional the optional value to convert into a {@code Maybe}
4820+
* @return the new Maybe instance
4821+
* @see #just(Object)
4822+
* @see #empty()
4823+
* @since 3.0.0
4824+
*/
4825+
@CheckReturnValue
4826+
@SchedulerSupport(SchedulerSupport.NONE)
4827+
@NonNull
4828+
public static <T> Maybe<@NonNull T> fromOptional(@NonNull Optional<T> optional) {
4829+
Objects.requireNonNull(optional, "optional is null");
4830+
return optional.map(Maybe::just).orElseGet(Maybe::empty);
4831+
}
4832+
4833+
/**
4834+
* Signals the completion value or error of the given (hot) {@link CompletionStage}-based asynchronous calculation.
4835+
* <p>
4836+
* <img width="640" height="262" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/fromCompletionStage.s.png" alt="">
4837+
* <p>
4838+
* Note that the operator takes an already instantiated, running or terminated {@code CompletionStage}.
4839+
* If the optional is to be created per consumer upon subscription, use {@link #defer(Supplier)}
4840+
* around {@code fromCompletionStage}:
4841+
* <pre><code>
4842+
* Maybe.defer(() -&gt; Maybe.fromCompletionStage(createCompletionStage()));
4843+
* </code></pre>
4844+
* <p>
4845+
* If the {@code CompletionStage} completes with {@code null}, the resulting {@code Maybe} is completed via {@code onComplete}.
4846+
* <p>
4847+
* Canceling the flow can't cancel the execution of the {@code CompletionStage} because {@code CompletionStage}
4848+
* itself doesn't support cancellation. Instead, the operator detaches from the {@code CompletionStage}.
4849+
* <dl>
4850+
* <dt><b>Scheduler:</b></dt>
4851+
* <dd>{@code fromCompletionStage} does not operate by default on a particular {@link Scheduler}.</dd>
4852+
* </dl>
4853+
* @param <T> the element type of the CompletionStage
4854+
* @param stage the CompletionStage to convert to Maybe and signal its terminal value or error
4855+
* @return the new Maybe instance
4856+
* @since 3.0.0
4857+
*/
4858+
@CheckReturnValue
4859+
@SchedulerSupport(SchedulerSupport.NONE)
4860+
@NonNull
4861+
public static <T> Maybe<@NonNull T> fromCompletionStage(@NonNull CompletionStage<T> stage) {
4862+
Objects.requireNonNull(stage, "stage is null");
4863+
return RxJavaPlugins.onAssembly(new MaybeFromCompletionStage<>(stage));
4864+
}
4865+
4866+
/**
4867+
* Maps the upstream success value into an {@link Optional} and emits the contained item if not empty.
4868+
* <p>
4869+
* <img width="640" height="323" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/mapOptional.m.png" alt="">
4870+
*
4871+
* <dl>
4872+
* <dt><b>Scheduler:</b></dt>
4873+
* <dd>{@code mapOptional} does not operate by default on a particular {@link Scheduler}.</dd>
4874+
* </dl>
4875+
* @param <R> the non-null output type
4876+
* @param mapper the function that receives the upstream success iteem and should return a <em>non-empty</em> {@code Optional}
4877+
* to emit as the success output or an <em>empty</em> {@code Optional} to complete the {@code Maybe}
4878+
* @return the new Maybe instance
4879+
* @since 3.0.0
4880+
* @see #map(Function)
4881+
* @see #filter(Predicate)
4882+
*/
4883+
@CheckReturnValue
4884+
@SchedulerSupport(SchedulerSupport.NONE)
4885+
@NonNull
4886+
public final <@NonNull R> Maybe<R> mapOptional(@NonNull Function<? super T, @NonNull Optional<? extends R>> mapper) {
4887+
Objects.requireNonNull(mapper, "mapper is null");
4888+
return RxJavaPlugins.onAssembly(new MaybeMapOptional<>(this, mapper));
4889+
}
4890+
4891+
/**
4892+
* Signals the upstream success item (or a {@link NoSuchElementException} if the upstream is empty) via
4893+
* a {@link CompletionStage}.
4894+
* <p>
4895+
* <img width="640" height="349" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/toCompletionStage.m.png" alt="">
4896+
* <p>
4897+
* The upstream can be canceled by converting the resulting {@code CompletionStage} into
4898+
* {@link CompletableFuture} via {@link CompletionStage#toCompletableFuture()} and
4899+
* calling {@link CompletableFuture#cancel(boolean)} on it.
4900+
* The upstream will be also cancelled if the resulting {@code CompletionStage} is converted to and
4901+
* completed manually by {@link CompletableFuture#complete(Object)} or {@link CompletableFuture#completeExceptionally(Throwable)}.
4902+
* <p>
4903+
* {@code CompletionStage}s don't have a notion of emptyness and allow {@code null}s, therefore, one can either use
4904+
* {@link #toCompletionStage(Object)} with {@code null} or turn the upstrea into a sequence of {@link Optional}s and
4905+
* default to {@link Optional#empty()}:
4906+
* <pre><code>
4907+
* CompletionStage&lt;Optional&lt;T&gt;&gt; stage = source.map(Optional::of).toCompletionStage(Optional.empty());
4908+
* </code></pre>
4909+
* <dl>
4910+
* <dt><b>Scheduler:</b></dt>
4911+
* <dd>{@code toCompletionStage} does not operate by default on a particular {@link Scheduler}.</dd>
4912+
* </dl>
4913+
* @return the new CompletionStage instance
4914+
* @since 3.0.0
4915+
* @see #toCompletionStage(Object)
4916+
*/
4917+
@CheckReturnValue
4918+
@SchedulerSupport(SchedulerSupport.NONE)
4919+
@NonNull
4920+
public final CompletionStage<T> toCompletionStage() {
4921+
return subscribeWith(new CompletionStageConsumer<>(false, null));
4922+
}
4923+
4924+
/**
4925+
* Signals the upstream success item (or the default item if the upstream is empty) via
4926+
* a {@link CompletionStage}.
4927+
* <p>
4928+
* <img width="640" height="323" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/toCompletionStage.mv.png" alt="">
4929+
* <p>
4930+
* The upstream can be canceled by converting the resulting {@code CompletionStage} into
4931+
* {@link CompletableFuture} via {@link CompletionStage#toCompletableFuture()} and
4932+
* calling {@link CompletableFuture#cancel(boolean)} on it.
4933+
* The upstream will be also cancelled if the resulting {@code CompletionStage} is converted to and
4934+
* completed manually by {@link CompletableFuture#complete(Object)} or {@link CompletableFuture#completeExceptionally(Throwable)}.
4935+
* <p>
4936+
* {@code CompletionStage}s don't have a notion of emptyness and allow {@code null}s, therefore, one can either use
4937+
* a {@code defaultItem} of {@code null} or turn the flow into a sequence of {@link Optional}s and default to {@link Optional#empty()}:
4938+
* <pre><code>
4939+
* CompletionStage&lt;Optional&lt;T&gt;&gt; stage = source.map(Optional::of).toCompletionStage(Optional.empty());
4940+
* </code></pre>
4941+
* <dl>
4942+
* <dt><b>Scheduler:</b></dt>
4943+
* <dd>{@code toCompletionStage} does not operate by default on a particular {@link Scheduler}.</dd>
4944+
* </dl>
4945+
* @param defaultItem the item to signal if the upstream is empty
4946+
* @return the new CompletionStage instance
4947+
* @since 3.0.0
4948+
*/
4949+
@CheckReturnValue
4950+
@SchedulerSupport(SchedulerSupport.NONE)
4951+
@NonNull
4952+
public final CompletionStage<T> toCompletionStage(@Nullable T defaultItem) {
4953+
return subscribeWith(new CompletionStageConsumer<>(true, defaultItem));
4954+
}
47974955
}

src/main/java/io/reactivex/rxjava3/core/Single.java

+89-2
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,7 @@
1313

1414
package io.reactivex.rxjava3.core;
1515

16-
import java.util.NoSuchElementException;
17-
import java.util.Objects;
16+
import java.util.*;
1817
import java.util.concurrent.*;
1918

2019
import org.reactivestreams.Publisher;
@@ -25,6 +24,7 @@
2524
import io.reactivex.rxjava3.functions.*;
2625
import io.reactivex.rxjava3.internal.functions.*;
2726
import io.reactivex.rxjava3.internal.fuseable.*;
27+
import io.reactivex.rxjava3.internal.jdk8.*;
2828
import io.reactivex.rxjava3.internal.observers.*;
2929
import io.reactivex.rxjava3.internal.operators.completable.*;
3030
import io.reactivex.rxjava3.internal.operators.flowable.*;
@@ -4181,4 +4181,91 @@ public final TestObserver<T> test(boolean dispose) {
41814181
private static <T> Single<T> toSingle(Flowable<T> source) {
41824182
return RxJavaPlugins.onAssembly(new FlowableSingleSingle<T>(source, null));
41834183
}
4184+
4185+
// -------------------------------------------------------------------------
4186+
// JDK 8 Support
4187+
// -------------------------------------------------------------------------
4188+
4189+
/**
4190+
* Signals the completion value or error of the given (hot) {@link CompletionStage}-based asynchronous calculation.
4191+
* <p>
4192+
* <img width="640" height="262" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/fromCompletionStage.s.png" alt="">
4193+
* <p>
4194+
* Note that the operator takes an already instantiated, running or terminated {@code CompletionStage}.
4195+
* If the optional is to be created per consumer upon subscription, use {@link #defer(Supplier)}
4196+
* around {@code fromCompletionStage}:
4197+
* <pre><code>
4198+
* Single.defer(() -&gt; Single.fromCompletionStage(createCompletionStage()));
4199+
* </code></pre>
4200+
* <p>
4201+
* If the {@code CompletionStage} completes with {@code null}, the resulting {@code Single} is terminated with
4202+
* a {@link NullPointerException}.
4203+
* <p>
4204+
* Canceling the flow can't cancel the execution of the {@code CompletionStage} because {@code CompletionStage}
4205+
* itself doesn't support cancellation. Instead, the operator detaches from the {@code CompletionStage}.
4206+
* <dl>
4207+
* <dt><b>Scheduler:</b></dt>
4208+
* <dd>{@code fromCompletionStage} does not operate by default on a particular {@link Scheduler}.</dd>
4209+
* </dl>
4210+
* @param <T> the element type of the CompletionStage
4211+
* @param stage the CompletionStage to convert to Single and signal its success value or error
4212+
* @return the new Single instance
4213+
* @since 3.0.0
4214+
*/
4215+
@CheckReturnValue
4216+
@SchedulerSupport(SchedulerSupport.NONE)
4217+
@NonNull
4218+
public static <T> Single<@NonNull T> fromCompletionStage(@NonNull CompletionStage<T> stage) {
4219+
Objects.requireNonNull(stage, "stage is null");
4220+
return RxJavaPlugins.onAssembly(new SingleFromCompletionStage<>(stage));
4221+
}
4222+
4223+
/**
4224+
* Maps the upstream success value into an {@link Optional} and emits the contained item if not empty.
4225+
* <p>
4226+
* <img width="640" height="323" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/mapOptional.s.png" alt="">
4227+
*
4228+
* <dl>
4229+
* <dt><b>Scheduler:</b></dt>
4230+
* <dd>{@code mapOptional} does not operate by default on a particular {@link Scheduler}.</dd>
4231+
* </dl>
4232+
* @param <R> the non-null output type
4233+
* @param mapper the function that receives the upstream success iteem and should return a <em>non-empty</em> {@code Optional}
4234+
* to emit as the success output or an <em>empty</em> {@code Optional} to complete the {@code Maybe}
4235+
* @return the new Maybe instance
4236+
* @since 3.0.0
4237+
* @see #map(Function)
4238+
* @see #filter(Predicate)
4239+
*/
4240+
@CheckReturnValue
4241+
@SchedulerSupport(SchedulerSupport.NONE)
4242+
@NonNull
4243+
public final <@NonNull R> Maybe<R> mapOptional(@NonNull Function<? super T, @NonNull Optional<? extends R>> mapper) {
4244+
Objects.requireNonNull(mapper, "mapper is null");
4245+
return RxJavaPlugins.onAssembly(new SingleMapOptional<>(this, mapper));
4246+
}
4247+
4248+
/**
4249+
* Signals the upstream success item (or error) via a {@link CompletionStage}.
4250+
* <p>
4251+
* <img width="640" height="321" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/toCompletionStage.s.png" alt="">
4252+
* <p>
4253+
* The upstream can be canceled by converting the resulting {@code CompletionStage} into
4254+
* {@link CompletableFuture} via {@link CompletionStage#toCompletableFuture()} and
4255+
* calling {@link CompletableFuture#cancel(boolean)} on it.
4256+
* The upstream will be also cancelled if the resulting {@code CompletionStage} is converted to and
4257+
* completed manually by {@link CompletableFuture#complete(Object)} or {@link CompletableFuture#completeExceptionally(Throwable)}.
4258+
* <dl>
4259+
* <dt><b>Scheduler:</b></dt>
4260+
* <dd>{@code toCompletionStage} does not operate by default on a particular {@link Scheduler}.</dd>
4261+
* </dl>
4262+
* @return the new CompletionStage instance
4263+
* @since 3.0.0
4264+
*/
4265+
@CheckReturnValue
4266+
@SchedulerSupport(SchedulerSupport.NONE)
4267+
@NonNull
4268+
public final CompletionStage<T> toCompletionStage() {
4269+
return subscribeWith(new CompletionStageConsumer<>(false, null));
4270+
}
41844271
}

0 commit comments

Comments
 (0)