Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Improve Observable.takeUntil #6028

Merged
merged 1 commit into from
May 30, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,103 +13,120 @@

package io.reactivex.internal.operators.observable;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.*;

import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.disposables.*;
import io.reactivex.observers.SerializedObserver;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.util.*;

public final class ObservableTakeUntil<T, U> extends AbstractObservableWithUpstream<T, T> {

final ObservableSource<? extends U> other;

public ObservableTakeUntil(ObservableSource<T> source, ObservableSource<? extends U> other) {
super(source);
this.other = other;
}
@Override
public void subscribeActual(Observer<? super T> child) {
final SerializedObserver<T> serial = new SerializedObserver<T>(child);
TakeUntilMainObserver<T, U> parent = new TakeUntilMainObserver<T, U>(child);
child.onSubscribe(parent);

final ArrayCompositeDisposable frc = new ArrayCompositeDisposable(2);
other.subscribe(parent.otherObserver);
source.subscribe(parent);
}

final TakeUntilObserver<T> tus = new TakeUntilObserver<T>(serial, frc);
static final class TakeUntilMainObserver<T, U> extends AtomicInteger
implements Observer<T>, Disposable {

child.onSubscribe(frc);
private static final long serialVersionUID = 1418547743690811973L;

other.subscribe(new TakeUntil(frc, serial));
final Observer<? super T> downstream;

source.subscribe(tus);
}
final AtomicReference<Disposable> upstream;

static final class TakeUntilObserver<T> extends AtomicBoolean implements Observer<T> {
final OtherObserver otherObserver;

private static final long serialVersionUID = 3451719290311127173L;
final Observer<? super T> actual;
final ArrayCompositeDisposable frc;
final AtomicThrowable error;

Disposable s;

TakeUntilObserver(Observer<? super T> actual, ArrayCompositeDisposable frc) {
this.actual = actual;
this.frc = frc;
TakeUntilMainObserver(Observer<? super T> downstream) {
this.downstream = downstream;
this.upstream = new AtomicReference<Disposable>();
this.otherObserver = new OtherObserver();
this.error = new AtomicThrowable();
}

@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
frc.setResource(0, s);
}
public void dispose() {
DisposableHelper.dispose(upstream);
DisposableHelper.dispose(otherObserver);
}

@Override
public void onNext(T t) {
actual.onNext(t);
public boolean isDisposed() {
return DisposableHelper.isDisposed(upstream.get());
}

@Override
public void onError(Throwable t) {
frc.dispose();
actual.onError(t);
public void onSubscribe(Disposable d) {
DisposableHelper.setOnce(upstream, d);
}

@Override
public void onComplete() {
frc.dispose();
actual.onComplete();
public void onNext(T t) {
HalfSerializer.onNext(downstream, t, this, error);
}
}

final class TakeUntil implements Observer<U> {
private final ArrayCompositeDisposable frc;
private final SerializedObserver<T> serial;

TakeUntil(ArrayCompositeDisposable frc, SerializedObserver<T> serial) {
this.frc = frc;
this.serial = serial;
@Override
public void onError(Throwable e) {
DisposableHelper.dispose(otherObserver);
HalfSerializer.onError(downstream, e, this, error);
}

@Override
public void onSubscribe(Disposable s) {
frc.setResource(1, s);
public void onComplete() {
DisposableHelper.dispose(otherObserver);
HalfSerializer.onComplete(downstream, this, error);
}

@Override
public void onNext(U t) {
frc.dispose();
serial.onComplete();
void otherError(Throwable e) {
DisposableHelper.dispose(upstream);
HalfSerializer.onError(downstream, e, this, error);
}

@Override
public void onError(Throwable t) {
frc.dispose();
serial.onError(t);
void otherComplete() {
DisposableHelper.dispose(upstream);
HalfSerializer.onComplete(downstream, this, error);
}

@Override
public void onComplete() {
frc.dispose();
serial.onComplete();
final class OtherObserver extends AtomicReference<Disposable>
implements Observer<U> {

private static final long serialVersionUID = -8693423678067375039L;

@Override
public void onSubscribe(Disposable d) {
DisposableHelper.setOnce(this, d);
}

@Override
public void onNext(U t) {
DisposableHelper.dispose(this);
otherComplete();
}

@Override
public void onError(Throwable e) {
otherError(e);
}

@Override
public void onComplete() {
otherComplete();
}

}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public void testTakeUntilSourceCompleted() {

verify(result, times(1)).onNext("one");
verify(result, times(1)).onNext("two");
verify(sSource, times(1)).dispose();
verify(sSource, never()).dispose(); // no longer disposing itself on terminal events
verify(sOther, times(1)).dispose();

}
Expand All @@ -95,7 +95,7 @@ public void testTakeUntilSourceError() {
verify(result, times(1)).onNext("two");
verify(result, times(0)).onNext("three");
verify(result, times(1)).onError(error);
verify(sSource, times(1)).dispose();
verify(sSource, never()).dispose(); // no longer disposing itself on terminal events
verify(sOther, times(1)).dispose();

}
Expand All @@ -122,7 +122,7 @@ public void testTakeUntilOtherError() {
verify(result, times(1)).onError(error);
verify(result, times(0)).onComplete();
verify(sSource, times(1)).dispose();
verify(sOther, times(1)).dispose();
verify(sOther, never()).dispose(); // no longer disposing itself on termination

}

Expand All @@ -149,7 +149,7 @@ public void testTakeUntilOtherCompleted() {
verify(result, times(0)).onNext("three");
verify(result, times(1)).onComplete();
verify(sSource, times(1)).dispose();
verify(sOther, times(1)).dispose(); // unsubscribed since SafeSubscriber unsubscribes after onComplete
verify(sOther, never()).dispose(); // no longer disposing itself on terminal events

}

Expand Down