Skip to content

Commit 9099f90

Browse files
authored
2.x: test sync and bugfixes (#4268)
1 parent 3f324c6 commit 9099f90

26 files changed

+2323
-276
lines changed

src/main/java/io/reactivex/Flowable.java

+28-25
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ public static <T, R> Flowable<R> combineLatest(Function<Object[], ? extends R> c
110110
public static <T, R> Flowable<R> combineLatest(Publisher<? extends T>[] sources, Function<Object[], ? extends R> combiner, int bufferSize) {
111111
Objects.requireNonNull(sources, "sources is null");
112112
Objects.requireNonNull(combiner, "combiner is null");
113-
validateBufferSize(bufferSize);
113+
validateBufferSize(bufferSize, "bufferSize");
114114
if (sources.length == 0) {
115115
return empty();
116116
}
@@ -128,7 +128,7 @@ public static <T, R> Flowable<R> combineLatest(Iterable<? extends Publisher<? ex
128128
public static <T, R> Flowable<R> combineLatest(Iterable<? extends Publisher<? extends T>> sources, Function<Object[], ? extends R> combiner, int bufferSize) {
129129
Objects.requireNonNull(sources, "sources is null");
130130
Objects.requireNonNull(combiner, "combiner is null");
131-
validateBufferSize(bufferSize);
131+
validateBufferSize(bufferSize, "bufferSize");
132132
return new FlowableCombineLatest<T, R>(sources, combiner, bufferSize, false);
133133
}
134134

@@ -149,7 +149,7 @@ public static <T, R> Flowable<R> combineLatestDelayError(Function<Object[], ? ex
149149
public static <T, R> Flowable<R> combineLatestDelayError(Publisher<? extends T>[] sources, Function<Object[], ? extends R> combiner, int bufferSize) {
150150
Objects.requireNonNull(sources, "sources is null");
151151
Objects.requireNonNull(combiner, "combiner is null");
152-
validateBufferSize(bufferSize);
152+
validateBufferSize(bufferSize, "bufferSize");
153153
if (sources.length == 0) {
154154
return empty();
155155
}
@@ -167,7 +167,7 @@ public static <T, R> Flowable<R> combineLatestDelayError(Iterable<? extends Publ
167167
public static <T, R> Flowable<R> combineLatestDelayError(Iterable<? extends Publisher<? extends T>> sources, Function<Object[], ? extends R> combiner, int bufferSize) {
168168
Objects.requireNonNull(sources, "sources is null");
169169
Objects.requireNonNull(combiner, "combiner is null");
170-
validateBufferSize(bufferSize);
170+
validateBufferSize(bufferSize, "bufferSize");
171171
return new FlowableCombineLatest<T, R>(sources, combiner, bufferSize, true);
172172
}
173173

@@ -447,7 +447,7 @@ public static <T> Flowable<T> concatEager(Iterable<? extends Publisher<? extends
447447
public static <T> Flowable<T> concatEager(Iterable<? extends Publisher<? extends T>> sources, int maxConcurrency, int prefetch) {
448448
return new FlowableConcatMapEager(new FlowableFromIterable(sources), Functions.identity(), maxConcurrency, prefetch, ErrorMode.IMMEDIATE);
449449
}
450-
450+
451451
@BackpressureSupport(BackpressureKind.FULL)
452452
@SchedulerSupport(SchedulerSupport.NONE)
453453
public static <T> Flowable<T> concatDelayError(Iterable<? extends Publisher<? extends T>> sources) {
@@ -1037,7 +1037,7 @@ public static <T> Flowable<T> never() {
10371037
@SchedulerSupport(SchedulerSupport.NONE)
10381038
public static Flowable<Integer> range(int start, int count) {
10391039
if (count < 0) {
1040-
throw new IllegalArgumentException("count >= required but it was " + count);
1040+
throw new IllegalArgumentException("count >= 0 required but it was " + count);
10411041
} else
10421042
if (count == 0) {
10431043
return empty();
@@ -1069,7 +1069,7 @@ public static <T> Flowable<Boolean> sequenceEqual(Publisher<? extends T> p1, Pub
10691069
Objects.requireNonNull(p1, "p1 is null");
10701070
Objects.requireNonNull(p2, "p2 is null");
10711071
Objects.requireNonNull(isEqual, "isEqual is null");
1072-
validateBufferSize(bufferSize);
1072+
validateBufferSize(bufferSize, "bufferSize");
10731073
return new FlowableSequenceEqual<T>(p1, p2, isEqual, bufferSize);
10741074
}
10751075

@@ -1126,9 +1126,9 @@ public static <T, D> Flowable<T> using(Supplier<? extends D> resourceSupplier, F
11261126
return new FlowableUsing<T, D>(resourceSupplier, sourceSupplier, disposer, eager);
11271127
}
11281128

1129-
private static void validateBufferSize(int bufferSize) {
1129+
private static void validateBufferSize(int bufferSize, String paramName) {
11301130
if (bufferSize <= 0) {
1131-
throw new IllegalArgumentException("bufferSize > 0 required but it was " + bufferSize);
1131+
throw new IllegalArgumentException(paramName + " > 0 required but it was " + bufferSize);
11321132
}
11331133
}
11341134

@@ -1260,7 +1260,7 @@ public static <T, R> Flowable<R> zipArray(Function<? super Object[], ? extends R
12601260
return empty();
12611261
}
12621262
Objects.requireNonNull(zipper, "zipper is null");
1263-
validateBufferSize(bufferSize);
1263+
validateBufferSize(bufferSize, "bufferSize");
12641264
return new FlowableZip<T, R>(sources, null, zipper, bufferSize, delayError);
12651265
}
12661266

@@ -1271,7 +1271,7 @@ public static <T, R> Flowable<R> zipIterable(Function<? super Object[], ? extend
12711271
Iterable<? extends Publisher<? extends T>> sources) {
12721272
Objects.requireNonNull(zipper, "zipper is null");
12731273
Objects.requireNonNull(sources, "sources is null");
1274-
validateBufferSize(bufferSize);
1274+
validateBufferSize(bufferSize, "bufferSize");
12751275
return new FlowableZip<T, R>(null, sources, zipper, bufferSize, delayError);
12761276
}
12771277

@@ -1344,7 +1344,6 @@ public List<T> get() {
13441344
@BackpressureSupport(BackpressureKind.FULL)
13451345
@SchedulerSupport(SchedulerSupport.NONE)
13461346
public final <U extends Collection<? super T>> Flowable<U> buffer(int count, int skip, Supplier<U> bufferSupplier) {
1347-
Objects.requireNonNull(bufferSupplier, "bufferSupplier is null");
13481347
return new FlowableBuffer<T, U>(this, count, skip, bufferSupplier);
13491348
}
13501349

@@ -1622,6 +1621,10 @@ public final <R> Flowable<R> concatMapEager(Function<? super T, ? extends Publis
16221621

16231622
public final <R> Flowable<R> concatMapEager(Function<? super T, ? extends Publisher<? extends R>> mapper,
16241623
int maxConcurrency, int prefetch) {
1624+
if (maxConcurrency <= 0) {
1625+
throw new IllegalArgumentException("maxConcurrency > 0 required but it was " + maxConcurrency);
1626+
}
1627+
validateBufferSize(prefetch, "prefetch");
16251628
return new FlowableConcatMapEager<T, R>(this, mapper, maxConcurrency, prefetch, ErrorMode.IMMEDIATE);
16261629
}
16271630

@@ -2063,7 +2066,7 @@ public final Flowable<T> filter(Predicate<? super T> predicate) {
20632066

20642067
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
20652068
@SchedulerSupport(SchedulerSupport.NONE)
2066-
public final Flowable<T> finallyDo(Runnable onFinally) {
2069+
public final Flowable<T> doAfterTerminate(Runnable onFinally) {
20672070
return doOnEach(Functions.emptyConsumer(), Functions.emptyConsumer(), Functions.emptyRunnable(), onFinally);
20682071
}
20692072

@@ -2113,7 +2116,7 @@ public final <R> Flowable<R> flatMap(Function<? super T, ? extends Publisher<? e
21132116
if (maxConcurrency <= 0) {
21142117
throw new IllegalArgumentException("maxConcurrency > 0 required but it was " + maxConcurrency);
21152118
}
2116-
validateBufferSize(bufferSize);
2119+
validateBufferSize(bufferSize, "bufferSize");
21172120
return new FlowableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize);
21182121
}
21192122

@@ -2308,7 +2311,7 @@ public final <K, V> Flowable<GroupedFlowable<K, V>> groupBy(Function<? super T,
23082311
boolean delayError, int bufferSize) {
23092312
Objects.requireNonNull(keySelector, "keySelector is null");
23102313
Objects.requireNonNull(valueSelector, "valueSelector is null");
2311-
validateBufferSize(bufferSize);
2314+
validateBufferSize(bufferSize, "bufferSize");
23122315

23132316
return new FlowableGroupBy<T, K, V>(this, keySelector, valueSelector, bufferSize, delayError);
23142317
}
@@ -2418,7 +2421,7 @@ public final Flowable<T> observeOn(Scheduler scheduler, boolean delayError) {
24182421
@SchedulerSupport(SchedulerSupport.CUSTOM)
24192422
public final Flowable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
24202423
Objects.requireNonNull(scheduler, "scheduler is null");
2421-
validateBufferSize(bufferSize);
2424+
validateBufferSize(bufferSize, "bufferSize");
24222425
return new FlowableObserveOn<T>(this, scheduler, delayError, bufferSize);
24232426
}
24242427

@@ -2461,7 +2464,7 @@ public final Flowable<T> onBackpressureBuffer(int bufferSize, boolean delayError
24612464
@BackpressureSupport(BackpressureKind.SPECIAL)
24622465
@SchedulerSupport(SchedulerSupport.NONE)
24632466
public final Flowable<T> onBackpressureBuffer(int bufferSize, boolean delayError, boolean unbounded) {
2464-
validateBufferSize(bufferSize);
2467+
validateBufferSize(bufferSize, "bufferSize");
24652468
return new FlowableOnBackpressureBuffer<T>(this, bufferSize, unbounded, delayError, Functions.emptyRunnable());
24662469
}
24672470

@@ -2569,15 +2572,15 @@ public final <R> Flowable<R> publish(Function<? super Flowable<T>, ? extends Pub
25692572
@BackpressureSupport(BackpressureKind.FULL)
25702573
@SchedulerSupport(SchedulerSupport.NONE)
25712574
public final <R> Flowable<R> publish(Function<? super Flowable<T>, ? extends Publisher<R>> selector, int bufferSize) {
2572-
validateBufferSize(bufferSize);
2575+
validateBufferSize(bufferSize, "bufferSize");
25732576
Objects.requireNonNull(selector, "selector is null");
25742577
return FlowablePublish.create(this, selector, bufferSize);
25752578
}
25762579

25772580
@BackpressureSupport(BackpressureKind.FULL)
25782581
@SchedulerSupport(SchedulerSupport.NONE)
25792582
public final ConnectableFlowable<T> publish(int bufferSize) {
2580-
validateBufferSize(bufferSize);
2583+
validateBufferSize(bufferSize, "bufferSize");
25812584
return FlowablePublish.create(this, bufferSize);
25822585
}
25832586

@@ -3041,7 +3044,7 @@ public final Flowable<T> skipLast(long time, TimeUnit unit, Scheduler scheduler,
30413044
public final Flowable<T> skipLast(long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize) {
30423045
Objects.requireNonNull(unit, "unit is null");
30433046
Objects.requireNonNull(scheduler, "scheduler is null");
3044-
validateBufferSize(bufferSize);
3047+
validateBufferSize(bufferSize, "bufferSize");
30453048
// the internal buffer holds pairs of (timestamp, value) so double the default buffer size
30463049
int s = bufferSize << 1;
30473050
return new FlowableSkipLastTimed<T>(this, time, unit, scheduler, s, delayError);
@@ -3218,7 +3221,7 @@ public final <R> Flowable<R> switchMap(Function<? super T, ? extends Publisher<?
32183221
}
32193222
return ScalarXMap.scalarXMap(v, mapper);
32203223
}
3221-
validateBufferSize(bufferSize);
3224+
validateBufferSize(bufferSize, "bufferSize");
32223225
return new FlowableSwitchMap<T, R>(this, mapper, bufferSize);
32233226
}
32243227

@@ -3288,7 +3291,7 @@ public final Flowable<T> takeLast(long count, long time, TimeUnit unit, Schedule
32883291
public final Flowable<T> takeLast(long count, long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize) {
32893292
Objects.requireNonNull(unit, "unit is null");
32903293
Objects.requireNonNull(scheduler, "scheduler is null");
3291-
validateBufferSize(bufferSize);
3294+
validateBufferSize(bufferSize, "bufferSize");
32923295
if (count < 0) {
32933296
throw new IndexOutOfBoundsException("count >= 0 required but it was " + count);
32943297
}
@@ -3823,7 +3826,7 @@ public final Flowable<Flowable<T>> window(long count, long skip, int bufferSize)
38233826
if (count <= 0) {
38243827
throw new IllegalArgumentException("count > 0 required but it was " + count);
38253828
}
3826-
validateBufferSize(bufferSize);
3829+
validateBufferSize(bufferSize, "bufferSize");
38273830
return new FlowableWindow<T>(this, count, skip, bufferSize);
38283831
}
38293832

@@ -3842,7 +3845,7 @@ public final Flowable<Flowable<T>> window(long timespan, long timeskip, TimeUnit
38423845
@BackpressureSupport(BackpressureKind.ERROR)
38433846
@SchedulerSupport(SchedulerSupport.CUSTOM)
38443847
public final Flowable<Flowable<T>> window(long timespan, long timeskip, TimeUnit unit, Scheduler scheduler, int bufferSize) {
3845-
validateBufferSize(bufferSize);
3848+
validateBufferSize(bufferSize, "bufferSize");
38463849
Objects.requireNonNull(scheduler, "scheduler is null");
38473850
Objects.requireNonNull(unit, "unit is null");
38483851
return new FlowableWindowTimed<T>(this, timespan, timeskip, unit, scheduler, Long.MAX_VALUE, bufferSize, false);
@@ -3894,7 +3897,7 @@ public final Flowable<Flowable<T>> window(long timespan, TimeUnit unit,
38943897
public final Flowable<Flowable<T>> window(
38953898
long timespan, TimeUnit unit, Scheduler scheduler,
38963899
long count, boolean restart, int bufferSize) {
3897-
validateBufferSize(bufferSize);
3900+
validateBufferSize(bufferSize, "bufferSize");
38983901
Objects.requireNonNull(scheduler, "scheduler is null");
38993902
Objects.requireNonNull(unit, "unit is null");
39003903
if (count <= 0) {

0 commit comments

Comments
 (0)