Skip to content

Commit b5c0fb8

Browse files
jschneiderakarnokd
authored andcommitted
2.x: Feature/#4876 more null checks (#5055)
* add NotNull annotations add assertion to help static code analysis * avoid false positive * add annotations and assert statement to help static code analysis * remove redundant check * mark parameter as nullable * add test to reproduce npe * add null check for avoid npe * parameter time unit marked as @nonnull * add annotations and assert to help static code analysis * remove assert statements * add missing annotation * add comment for test case
1 parent 71cda3e commit b5c0fb8

File tree

7 files changed

+46
-13
lines changed

7 files changed

+46
-13
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableCombineLatest.java

+7-4
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import java.util.Iterator;
1717
import java.util.concurrent.atomic.*;
1818

19+
import io.reactivex.annotations.NonNull;
1920
import io.reactivex.annotations.Nullable;
2021
import org.reactivestreams.*;
2122

@@ -37,8 +38,10 @@
3738
public final class FlowableCombineLatest<T, R>
3839
extends Flowable<R> {
3940

41+
@Nullable
4042
final Publisher<? extends T>[] array;
4143

44+
@Nullable
4245
final Iterable<? extends Publisher<? extends T>> iterable;
4346

4447
final Function<? super Object[], ? extends R> combiner;
@@ -47,8 +50,8 @@ public final class FlowableCombineLatest<T, R>
4750

4851
final boolean delayErrors;
4952

50-
public FlowableCombineLatest(Publisher<? extends T>[] array,
51-
Function<? super Object[], ? extends R> combiner,
53+
public FlowableCombineLatest(@NonNull Publisher<? extends T>[] array,
54+
@NonNull Function<? super Object[], ? extends R> combiner,
5255
int bufferSize, boolean delayErrors) {
5356
this.array = array;
5457
this.iterable = null;
@@ -57,8 +60,8 @@ public FlowableCombineLatest(Publisher<? extends T>[] array,
5760
this.delayErrors = delayErrors;
5861
}
5962

60-
public FlowableCombineLatest(Iterable<? extends Publisher<? extends T>> iterable,
61-
Function<? super Object[], ? extends R> combiner,
63+
public FlowableCombineLatest(@NonNull Iterable<? extends Publisher<? extends T>> iterable,
64+
@NonNull Function<? super Object[], ? extends R> combiner,
6265
int bufferSize, boolean delayErrors) {
6366
this.array = null;
6467
this.iterable = iterable;

src/main/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFromMany.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import java.util.Arrays;
1616
import java.util.concurrent.atomic.*;
1717

18+
import io.reactivex.annotations.NonNull;
19+
import io.reactivex.annotations.Nullable;
1820
import org.reactivestreams.*;
1921

2022
import io.reactivex.disposables.Disposable;
@@ -33,21 +35,22 @@
3335
* @param <R> the output type
3436
*/
3537
public final class FlowableWithLatestFromMany<T, R> extends AbstractFlowableWithUpstream<T, R> {
36-
38+
@Nullable
3739
final Publisher<?>[] otherArray;
3840

41+
@Nullable
3942
final Iterable<? extends Publisher<?>> otherIterable;
4043

4144
final Function<? super Object[], R> combiner;
4245

43-
public FlowableWithLatestFromMany(Publisher<T> source, Publisher<?>[] otherArray, Function<? super Object[], R> combiner) {
46+
public FlowableWithLatestFromMany(@NonNull Publisher<T> source, @NonNull Publisher<?>[] otherArray, Function<? super Object[], R> combiner) {
4447
super(source);
4548
this.otherArray = otherArray;
4649
this.otherIterable = null;
4750
this.combiner = combiner;
4851
}
4952

50-
public FlowableWithLatestFromMany(Publisher<T> source, Iterable<? extends Publisher<?>> otherIterable, Function<? super Object[], R> combiner) {
53+
public FlowableWithLatestFromMany(@NonNull Publisher<T> source, @NonNull Iterable<? extends Publisher<?>> otherIterable, @NonNull Function<? super Object[], R> combiner) {
5154
super(source);
5255
this.otherArray = null;
5356
this.otherIterable = otherIterable;

src/main/java/io/reactivex/internal/operators/maybe/MaybeFlatMapIterableObservable.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ public void onSuccess(T value) {
101101

102102
this.it = iter;
103103

104-
if (outputFused && iter != null) {
104+
if (outputFused) {
105105
a.onNext(null);
106106
a.onComplete();
107107
return;

src/main/java/io/reactivex/internal/operators/observable/ObservableWithLatestFromMany.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
import java.util.concurrent.atomic.*;
1717

1818
import io.reactivex.*;
19+
import io.reactivex.annotations.NonNull;
20+
import io.reactivex.annotations.Nullable;
1921
import io.reactivex.disposables.Disposable;
2022
import io.reactivex.exceptions.Exceptions;
2123
import io.reactivex.functions.Function;
@@ -33,20 +35,23 @@
3335
*/
3436
public final class ObservableWithLatestFromMany<T, R> extends AbstractObservableWithUpstream<T, R> {
3537

38+
@Nullable
3639
final ObservableSource<?>[] otherArray;
3740

41+
@Nullable
3842
final Iterable<? extends ObservableSource<?>> otherIterable;
3943

44+
@NonNull
4045
final Function<? super Object[], R> combiner;
4146

42-
public ObservableWithLatestFromMany(ObservableSource<T> source, ObservableSource<?>[] otherArray, Function<? super Object[], R> combiner) {
47+
public ObservableWithLatestFromMany(@NonNull ObservableSource<T> source, @NonNull ObservableSource<?>[] otherArray, @NonNull Function<? super Object[], R> combiner) {
4348
super(source);
4449
this.otherArray = otherArray;
4550
this.otherIterable = null;
4651
this.combiner = combiner;
4752
}
4853

49-
public ObservableWithLatestFromMany(ObservableSource<T> source, Iterable<? extends ObservableSource<?>> otherIterable, Function<? super Object[], R> combiner) {
54+
public ObservableWithLatestFromMany(@NonNull ObservableSource<T> source, @NonNull Iterable<? extends ObservableSource<?>> otherIterable, @NonNull Function<? super Object[], R> combiner) {
5055
super(source);
5156
this.otherArray = null;
5257
this.otherIterable = otherIterable;

src/main/java/io/reactivex/internal/schedulers/ComputationScheduler.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ public Disposable schedule(@NonNull Runnable action) {
199199
return EmptyDisposable.INSTANCE;
200200
}
201201

202-
return poolWorker.scheduleActual(action, 0, null, serial);
202+
return poolWorker.scheduleActual(action, 0, TimeUnit.MILLISECONDS, serial);
203203
}
204204
@NonNull
205205
@Override

src/main/java/io/reactivex/internal/schedulers/NewThreadWorker.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import io.reactivex.Scheduler;
1919
import io.reactivex.annotations.NonNull;
20+
import io.reactivex.annotations.Nullable;
2021
import io.reactivex.disposables.*;
2122
import io.reactivex.internal.disposables.*;
2223
import io.reactivex.plugins.RxJavaPlugins;
@@ -106,7 +107,8 @@ public Disposable schedulePeriodicallyDirect(final Runnable run, long initialDel
106107
* @param parent the optional tracker parent to add the created ScheduledRunnable instance to before it gets scheduled
107108
* @return the ScheduledRunnable instance
108109
*/
109-
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, TimeUnit unit, DisposableContainer parent) {
110+
@NonNull
111+
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
110112
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
111113

112114
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
@@ -126,7 +128,9 @@ public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, Time
126128
}
127129
sr.setFuture(f);
128130
} catch (RejectedExecutionException ex) {
129-
parent.remove(sr);
131+
if (parent != null) {
132+
parent.remove(sr);
133+
}
130134
RxJavaPlugins.onError(ex);
131135
}
132136

src/test/java/io/reactivex/schedulers/NewThreadSchedulerTest.java

+18
Original file line numberDiff line numberDiff line change
@@ -115,4 +115,22 @@ public void run() {
115115

116116
assertEquals(0, calls[0]);
117117
}
118+
119+
/**
120+
* Regression test to ensure there is no NPE when the worker has been disposed
121+
*/
122+
@Test
123+
public void npeRegression() throws Exception {
124+
Scheduler s = getScheduler();
125+
NewThreadWorker w = (NewThreadWorker) s.createWorker();
126+
w.dispose();
127+
128+
//This method used to throw a NPE when the worker has been disposed and the parent is null
129+
w.scheduleActual(new Runnable() {
130+
@Override
131+
public void run() {
132+
}
133+
}, 0, TimeUnit.MILLISECONDS, null);
134+
135+
}
118136
}

0 commit comments

Comments
 (0)