Skip to content

Commit 76abb7b

Browse files
authored
2.x: Call the doOn{Dispose|Cancel} handler at most once (#6269)
1 parent 45c0d98 commit 76abb7b

File tree

8 files changed

+69
-17
lines changed

8 files changed

+69
-17
lines changed

src/main/java/io/reactivex/Observable.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -8046,7 +8046,7 @@ public final Observable<T> doOnError(Consumer<? super Throwable> onError) {
80468046

80478047
/**
80488048
* Calls the appropriate onXXX method (shared between all Observer) for the lifecycle events of
8049-
* the sequence (subscription, disposal, requesting).
8049+
* the sequence (subscription, disposal).
80508050
* <p>
80518051
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnLifecycle.o.png" alt="">
80528052
* <dl>

src/main/java/io/reactivex/internal/observers/DisposableLambdaObserver.java

+12-6
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ public void onNext(T t) {
6161
@Override
6262
public void onError(Throwable t) {
6363
if (upstream != DisposableHelper.DISPOSED) {
64+
upstream = DisposableHelper.DISPOSED;
6465
downstream.onError(t);
6566
} else {
6667
RxJavaPlugins.onError(t);
@@ -70,19 +71,24 @@ public void onError(Throwable t) {
7071
@Override
7172
public void onComplete() {
7273
if (upstream != DisposableHelper.DISPOSED) {
74+
upstream = DisposableHelper.DISPOSED;
7375
downstream.onComplete();
7476
}
7577
}
7678

7779
@Override
7880
public void dispose() {
79-
try {
80-
onDispose.run();
81-
} catch (Throwable e) {
82-
Exceptions.throwIfFatal(e);
83-
RxJavaPlugins.onError(e);
81+
Disposable d = upstream;
82+
if (d != DisposableHelper.DISPOSED) {
83+
upstream = DisposableHelper.DISPOSED;
84+
try {
85+
onDispose.run();
86+
} catch (Throwable e) {
87+
Exceptions.throwIfFatal(e);
88+
RxJavaPlugins.onError(e);
89+
}
90+
d.dispose();
8491
}
85-
upstream.dispose();
8692
}
8793

8894
@Override

src/main/java/io/reactivex/internal/operators/flowable/FlowableDoOnLifecycle.java

+10-6
Original file line numberDiff line numberDiff line change
@@ -108,13 +108,17 @@ public void request(long n) {
108108

109109
@Override
110110
public void cancel() {
111-
try {
112-
onCancel.run();
113-
} catch (Throwable e) {
114-
Exceptions.throwIfFatal(e);
115-
RxJavaPlugins.onError(e);
111+
Subscription s = upstream;
112+
if (s != SubscriptionHelper.CANCELLED) {
113+
upstream = SubscriptionHelper.CANCELLED;
114+
try {
115+
onCancel.run();
116+
} catch (Throwable e) {
117+
Exceptions.throwIfFatal(e);
118+
RxJavaPlugins.onError(e);
119+
}
120+
s.cancel();
116121
}
117-
upstream.cancel();
118122
}
119123
}
120124
}

src/test/java/io/reactivex/internal/operators/flowable/FlowableDoOnLifecycleTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public void run() throws Exception {
8787
);
8888

8989
assertEquals(1, calls[0]);
90-
assertEquals(2, calls[1]);
90+
assertEquals(1, calls[1]);
9191
}
9292

9393
@Test

src/test/java/io/reactivex/internal/operators/flowable/FlowableDoOnUnsubscribeTest.java

+21
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.reactivex.Flowable;
2525
import io.reactivex.disposables.Disposable;
2626
import io.reactivex.functions.*;
27+
import io.reactivex.processors.BehaviorProcessor;
2728
import io.reactivex.subscribers.TestSubscriber;
2829

2930
public class FlowableDoOnUnsubscribeTest {
@@ -148,4 +149,24 @@ public void run() {
148149
assertEquals("There should exactly 1 un-subscription events for upper stream", 1, upperCount.get());
149150
assertEquals("There should exactly 1 un-subscription events for lower stream", 1, lowerCount.get());
150151
}
152+
153+
@Test
154+
public void noReentrantDispose() {
155+
156+
final AtomicInteger cancelCalled = new AtomicInteger();
157+
158+
final BehaviorProcessor<Integer> p = BehaviorProcessor.create();
159+
p.doOnCancel(new Action() {
160+
@Override
161+
public void run() throws Exception {
162+
cancelCalled.incrementAndGet();
163+
p.onNext(2);
164+
}
165+
})
166+
.firstOrError()
167+
.subscribe()
168+
.dispose();
169+
170+
assertEquals(1, cancelCalled.get());
171+
}
151172
}

src/test/java/io/reactivex/internal/operators/observable/ObservableCacheTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public void testUnsubscribeSource() throws Exception {
113113
o.subscribe();
114114
o.subscribe();
115115
o.subscribe();
116-
verify(unsubscribe, times(1)).run();
116+
verify(unsubscribe, never()).run();
117117
}
118118

119119
@Test

src/test/java/io/reactivex/internal/operators/observable/ObservableDoOnUnsubscribeTest.java

+21
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import io.reactivex.disposables.Disposable;
2626
import io.reactivex.functions.*;
2727
import io.reactivex.observers.TestObserver;
28+
import io.reactivex.subjects.BehaviorSubject;
2829

2930
public class ObservableDoOnUnsubscribeTest {
3031

@@ -152,4 +153,24 @@ public void run() {
152153
assertEquals("There should exactly 1 un-subscription events for upper stream", 1, upperCount.get());
153154
assertEquals("There should exactly 1 un-subscription events for lower stream", 1, lowerCount.get());
154155
}
156+
157+
@Test
158+
public void noReentrantDispose() {
159+
160+
final AtomicInteger disposeCalled = new AtomicInteger();
161+
162+
final BehaviorSubject<Integer> s = BehaviorSubject.create();
163+
s.doOnDispose(new Action() {
164+
@Override
165+
public void run() throws Exception {
166+
disposeCalled.incrementAndGet();
167+
s.onNext(2);
168+
}
169+
})
170+
.firstOrError()
171+
.subscribe()
172+
.dispose();
173+
174+
assertEquals(1, disposeCalled.get());
175+
}
155176
}

src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -938,11 +938,11 @@ public void accept(String v) {
938938
@Test
939939
public void testUnsubscribeSource() throws Exception {
940940
Action unsubscribe = mock(Action.class);
941-
Observable<Integer> o = Observable.just(1).doOnDispose(unsubscribe).cache();
941+
Observable<Integer> o = Observable.just(1).doOnDispose(unsubscribe).replay().autoConnect();
942942
o.subscribe();
943943
o.subscribe();
944944
o.subscribe();
945-
verify(unsubscribe, times(1)).run();
945+
verify(unsubscribe, never()).run();
946946
}
947947

948948
@Test

0 commit comments

Comments
 (0)