Skip to content

Commit db8b733

Browse files
authored
2.x: add more Maybe operators 9/09-1 (#4519)
1 parent 4f878d5 commit db8b733

18 files changed

+2090
-10
lines changed

src/main/java/io/reactivex/Maybe.java

+241-2
Original file line numberDiff line numberDiff line change
@@ -1807,6 +1807,28 @@ public static <T, R> Maybe<R> zipArray(Function<? super Object[], ? extends R> z
18071807
// Instance methods
18081808
// ------------------------------------------------------------------
18091809

1810+
/**
1811+
* Mirrors the MaybeSource (current or provided) that first signals an event.
1812+
* <p>
1813+
* <img width="640" height="385" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/amb.png" alt="">
1814+
* <dl>
1815+
* <dt><b>Scheduler:</b></dt>
1816+
* <dd>{@code amb} does not operate by default on a particular {@link Scheduler}.</dd>
1817+
* </dl>
1818+
*
1819+
* @param other
1820+
* a MaybeSource competing to react first
1821+
* @return a Maybe that emits the same sequence as whichever of the source MaybeSources first
1822+
* signalled
1823+
* @see <a href="http://reactivex.io/documentation/operators/amb.html">ReactiveX operators documentation: Amb</a>
1824+
*/
1825+
@SuppressWarnings("unchecked")
1826+
@SchedulerSupport(SchedulerSupport.NONE)
1827+
public final Maybe<T> ambWith(MaybeSource<? extends T> other) {
1828+
ObjectHelper.requireNonNull(other, "other is null");
1829+
return ambArray(this, other);
1830+
}
1831+
18101832
/**
18111833
* Waits in a blocking fashion until the current Maybe signals a success value (which is returned),
18121834
* null if completed or an exception (which is propagated).
@@ -1816,7 +1838,7 @@ public static <T, R> Maybe<R> zipArray(Function<? super Object[], ? extends R> z
18161838
* </dl>
18171839
* @return the success value
18181840
*/
1819-
public T blockingGet() {
1841+
public final T blockingGet() {
18201842
BlockingObserver<T> observer = new BlockingObserver<T>();
18211843
subscribe(observer);
18221844
return observer.blockingGet();
@@ -1832,13 +1854,38 @@ public T blockingGet() {
18321854
* @param defaultValue the default item to return if this Maybe is empty
18331855
* @return the success value
18341856
*/
1835-
public T blockingGet(T defaultValue) {
1857+
public final T blockingGet(T defaultValue) {
18361858
ObjectHelper.requireNonNull(defaultValue, "defaultValue is null");
18371859
BlockingObserver<T> observer = new BlockingObserver<T>();
18381860
subscribe(observer);
18391861
return observer.blockingGet(defaultValue);
18401862
}
18411863

1864+
/**
1865+
* Returns a Maybe that subscribes to this Maybe lazily, caches its event
1866+
* and replays it, to all the downstream subscribers.
1867+
* <p>
1868+
* <img width="640" height="410" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/cache.png" alt="">
1869+
* <p>
1870+
* The operator subscribes only when the first downstream subscriber subscribes and maintains
1871+
* a single subscription towards this Maybe.
1872+
* <p>
1873+
* <em>Note:</em> You sacrifice the ability to unsubscribe from the origin when you use the {@code cache}.
1874+
* <dl>
1875+
* <dt><b>Scheduler:</b></dt>
1876+
* <dd>{@code cache} does not operate by default on a particular {@link Scheduler}.</dd>
1877+
* </dl>
1878+
*
1879+
* @return a Flowable that, when first subscribed to, caches all of its items and notifications for the
1880+
* benefit of subsequent subscribers
1881+
* @see <a href="http://reactivex.io/documentation/operators/replay.html">ReactiveX operators documentation: Replay</a>
1882+
*/
1883+
@BackpressureSupport(BackpressureKind.FULL)
1884+
@SchedulerSupport(SchedulerSupport.NONE)
1885+
public final Maybe<T> cache() {
1886+
return new MaybeCache<T>(this);
1887+
}
1888+
18421889
/**
18431890
* Casts the success value of the current Maybe into the target type or signals a
18441891
* ClassCastException if not compatible.
@@ -1900,6 +1947,146 @@ public final <R> Maybe<R> concatMap(Function<? super T, ? extends MaybeSource<?
19001947
return RxJavaPlugins.onAssembly(new MaybeFlatten<T, R>(this, mapper));
19011948
}
19021949

1950+
1951+
/**
1952+
* Returns a Flowable that emits the items emitted from the current MaybeSource, then the next, one after
1953+
* the other, without interleaving them.
1954+
* <p>
1955+
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concat.png" alt="">
1956+
* <dl>
1957+
* <dt><b>Backpressure:</b></dt>
1958+
* <dd>The operator honors backpressure from downstream.</dd>
1959+
* <dt><b>Scheduler:</b></dt>
1960+
* <dd>{@code concat} does not operate by default on a particular {@link Scheduler}.</dd>
1961+
* </dl>
1962+
*
1963+
* @param other
1964+
* a MaybeSource to be concatenated after the current
1965+
* @return a Flowable that emits items emitted by the two source MaybeSources, one after the other,
1966+
* without interleaving them
1967+
* @see <a href="http://reactivex.io/documentation/operators/concat.html">ReactiveX operators documentation: Concat</a>
1968+
*/
1969+
@BackpressureSupport(BackpressureKind.FULL)
1970+
@SchedulerSupport(SchedulerSupport.NONE)
1971+
public final Flowable<T> concatWith(MaybeSource<? extends T> other) {
1972+
ObjectHelper.requireNonNull(other, "other is null");
1973+
return concat(this, other);
1974+
}
1975+
1976+
/**
1977+
* Returns a Single that emits a Boolean that indicates whether the source Publisher emitted a
1978+
* specified item.
1979+
* <p>
1980+
* <img width="640" height="320" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/contains.png" alt="">
1981+
* <dl>
1982+
* <dt><b>Scheduler:</b></dt>
1983+
* <dd>{@code contains} does not operate by default on a particular {@link Scheduler}.</dd>
1984+
* </dl>
1985+
*
1986+
* @param item
1987+
* the item to search for in the emissions from the source Maybe, not null
1988+
* @return a Single that emits {@code true} if the specified item is emitted by the source Maybe,
1989+
* or {@code false} if the source Maybe completes without emitting that item
1990+
* @see <a href="http://reactivex.io/documentation/operators/contains.html">ReactiveX operators documentation: Contains</a>
1991+
*/
1992+
@SchedulerSupport(SchedulerSupport.NONE)
1993+
public final Single<Boolean> contains(final Object item) {
1994+
ObjectHelper.requireNonNull(item, "item is null");
1995+
return RxJavaPlugins.onAssembly(new MaybeContains<T>(this, item));
1996+
}
1997+
1998+
/**
1999+
* Returns a Maybe that counts the total number of items emitted (0 or 1) by the source Maybe and emits
2000+
* this count as a 64-bit Long.
2001+
* <p>
2002+
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/longCount.png" alt="">
2003+
* <dl>
2004+
* <dt><b>Scheduler:</b></dt>
2005+
* <dd>{@code countLong} does not operate by default on a particular {@link Scheduler}.</dd>
2006+
* </dl>
2007+
*
2008+
* @return a Single that emits a single item: the number of items emitted by the source Publisher as a
2009+
* 64-bit Long item
2010+
* @see <a href="http://reactivex.io/documentation/operators/count.html">ReactiveX operators documentation: Count</a>
2011+
* @see #count()
2012+
*/
2013+
@SchedulerSupport(SchedulerSupport.NONE)
2014+
public final Single<Long> count() {
2015+
return RxJavaPlugins.onAssembly(new MaybeCount<T>(this));
2016+
}
2017+
2018+
/**
2019+
* Returns a Maybe that emits the item emitted by the source Maybe or a specified default item
2020+
* if the source Maybe is empty.
2021+
* <p>
2022+
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/defaultIfEmpty.png" alt="">
2023+
* <dl>
2024+
* <dt><b>Scheduler:</b></dt>
2025+
* <dd>{@code defaultIfEmpty} does not operate by default on a particular {@link Scheduler}.</dd>
2026+
* </dl>
2027+
*
2028+
* @param defaultItem
2029+
* the item to emit if the source Maybe emits no items
2030+
* @return a Maybe that emits either the specified default item if the source Maybe emits no
2031+
* items, or the items emitted by the source Maybe
2032+
* @see <a href="http://reactivex.io/documentation/operators/defaultifempty.html">ReactiveX operators documentation: DefaultIfEmpty</a>
2033+
*/
2034+
@BackpressureSupport(BackpressureKind.FULL)
2035+
@SchedulerSupport(SchedulerSupport.NONE)
2036+
public final Maybe<T> defaultIfEmpty(T defaultItem) {
2037+
ObjectHelper.requireNonNull(defaultItem, "item is null");
2038+
return switchIfEmpty(just(defaultItem));
2039+
}
2040+
2041+
2042+
/**
2043+
* Returns a Maybe that signals the events emitted by the source Maybe shifted forward in time by a
2044+
* specified delay.
2045+
* <p>
2046+
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/delay.png" alt="">
2047+
* <dl>
2048+
* <dt><b>Scheduler:</b></dt>
2049+
* <dd>This version of {@code delay} operates by default on the {@code computation} {@link Scheduler}.</dd>
2050+
* </dl>
2051+
*
2052+
* @param delay
2053+
* the delay to shift the source by
2054+
* @param unit
2055+
* the {@link TimeUnit} in which {@code period} is defined
2056+
* @return the new Maybe instance
2057+
* @see <a href="http://reactivex.io/documentation/operators/delay.html">ReactiveX operators documentation: Delay</a>
2058+
*/
2059+
@SchedulerSupport(SchedulerSupport.COMPUTATION)
2060+
public final Maybe<T> delay(long delay, TimeUnit unit) {
2061+
return delay(delay, unit, Schedulers.computation());
2062+
}
2063+
2064+
/**
2065+
* Returns a Maybe that signals the events emitted by the source Maybe shifted forward in time by a
2066+
* specified delay running on the specified Scheduler.
2067+
* <p>
2068+
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/delay.s.png" alt="">
2069+
* <dl>
2070+
* <dt><b>Scheduler:</b></dt>
2071+
* <dd>you specify which {@link Scheduler} this operator will use</dd>
2072+
* </dl>
2073+
*
2074+
* @param delay
2075+
* the delay to shift the source by
2076+
* @param unit
2077+
* the time unit of {@code delay}
2078+
* @param scheduler
2079+
* the {@link Scheduler} to use for delaying
2080+
* @return the new Maybe instance
2081+
* @see <a href="http://reactivex.io/documentation/operators/delay.html">ReactiveX operators documentation: Delay</a>
2082+
*/
2083+
@SchedulerSupport(SchedulerSupport.CUSTOM)
2084+
public final Maybe<T> delay(long delay, TimeUnit unit, Scheduler scheduler) {
2085+
ObjectHelper.requireNonNull(unit, "unit is null");
2086+
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
2087+
return RxJavaPlugins.onAssembly(new MaybeDelay<T>(this, Math.max(0L, delay), unit, scheduler));
2088+
}
2089+
19032090
/**
19042091
* Registers an {@link Action} to be called when this Maybe invokes either
19052092
* {@link MaybeObserver#onComplete onSuccess},
@@ -2546,6 +2733,58 @@ public final <E extends MaybeObserver<? super T>> E subscribeWith(E observer) {
25462733
return observer;
25472734
}
25482735

2736+
/**
2737+
* Returns a Maybe that emits the items emitted by the source Maybe or the items of an alternate
2738+
* MaybeSource if the current Maybe is empty.
2739+
* <p/>
2740+
* <dl>
2741+
* <dt><b>Scheduler:</b></dt>
2742+
* <dd>{@code switchIfEmpty} does not operate by default on a particular {@link Scheduler}.</dd>
2743+
* </dl>
2744+
*
2745+
* @param other
2746+
* the alternate MaybeSource to subscribe to if the main does not emit any items
2747+
* @return a Maybe that emits the items emitted by the source Maybe or the items of an
2748+
* alternate MaybeSource if the source Maybe is empty.
2749+
*/
2750+
@SchedulerSupport(SchedulerSupport.NONE)
2751+
public final Maybe<T> switchIfEmpty(MaybeSource<? extends T> other) {
2752+
ObjectHelper.requireNonNull(other, "other is null");
2753+
return RxJavaPlugins.onAssembly(new MaybeSwitchIfEmpty<T>(this, other));
2754+
}
2755+
2756+
2757+
/**
2758+
* Waits until this and the other MaybeSource signal a success value then applies the given BiFunction
2759+
* to those values and emits the BiFunction's resulting value to downstream.
2760+
*
2761+
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/zip.png" alt="">
2762+
*
2763+
* <p>If either this or the other MaybeSource is empty or signals an error, the resulting Maybe will
2764+
* terminate immediately and dispose the other source.
2765+
*
2766+
* <dl>
2767+
* <dt><b>Scheduler:</b></dt>
2768+
* <dd>{@code zipWith} does not operate by default on a particular {@link Scheduler}.</dd>
2769+
* </dl>
2770+
*
2771+
* @param <U>
2772+
* the type of items emitted by the {@code other} Publisher
2773+
* @param <R>
2774+
* the type of items emitted by the resulting Publisher
2775+
* @param other
2776+
* the other Publisher
2777+
* @param zipper
2778+
* a function that combines the pairs of items from the two Publishers to generate the items to
2779+
* be emitted by the resulting Publisher
2780+
* @return the new Maybe instance
2781+
* @see <a href="http://reactivex.io/documentation/operators/zip.html">ReactiveX operators documentation: Zip</a>
2782+
*/
2783+
@SchedulerSupport(SchedulerSupport.NONE)
2784+
public final <U, R> Maybe<R> zipWith(MaybeSource<? extends U> other, BiFunction<? super T, ? super U, ? extends R> zipper) {
2785+
ObjectHelper.requireNonNull(other, "other is null");
2786+
return zip(this, other, zipper);
2787+
}
25492788

25502789
// ------------------------------------------------------------------
25512790
// Test helper

src/main/java/io/reactivex/internal/operators/flowable/FlowableDetach.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@
1515

1616
import org.reactivestreams.*;
1717

18-
import io.reactivex.internal.subscribers.flowable.EmptyComponent;
1918
import io.reactivex.internal.subscriptions.SubscriptionHelper;
19+
import io.reactivex.internal.util.EmptyComponent;
2020

2121
public final class FlowableDetach<T> extends AbstractFlowableWithUpstream<T, T> {
2222

0 commit comments

Comments
 (0)