Skip to content

Commit 2fafcb0

Browse files
authored
3.x: Add Maybe/Single/Completable switchOnNext & switchOnNextDelayError (#6870)
* 3.x: Add M/S/C switchOnNext[DelayError] * Fix wording of the Completable variant
1 parent 0ed3572 commit 2fafcb0

File tree

9 files changed

+728
-1
lines changed

9 files changed

+728
-1
lines changed

src/main/java/io/reactivex/rxjava3/core/Completable.java

+66
Original file line numberDiff line numberDiff line change
@@ -1016,6 +1016,71 @@ private static NullPointerException toNpe(Throwable ex) {
10161016
return npe;
10171017
}
10181018

1019+
/**
1020+
* Switches between {@link CompletableSource}s emitted by the source {@link Publisher} whenever
1021+
* a new {@code CompletableSource} is emitted, disposing the previously running {@code CompletableSource},
1022+
* exposing the setup as a {@code Completable} sequence.
1023+
* <p>
1024+
* <img width="640" height="518" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.switchOnNext.png" alt="">
1025+
* <dl>
1026+
* <dt><b>Backpressure:</b></dt>
1027+
* <dd>The {@code sources} {@code Publisher} is consumed in an unbounded manner (requesting {@link Long#MAX_VALUE}).</dd>
1028+
* <dt><b>Scheduler:</b></dt>
1029+
* <dd>{@code switchOnNext} does not operate by default on a particular {@link Scheduler}.</dd>
1030+
* <dt><b>Error handling:</b></dt>
1031+
* <dd>The returned sequence fails with the first error signaled by the {@code sources} {@code Publisher}
1032+
* or the currently running {@code CompletableSource}, disposing the rest. Late errors are
1033+
* forwarded to the global error handler via {@link RxJavaPlugins#onError(Throwable)}.</dd>
1034+
* </dl>
1035+
* @param sources the {@code Publisher} sequence of inner {@code CompletableSource}s to switch between
1036+
* @return the new {@code Completable} instance
1037+
* @throws NullPointerException if {@code sources} is {@code null}
1038+
* @since 3.0.0
1039+
* @see #switchOnNextDelayError(Publisher)
1040+
* @see <a href="http://reactivex.io/documentation/operators/switch.html">ReactiveX operators documentation: Switch</a>
1041+
*/
1042+
@CheckReturnValue
1043+
@NonNull
1044+
@SchedulerSupport(SchedulerSupport.NONE)
1045+
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
1046+
public static Completable switchOnNext(@NonNull Publisher<@NonNull ? extends CompletableSource> sources) {
1047+
Objects.requireNonNull(sources, "sources is null");
1048+
return RxJavaPlugins.onAssembly(new FlowableSwitchMapCompletablePublisher<>(sources, Functions.identity(), false));
1049+
}
1050+
1051+
/**
1052+
* Switches between {@link CompletableSource}s emitted by the source {@link Publisher} whenever
1053+
* a new {@code CompletableSource} is emitted, disposing the previously running {@code CompletableSource},
1054+
* exposing the setup as a {@code Completable} sequence and delaying all errors from
1055+
* all of them until all terminate.
1056+
* <p>
1057+
* <img width="640" height="415" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.switchOnNextDelayError.png" alt="">
1058+
* <dl>
1059+
* <dt><b>Backpressure:</b></dt>
1060+
* <dd>The {@code sources} {@code Publisher} is consumed in an unbounded manner (requesting {@link Long#MAX_VALUE}).</dd>
1061+
* <dt><b>Scheduler:</b></dt>
1062+
* <dd>{@code switchOnNextDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
1063+
* <dt><b>Error handling:</b></dt>
1064+
* <dd>The returned {@code Completable} collects all errors emitted by either the {@code sources}
1065+
* {@code Publisher} or any inner {@code CompletableSource} and emits them as a {@link CompositeException}
1066+
* when all sources terminate. If only one source ever failed, its error is emitted as-is at the end.</dd>
1067+
* </dl>
1068+
* @param sources the {@code Publisher} sequence of inner {@code CompletableSource}s to switch between
1069+
* @return the new {@code Completable} instance
1070+
* @throws NullPointerException if {@code sources} is {@code null}
1071+
* @since 3.0.0
1072+
* @see #switchOnNext(Publisher)
1073+
* @see <a href="http://reactivex.io/documentation/operators/switch.html">ReactiveX operators documentation: Switch</a>
1074+
*/
1075+
@CheckReturnValue
1076+
@NonNull
1077+
@SchedulerSupport(SchedulerSupport.NONE)
1078+
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
1079+
public static Completable switchOnNextDelayError(@NonNull Publisher<@NonNull ? extends CompletableSource> sources) {
1080+
Objects.requireNonNull(sources, "sources is null");
1081+
return RxJavaPlugins.onAssembly(new FlowableSwitchMapCompletablePublisher<>(sources, Functions.identity(), true));
1082+
}
1083+
10191084
/**
10201085
* Returns a {@code Completable} instance which manages a resource along
10211086
* with a custom {@link CompletableSource} instance while the subscription is active.
@@ -2328,6 +2393,7 @@ public final Completable retry(@NonNull Predicate<? super Throwable> predicate)
23282393
* @param stop the function that should return {@code true} to stop retrying
23292394
* @return the new {@code Completable} instance
23302395
* @throws NullPointerException if {@code stop} is {@code null}
2396+
* @since 3.0.0
23312397
*/
23322398
@CheckReturnValue
23332399
@NonNull

src/main/java/io/reactivex/rxjava3/core/Maybe.java

+71
Original file line numberDiff line numberDiff line change
@@ -1728,6 +1728,75 @@ public static <T> Single<Boolean> sequenceEqual(@NonNull MaybeSource<? extends T
17281728
return RxJavaPlugins.onAssembly(new MaybeEqualSingle<>(source1, source2, isEqual));
17291729
}
17301730

1731+
/**
1732+
* Switches between {@link MaybeSource}s emitted by the source {@link Publisher} whenever
1733+
* a new {@code MaybeSource} is emitted, disposing the previously running {@code MaybeSource},
1734+
* exposing the success items as a {@link Flowable} sequence.
1735+
* <p>
1736+
* <img width="640" height="521" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.switchOnNext.png" alt="">
1737+
* <dl>
1738+
* <dt><b>Backpressure:</b></dt>
1739+
* <dd>The {@code sources} {@code Publisher} is consumed in an unbounded manner (requesting {@link Long#MAX_VALUE}).
1740+
* The returned {@code Flowable} respects the backpressure from the downstream.</dd>
1741+
* <dt><b>Scheduler:</b></dt>
1742+
* <dd>{@code switchOnNext} does not operate by default on a particular {@link Scheduler}.</dd>
1743+
* <dt><b>Error handling:</b></dt>
1744+
* <dd>The returned sequence fails with the first error signaled by the {@code sources} {@code Publisher}
1745+
* or the currently running {@code MaybeSource}, disposing the rest. Late errors are
1746+
* forwarded to the global error handler via {@link RxJavaPlugins#onError(Throwable)}.</dd>
1747+
* </dl>
1748+
* @param <T> the element type of the {@code MaybeSource}s
1749+
* @param sources the {@code Publisher} sequence of inner {@code MaybeSource}s to switch between
1750+
* @return the new {@code Flowable} instance
1751+
* @throws NullPointerException if {@code sources} is {@code null}
1752+
* @since 3.0.0
1753+
* @see #switchOnNextDelayError(Publisher)
1754+
* @see <a href="http://reactivex.io/documentation/operators/switch.html">ReactiveX operators documentation: Switch</a>
1755+
*/
1756+
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
1757+
@CheckReturnValue
1758+
@NonNull
1759+
@SchedulerSupport(SchedulerSupport.NONE)
1760+
public static <T> Flowable<T> switchOnNext(@NonNull Publisher<@NonNull ? extends MaybeSource<? extends T>> sources) {
1761+
Objects.requireNonNull(sources, "sources is null");
1762+
return RxJavaPlugins.onAssembly(new FlowableSwitchMapMaybePublisher<>(sources, Functions.identity(), false));
1763+
}
1764+
1765+
/**
1766+
* Switches between {@link MaybeSource}s emitted by the source {@link Publisher} whenever
1767+
* a new {@code MaybeSource} is emitted, disposing the previously running {@code MaybeSource},
1768+
* exposing the success items as a {@link Flowable} sequence and delaying all errors from
1769+
* all of them until all terminate.
1770+
* <p>
1771+
* <img width="640" height="423" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.switchOnNextDelayError.png" alt="">
1772+
* <dl>
1773+
* <dt><b>Backpressure:</b></dt>
1774+
* <dd>The {@code sources} {@code Publisher} is consumed in an unbounded manner (requesting {@link Long#MAX_VALUE}).
1775+
* The returned {@code Flowable} respects the backpressure from the downstream.</dd>
1776+
* <dt><b>Scheduler:</b></dt>
1777+
* <dd>{@code switchOnNextDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
1778+
* <dt><b>Error handling:</b></dt>
1779+
* <dd>The returned {@code Flowable} collects all errors emitted by either the {@code sources}
1780+
* {@code Publisher} or any inner {@code MaybeSource} and emits them as a {@link CompositeException}
1781+
* when all sources terminate. If only one source ever failed, its error is emitted as-is at the end.</dd>
1782+
* </dl>
1783+
* @param <T> the element type of the {@code MaybeSource}s
1784+
* @param sources the {@code Publisher} sequence of inner {@code MaybeSource}s to switch between
1785+
* @return the new {@code Flowable} instance
1786+
* @throws NullPointerException if {@code sources} is {@code null}
1787+
* @since 3.0.0
1788+
* @see #switchOnNext(Publisher)
1789+
* @see <a href="http://reactivex.io/documentation/operators/switch.html">ReactiveX operators documentation: Switch</a>
1790+
*/
1791+
@BackpressureSupport(BackpressureKind.FULL)
1792+
@CheckReturnValue
1793+
@NonNull
1794+
@SchedulerSupport(SchedulerSupport.NONE)
1795+
public static <T> Flowable<T> switchOnNextDelayError(@NonNull Publisher<@NonNull ? extends MaybeSource<? extends T>> sources) {
1796+
Objects.requireNonNull(sources, "sources is null");
1797+
return RxJavaPlugins.onAssembly(new FlowableSwitchMapMaybePublisher<>(sources, Functions.identity(), true));
1798+
}
1799+
17311800
/**
17321801
* Returns a {@code Maybe} that emits {@code 0L} after a specified delay.
17331802
* <p>
@@ -2868,6 +2937,7 @@ public final Maybe<T> delay(long time, @NonNull TimeUnit unit) {
28682937
* @throws NullPointerException if {@code unit} is {@code null}
28692938
* @see <a href="http://reactivex.io/documentation/operators/delay.html">ReactiveX operators documentation: Delay</a>
28702939
* @see #delay(long, TimeUnit, Scheduler, boolean)
2940+
* @since 3.0.0
28712941
*/
28722942
@CheckReturnValue
28732943
@SchedulerSupport(SchedulerSupport.COMPUTATION)
@@ -2922,6 +2992,7 @@ public final Maybe<T> delay(long time, @NonNull TimeUnit unit, @NonNull Schedule
29222992
* @return the new {@code Maybe} instance
29232993
* @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null}
29242994
* @see <a href="http://reactivex.io/documentation/operators/delay.html">ReactiveX operators documentation: Delay</a>
2995+
* @since 3.0.0
29252996
*/
29262997
@CheckReturnValue
29272998
@NonNull

src/main/java/io/reactivex/rxjava3/core/Single.java

+71-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
import io.reactivex.rxjava3.internal.operators.completable.*;
3131
import io.reactivex.rxjava3.internal.operators.flowable.*;
3232
import io.reactivex.rxjava3.internal.operators.maybe.*;
33-
import io.reactivex.rxjava3.internal.operators.mixed.SingleFlatMapObservable;
33+
import io.reactivex.rxjava3.internal.operators.mixed.*;
3434
import io.reactivex.rxjava3.internal.operators.observable.*;
3535
import io.reactivex.rxjava3.internal.operators.single.*;
3636
import io.reactivex.rxjava3.internal.util.ErrorMode;
@@ -1406,6 +1406,75 @@ public static <T> Single<Boolean> sequenceEqual(@NonNull SingleSource<? extends
14061406
return RxJavaPlugins.onAssembly(new SingleEquals<>(source1, source2));
14071407
}
14081408

1409+
/**
1410+
* Switches between {@link SingleSource}s emitted by the source {@link Publisher} whenever
1411+
* a new {@code SingleSource} is emitted, disposing the previously running {@code SingleSource},
1412+
* exposing the success items as a {@link Flowable} sequence.
1413+
* <p>
1414+
* <img width="640" height="521" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.switchOnNext.png" alt="">
1415+
* <dl>
1416+
* <dt><b>Backpressure:</b></dt>
1417+
* <dd>The {@code sources} {@code Publisher} is consumed in an unbounded manner (requesting {@link Long#MAX_VALUE}).
1418+
* The returned {@code Flowable} respects the backpressure from the downstream.</dd>
1419+
* <dt><b>Scheduler:</b></dt>
1420+
* <dd>{@code switchOnNext} does not operate by default on a particular {@link Scheduler}.</dd>
1421+
* <dt><b>Error handling:</b></dt>
1422+
* <dd>The returned sequence fails with the first error signaled by the {@code sources} {@code Publisher}
1423+
* or the currently running {@code SingleSource}, disposing the rest. Late errors are
1424+
* forwarded to the global error handler via {@link RxJavaPlugins#onError(Throwable)}.</dd>
1425+
* </dl>
1426+
* @param <T> the element type of the {@code SingleSource}s
1427+
* @param sources the {@code Publisher} sequence of inner {@code SingleSource}s to switch between
1428+
* @return the new {@code Flowable} instance
1429+
* @throws NullPointerException if {@code sources} is {@code null}
1430+
* @since 3.0.0
1431+
* @see #switchOnNextDelayError(Publisher)
1432+
* @see <a href="http://reactivex.io/documentation/operators/switch.html">ReactiveX operators documentation: Switch</a>
1433+
*/
1434+
@BackpressureSupport(BackpressureKind.FULL)
1435+
@CheckReturnValue
1436+
@NonNull
1437+
@SchedulerSupport(SchedulerSupport.NONE)
1438+
public static <T> Flowable<T> switchOnNext(@NonNull Publisher<@NonNull ? extends SingleSource<? extends T>> sources) {
1439+
Objects.requireNonNull(sources, "sources is null");
1440+
return RxJavaPlugins.onAssembly(new FlowableSwitchMapSinglePublisher<>(sources, Functions.identity(), false));
1441+
}
1442+
1443+
/**
1444+
* Switches between {@link SingleSource}s emitted by the source {@link Publisher} whenever
1445+
* a new {@code SingleSource} is emitted, disposing the previously running {@code SingleSource},
1446+
* exposing the success items as a {@link Flowable} sequence and delaying all errors from
1447+
* all of them until all terminate.
1448+
* <p>
1449+
* <img width="640" height="425" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.switchOnNextDelayError.png" alt="">
1450+
* <dl>
1451+
* <dt><b>Backpressure:</b></dt>
1452+
* <dd>The {@code sources} {@code Publisher} is consumed in an unbounded manner (requesting {@link Long#MAX_VALUE}).
1453+
* The returned {@code Flowable} respects the backpressure from the downstream.</dd>
1454+
* <dt><b>Scheduler:</b></dt>
1455+
* <dd>{@code switchOnNextDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
1456+
* <dt><b>Error handling:</b></dt>
1457+
* <dd>The returned {@code Flowable} collects all errors emitted by either the {@code sources}
1458+
* {@code Publisher} or any inner {@code SingleSource} and emits them as a {@link CompositeException}
1459+
* when all sources terminate. If only one source ever failed, its error is emitted as-is at the end.</dd>
1460+
* </dl>
1461+
* @param <T> the element type of the {@code SingleSource}s
1462+
* @param sources the {@code Publisher} sequence of inner {@code SingleSource}s to switch between
1463+
* @return the new {@code Flowable} instance
1464+
* @throws NullPointerException if {@code sources} is {@code null}
1465+
* @since 3.0.0
1466+
* @see #switchOnNext(Publisher)
1467+
* @see <a href="http://reactivex.io/documentation/operators/switch.html">ReactiveX operators documentation: Switch</a>
1468+
*/
1469+
@BackpressureSupport(BackpressureKind.FULL)
1470+
@CheckReturnValue
1471+
@NonNull
1472+
@SchedulerSupport(SchedulerSupport.NONE)
1473+
public static <T> Flowable<T> switchOnNextDelayError(@NonNull Publisher<@NonNull ? extends SingleSource<? extends T>> sources) {
1474+
Objects.requireNonNull(sources, "sources is null");
1475+
return RxJavaPlugins.onAssembly(new FlowableSwitchMapSinglePublisher<>(sources, Functions.identity(), true));
1476+
}
1477+
14091478
/**
14101479
* <strong>Advanced use only:</strong> creates a {@code Single} instance without
14111480
* any safeguards by using a callback that is called with a {@link SingleObserver}.
@@ -3758,6 +3827,7 @@ public final Single<T> retry(@NonNull Predicate<? super Throwable> predicate) {
37583827
* @param stop the function that should return {@code true} to stop retrying
37593828
* @return the new {@code Single} instance
37603829
* @throws NullPointerException if {@code stop} is {@code null}
3830+
* @since 3.0.0
37613831
*/
37623832
@CheckReturnValue
37633833
@NonNull
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava3.internal.operators.mixed;
15+
16+
import org.reactivestreams.Publisher;
17+
18+
import io.reactivex.rxjava3.core.*;
19+
import io.reactivex.rxjava3.functions.Function;
20+
21+
/**
22+
* Switch between subsequent {@link CompletableSource}s emitted by a {@link Publisher}.
23+
* Reuses {@link FlowableSwitchMapCompletable} internals.
24+
* @param <T> the upstream value type
25+
* @since 3.0.0
26+
*/
27+
public final class FlowableSwitchMapCompletablePublisher<T> extends Completable {
28+
29+
final Publisher<T> source;
30+
31+
final Function<? super T, ? extends CompletableSource> mapper;
32+
33+
final boolean delayErrors;
34+
35+
public FlowableSwitchMapCompletablePublisher(Publisher<T> source,
36+
Function<? super T, ? extends CompletableSource> mapper, boolean delayErrors) {
37+
this.source = source;
38+
this.mapper = mapper;
39+
this.delayErrors = delayErrors;
40+
}
41+
42+
@Override
43+
protected void subscribeActual(CompletableObserver observer) {
44+
source.subscribe(new FlowableSwitchMapCompletable.SwitchMapCompletableObserver<>(observer, mapper, delayErrors));
45+
}
46+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava3.internal.operators.mixed;
15+
16+
import org.reactivestreams.*;
17+
18+
import io.reactivex.rxjava3.core.*;
19+
import io.reactivex.rxjava3.functions.Function;
20+
21+
/**
22+
* Switch between subsequent {@link MaybeSource}s emitted by a {@link Publisher}.
23+
* Reuses {@link FlowableSwitchMapMaybe} internals.
24+
* @param <T> the upstream value type
25+
* @param <R> the downstream value type
26+
* @since 3.0.0
27+
*/
28+
public final class FlowableSwitchMapMaybePublisher<T, R> extends Flowable<R> {
29+
30+
final Publisher<T> source;
31+
32+
final Function<? super T, ? extends MaybeSource<? extends R>> mapper;
33+
34+
final boolean delayErrors;
35+
36+
public FlowableSwitchMapMaybePublisher(Publisher<T> source,
37+
Function<? super T, ? extends MaybeSource<? extends R>> mapper,
38+
boolean delayErrors) {
39+
this.source = source;
40+
this.mapper = mapper;
41+
this.delayErrors = delayErrors;
42+
}
43+
44+
@Override
45+
protected void subscribeActual(Subscriber<? super R> s) {
46+
source.subscribe(new FlowableSwitchMapMaybe.SwitchMapMaybeSubscriber<>(s, mapper, delayErrors));
47+
}
48+
}

0 commit comments

Comments
 (0)