Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: coverage and cleanup 10/17-1 #4717

Merged
merged 1 commit into from
Oct 17, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -337,11 +337,6 @@ static final class NoneEmitter<T> extends BaseEmitter<T> {

@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}

if (isCancelled()) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ public boolean isDisposed() {
@Override
public void onNext(Notification<T> t) {
if (done) {
if (t.isOnError()) {
RxJavaPlugins.onError(t.getError());
}
return;
}
if (t.isOnError()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@

package io.reactivex.internal.operators.observable;

import io.reactivex.internal.functions.ObjectHelper;
import java.util.concurrent.*;

import io.reactivex.*;
import io.reactivex.disposables.*;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.observers.DeferredScalarDisposable;

public final class ObservableFromFuture<T> extends Observable<T> {
final Future<? extends T> future;
Expand All @@ -33,7 +33,7 @@ public ObservableFromFuture(Future<? extends T> future, long timeout, TimeUnit u

@Override
public void subscribeActual(Observer<? super T> s) {
Disposable d = Disposables.empty();
DeferredScalarDisposable<T> d = new DeferredScalarDisposable<T>(s);
s.onSubscribe(d);
if (!d.isDisposed()) {
T v;
Expand All @@ -45,13 +45,8 @@ public void subscribeActual(Observer<? super T> s) {
s.onError(ex);
}
return;
} finally {
future.cancel(true); // TODO ?? not sure about this
}
if (!d.isDisposed()) {
s.onNext(v);
s.onComplete();
}
d.complete(v);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,12 @@ public void onComplete() {

@Override
public void dispose() {
if (cancelled) {
if (!cancelled) {
cancelled = true;
s.dispose();

if (getAndIncrement() == 0) {
queue.clear();
s.dispose();
}
}
}
Expand All @@ -134,11 +134,12 @@ void drain() {

for (;;) {

if (checkTerminated(done, q.isEmpty(), a, delayError)) {
return;
}

for (;;) {
if (cancelled) {
queue.clear();
return;
}

boolean d = done;

Long ts = (Long)q.peek();
Expand All @@ -151,19 +152,35 @@ void drain() {
empty = true;
}

if (checkTerminated(d, empty, a, delayError)) {
return;
if (d) {
if (delayError) {
if (empty) {
Throwable e = error;
if (e != null) {
a.onError(e);
} else {
a.onComplete();
}
return;
}
} else {
Throwable e = error;
if (e != null) {
queue.clear();
a.onError(e);
return;
} else
if (empty) {
a.onComplete();
return;
}
}
}

if (empty) {
break;
}

if (ts > now - time) {
// not old enough
break;
}

q.poll();
@SuppressWarnings("unchecked")
T v = (T)q.poll();
Expand All @@ -177,38 +194,5 @@ void drain() {
}
}
}

boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a, boolean delayError) {
if (cancelled) {
queue.clear();
s.dispose();
return true;
}
if (d) {
if (delayError) {
if (empty) {
Throwable e = error;
if (e != null) {
a.onError(e);
} else {
a.onComplete();
}
return true;
}
} else {
Throwable e = error;
if (e != null) {
queue.clear();
a.onError(e);
return true;
} else
if (empty) {
a.onComplete();
return true;
}
}
}
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.BiFunction;
import io.reactivex.internal.disposables.*;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.observers.SerializedObserver;
import io.reactivex.plugins.RxJavaPlugins;

public final class ObservableWithLatestFrom<T, U, R> extends AbstractObservableWithUpstream<T, R> {
final BiFunction<? super T, ? super U, ? extends R> combiner;
Expand All @@ -38,6 +37,8 @@ public void subscribeActual(Observer<? super R> t) {
final SerializedObserver<R> serial = new SerializedObserver<R>(t);
final WithLatestFromObserver<T, U, R> wlf = new WithLatestFromObserver<T, U, R>(serial, combiner);

t.onSubscribe(wlf);

other.subscribe(new Observer<U>() {
@Override
public void onSubscribe(Disposable s) {
Expand Down Expand Up @@ -68,6 +69,7 @@ static final class WithLatestFromObserver<T, U, R> extends AtomicReference<U> im
private static final long serialVersionUID = -312246233408980075L;

final Observer<? super R> actual;

final BiFunction<? super T, ? super U, ? extends R> combiner;

final AtomicReference<Disposable> s = new AtomicReference<Disposable>();
Expand All @@ -80,9 +82,7 @@ static final class WithLatestFromObserver<T, U, R> extends AtomicReference<U> im
}
@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.setOnce(this.s, s)) {
actual.onSubscribe(this);
}
DisposableHelper.setOnce(this.s, s);
}

@Override
Expand Down Expand Up @@ -116,43 +116,22 @@ public void onComplete() {

@Override
public void dispose() {
s.get().dispose();
DisposableHelper.dispose(s);
DisposableHelper.dispose(other);
}

@Override public boolean isDisposed() {
return s.get().isDisposed();
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(s.get());
}

public boolean setOther(Disposable o) {
for (;;) {
Disposable current = other.get();
if (current == DisposableHelper.DISPOSED) {
o.dispose();
return false;
}
if (current != null) {
RxJavaPlugins.onError(new IllegalStateException("Other subscription already set!"));
o.dispose();
return false;
}
if (other.compareAndSet(null, o)) {
return true;
}
}
return DisposableHelper.setOnce(other, o);
}

public void otherError(Throwable e) {
if (this.s.compareAndSet(null, DisposableHelper.DISPOSED)) {
EmptyDisposable.error(e, actual);
} else {
if (this.s.get() != DisposableHelper.DISPOSED) {
dispose();
actual.onError(e);
} else {
RxJavaPlugins.onError(e);
}
}
DisposableHelper.dispose(s);
actual.onError(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ public boolean isDisposed() {
@Override
public void dispose() {
DisposableHelper.dispose(d);
for (Disposable s : observers) {
for (WithLatestInnerObserver s : observers) {
s.dispose();
}
}
Expand Down Expand Up @@ -240,7 +240,7 @@ void cancelAllBut(int index) {

static final class WithLatestInnerObserver
extends AtomicReference<Disposable>
implements Observer<Object>, Disposable {
implements Observer<Object> {

private static final long serialVersionUID = 3256684027868224024L;

Expand Down Expand Up @@ -278,12 +278,6 @@ public void onComplete() {
parent.innerComplete(index, hasValue);
}

@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}

@Override
public void dispose() {
DisposableHelper.dispose(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ boolean checkTerminated(boolean d, boolean empty, Observer<? super R> a, boolean
}
}

static final class ZipObserver<T, R> implements Observer<T>, Disposable {
static final class ZipObserver<T, R> implements Observer<T> {

final ZipCoordinator<T, R> parent;
final SpscLinkedArrayQueue<T> queue;
Expand All @@ -261,16 +261,7 @@ public void onSubscribe(Disposable s) {

@Override
public void onNext(T t) {
if (t == null) {
s.get().dispose();
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!queue.offer(t)) {
s.get().dispose();
onError(new IllegalStateException("Queue full?!"));
return;
}
queue.offer(t);
parent.drain();
}

Expand All @@ -287,14 +278,8 @@ public void onComplete() {
parent.drain();
}

@Override
public void dispose() {
DisposableHelper.dispose(s);
}

@Override
public boolean isDisposed() {
return s.get() == DisposableHelper.DISPOSED;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -301,10 +301,12 @@ public boolean test(String v) {
@Test
public void dispose() {
TestHelper.checkDisposed(Observable.just(1).all(Functions.alwaysTrue()).toObservable());

TestHelper.checkDisposed(Observable.just(1).all(Functions.alwaysTrue()));
}

@Test
public void predicateThrows() {
public void predicateThrowsObservable() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
new Observable<Integer>() {
Expand Down Expand Up @@ -333,4 +335,34 @@ public boolean test(Integer v) throws Exception {
RxJavaPlugins.reset();
}
}

@Test
public void predicateThrows() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
new Observable<Integer>() {
@Override
protected void subscribeActual(Observer<? super Integer> observer) {
observer.onSubscribe(Disposables.empty());

observer.onNext(1);
observer.onNext(2);
observer.onError(new TestException());
observer.onComplete();
}
}
.all(new Predicate<Integer>() {
@Override
public boolean test(Integer v) throws Exception {
throw new TestException();
}
})
.test()
.assertFailure(TestException.class);

TestHelper.assertError(errors, 0, TestException.class);
} finally {
RxJavaPlugins.reset();
}
}
}
Loading