Skip to content

Commit 1ea1291

Browse files
authored
2.x: add parallel hooks to RxJavaPlugins, add missing params validation (#5043)
1 parent ceded86 commit 1ea1291

File tree

3 files changed

+154
-32
lines changed

3 files changed

+154
-32
lines changed

src/main/java/io/reactivex/parallel/ParallelFlowable.java

+50-27
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ public static <T> ParallelFlowable<T> from(Publisher<? extends T> source,
118118
ObjectHelper.verifyPositive(parallelism, "parallelism");
119119
ObjectHelper.verifyPositive(prefetch, "prefetch");
120120

121-
return new ParallelFromPublisher<T>(source, parallelism, prefetch);
121+
return RxJavaPlugins.onAssembly(new ParallelFromPublisher<T>(source, parallelism, prefetch));
122122
}
123123

124124
/**
@@ -132,7 +132,7 @@ public static <T> ParallelFlowable<T> from(Publisher<? extends T> source,
132132
@CheckReturnValue
133133
public final <R> ParallelFlowable<R> map(Function<? super T, ? extends R> mapper) {
134134
ObjectHelper.requireNonNull(mapper, "mapper");
135-
return new ParallelMap<T, R>(this, mapper);
135+
return RxJavaPlugins.onAssembly(new ParallelMap<T, R>(this, mapper));
136136
}
137137

138138
/**
@@ -145,7 +145,7 @@ public final <R> ParallelFlowable<R> map(Function<? super T, ? extends R> mapper
145145
@CheckReturnValue
146146
public final ParallelFlowable<T> filter(Predicate<? super T> predicate) {
147147
ObjectHelper.requireNonNull(predicate, "predicate");
148-
return new ParallelFilter<T>(this, predicate);
148+
return RxJavaPlugins.onAssembly(new ParallelFilter<T>(this, predicate));
149149
}
150150

151151
/**
@@ -197,7 +197,7 @@ public final ParallelFlowable<T> runOn(Scheduler scheduler) {
197197
public final ParallelFlowable<T> runOn(Scheduler scheduler, int prefetch) {
198198
ObjectHelper.requireNonNull(scheduler, "scheduler");
199199
ObjectHelper.verifyPositive(prefetch, "prefetch");
200-
return new ParallelRunOn<T>(this, scheduler, prefetch);
200+
return RxJavaPlugins.onAssembly(new ParallelRunOn<T>(this, scheduler, prefetch));
201201
}
202202

203203
/**
@@ -229,7 +229,7 @@ public final Flowable<T> reduce(BiFunction<T, T, T> reducer) {
229229
public final <R> ParallelFlowable<R> reduce(Callable<R> initialSupplier, BiFunction<R, ? super T, R> reducer) {
230230
ObjectHelper.requireNonNull(initialSupplier, "initialSupplier");
231231
ObjectHelper.requireNonNull(reducer, "reducer");
232-
return new ParallelReduce<T, R>(this, initialSupplier, reducer);
232+
return RxJavaPlugins.onAssembly(new ParallelReduce<T, R>(this, initialSupplier, reducer));
233233
}
234234

235235
/**
@@ -304,6 +304,8 @@ public final Flowable<T> sorted(Comparator<? super T> comparator) {
304304
*/
305305
@CheckReturnValue
306306
public final Flowable<T> sorted(Comparator<? super T> comparator, int capacityHint) {
307+
ObjectHelper.requireNonNull(comparator, "comparator is null");
308+
ObjectHelper.verifyPositive(capacityHint, "capacityHint");
307309
int ch = capacityHint / parallelism() + 1;
308310
ParallelFlowable<List<T>> railReduced = reduce(Functions.<T>createArrayList(ch), ListAddBiConsumer.<T>instance());
309311
ParallelFlowable<List<T>> railSorted = railReduced.map(new SorterFunction<T>(comparator));
@@ -334,6 +336,9 @@ public final Flowable<List<T>> toSortedList(Comparator<? super T> comparator) {
334336
*/
335337
@CheckReturnValue
336338
public final Flowable<List<T>> toSortedList(Comparator<? super T> comparator, int capacityHint) {
339+
ObjectHelper.requireNonNull(comparator, "comparator is null");
340+
ObjectHelper.verifyPositive(capacityHint, "capacityHint");
341+
337342
int ch = capacityHint / parallelism() + 1;
338343
ParallelFlowable<List<T>> railReduced = reduce(Functions.<T>createArrayList(ch), ListAddBiConsumer.<T>instance());
339344
ParallelFlowable<List<T>> railSorted = railReduced.map(new SorterFunction<T>(comparator));
@@ -351,7 +356,8 @@ public final Flowable<List<T>> toSortedList(Comparator<? super T> comparator, in
351356
*/
352357
@CheckReturnValue
353358
public final ParallelFlowable<T> doOnNext(Consumer<? super T> onNext) {
354-
return new ParallelPeek<T>(this,
359+
ObjectHelper.requireNonNull(onNext, "onNext is null");
360+
return RxJavaPlugins.onAssembly(new ParallelPeek<T>(this,
355361
onNext,
356362
Functions.emptyConsumer(),
357363
Functions.emptyConsumer(),
@@ -360,7 +366,7 @@ public final ParallelFlowable<T> doOnNext(Consumer<? super T> onNext) {
360366
Functions.emptyConsumer(),
361367
Functions.EMPTY_LONG_CONSUMER,
362368
Functions.EMPTY_ACTION
363-
);
369+
));
364370
}
365371

366372
/**
@@ -372,7 +378,8 @@ public final ParallelFlowable<T> doOnNext(Consumer<? super T> onNext) {
372378
*/
373379
@CheckReturnValue
374380
public final ParallelFlowable<T> doAfterNext(Consumer<? super T> onAfterNext) {
375-
return new ParallelPeek<T>(this,
381+
ObjectHelper.requireNonNull(onAfterNext, "onAfterNext is null");
382+
return RxJavaPlugins.onAssembly(new ParallelPeek<T>(this,
376383
Functions.emptyConsumer(),
377384
onAfterNext,
378385
Functions.emptyConsumer(),
@@ -381,7 +388,7 @@ public final ParallelFlowable<T> doAfterNext(Consumer<? super T> onAfterNext) {
381388
Functions.emptyConsumer(),
382389
Functions.EMPTY_LONG_CONSUMER,
383390
Functions.EMPTY_ACTION
384-
);
391+
));
385392
}
386393

387394
/**
@@ -392,7 +399,8 @@ public final ParallelFlowable<T> doAfterNext(Consumer<? super T> onAfterNext) {
392399
*/
393400
@CheckReturnValue
394401
public final ParallelFlowable<T> doOnError(Consumer<Throwable> onError) {
395-
return new ParallelPeek<T>(this,
402+
ObjectHelper.requireNonNull(onError, "onError is null");
403+
return RxJavaPlugins.onAssembly(new ParallelPeek<T>(this,
396404
Functions.emptyConsumer(),
397405
Functions.emptyConsumer(),
398406
onError,
@@ -401,7 +409,7 @@ public final ParallelFlowable<T> doOnError(Consumer<Throwable> onError) {
401409
Functions.emptyConsumer(),
402410
Functions.EMPTY_LONG_CONSUMER,
403411
Functions.EMPTY_ACTION
404-
);
412+
));
405413
}
406414

407415
/**
@@ -412,7 +420,8 @@ public final ParallelFlowable<T> doOnError(Consumer<Throwable> onError) {
412420
*/
413421
@CheckReturnValue
414422
public final ParallelFlowable<T> doOnComplete(Action onComplete) {
415-
return new ParallelPeek<T>(this,
423+
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
424+
return RxJavaPlugins.onAssembly(new ParallelPeek<T>(this,
416425
Functions.emptyConsumer(),
417426
Functions.emptyConsumer(),
418427
Functions.emptyConsumer(),
@@ -421,7 +430,7 @@ public final ParallelFlowable<T> doOnComplete(Action onComplete) {
421430
Functions.emptyConsumer(),
422431
Functions.EMPTY_LONG_CONSUMER,
423432
Functions.EMPTY_ACTION
424-
);
433+
));
425434
}
426435

427436
/**
@@ -432,7 +441,8 @@ public final ParallelFlowable<T> doOnComplete(Action onComplete) {
432441
*/
433442
@CheckReturnValue
434443
public final ParallelFlowable<T> doAfterTerminated(Action onAfterTerminate) {
435-
return new ParallelPeek<T>(this,
444+
ObjectHelper.requireNonNull(onAfterTerminate, "onAfterTerminate is null");
445+
return RxJavaPlugins.onAssembly(new ParallelPeek<T>(this,
436446
Functions.emptyConsumer(),
437447
Functions.emptyConsumer(),
438448
Functions.emptyConsumer(),
@@ -441,7 +451,7 @@ public final ParallelFlowable<T> doAfterTerminated(Action onAfterTerminate) {
441451
Functions.emptyConsumer(),
442452
Functions.EMPTY_LONG_CONSUMER,
443453
Functions.EMPTY_ACTION
444-
);
454+
));
445455
}
446456

447457
/**
@@ -452,7 +462,8 @@ public final ParallelFlowable<T> doAfterTerminated(Action onAfterTerminate) {
452462
*/
453463
@CheckReturnValue
454464
public final ParallelFlowable<T> doOnSubscribe(Consumer<? super Subscription> onSubscribe) {
455-
return new ParallelPeek<T>(this,
465+
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
466+
return RxJavaPlugins.onAssembly(new ParallelPeek<T>(this,
456467
Functions.emptyConsumer(),
457468
Functions.emptyConsumer(),
458469
Functions.emptyConsumer(),
@@ -461,7 +472,7 @@ public final ParallelFlowable<T> doOnSubscribe(Consumer<? super Subscription> on
461472
onSubscribe,
462473
Functions.EMPTY_LONG_CONSUMER,
463474
Functions.EMPTY_ACTION
464-
);
475+
));
465476
}
466477

467478
/**
@@ -472,7 +483,8 @@ public final ParallelFlowable<T> doOnSubscribe(Consumer<? super Subscription> on
472483
*/
473484
@CheckReturnValue
474485
public final ParallelFlowable<T> doOnRequest(LongConsumer onRequest) {
475-
return new ParallelPeek<T>(this,
486+
ObjectHelper.requireNonNull(onRequest, "onRequest is null");
487+
return RxJavaPlugins.onAssembly(new ParallelPeek<T>(this,
476488
Functions.emptyConsumer(),
477489
Functions.emptyConsumer(),
478490
Functions.emptyConsumer(),
@@ -481,7 +493,7 @@ public final ParallelFlowable<T> doOnRequest(LongConsumer onRequest) {
481493
Functions.emptyConsumer(),
482494
onRequest,
483495
Functions.EMPTY_ACTION
484-
);
496+
));
485497
}
486498

487499
/**
@@ -492,7 +504,8 @@ public final ParallelFlowable<T> doOnRequest(LongConsumer onRequest) {
492504
*/
493505
@CheckReturnValue
494506
public final ParallelFlowable<T> doOnCancel(Action onCancel) {
495-
return new ParallelPeek<T>(this,
507+
ObjectHelper.requireNonNull(onCancel, "onCancel is null");
508+
return RxJavaPlugins.onAssembly(new ParallelPeek<T>(this,
496509
Functions.emptyConsumer(),
497510
Functions.emptyConsumer(),
498511
Functions.emptyConsumer(),
@@ -501,7 +514,7 @@ public final ParallelFlowable<T> doOnCancel(Action onCancel) {
501514
Functions.emptyConsumer(),
502515
Functions.EMPTY_LONG_CONSUMER,
503516
onCancel
504-
);
517+
));
505518
}
506519

507520
/**
@@ -515,7 +528,9 @@ public final ParallelFlowable<T> doOnCancel(Action onCancel) {
515528
*/
516529
@CheckReturnValue
517530
public final <C> ParallelFlowable<C> collect(Callable<? extends C> collectionSupplier, BiConsumer<? super C, ? super T> collector) {
518-
return new ParallelCollect<T, C>(this, collectionSupplier, collector);
531+
ObjectHelper.requireNonNull(collectionSupplier, "collectionSupplier is null");
532+
ObjectHelper.requireNonNull(collector, "collector is null");
533+
return RxJavaPlugins.onAssembly(new ParallelCollect<T, C>(this, collectionSupplier, collector));
519534
}
520535

521536
/**
@@ -531,7 +546,7 @@ public static <T> ParallelFlowable<T> fromArray(Publisher<T>... publishers) {
531546
if (publishers.length == 0) {
532547
throw new IllegalArgumentException("Zero publishers not supported");
533548
}
534-
return new ParallelFromArray<T>(publishers);
549+
return RxJavaPlugins.onAssembly(new ParallelFromArray<T>(publishers));
535550
}
536551

537552
/**
@@ -562,7 +577,7 @@ public final <U> U to(Function<? super ParallelFlowable<T>, U> converter) {
562577
*/
563578
@CheckReturnValue
564579
public final <U> ParallelFlowable<U> compose(Function<? super ParallelFlowable<T>, ParallelFlowable<U>> composer) {
565-
return to(composer);
580+
return RxJavaPlugins.onAssembly(to(composer));
566581
}
567582

568583
/**
@@ -629,7 +644,10 @@ public final <R> ParallelFlowable<R> flatMap(
629644
public final <R> ParallelFlowable<R> flatMap(
630645
Function<? super T, ? extends Publisher<? extends R>> mapper,
631646
boolean delayError, int maxConcurrency, int prefetch) {
632-
return new ParallelFlatMap<T, R>(this, mapper, delayError, maxConcurrency, prefetch);
647+
ObjectHelper.requireNonNull(mapper, "mapper is null");
648+
ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
649+
ObjectHelper.verifyPositive(prefetch, "prefetch");
650+
return RxJavaPlugins.onAssembly(new ParallelFlatMap<T, R>(this, mapper, delayError, maxConcurrency, prefetch));
633651
}
634652

635653
/**
@@ -661,7 +679,9 @@ public final <R> ParallelFlowable<R> concatMap(
661679
public final <R> ParallelFlowable<R> concatMap(
662680
Function<? super T, ? extends Publisher<? extends R>> mapper,
663681
int prefetch) {
664-
return new ParallelConcatMap<T, R>(this, mapper, prefetch, ErrorMode.IMMEDIATE);
682+
ObjectHelper.requireNonNull(mapper, "mapper is null");
683+
ObjectHelper.verifyPositive(prefetch, "prefetch");
684+
return RxJavaPlugins.onAssembly(new ParallelConcatMap<T, R>(this, mapper, prefetch, ErrorMode.IMMEDIATE));
665685
}
666686

667687
/**
@@ -697,6 +717,9 @@ public final <R> ParallelFlowable<R> concatMapDelayError(
697717
public final <R> ParallelFlowable<R> concatMapDelayError(
698718
Function<? super T, ? extends Publisher<? extends R>> mapper,
699719
int prefetch, boolean tillTheEnd) {
700-
return new ParallelConcatMap<T, R>(this, mapper, prefetch, tillTheEnd ? ErrorMode.END : ErrorMode.BOUNDARY);
720+
ObjectHelper.requireNonNull(mapper, "mapper is null");
721+
ObjectHelper.verifyPositive(prefetch, "prefetch");
722+
return RxJavaPlugins.onAssembly(new ParallelConcatMap<T, R>(
723+
this, mapper, prefetch, tillTheEnd ? ErrorMode.END : ErrorMode.BOUNDARY));
701724
}
702725
}

src/main/java/io/reactivex/plugins/RxJavaPlugins.java

+54-5
Original file line numberDiff line numberDiff line change
@@ -12,19 +12,21 @@
1212
*/
1313
package io.reactivex.plugins;
1414

15+
import java.lang.Thread.UncaughtExceptionHandler;
16+
import java.util.concurrent.*;
17+
18+
import org.reactivestreams.Subscriber;
19+
1520
import io.reactivex.*;
16-
import io.reactivex.annotations.*;
21+
import io.reactivex.annotations.Experimental;
1722
import io.reactivex.flowables.ConnectableFlowable;
1823
import io.reactivex.functions.*;
1924
import io.reactivex.internal.functions.ObjectHelper;
2025
import io.reactivex.internal.schedulers.*;
2126
import io.reactivex.internal.util.ExceptionHelper;
2227
import io.reactivex.observables.ConnectableObservable;
28+
import io.reactivex.parallel.ParallelFlowable;
2329
import io.reactivex.schedulers.Schedulers;
24-
import org.reactivestreams.Subscriber;
25-
26-
import java.lang.Thread.UncaughtExceptionHandler;
27-
import java.util.concurrent.*;
2830

2931
/**
3032
* Utility class to inject handlers to certain standard RxJava operations.
@@ -71,6 +73,9 @@ public final class RxJavaPlugins {
7173

7274
static volatile Function<Completable, Completable> onCompletableAssembly;
7375

76+
@SuppressWarnings("rawtypes")
77+
static volatile Function<ParallelFlowable, ParallelFlowable> onParallelAssembly;
78+
7479
@SuppressWarnings("rawtypes")
7580
static volatile BiFunction<Flowable, Subscriber, Subscriber> onFlowableSubscribe;
7681

@@ -414,6 +419,8 @@ public static void reset() {
414419
setOnMaybeAssembly(null);
415420
setOnMaybeSubscribe(null);
416421

422+
setOnParallelAssembly(null);
423+
417424
setFailOnNonBlockingScheduler(false);
418425
setOnBeforeBlocking(null);
419426
}
@@ -966,6 +973,48 @@ public static Completable onAssembly(Completable source) {
966973
return source;
967974
}
968975

976+
/**
977+
* Sets the specific hook function.
978+
* @param handler the hook function to set, null allowed
979+
* @since 2.0.6 - experimental
980+
*/
981+
@Experimental
982+
@SuppressWarnings("rawtypes")
983+
public static void setOnParallelAssembly(Function<ParallelFlowable, ParallelFlowable> handler) {
984+
if (lockdown) {
985+
throw new IllegalStateException("Plugins can't be changed anymore");
986+
}
987+
onParallelAssembly = handler;
988+
}
989+
990+
/**
991+
* Returns the current hook function.
992+
* @return the hook function, may be null
993+
* @since 2.0.6 - experimental
994+
*/
995+
@Experimental
996+
@SuppressWarnings("rawtypes")
997+
public static Function<ParallelFlowable, ParallelFlowable> getOnParallelAssembly() {
998+
return onParallelAssembly;
999+
}
1000+
1001+
/**
1002+
* Calls the associated hook function.
1003+
* @param <T> the value type of the source
1004+
* @param source the hook's input value
1005+
* @return the value returned by the hook
1006+
* @since 2.0.6 - experimental
1007+
*/
1008+
@Experimental
1009+
@SuppressWarnings({ "rawtypes", "unchecked" })
1010+
public static <T> ParallelFlowable<T> onAssembly(ParallelFlowable<T> source) {
1011+
Function<ParallelFlowable, ParallelFlowable> f = onParallelAssembly;
1012+
if (f != null) {
1013+
return apply(f, source);
1014+
}
1015+
return source;
1016+
}
1017+
9691018
/**
9701019
* Called before an operator attempts a blocking operation
9711020
* such as awaiting a condition or signal

0 commit comments

Comments
 (0)