Skip to content

Commit 6038c02

Browse files
authored
2.x: Improve Observable.takeUntil (#6028)
1 parent d506ddc commit 6038c02

File tree

2 files changed

+75
-58
lines changed

2 files changed

+75
-58
lines changed

src/main/java/io/reactivex/internal/operators/observable/ObservableTakeUntil.java

+71-54
Original file line numberDiff line numberDiff line change
@@ -13,103 +13,120 @@
1313

1414
package io.reactivex.internal.operators.observable;
1515

16-
import java.util.concurrent.atomic.AtomicBoolean;
16+
import java.util.concurrent.atomic.*;
1717

1818
import io.reactivex.*;
1919
import io.reactivex.disposables.Disposable;
20-
import io.reactivex.internal.disposables.*;
21-
import io.reactivex.observers.SerializedObserver;
20+
import io.reactivex.internal.disposables.DisposableHelper;
21+
import io.reactivex.internal.util.*;
2222

2323
public final class ObservableTakeUntil<T, U> extends AbstractObservableWithUpstream<T, T> {
24+
2425
final ObservableSource<? extends U> other;
26+
2527
public ObservableTakeUntil(ObservableSource<T> source, ObservableSource<? extends U> other) {
2628
super(source);
2729
this.other = other;
2830
}
2931
@Override
3032
public void subscribeActual(Observer<? super T> child) {
31-
final SerializedObserver<T> serial = new SerializedObserver<T>(child);
33+
TakeUntilMainObserver<T, U> parent = new TakeUntilMainObserver<T, U>(child);
34+
child.onSubscribe(parent);
3235

33-
final ArrayCompositeDisposable frc = new ArrayCompositeDisposable(2);
36+
other.subscribe(parent.otherObserver);
37+
source.subscribe(parent);
38+
}
3439

35-
final TakeUntilObserver<T> tus = new TakeUntilObserver<T>(serial, frc);
40+
static final class TakeUntilMainObserver<T, U> extends AtomicInteger
41+
implements Observer<T>, Disposable {
3642

37-
child.onSubscribe(frc);
43+
private static final long serialVersionUID = 1418547743690811973L;
3844

39-
other.subscribe(new TakeUntil(frc, serial));
45+
final Observer<? super T> downstream;
4046

41-
source.subscribe(tus);
42-
}
47+
final AtomicReference<Disposable> upstream;
4348

44-
static final class TakeUntilObserver<T> extends AtomicBoolean implements Observer<T> {
49+
final OtherObserver otherObserver;
4550

46-
private static final long serialVersionUID = 3451719290311127173L;
47-
final Observer<? super T> actual;
48-
final ArrayCompositeDisposable frc;
51+
final AtomicThrowable error;
4952

50-
Disposable s;
51-
52-
TakeUntilObserver(Observer<? super T> actual, ArrayCompositeDisposable frc) {
53-
this.actual = actual;
54-
this.frc = frc;
53+
TakeUntilMainObserver(Observer<? super T> downstream) {
54+
this.downstream = downstream;
55+
this.upstream = new AtomicReference<Disposable>();
56+
this.otherObserver = new OtherObserver();
57+
this.error = new AtomicThrowable();
5558
}
5659

5760
@Override
58-
public void onSubscribe(Disposable s) {
59-
if (DisposableHelper.validate(this.s, s)) {
60-
this.s = s;
61-
frc.setResource(0, s);
62-
}
61+
public void dispose() {
62+
DisposableHelper.dispose(upstream);
63+
DisposableHelper.dispose(otherObserver);
6364
}
6465

6566
@Override
66-
public void onNext(T t) {
67-
actual.onNext(t);
67+
public boolean isDisposed() {
68+
return DisposableHelper.isDisposed(upstream.get());
6869
}
6970

7071
@Override
71-
public void onError(Throwable t) {
72-
frc.dispose();
73-
actual.onError(t);
72+
public void onSubscribe(Disposable d) {
73+
DisposableHelper.setOnce(upstream, d);
7474
}
7575

7676
@Override
77-
public void onComplete() {
78-
frc.dispose();
79-
actual.onComplete();
77+
public void onNext(T t) {
78+
HalfSerializer.onNext(downstream, t, this, error);
8079
}
81-
}
82-
83-
final class TakeUntil implements Observer<U> {
84-
private final ArrayCompositeDisposable frc;
85-
private final SerializedObserver<T> serial;
8680

87-
TakeUntil(ArrayCompositeDisposable frc, SerializedObserver<T> serial) {
88-
this.frc = frc;
89-
this.serial = serial;
81+
@Override
82+
public void onError(Throwable e) {
83+
DisposableHelper.dispose(otherObserver);
84+
HalfSerializer.onError(downstream, e, this, error);
9085
}
9186

9287
@Override
93-
public void onSubscribe(Disposable s) {
94-
frc.setResource(1, s);
88+
public void onComplete() {
89+
DisposableHelper.dispose(otherObserver);
90+
HalfSerializer.onComplete(downstream, this, error);
9591
}
9692

97-
@Override
98-
public void onNext(U t) {
99-
frc.dispose();
100-
serial.onComplete();
93+
void otherError(Throwable e) {
94+
DisposableHelper.dispose(upstream);
95+
HalfSerializer.onError(downstream, e, this, error);
10196
}
10297

103-
@Override
104-
public void onError(Throwable t) {
105-
frc.dispose();
106-
serial.onError(t);
98+
void otherComplete() {
99+
DisposableHelper.dispose(upstream);
100+
HalfSerializer.onComplete(downstream, this, error);
107101
}
108102

109-
@Override
110-
public void onComplete() {
111-
frc.dispose();
112-
serial.onComplete();
103+
final class OtherObserver extends AtomicReference<Disposable>
104+
implements Observer<U> {
105+
106+
private static final long serialVersionUID = -8693423678067375039L;
107+
108+
@Override
109+
public void onSubscribe(Disposable d) {
110+
DisposableHelper.setOnce(this, d);
111+
}
112+
113+
@Override
114+
public void onNext(U t) {
115+
DisposableHelper.dispose(this);
116+
otherComplete();
117+
}
118+
119+
@Override
120+
public void onError(Throwable e) {
121+
otherError(e);
122+
}
123+
124+
@Override
125+
public void onComplete() {
126+
otherComplete();
127+
}
128+
113129
}
114130
}
131+
115132
}

src/test/java/io/reactivex/internal/operators/observable/ObservableTakeUntilTest.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public void testTakeUntilSourceCompleted() {
7070

7171
verify(result, times(1)).onNext("one");
7272
verify(result, times(1)).onNext("two");
73-
verify(sSource, times(1)).dispose();
73+
verify(sSource, never()).dispose(); // no longer disposing itself on terminal events
7474
verify(sOther, times(1)).dispose();
7575

7676
}
@@ -95,7 +95,7 @@ public void testTakeUntilSourceError() {
9595
verify(result, times(1)).onNext("two");
9696
verify(result, times(0)).onNext("three");
9797
verify(result, times(1)).onError(error);
98-
verify(sSource, times(1)).dispose();
98+
verify(sSource, never()).dispose(); // no longer disposing itself on terminal events
9999
verify(sOther, times(1)).dispose();
100100

101101
}
@@ -122,7 +122,7 @@ public void testTakeUntilOtherError() {
122122
verify(result, times(1)).onError(error);
123123
verify(result, times(0)).onComplete();
124124
verify(sSource, times(1)).dispose();
125-
verify(sOther, times(1)).dispose();
125+
verify(sOther, never()).dispose(); // no longer disposing itself on termination
126126

127127
}
128128

@@ -149,7 +149,7 @@ public void testTakeUntilOtherCompleted() {
149149
verify(result, times(0)).onNext("three");
150150
verify(result, times(1)).onComplete();
151151
verify(sSource, times(1)).dispose();
152-
verify(sOther, times(1)).dispose(); // unsubscribed since SafeSubscriber unsubscribes after onComplete
152+
verify(sOther, never()).dispose(); // no longer disposing itself on terminal events
153153

154154
}
155155

0 commit comments

Comments
 (0)