Skip to content

Commit c460dd5

Browse files
authored
2.x: coverage and cleanup 10/17-1 (#4717)
1 parent e255de7 commit c460dd5

15 files changed

+411
-124
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableCreate.java

-5
Original file line numberDiff line numberDiff line change
@@ -337,11 +337,6 @@ static final class NoneEmitter<T> extends BaseEmitter<T> {
337337

338338
@Override
339339
public void onNext(T t) {
340-
if (t == null) {
341-
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
342-
return;
343-
}
344-
345340
if (isCancelled()) {
346341
return;
347342
}

src/main/java/io/reactivex/internal/operators/observable/ObservableDematerialize.java

+3
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@ public boolean isDisposed() {
6464
@Override
6565
public void onNext(Notification<T> t) {
6666
if (done) {
67+
if (t.isOnError()) {
68+
RxJavaPlugins.onError(t.getError());
69+
}
6770
return;
6871
}
6972
if (t.isOnError()) {

src/main/java/io/reactivex/internal/operators/observable/ObservableFromFuture.java

+4-9
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,12 @@
1313

1414
package io.reactivex.internal.operators.observable;
1515

16-
import io.reactivex.internal.functions.ObjectHelper;
1716
import java.util.concurrent.*;
1817

1918
import io.reactivex.*;
20-
import io.reactivex.disposables.*;
2119
import io.reactivex.exceptions.Exceptions;
20+
import io.reactivex.internal.functions.ObjectHelper;
21+
import io.reactivex.internal.observers.DeferredScalarDisposable;
2222

2323
public final class ObservableFromFuture<T> extends Observable<T> {
2424
final Future<? extends T> future;
@@ -33,7 +33,7 @@ public ObservableFromFuture(Future<? extends T> future, long timeout, TimeUnit u
3333

3434
@Override
3535
public void subscribeActual(Observer<? super T> s) {
36-
Disposable d = Disposables.empty();
36+
DeferredScalarDisposable<T> d = new DeferredScalarDisposable<T>(s);
3737
s.onSubscribe(d);
3838
if (!d.isDisposed()) {
3939
T v;
@@ -45,13 +45,8 @@ public void subscribeActual(Observer<? super T> s) {
4545
s.onError(ex);
4646
}
4747
return;
48-
} finally {
49-
future.cancel(true); // TODO ?? not sure about this
50-
}
51-
if (!d.isDisposed()) {
52-
s.onNext(v);
53-
s.onComplete();
5448
}
49+
d.complete(v);
5550
}
5651
}
5752
}

src/main/java/io/reactivex/internal/operators/observable/ObservableSkipLastTimed.java

+30-46
Original file line numberDiff line numberDiff line change
@@ -103,12 +103,12 @@ public void onComplete() {
103103

104104
@Override
105105
public void dispose() {
106-
if (cancelled) {
106+
if (!cancelled) {
107107
cancelled = true;
108+
s.dispose();
108109

109110
if (getAndIncrement() == 0) {
110111
queue.clear();
111-
s.dispose();
112112
}
113113
}
114114
}
@@ -134,11 +134,12 @@ void drain() {
134134

135135
for (;;) {
136136

137-
if (checkTerminated(done, q.isEmpty(), a, delayError)) {
138-
return;
139-
}
140-
141137
for (;;) {
138+
if (cancelled) {
139+
queue.clear();
140+
return;
141+
}
142+
142143
boolean d = done;
143144

144145
Long ts = (Long)q.peek();
@@ -151,19 +152,35 @@ void drain() {
151152
empty = true;
152153
}
153154

154-
if (checkTerminated(d, empty, a, delayError)) {
155-
return;
155+
if (d) {
156+
if (delayError) {
157+
if (empty) {
158+
Throwable e = error;
159+
if (e != null) {
160+
a.onError(e);
161+
} else {
162+
a.onComplete();
163+
}
164+
return;
165+
}
166+
} else {
167+
Throwable e = error;
168+
if (e != null) {
169+
queue.clear();
170+
a.onError(e);
171+
return;
172+
} else
173+
if (empty) {
174+
a.onComplete();
175+
return;
176+
}
177+
}
156178
}
157179

158180
if (empty) {
159181
break;
160182
}
161183

162-
if (ts > now - time) {
163-
// not old enough
164-
break;
165-
}
166-
167184
q.poll();
168185
@SuppressWarnings("unchecked")
169186
T v = (T)q.poll();
@@ -177,38 +194,5 @@ void drain() {
177194
}
178195
}
179196
}
180-
181-
boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a, boolean delayError) {
182-
if (cancelled) {
183-
queue.clear();
184-
s.dispose();
185-
return true;
186-
}
187-
if (d) {
188-
if (delayError) {
189-
if (empty) {
190-
Throwable e = error;
191-
if (e != null) {
192-
a.onError(e);
193-
} else {
194-
a.onComplete();
195-
}
196-
return true;
197-
}
198-
} else {
199-
Throwable e = error;
200-
if (e != null) {
201-
queue.clear();
202-
a.onError(e);
203-
return true;
204-
} else
205-
if (empty) {
206-
a.onComplete();
207-
return true;
208-
}
209-
}
210-
}
211-
return false;
212-
}
213197
}
214198
}

src/main/java/io/reactivex/internal/operators/observable/ObservableWithLatestFrom.java

+12-33
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,8 @@
1919
import io.reactivex.disposables.Disposable;
2020
import io.reactivex.exceptions.Exceptions;
2121
import io.reactivex.functions.BiFunction;
22-
import io.reactivex.internal.disposables.*;
22+
import io.reactivex.internal.disposables.DisposableHelper;
2323
import io.reactivex.observers.SerializedObserver;
24-
import io.reactivex.plugins.RxJavaPlugins;
2524

2625
public final class ObservableWithLatestFrom<T, U, R> extends AbstractObservableWithUpstream<T, R> {
2726
final BiFunction<? super T, ? super U, ? extends R> combiner;
@@ -38,6 +37,8 @@ public void subscribeActual(Observer<? super R> t) {
3837
final SerializedObserver<R> serial = new SerializedObserver<R>(t);
3938
final WithLatestFromObserver<T, U, R> wlf = new WithLatestFromObserver<T, U, R>(serial, combiner);
4039

40+
t.onSubscribe(wlf);
41+
4142
other.subscribe(new Observer<U>() {
4243
@Override
4344
public void onSubscribe(Disposable s) {
@@ -68,6 +69,7 @@ static final class WithLatestFromObserver<T, U, R> extends AtomicReference<U> im
6869
private static final long serialVersionUID = -312246233408980075L;
6970

7071
final Observer<? super R> actual;
72+
7173
final BiFunction<? super T, ? super U, ? extends R> combiner;
7274

7375
final AtomicReference<Disposable> s = new AtomicReference<Disposable>();
@@ -80,9 +82,7 @@ static final class WithLatestFromObserver<T, U, R> extends AtomicReference<U> im
8082
}
8183
@Override
8284
public void onSubscribe(Disposable s) {
83-
if (DisposableHelper.setOnce(this.s, s)) {
84-
actual.onSubscribe(this);
85-
}
85+
DisposableHelper.setOnce(this.s, s);
8686
}
8787

8888
@Override
@@ -116,43 +116,22 @@ public void onComplete() {
116116

117117
@Override
118118
public void dispose() {
119-
s.get().dispose();
119+
DisposableHelper.dispose(s);
120120
DisposableHelper.dispose(other);
121121
}
122122

123-
@Override public boolean isDisposed() {
124-
return s.get().isDisposed();
123+
@Override
124+
public boolean isDisposed() {
125+
return DisposableHelper.isDisposed(s.get());
125126
}
126127

127128
public boolean setOther(Disposable o) {
128-
for (;;) {
129-
Disposable current = other.get();
130-
if (current == DisposableHelper.DISPOSED) {
131-
o.dispose();
132-
return false;
133-
}
134-
if (current != null) {
135-
RxJavaPlugins.onError(new IllegalStateException("Other subscription already set!"));
136-
o.dispose();
137-
return false;
138-
}
139-
if (other.compareAndSet(null, o)) {
140-
return true;
141-
}
142-
}
129+
return DisposableHelper.setOnce(other, o);
143130
}
144131

145132
public void otherError(Throwable e) {
146-
if (this.s.compareAndSet(null, DisposableHelper.DISPOSED)) {
147-
EmptyDisposable.error(e, actual);
148-
} else {
149-
if (this.s.get() != DisposableHelper.DISPOSED) {
150-
dispose();
151-
actual.onError(e);
152-
} else {
153-
RxJavaPlugins.onError(e);
154-
}
155-
}
133+
DisposableHelper.dispose(s);
134+
actual.onError(e);
156135
}
157136
}
158137
}

src/main/java/io/reactivex/internal/operators/observable/ObservableWithLatestFromMany.java

+2-8
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ public boolean isDisposed() {
204204
@Override
205205
public void dispose() {
206206
DisposableHelper.dispose(d);
207-
for (Disposable s : observers) {
207+
for (WithLatestInnerObserver s : observers) {
208208
s.dispose();
209209
}
210210
}
@@ -240,7 +240,7 @@ void cancelAllBut(int index) {
240240

241241
static final class WithLatestInnerObserver
242242
extends AtomicReference<Disposable>
243-
implements Observer<Object>, Disposable {
243+
implements Observer<Object> {
244244

245245
private static final long serialVersionUID = 3256684027868224024L;
246246

@@ -278,12 +278,6 @@ public void onComplete() {
278278
parent.innerComplete(index, hasValue);
279279
}
280280

281-
@Override
282-
public boolean isDisposed() {
283-
return DisposableHelper.isDisposed(get());
284-
}
285-
286-
@Override
287281
public void dispose() {
288282
DisposableHelper.dispose(this);
289283
}

src/main/java/io/reactivex/internal/operators/observable/ObservableZip.java

+2-17
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ boolean checkTerminated(boolean d, boolean empty, Observer<? super R> a, boolean
240240
}
241241
}
242242

243-
static final class ZipObserver<T, R> implements Observer<T>, Disposable {
243+
static final class ZipObserver<T, R> implements Observer<T> {
244244

245245
final ZipCoordinator<T, R> parent;
246246
final SpscLinkedArrayQueue<T> queue;
@@ -261,16 +261,7 @@ public void onSubscribe(Disposable s) {
261261

262262
@Override
263263
public void onNext(T t) {
264-
if (t == null) {
265-
s.get().dispose();
266-
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
267-
return;
268-
}
269-
if (!queue.offer(t)) {
270-
s.get().dispose();
271-
onError(new IllegalStateException("Queue full?!"));
272-
return;
273-
}
264+
queue.offer(t);
274265
parent.drain();
275266
}
276267

@@ -287,14 +278,8 @@ public void onComplete() {
287278
parent.drain();
288279
}
289280

290-
@Override
291281
public void dispose() {
292282
DisposableHelper.dispose(s);
293283
}
294-
295-
@Override
296-
public boolean isDisposed() {
297-
return s.get() == DisposableHelper.DISPOSED;
298-
}
299284
}
300285
}

src/test/java/io/reactivex/internal/operators/observable/ObservableAllTest.java

+33-1
Original file line numberDiff line numberDiff line change
@@ -301,10 +301,12 @@ public boolean test(String v) {
301301
@Test
302302
public void dispose() {
303303
TestHelper.checkDisposed(Observable.just(1).all(Functions.alwaysTrue()).toObservable());
304+
305+
TestHelper.checkDisposed(Observable.just(1).all(Functions.alwaysTrue()));
304306
}
305307

306308
@Test
307-
public void predicateThrows() {
309+
public void predicateThrowsObservable() {
308310
List<Throwable> errors = TestHelper.trackPluginErrors();
309311
try {
310312
new Observable<Integer>() {
@@ -333,4 +335,34 @@ public boolean test(Integer v) throws Exception {
333335
RxJavaPlugins.reset();
334336
}
335337
}
338+
339+
@Test
340+
public void predicateThrows() {
341+
List<Throwable> errors = TestHelper.trackPluginErrors();
342+
try {
343+
new Observable<Integer>() {
344+
@Override
345+
protected void subscribeActual(Observer<? super Integer> observer) {
346+
observer.onSubscribe(Disposables.empty());
347+
348+
observer.onNext(1);
349+
observer.onNext(2);
350+
observer.onError(new TestException());
351+
observer.onComplete();
352+
}
353+
}
354+
.all(new Predicate<Integer>() {
355+
@Override
356+
public boolean test(Integer v) throws Exception {
357+
throw new TestException();
358+
}
359+
})
360+
.test()
361+
.assertFailure(TestException.class);
362+
363+
TestHelper.assertError(errors, 0, TestException.class);
364+
} finally {
365+
RxJavaPlugins.reset();
366+
}
367+
}
336368
}

0 commit comments

Comments
 (0)