Skip to content

Commit 212db45

Browse files
authored
2.x: cleanup, coverage, fixes 10/14-2 (#4706)
* 2.x: cleanup, coverage, fixes 10/14-2 * Fix NPEs
1 parent a5df963 commit 212db45

24 files changed

+1407
-261
lines changed

src/main/java/io/reactivex/Flowable.java

+10-10
Original file line numberDiff line numberDiff line change
@@ -5683,7 +5683,7 @@ public final <U extends Collection<? super T>> Flowable<U> buffer(long timespan,
56835683
@BackpressureSupport(BackpressureKind.ERROR)
56845684
@SchedulerSupport(SchedulerSupport.COMPUTATION)
56855685
public final Flowable<List<T>> buffer(long timespan, TimeUnit unit) {
5686-
return buffer(timespan, unit, Integer.MAX_VALUE, Schedulers.computation());
5686+
return buffer(timespan, unit, Schedulers.computation(), Integer.MAX_VALUE);
56875687
}
56885688

56895689
/**
@@ -5717,7 +5717,7 @@ public final Flowable<List<T>> buffer(long timespan, TimeUnit unit) {
57175717
@BackpressureSupport(BackpressureKind.ERROR)
57185718
@SchedulerSupport(SchedulerSupport.COMPUTATION)
57195719
public final Flowable<List<T>> buffer(long timespan, TimeUnit unit, int count) {
5720-
return buffer(timespan, unit, count, Schedulers.computation());
5720+
return buffer(timespan, unit, Schedulers.computation(), count);
57215721
}
57225722

57235723
/**
@@ -5742,19 +5742,19 @@ public final Flowable<List<T>> buffer(long timespan, TimeUnit unit, int count) {
57425742
* buffer
57435743
* @param unit
57445744
* the unit of time which applies to the {@code timespan} argument
5745-
* @param count
5746-
* the maximum size of each buffer before it is emitted
57475745
* @param scheduler
57485746
* the {@link Scheduler} to use when determining the end and start of a buffer
5747+
* @param count
5748+
* the maximum size of each buffer before it is emitted
57495749
* @return a Flowable that emits connected, non-overlapping buffers of items emitted by the source
57505750
* Publisher after a fixed duration or when the buffer reaches maximum capacity (whichever occurs
57515751
* first)
57525752
* @see <a href="http://reactivex.io/documentation/operators/buffer.html">ReactiveX operators documentation: Buffer</a>
57535753
*/
57545754
@BackpressureSupport(BackpressureKind.ERROR)
57555755
@SchedulerSupport(SchedulerSupport.CUSTOM)
5756-
public final Flowable<List<T>> buffer(long timespan, TimeUnit unit, int count, Scheduler scheduler) {
5757-
return buffer(timespan, unit, count, scheduler, ArrayListSupplier.<T>asCallable(), false);
5756+
public final Flowable<List<T>> buffer(long timespan, TimeUnit unit, Scheduler scheduler, int count) {
5757+
return buffer(timespan, unit, scheduler, count, ArrayListSupplier.<T>asCallable(), false);
57585758
}
57595759

57605760
/**
@@ -5780,10 +5780,10 @@ public final Flowable<List<T>> buffer(long timespan, TimeUnit unit, int count, S
57805780
* buffer
57815781
* @param unit
57825782
* the unit of time which applies to the {@code timespan} argument
5783-
* @param count
5784-
* the maximum size of each buffer before it is emitted
57855783
* @param scheduler
57865784
* the {@link Scheduler} to use when determining the end and start of a buffer
5785+
* @param count
5786+
* the maximum size of each buffer before it is emitted
57875787
* @param bufferSupplier
57885788
* a factory function that returns an instance of the collection subclass to be used and returned
57895789
* as the buffer
@@ -5798,7 +5798,7 @@ public final Flowable<List<T>> buffer(long timespan, TimeUnit unit, int count, S
57985798
@SchedulerSupport(SchedulerSupport.CUSTOM)
57995799
public final <U extends Collection<? super T>> Flowable<U> buffer(
58005800
long timespan, TimeUnit unit,
5801-
int count, Scheduler scheduler,
5801+
Scheduler scheduler, int count,
58025802
Callable<U> bufferSupplier,
58035803
boolean restartTimerOnMaxSize) {
58045804
ObjectHelper.requireNonNull(unit, "unit is null");
@@ -5838,7 +5838,7 @@ public final <U extends Collection<? super T>> Flowable<U> buffer(
58385838
@BackpressureSupport(BackpressureKind.ERROR)
58395839
@SchedulerSupport(SchedulerSupport.CUSTOM)
58405840
public final Flowable<List<T>> buffer(long timespan, TimeUnit unit, Scheduler scheduler) {
5841-
return buffer(timespan, unit, Integer.MAX_VALUE, scheduler, ArrayListSupplier.<T>asCallable(), false);
5841+
return buffer(timespan, unit, scheduler, Integer.MAX_VALUE, ArrayListSupplier.<T>asCallable(), false);
58425842
}
58435843

58445844
/**

src/main/java/io/reactivex/Observable.java

+10-10
Original file line numberDiff line numberDiff line change
@@ -4964,7 +4964,7 @@ public final <U extends Collection<? super T>> Observable<U> buffer(long timespa
49644964
*/
49654965
@SchedulerSupport(SchedulerSupport.COMPUTATION)
49664966
public final Observable<List<T>> buffer(long timespan, TimeUnit unit) {
4967-
return buffer(timespan, unit, Integer.MAX_VALUE, Schedulers.computation());
4967+
return buffer(timespan, unit, Schedulers.computation(), Integer.MAX_VALUE);
49684968
}
49694969

49704970
/**
@@ -4994,7 +4994,7 @@ public final Observable<List<T>> buffer(long timespan, TimeUnit unit) {
49944994
*/
49954995
@SchedulerSupport(SchedulerSupport.COMPUTATION)
49964996
public final Observable<List<T>> buffer(long timespan, TimeUnit unit, int count) {
4997-
return buffer(timespan, unit, count, Schedulers.computation());
4997+
return buffer(timespan, unit, Schedulers.computation(), count);
49984998
}
49994999

50005000
/**
@@ -5016,18 +5016,18 @@ public final Observable<List<T>> buffer(long timespan, TimeUnit unit, int count)
50165016
* buffer
50175017
* @param unit
50185018
* the unit of time which applies to the {@code timespan} argument
5019-
* @param count
5020-
* the maximum size of each buffer before it is emitted
50215019
* @param scheduler
50225020
* the {@link Scheduler} to use when determining the end and start of a buffer
5021+
* @param count
5022+
* the maximum size of each buffer before it is emitted
50235023
* @return an Observable that emits connected, non-overlapping buffers of items emitted by the source
50245024
* ObservableSource after a fixed duration or when the buffer reaches maximum capacity (whichever occurs
50255025
* first)
50265026
* @see <a href="http://reactivex.io/documentation/operators/buffer.html">ReactiveX operators documentation: Buffer</a>
50275027
*/
50285028
@SchedulerSupport(SchedulerSupport.CUSTOM)
5029-
public final Observable<List<T>> buffer(long timespan, TimeUnit unit, int count, Scheduler scheduler) {
5030-
return buffer(timespan, unit, count, scheduler, ArrayListSupplier.<T>asCallable(), false);
5029+
public final Observable<List<T>> buffer(long timespan, TimeUnit unit, Scheduler scheduler, int count) {
5030+
return buffer(timespan, unit, scheduler, count, ArrayListSupplier.<T>asCallable(), false);
50315031
}
50325032

50335033
/**
@@ -5050,10 +5050,10 @@ public final Observable<List<T>> buffer(long timespan, TimeUnit unit, int count,
50505050
* buffer
50515051
* @param unit
50525052
* the unit of time which applies to the {@code timespan} argument
5053-
* @param count
5054-
* the maximum size of each buffer before it is emitted
50555053
* @param scheduler
50565054
* the {@link Scheduler} to use when determining the end and start of a buffer
5055+
* @param count
5056+
* the maximum size of each buffer before it is emitted
50575057
* @param bufferSupplier
50585058
* a factory function that returns an instance of the collection subclass to be used and returned
50595059
* as the buffer
@@ -5067,7 +5067,7 @@ public final Observable<List<T>> buffer(long timespan, TimeUnit unit, int count,
50675067
@SchedulerSupport(SchedulerSupport.CUSTOM)
50685068
public final <U extends Collection<? super T>> Observable<U> buffer(
50695069
long timespan, TimeUnit unit,
5070-
int count, Scheduler scheduler,
5070+
Scheduler scheduler, int count,
50715071
Callable<U> bufferSupplier,
50725072
boolean restartTimerOnMaxSize) {
50735073
ObjectHelper.requireNonNull(unit, "unit is null");
@@ -5103,7 +5103,7 @@ public final <U extends Collection<? super T>> Observable<U> buffer(
51035103
*/
51045104
@SchedulerSupport(SchedulerSupport.CUSTOM)
51055105
public final Observable<List<T>> buffer(long timespan, TimeUnit unit, Scheduler scheduler) {
5106-
return buffer(timespan, unit, Integer.MAX_VALUE, scheduler, ArrayListSupplier.<T>asCallable(), false);
5106+
return buffer(timespan, unit, scheduler, Integer.MAX_VALUE, ArrayListSupplier.<T>asCallable(), false);
51075107
}
51085108

51095109
/**

src/main/java/io/reactivex/internal/observers/BasicFuseableObserver.java

-21
Original file line numberDiff line numberDiff line change
@@ -119,27 +119,6 @@ public void onComplete() {
119119
actual.onComplete();
120120
}
121121

122-
/**
123-
* Calls the upstream's QueueDisposable.requestFusion with the mode and
124-
* saves the established mode in {@link #sourceMode}.
125-
* <p>
126-
* If the upstream doesn't support fusion ({@link #qs} is null), the method
127-
* returns {@link QueueDisposable#NONE}.
128-
* @param mode the fusion mode requested
129-
* @return the established fusion mode
130-
*/
131-
protected final int transitiveFusion(int mode) {
132-
QueueDisposable<T> qs = this.qs;
133-
if (qs != null) {
134-
int m = qs.requestFusion(mode);
135-
if (m != NONE) {
136-
sourceMode = m;
137-
}
138-
return m;
139-
}
140-
return NONE;
141-
}
142-
143122
/**
144123
* Calls the upstream's QueueDisposable.requestFusion with the mode and
145124
* saves the established mode in {@link #sourceMode} if that mode doesn't

src/main/java/io/reactivex/internal/operators/observable/ObservableBufferTimed.java

-14
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,6 @@ static final class BufferExactUnboundedObserver<T, U extends Collection<? super
8989

9090
U buffer;
9191

92-
boolean selfCancel;
93-
9492
final AtomicReference<Disposable> timer = new AtomicReference<Disposable>();
9593

9694
BufferExactUnboundedObserver(
@@ -183,23 +181,12 @@ public boolean isDisposed() {
183181

184182
@Override
185183
public void run() {
186-
/*
187-
* If running on a synchronous scheduler, the timer might never
188-
* be set so the periodic timer can't be stopped this loop-back way.
189-
* The last resort is to crash the task so it hopefully won't
190-
* be rescheduled.
191-
*/
192-
if (selfCancel) {
193-
throw new CancellationException();
194-
}
195-
196184
U next;
197185

198186
try {
199187
next = ObjectHelper.requireNonNull(bufferSupplier.call(), "The bufferSupplier returned a null buffer");
200188
} catch (Throwable e) {
201189
Exceptions.throwIfFatal(e);
202-
selfCancel = true;
203190
dispose();
204191
actual.onError(e);
205192
return;
@@ -215,7 +202,6 @@ public void run() {
215202
}
216203

217204
if (current == null) {
218-
selfCancel = true;
219205
DisposableHelper.dispose(timer);
220206
return;
221207
}

0 commit comments

Comments
 (0)