Skip to content

Commit cec5dcc

Browse files
authored
2.x: Functional interfaces now throw (#4278)
* 1.x: use throwing functional interfaces + changes * Fix remaining classes * Fix mistakes.
1 parent a4fb7da commit cec5dcc

File tree

202 files changed

+2299
-2783
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

202 files changed

+2299
-2783
lines changed

src/main/java/io/reactivex/Completable.java

+12-7
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,13 @@
1616

1717
import org.reactivestreams.*;
1818

19-
import io.reactivex.annotations.*;
19+
import io.reactivex.annotations.SchedulerSupport;
2020
import io.reactivex.disposables.Disposable;
2121
import io.reactivex.functions.*;
2222
import io.reactivex.internal.functions.*;
2323
import io.reactivex.internal.operators.completable.*;
2424
import io.reactivex.internal.subscribers.completable.*;
25+
import io.reactivex.internal.util.Exceptions;
2526
import io.reactivex.plugins.RxJavaPlugins;
2627
import io.reactivex.schedulers.Schedulers;
2728

@@ -200,7 +201,7 @@ public static Completable create(CompletableConsumable onSubscribe) {
200201
* @return the Completable instance
201202
*/
202203
@SchedulerSupport(SchedulerSupport.NONE)
203-
public static Completable defer(final Supplier<? extends CompletableConsumable> completableSupplier) {
204+
public static Completable defer(final Callable<? extends CompletableConsumable> completableSupplier) {
204205
Objects.requireNonNull(completableSupplier, "completableSupplier");
205206
return new CompletableDefer(completableSupplier);
206207
}
@@ -216,7 +217,7 @@ public static Completable defer(final Supplier<? extends CompletableConsumable>
216217
* @throws NullPointerException if errorSupplier is null
217218
*/
218219
@SchedulerSupport(SchedulerSupport.NONE)
219-
public static Completable error(final Supplier<? extends Throwable> errorSupplier) {
220+
public static Completable error(final Callable<? extends Throwable> errorSupplier) {
220221
Objects.requireNonNull(errorSupplier, "errorSupplier is null");
221222
return new CompletableErrorSupplier(errorSupplier);
222223
}
@@ -499,7 +500,7 @@ private static NullPointerException toNpe(Throwable ex) {
499500
* @param disposer the consumer that disposes the resource created by the resource supplier
500501
* @return the new Completable instance
501502
*/
502-
public static <R> Completable using(Supplier<R> resourceSupplier,
503+
public static <R> Completable using(Callable<R> resourceSupplier,
503504
Function<? super R, ? extends CompletableConsumable> completableFunction,
504505
Consumer<? super R> disposer) {
505506
return using(resourceSupplier, completableFunction, disposer, true);
@@ -523,7 +524,7 @@ public static <R> Completable using(Supplier<R> resourceSupplier,
523524
* @return the new Completable instance
524525
*/
525526
public static <R> Completable using(
526-
final Supplier<R> resourceSupplier,
527+
final Callable<R> resourceSupplier,
527528
final Function<? super R, ? extends CompletableConsumable> completableFunction,
528529
final Consumer<? super R> disposer,
529530
final boolean eager) {
@@ -1315,7 +1316,11 @@ private Completable timeout0(long timeout, TimeUnit unit, Scheduler scheduler, C
13151316
*/
13161317
@SchedulerSupport(SchedulerSupport.NONE)
13171318
public final <U> U to(Function<? super Completable, U> converter) {
1318-
return converter.apply(this);
1319+
try {
1320+
return converter.apply(this);
1321+
} catch (Throwable ex) {
1322+
throw Exceptions.propagate(ex);
1323+
}
13191324
}
13201325

13211326
/**
@@ -1349,7 +1354,7 @@ public final <T> Observable<T> toObservable() {
13491354
* @throws NullPointerException if completionValueSupplier is null
13501355
*/
13511356
@SchedulerSupport(SchedulerSupport.NONE)
1352-
public final <T> Single<T> toSingle(final Supplier<? extends T> completionValueSupplier) {
1357+
public final <T> Single<T> toSingle(final Callable<? extends T> completionValueSupplier) {
13531358
Objects.requireNonNull(completionValueSupplier, "completionValueSupplier is null");
13541359
return new CompletableToSingle<T>(this, completionValueSupplier, null);
13551360
}

src/main/java/io/reactivex/Flowable.java

+108-143
Large diffs are not rendered by default.

src/main/java/io/reactivex/Observable.java

+111-145
Large diffs are not rendered by default.

src/main/java/io/reactivex/Single.java

+15-10
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.reactivex.internal.functions.Objects;
2525
import io.reactivex.internal.operators.single.*;
2626
import io.reactivex.internal.subscribers.single.*;
27+
import io.reactivex.internal.util.Exceptions;
2728
import io.reactivex.plugins.RxJavaPlugins;
2829
import io.reactivex.schedulers.Schedulers;
2930

@@ -60,9 +61,9 @@ public static <T> Single<T> amb(final Iterable<? extends SingleConsumable<? exte
6061
@SuppressWarnings("unchecked")
6162
public static <T> Single<T> amb(final SingleConsumable<? extends T>... sources) {
6263
if (sources.length == 0) {
63-
return error(new Supplier<Throwable>() {
64+
return error(new Callable<Throwable>() {
6465
@Override
65-
public Throwable get() {
66+
public Throwable call() {
6667
return new NoSuchElementException();
6768
}
6869
});
@@ -208,21 +209,21 @@ public static <T> Single<T> create(SingleConsumable<T> onSubscribe) {
208209
return new SingleWrapper<T>(onSubscribe);
209210
}
210211

211-
public static <T> Single<T> defer(final Supplier<? extends SingleConsumable<? extends T>> singleSupplier) {
212+
public static <T> Single<T> defer(final Callable<? extends SingleConsumable<? extends T>> singleSupplier) {
212213
Objects.requireNonNull(singleSupplier, "singleSupplier is null");
213214
return new SingleDefer<T>(singleSupplier);
214215
}
215216

216-
public static <T> Single<T> error(final Supplier<? extends Throwable> errorSupplier) {
217+
public static <T> Single<T> error(final Callable<? extends Throwable> errorSupplier) {
217218
Objects.requireNonNull(errorSupplier, "errorSupplier is null");
218219
return new SingleError<T>(errorSupplier);
219220
}
220221

221222
public static <T> Single<T> error(final Throwable error) {
222223
Objects.requireNonNull(error, "error is null");
223-
return error(new Supplier<Throwable>() {
224+
return error(new Callable<Throwable>() {
224225
@Override
225-
public Throwable get() {
226+
public Throwable call() {
226227
return error;
227228
}
228229
});
@@ -415,13 +416,13 @@ public static <T> Single<Boolean> equals(final SingleConsumable<? extends T> fir
415416
return new SingleEquals<T>(first, second);
416417
}
417418

418-
public static <T, U> Single<T> using(Supplier<U> resourceSupplier,
419+
public static <T, U> Single<T> using(Callable<U> resourceSupplier,
419420
Function<? super U, ? extends SingleConsumable<? extends T>> singleFunction, Consumer<? super U> disposer) {
420421
return using(resourceSupplier, singleFunction, disposer, true);
421422
}
422423

423424
public static <T, U> Single<T> using(
424-
final Supplier<U> resourceSupplier,
425+
final Callable<U> resourceSupplier,
425426
final Function<? super U, ? extends SingleConsumable<? extends T>> singleFunction,
426427
final Consumer<? super U> disposer,
427428
final boolean eager) {
@@ -730,7 +731,7 @@ public final Single<T> observeOn(final Scheduler scheduler) {
730731
return new SingleObserveOn<T>(this, scheduler);
731732
}
732733

733-
public final Single<T> onErrorReturn(final Supplier<? extends T> valueSupplier) {
734+
public final Single<T> onErrorReturn(final Callable<? extends T> valueSupplier) {
734735
Objects.requireNonNull(valueSupplier, "valueSupplier is null");
735736
return new SingleOnErrorReturn<T>(this, valueSupplier, null);
736737
}
@@ -858,7 +859,11 @@ private Single<T> timeout0(final long timeout, final TimeUnit unit, final Schedu
858859
}
859860

860861
public final <R> R to(Function<? super Single<T>, R> convert) {
861-
return convert.apply(this);
862+
try {
863+
return convert.apply(this);
864+
} catch (Throwable ex) {
865+
throw Exceptions.propagate(ex);
866+
}
862867
}
863868

864869
public final Flowable<T> toFlowable() {

src/main/java/io/reactivex/flowables/BlockingFlowable.java

+32-3
Original file line numberDiff line numberDiff line change
@@ -373,19 +373,48 @@ public void subscribe(final Consumer<? super T> onNext, final Consumer<? super T
373373
*/
374374
public void subscribe(final Consumer<? super T> onNext, final Consumer<? super Throwable> onError, final Runnable onComplete) {
375375
subscribe(new DefaultObserver<T>() {
376+
boolean done;
376377
@Override
377378
public void onNext(T t) {
378-
onNext.accept(t);
379+
if (done) {
380+
return;
381+
}
382+
try {
383+
onNext.accept(t);
384+
} catch (Throwable ex) {
385+
Exceptions.throwIfFatal(ex);
386+
cancel();
387+
onError(ex);
388+
}
379389
}
380390

381391
@Override
382392
public void onError(Throwable e) {
383-
onError.accept(e);
393+
if (done) {
394+
RxJavaPlugins.onError(e);
395+
return;
396+
}
397+
done = true;
398+
try {
399+
onError.accept(e);
400+
} catch (Throwable ex) {
401+
Exceptions.throwIfFatal(ex);
402+
RxJavaPlugins.onError(ex);
403+
}
384404
}
385405

386406
@Override
387407
public void onComplete() {
388-
onComplete.run();
408+
if (done) {
409+
return;
410+
}
411+
done = true;
412+
try {
413+
onComplete.run();
414+
} catch (Throwable ex) {
415+
Exceptions.throwIfFatal(ex);
416+
RxJavaPlugins.onError(ex);
417+
}
389418
}
390419
});
391420
}

src/main/java/io/reactivex/functions/BiConsumer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,5 @@
1515

1616
public interface BiConsumer<T1, T2> {
1717

18-
void accept(T1 t1, T2 t2);
18+
void accept(T1 t1, T2 t2) throws Exception;
1919
}

src/main/java/io/reactivex/functions/BiFunction.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,5 @@
1515

1616
public interface BiFunction<T1, T2, R> {
1717

18-
R apply(T1 t1, T2 t2);
18+
R apply(T1 t1, T2 t2) throws Exception;
1919
}

src/main/java/io/reactivex/functions/BiPredicate.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,5 @@
1515

1616
public interface BiPredicate<T1, T2> {
1717

18-
boolean test(T1 t1, T2 t2);
18+
boolean test(T1 t1, T2 t2) throws Exception;
1919
}

src/main/java/io/reactivex/functions/BooleanSupplier.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,5 @@
1414
package io.reactivex.functions;
1515

1616
public interface BooleanSupplier {
17-
boolean getAsBoolean(); // NOPMD
17+
boolean getAsBoolean() throws Exception; // NOPMD
1818
}

src/main/java/io/reactivex/functions/Consumer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,5 @@
1414
package io.reactivex.functions;
1515

1616
public interface Consumer<T> {
17-
void accept(T t);
17+
void accept(T t) throws Exception;
1818
}

src/main/java/io/reactivex/functions/Function.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,5 @@
1414
package io.reactivex.functions;
1515

1616
public interface Function<T, R> {
17-
R apply(T t);
17+
R apply(T t) throws Exception;
1818
}

src/main/java/io/reactivex/functions/Function3.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,5 @@
1414
package io.reactivex.functions;
1515

1616
public interface Function3<T1, T2, T3, R> {
17-
R apply(T1 t1, T2 t2, T3 t3);
17+
R apply(T1 t1, T2 t2, T3 t3) throws Exception;
1818
}

src/main/java/io/reactivex/functions/Function4.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,5 @@
1414
package io.reactivex.functions;
1515

1616
public interface Function4<T1, T2, T3, T4, R> {
17-
R apply(T1 t1, T2 t2, T3 t3, T4 t4);
17+
R apply(T1 t1, T2 t2, T3 t3, T4 t4) throws Exception;
1818
}

src/main/java/io/reactivex/functions/Function5.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,5 @@
1414
package io.reactivex.functions;
1515

1616
public interface Function5<T1, T2, T3, T4, T5, R> {
17-
R apply(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5);
17+
R apply(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5) throws Exception;
1818
}

src/main/java/io/reactivex/functions/Function6.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,5 @@
1414
package io.reactivex.functions;
1515

1616
public interface Function6<T1, T2, T3, T4, T5, T6, R> {
17-
R apply(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6);
17+
R apply(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6) throws Exception;
1818
}

src/main/java/io/reactivex/functions/Function7.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,5 @@
1414
package io.reactivex.functions;
1515

1616
public interface Function7<T1, T2, T3, T4, T5, T6, T7, R> {
17-
R apply(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7);
17+
R apply(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7) throws Exception;
1818
}

src/main/java/io/reactivex/functions/Function8.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,5 @@
1414
package io.reactivex.functions;
1515

1616
public interface Function8<T1, T2, T3, T4, T5, T6, T7, T8, R> {
17-
R apply(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7, T8 t8);
17+
R apply(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7, T8 t8) throws Exception;
1818
}

src/main/java/io/reactivex/functions/Function9.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,5 @@
1414
package io.reactivex.functions;
1515

1616
public interface Function9<T1, T2, T3, T4, T5, T6, T7, T8, T9, R> {
17-
R apply(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7, T8 t8, T9 t9);
17+
R apply(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7, T8 t8, T9 t9) throws Exception;
1818
}

src/main/java/io/reactivex/functions/IntFunction.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,5 @@
1313
package io.reactivex.functions;
1414

1515
public interface IntFunction<T> {
16-
T apply(int i);
16+
T apply(int i) throws Exception;
1717
}

src/main/java/io/reactivex/functions/LongConsumer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,5 @@
1313
package io.reactivex.functions;
1414

1515
public interface LongConsumer {
16-
void accept(long t);
16+
void accept(long t) throws Exception;
1717
}

src/main/java/io/reactivex/functions/Predicate.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,5 @@
1414
package io.reactivex.functions;
1515

1616
public interface Predicate<T> {
17-
boolean test(T t);
17+
boolean test(T t) throws Exception;
1818
}

src/main/java/io/reactivex/internal/disposables/NbpFullArbiter.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -112,13 +112,11 @@ void drain() {
112112
for (;;) {
113113

114114
for (;;) {
115-
Object o = q.peek();
116-
115+
Object o = q.poll();
117116
if (o == null) {
118117
break;
119118
}
120119

121-
q.poll();
122120
Object v = q.poll();
123121

124122
if (o != s) {

0 commit comments

Comments
 (0)