Skip to content

Commit ebec0b2

Browse files
authored
2.x: add Maybe operators, add annotation and source code checker tests (#4528)
1 parent 15f8054 commit ebec0b2

34 files changed

+3867
-300
lines changed

src/main/java/io/reactivex/Completable.java

+57-9
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,12 @@
1616

1717
import org.reactivestreams.Publisher;
1818

19-
import io.reactivex.annotations.SchedulerSupport;
19+
import io.reactivex.annotations.*;
2020
import io.reactivex.disposables.Disposable;
2121
import io.reactivex.exceptions.Exceptions;
2222
import io.reactivex.functions.*;
2323
import io.reactivex.internal.functions.*;
24+
import io.reactivex.internal.fuseable.*;
2425
import io.reactivex.internal.observers.*;
2526
import io.reactivex.internal.operators.completable.*;
2627
import io.reactivex.internal.operators.flowable.FlowableDelaySubscriptionOther;
@@ -43,7 +44,7 @@ public abstract class Completable implements CompletableSource {
4344
* terminates (normally or with an error) and cancels all other Completables.
4445
* <dl>
4546
* <dt><b>Scheduler:</b></dt>
46-
* <dd>{@code amb} does not operate by default on a particular {@link Scheduler}.</dd>
47+
* <dd>{@code ambArray} does not operate by default on a particular {@link Scheduler}.</dd>
4748
* </dl>
4849
* @param sources the array of source Completables
4950
* @return the new Completable instance
@@ -97,7 +98,7 @@ public static Completable complete() {
9798
* Returns a Completable which completes only when all sources complete, one after another.
9899
* <dl>
99100
* <dt><b>Scheduler:</b></dt>
100-
* <dd>{@code concat} does not operate by default on a particular {@link Scheduler}.</dd>
101+
* <dd>{@code concatArray} does not operate by default on a particular {@link Scheduler}.</dd>
101102
* </dl>
102103
* @param sources the sources to concatenate
103104
* @return the Completable instance which completes only when all sources complete
@@ -143,6 +144,7 @@ public static Completable concat(Iterable<? extends CompletableSource> sources)
143144
* @throws NullPointerException if sources is null
144145
*/
145146
@SchedulerSupport(SchedulerSupport.NONE)
147+
@BackpressureSupport(BackpressureKind.FULL)
146148
public static Completable concat(Publisher<? extends CompletableSource> sources) {
147149
return concat(sources, 2);
148150
}
@@ -159,6 +161,7 @@ public static Completable concat(Publisher<? extends CompletableSource> sources)
159161
* @throws NullPointerException if sources is null
160162
*/
161163
@SchedulerSupport(SchedulerSupport.NONE)
164+
@BackpressureSupport(BackpressureKind.FULL)
162165
public static Completable concat(Publisher<? extends CompletableSource> sources, int prefetch) {
163166
ObjectHelper.requireNonNull(sources, "sources is null");
164167
ObjectHelper.verifyPositive(prefetch, "prefetch");
@@ -359,6 +362,7 @@ public static <T> Completable fromObservable(final ObservableSource<T> observabl
359362
* @return the new Completable instance
360363
* @throws NullPointerException if publisher is null
361364
*/
365+
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
362366
@SchedulerSupport(SchedulerSupport.NONE)
363367
public static <T> Completable fromPublisher(final Publisher<T> publisher) {
364368
ObjectHelper.requireNonNull(publisher, "publisher is null");
@@ -388,7 +392,7 @@ public static <T> Completable fromSingle(final SingleSource<T> single) {
388392
* completes only when all source Completables complete or one of them emits an error.
389393
* <dl>
390394
* <dt><b>Scheduler:</b></dt>
391-
* <dd>{@code merge} does not operate by default on a particular {@link Scheduler}.</dd>
395+
* <dd>{@code mergeArray} does not operate by default on a particular {@link Scheduler}.</dd>
392396
* </dl>
393397
* @param sources the iterable sequence of sources.
394398
* @return the new Completable instance
@@ -435,6 +439,7 @@ public static Completable merge(Iterable<? extends CompletableSource> sources) {
435439
* @throws NullPointerException if sources is null
436440
*/
437441
@SchedulerSupport(SchedulerSupport.NONE)
442+
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
438443
public static Completable merge(Publisher<? extends CompletableSource> sources) {
439444
return merge0(sources, Integer.MAX_VALUE, false);
440445
}
@@ -453,6 +458,7 @@ public static Completable merge(Publisher<? extends CompletableSource> sources)
453458
* @throws IllegalArgumentException if maxConcurrency is less than 1
454459
*/
455460
@SchedulerSupport(SchedulerSupport.NONE)
461+
@BackpressureSupport(BackpressureKind.FULL)
456462
public static Completable merge(Publisher<? extends CompletableSource> sources, int maxConcurrency) {
457463
return merge0(sources, maxConcurrency, false);
458464
}
@@ -473,6 +479,7 @@ public static Completable merge(Publisher<? extends CompletableSource> sources,
473479
* @throws IllegalArgumentException if maxConcurrency is less than 1
474480
*/
475481
@SchedulerSupport(SchedulerSupport.NONE)
482+
@BackpressureSupport(BackpressureKind.FULL)
476483
private static Completable merge0(Publisher<? extends CompletableSource> sources, int maxConcurrency, boolean delayErrors) {
477484
ObjectHelper.requireNonNull(sources, "sources is null");
478485
ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
@@ -485,7 +492,7 @@ private static Completable merge0(Publisher<? extends CompletableSource> sources
485492
* them terminate in a way or another.
486493
* <dl>
487494
* <dt><b>Scheduler:</b></dt>
488-
* <dd>{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
495+
* <dd>{@code mergeArrayDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
489496
* </dl>
490497
* @param sources the array of Completables
491498
* @return the new Completable instance
@@ -529,6 +536,7 @@ public static Completable mergeDelayError(Iterable<? extends CompletableSource>
529536
* @throws NullPointerException if sources is null
530537
*/
531538
@SchedulerSupport(SchedulerSupport.NONE)
539+
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
532540
public static Completable mergeDelayError(Publisher<? extends CompletableSource> sources) {
533541
return merge0(sources, Integer.MAX_VALUE, true);
534542
}
@@ -548,6 +556,7 @@ public static Completable mergeDelayError(Publisher<? extends CompletableSource>
548556
* @throws NullPointerException if sources is null
549557
*/
550558
@SchedulerSupport(SchedulerSupport.NONE)
559+
@BackpressureSupport(BackpressureKind.FULL)
551560
public static Completable mergeDelayError(Publisher<? extends CompletableSource> sources, int maxConcurrency) {
552561
return merge0(sources, maxConcurrency, true);
553562
}
@@ -670,7 +679,7 @@ public static <R> Completable using(
670679
* if not already Completable.
671680
* <dl>
672681
* <dt><b>Scheduler:</b></dt>
673-
* <dd>{@code amb} does not operate by default on a particular {@link Scheduler}.</dd>
682+
* <dd>{@code wrap} does not operate by default on a particular {@link Scheduler}.</dd>
674683
* </dl>
675684
* @param source the source to wrap
676685
* @return the source or its wrapper Completable
@@ -736,6 +745,7 @@ public final <T> Observable<T> andThen(ObservableSource<T> next) {
736745
* @return Flowable that composes this Completable and next
737746
* @throws NullPointerException if next is null
738747
*/
748+
@BackpressureSupport(BackpressureKind.FULL)
739749
@SchedulerSupport(SchedulerSupport.NONE)
740750
public final <T> Flowable<T> andThen(Publisher<T> next) {
741751
ObjectHelper.requireNonNull(next, "next is null");
@@ -821,7 +831,7 @@ public final boolean blockingAwait(long timeout, TimeUnit unit) {
821831
* the emitted exception if any.
822832
* <dl>
823833
* <dt><b>Scheduler:</b></dt>
824-
* <dd>{@code doAfterTerminate} does not operate by default on a particular {@link Scheduler}.</dd>
834+
* <dd>{@code blockingGet} does not operate by default on a particular {@link Scheduler}.</dd>
825835
* </dl>
826836
* @return the throwable if this terminated with an error, null otherwise
827837
* @throws RuntimeException that wraps an InterruptedException if the wait is interrupted
@@ -836,6 +846,10 @@ public final Throwable blockingGet() {
836846
/**
837847
* Subscribes to this Completable instance and blocks until it terminates or the specified timeout
838848
* elapses, then returns null for normal termination or the emitted exception if any.
849+
* <dl>
850+
* <dt><b>Scheduler:</b></dt>
851+
* <dd>{@code blockingGet} does not operate by default on a particular {@link Scheduler}.</dd>
852+
* </dl>
839853
* @param timeout the timeout value
840854
* @param unit the time unit
841855
* @return the throwable if this terminated with an error, null otherwise
@@ -1162,7 +1176,7 @@ public final Completable onErrorComplete() {
11621176
* true, it will emit an onComplete and swallow the throwable.
11631177
* <dl>
11641178
* <dt><b>Scheduler:</b></dt>
1165-
* <dd>{@code doErrorComplete} does not operate by default on a particular {@link Scheduler}.</dd>
1179+
* <dd>{@code onErrorComplete} does not operate by default on a particular {@link Scheduler}.</dd>
11661180
* </dl>
11671181
* @param predicate the predicate to call when an Throwable is emitted which should return true
11681182
* if the Throwable should be swallowed and replaced with an onComplete.
@@ -1381,6 +1395,7 @@ public final <T> Observable<T> startWith(Observable<T> other) {
13811395
* @return the new Observable instance
13821396
* @throws NullPointerException if other is null
13831397
*/
1398+
@BackpressureSupport(BackpressureKind.FULL)
13841399
@SchedulerSupport(SchedulerSupport.NONE)
13851400
public final <T> Flowable<T> startWith(Publisher<T> other) {
13861401
ObjectHelper.requireNonNull(other, "other is null");
@@ -1442,12 +1457,17 @@ public final void subscribe(CompletableObserver s) {
14421457
*
14431458
* composite.add(source.subscribeWith(new ResourceCompletableObserver()));
14441459
* </code></pre>
1460+
* <dl>
1461+
* <dt><b>Scheduler:</b></dt>
1462+
* <dd>{@code subscribeWith} does not operate by default on a particular {@link Scheduler}.</dd>
1463+
* </dl>
14451464
* @param <E> the type of the CompletableObserver to use and return
14461465
* @param observer the CompletableObserver (subclass) to use and return, not null
14471466
* @return the input {@code observer}
14481467
* @throws NullPointerException if {@code observer} is null
14491468
* @since 2.0
14501469
*/
1470+
@SchedulerSupport(SchedulerSupport.NONE)
14511471
public final <E extends CompletableObserver> E subscribeWith(E observer) {
14521472
subscribe(observer);
14531473
return observer;
@@ -1595,6 +1615,10 @@ public final Completable timeout(long timeout, TimeUnit unit, Scheduler schedule
15951615
* Returns a Completable that runs this Completable and optionally switches to the other Completable
15961616
* in case this Completable doesn't complete within the given time while "waiting" on
15971617
* the specified scheduler.
1618+
* <dl>
1619+
* <dt><b>Scheduler:</b></dt>
1620+
* <dd>you specify the {@link Scheduler} this operator runs on.</dd>
1621+
* </dl>
15981622
* @param timeout the timeout value
15991623
* @param unit the timeout unit
16001624
* @param scheduler the scheduler to use to wait for completion
@@ -1641,8 +1665,13 @@ public final <U> U to(Function<? super Completable, U> converter) {
16411665
* @param <T> the value type
16421666
* @return the new Observable created
16431667
*/
1668+
@SuppressWarnings("unchecked")
1669+
@BackpressureSupport(BackpressureKind.FULL)
16441670
@SchedulerSupport(SchedulerSupport.NONE)
16451671
public final <T> Flowable<T> toFlowable() {
1672+
if (this instanceof FuseToFlowable) {
1673+
return ((FuseToFlowable<T>)this).fuseToFlowable();
1674+
}
16461675
return RxJavaPlugins.onAssembly(new CompletableToFlowable<T>(this));
16471676
}
16481677

@@ -1652,13 +1681,18 @@ public final <T> Flowable<T> toFlowable() {
16521681
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.toObservable.png" alt="">
16531682
* <dl>
16541683
* <dt><b>Scheduler:</b></dt>
1655-
* <dd>{@code toCompletable} does not operate by default on a particular {@link Scheduler}.</dd>
1684+
* <dd>{@code toMaybe} does not operate by default on a particular {@link Scheduler}.</dd>
16561685
* </dl>
16571686
*
16581687
* @param <T> the value type
16591688
* @return an {@link Maybe} that emits a single item T or an error.
16601689
*/
1690+
@SuppressWarnings("unchecked")
1691+
@SchedulerSupport(SchedulerSupport.NONE)
16611692
public final <T> Maybe<T> toMaybe() {
1693+
if (this instanceof FuseToMaybe) {
1694+
return ((FuseToMaybe<T>)this).fuseToMaybe();
1695+
}
16621696
return RxJavaPlugins.onAssembly(new MaybeFromCompletable<T>(this));
16631697
}
16641698

@@ -1672,8 +1706,12 @@ public final <T> Maybe<T> toMaybe() {
16721706
* @param <T> the value type
16731707
* @return the new Observable created
16741708
*/
1709+
@SuppressWarnings("unchecked")
16751710
@SchedulerSupport(SchedulerSupport.NONE)
16761711
public final <T> Observable<T> toObservable() {
1712+
if (this instanceof FuseToObservable) {
1713+
return ((FuseToObservable<T>)this).fuseToObservable();
1714+
}
16771715
return RxJavaPlugins.onAssembly(new CompletableToObservable<T>(this));
16781716
}
16791717

@@ -1736,9 +1774,14 @@ public final Completable unsubscribeOn(final Scheduler scheduler) {
17361774
/**
17371775
* Creates a TestSubscriber and subscribes
17381776
* it to this Completable.
1777+
* <dl>
1778+
* <dt><b>Scheduler:</b></dt>
1779+
* <dd>{@code test} does not operate by default on a particular {@link Scheduler}.</dd>
1780+
* </dl>
17391781
* @return the new TestSubscriber instance
17401782
* @since 2.0
17411783
*/
1784+
@SchedulerSupport(SchedulerSupport.NONE)
17421785
public final TestSubscriber<Void> test() {
17431786
TestSubscriber<Void> ts = new TestSubscriber<Void>();
17441787
subscribe(new SubscriberCompletableObserver<Void>(ts));
@@ -1749,9 +1792,14 @@ public final TestSubscriber<Void> test() {
17491792
* Creates a TestSubscriber optionally in cancelled state, then subscribes it to this Completable.
17501793
* @param cancelled if true, the TestSubscriber will be cancelled before subscribing to this
17511794
* Completable.
1795+
* <dl>
1796+
* <dt><b>Scheduler:</b></dt>
1797+
* <dd>{@code test} does not operate by default on a particular {@link Scheduler}.</dd>
1798+
* </dl>
17521799
* @return the new TestSubscriber instance
17531800
* @since 2.0
17541801
*/
1802+
@SchedulerSupport(SchedulerSupport.NONE)
17551803
public final TestSubscriber<Void> test(boolean cancelled) {
17561804
TestSubscriber<Void> ts = new TestSubscriber<Void>();
17571805

0 commit comments

Comments
 (0)