diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableTakeUntil.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableTakeUntil.java index 64639abc0a..35e04300e0 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableTakeUntil.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableTakeUntil.java @@ -13,103 +13,120 @@ package io.reactivex.internal.operators.observable; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.*; import io.reactivex.*; import io.reactivex.disposables.Disposable; -import io.reactivex.internal.disposables.*; -import io.reactivex.observers.SerializedObserver; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.util.*; public final class ObservableTakeUntil extends AbstractObservableWithUpstream { + final ObservableSource other; + public ObservableTakeUntil(ObservableSource source, ObservableSource other) { super(source); this.other = other; } @Override public void subscribeActual(Observer child) { - final SerializedObserver serial = new SerializedObserver(child); + TakeUntilMainObserver parent = new TakeUntilMainObserver(child); + child.onSubscribe(parent); - final ArrayCompositeDisposable frc = new ArrayCompositeDisposable(2); + other.subscribe(parent.otherObserver); + source.subscribe(parent); + } - final TakeUntilObserver tus = new TakeUntilObserver(serial, frc); + static final class TakeUntilMainObserver extends AtomicInteger + implements Observer, Disposable { - child.onSubscribe(frc); + private static final long serialVersionUID = 1418547743690811973L; - other.subscribe(new TakeUntil(frc, serial)); + final Observer downstream; - source.subscribe(tus); - } + final AtomicReference upstream; - static final class TakeUntilObserver extends AtomicBoolean implements Observer { + final OtherObserver otherObserver; - private static final long serialVersionUID = 3451719290311127173L; - final Observer actual; - final ArrayCompositeDisposable frc; + final AtomicThrowable error; - Disposable s; - - TakeUntilObserver(Observer actual, ArrayCompositeDisposable frc) { - this.actual = actual; - this.frc = frc; + TakeUntilMainObserver(Observer downstream) { + this.downstream = downstream; + this.upstream = new AtomicReference(); + this.otherObserver = new OtherObserver(); + this.error = new AtomicThrowable(); } @Override - public void onSubscribe(Disposable s) { - if (DisposableHelper.validate(this.s, s)) { - this.s = s; - frc.setResource(0, s); - } + public void dispose() { + DisposableHelper.dispose(upstream); + DisposableHelper.dispose(otherObserver); } @Override - public void onNext(T t) { - actual.onNext(t); + public boolean isDisposed() { + return DisposableHelper.isDisposed(upstream.get()); } @Override - public void onError(Throwable t) { - frc.dispose(); - actual.onError(t); + public void onSubscribe(Disposable d) { + DisposableHelper.setOnce(upstream, d); } @Override - public void onComplete() { - frc.dispose(); - actual.onComplete(); + public void onNext(T t) { + HalfSerializer.onNext(downstream, t, this, error); } - } - - final class TakeUntil implements Observer { - private final ArrayCompositeDisposable frc; - private final SerializedObserver serial; - TakeUntil(ArrayCompositeDisposable frc, SerializedObserver serial) { - this.frc = frc; - this.serial = serial; + @Override + public void onError(Throwable e) { + DisposableHelper.dispose(otherObserver); + HalfSerializer.onError(downstream, e, this, error); } @Override - public void onSubscribe(Disposable s) { - frc.setResource(1, s); + public void onComplete() { + DisposableHelper.dispose(otherObserver); + HalfSerializer.onComplete(downstream, this, error); } - @Override - public void onNext(U t) { - frc.dispose(); - serial.onComplete(); + void otherError(Throwable e) { + DisposableHelper.dispose(upstream); + HalfSerializer.onError(downstream, e, this, error); } - @Override - public void onError(Throwable t) { - frc.dispose(); - serial.onError(t); + void otherComplete() { + DisposableHelper.dispose(upstream); + HalfSerializer.onComplete(downstream, this, error); } - @Override - public void onComplete() { - frc.dispose(); - serial.onComplete(); + final class OtherObserver extends AtomicReference + implements Observer { + + private static final long serialVersionUID = -8693423678067375039L; + + @Override + public void onSubscribe(Disposable d) { + DisposableHelper.setOnce(this, d); + } + + @Override + public void onNext(U t) { + DisposableHelper.dispose(this); + otherComplete(); + } + + @Override + public void onError(Throwable e) { + otherError(e); + } + + @Override + public void onComplete() { + otherComplete(); + } + } } + } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableTakeUntilTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableTakeUntilTest.java index d42e5df389..4251fcc108 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableTakeUntilTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableTakeUntilTest.java @@ -70,7 +70,7 @@ public void testTakeUntilSourceCompleted() { verify(result, times(1)).onNext("one"); verify(result, times(1)).onNext("two"); - verify(sSource, times(1)).dispose(); + verify(sSource, never()).dispose(); // no longer disposing itself on terminal events verify(sOther, times(1)).dispose(); } @@ -95,7 +95,7 @@ public void testTakeUntilSourceError() { verify(result, times(1)).onNext("two"); verify(result, times(0)).onNext("three"); verify(result, times(1)).onError(error); - verify(sSource, times(1)).dispose(); + verify(sSource, never()).dispose(); // no longer disposing itself on terminal events verify(sOther, times(1)).dispose(); } @@ -122,7 +122,7 @@ public void testTakeUntilOtherError() { verify(result, times(1)).onError(error); verify(result, times(0)).onComplete(); verify(sSource, times(1)).dispose(); - verify(sOther, times(1)).dispose(); + verify(sOther, never()).dispose(); // no longer disposing itself on termination } @@ -149,7 +149,7 @@ public void testTakeUntilOtherCompleted() { verify(result, times(0)).onNext("three"); verify(result, times(1)).onComplete(); verify(sSource, times(1)).dispose(); - verify(sOther, times(1)).dispose(); // unsubscribed since SafeSubscriber unsubscribes after onComplete + verify(sOther, never()).dispose(); // no longer disposing itself on terminal events }