Skip to content

Commit ff282b9

Browse files
authored
2.x: fix Observable.repeatWhen & retryWhen not disposing the inner (#4783)
1 parent 3300d19 commit ff282b9

File tree

6 files changed

+123
-17
lines changed

6 files changed

+123
-17
lines changed

src/main/java/io/reactivex/internal/observers/ToNotificationObserver.java

+20-8
Original file line numberDiff line numberDiff line change
@@ -13,40 +13,42 @@
1313

1414
package io.reactivex.internal.observers;
1515

16+
import java.util.concurrent.atomic.AtomicReference;
17+
1618
import io.reactivex.*;
1719
import io.reactivex.disposables.Disposable;
1820
import io.reactivex.exceptions.*;
1921
import io.reactivex.functions.Consumer;
2022
import io.reactivex.internal.disposables.DisposableHelper;
2123
import io.reactivex.plugins.RxJavaPlugins;
2224

23-
public final class ToNotificationObserver<T> implements Observer<T> {
24-
final Consumer<? super Notification<Object>> consumer;
25+
public final class ToNotificationObserver<T>
26+
extends AtomicReference<Disposable>
27+
implements Observer<T>, Disposable {
28+
private static final long serialVersionUID = -7420197867343208289L;
2529

26-
Disposable s;
30+
final Consumer<? super Notification<Object>> consumer;
2731

2832
public ToNotificationObserver(Consumer<? super Notification<Object>> consumer) {
2933
this.consumer = consumer;
3034
}
3135

3236
@Override
3337
public void onSubscribe(Disposable s) {
34-
if (DisposableHelper.validate(this.s, s)) {
35-
this.s = s;
36-
}
38+
DisposableHelper.setOnce(this, s);
3739
}
3840

3941
@Override
4042
public void onNext(T t) {
4143
if (t == null) {
42-
s.dispose();
44+
get().dispose();
4345
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
4446
} else {
4547
try {
4648
consumer.accept(Notification.<Object>createOnNext(t));
4749
} catch (Throwable ex) {
4850
Exceptions.throwIfFatal(ex);
49-
s.dispose();
51+
get().dispose();
5052
onError(ex);
5153
}
5254
}
@@ -71,4 +73,14 @@ public void onComplete() {
7173
RxJavaPlugins.onError(ex);
7274
}
7375
}
76+
77+
@Override
78+
public void dispose() {
79+
DisposableHelper.dispose(this);
80+
}
81+
82+
@Override
83+
public boolean isDisposed() {
84+
return DisposableHelper.isDisposed(get());
85+
}
7486
}

src/main/java/io/reactivex/internal/operators/observable/ObservableRedo.java

+10-8
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import io.reactivex.disposables.Disposable;
2020
import io.reactivex.exceptions.Exceptions;
2121
import io.reactivex.functions.*;
22-
import io.reactivex.internal.disposables.SequentialDisposable;
22+
import io.reactivex.internal.disposables.*;
2323
import io.reactivex.internal.functions.ObjectHelper;
2424
import io.reactivex.internal.observers.ToNotificationObserver;
2525
import io.reactivex.subjects.*;
@@ -40,7 +40,14 @@ public void subscribeActual(Observer<? super T> s) {
4040

4141
final RedoObserver<T> parent = new RedoObserver<T>(s, subject, source);
4242

43-
s.onSubscribe(parent.arbiter);
43+
ToNotificationObserver<Object> actionObserver = new ToNotificationObserver<Object>(new Consumer<Notification<Object>>() {
44+
@Override
45+
public void accept(Notification<Object> o) {
46+
parent.handle(o);
47+
}
48+
});
49+
ListCompositeDisposable cd = new ListCompositeDisposable(parent.arbiter, actionObserver);
50+
s.onSubscribe(cd);
4451

4552
ObservableSource<?> action;
4653

@@ -52,12 +59,7 @@ public void subscribeActual(Observer<? super T> s) {
5259
return;
5360
}
5461

55-
action.subscribe(new ToNotificationObserver<Object>(new Consumer<Notification<Object>>() {
56-
@Override
57-
public void accept(Notification<Object> o) {
58-
parent.handle(o);
59-
}
60-
}));
62+
action.subscribe(actionObserver);
6163

6264
// trigger first subscription
6365
parent.handle(Notification.<Object>createOnNext(0));

src/test/java/io/reactivex/internal/operators/flowable/FlowableRepeatTest.java

+23
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,11 @@
2525
import org.reactivestreams.*;
2626

2727
import io.reactivex.*;
28+
import io.reactivex.disposables.Disposable;
2829
import io.reactivex.exceptions.TestException;
2930
import io.reactivex.functions.*;
3031
import io.reactivex.internal.subscriptions.BooleanSubscription;
32+
import io.reactivex.processors.PublishProcessor;
3133
import io.reactivex.schedulers.Schedulers;
3234
import io.reactivex.subscribers.TestSubscriber;
3335

@@ -306,4 +308,25 @@ public boolean getAsBoolean() throws Exception {
306308
.assertFailure(TestException.class, 1);
307309
}
308310

311+
@Test
312+
public void shouldDisposeInnerObservable() {
313+
final PublishProcessor<Object> subject = PublishProcessor.create();
314+
final Disposable disposable = Flowable.just("Leak")
315+
.repeatWhen(new Function<Flowable<Object>, Flowable<Object>>() {
316+
@Override
317+
public Flowable<Object> apply(Flowable<Object> completions) throws Exception {
318+
return completions.switchMap(new Function<Object, Flowable<Object>>() {
319+
@Override
320+
public Flowable<Object> apply(Object ignore) throws Exception {
321+
return subject;
322+
}
323+
});
324+
}
325+
})
326+
.subscribe();
327+
328+
assertTrue(subject.hasSubscribers());
329+
disposable.dispose();
330+
assertFalse(subject.hasSubscribers());
331+
}
309332
}

src/test/java/io/reactivex/internal/operators/flowable/FlowableRetryTest.java

+24
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package io.reactivex.internal.operators.flowable;
1515

1616
import static org.junit.Assert.*;
17+
import static org.mockito.ArgumentMatchers.*;
1718
import static org.mockito.Mockito.*;
1819

1920
import java.util.*;
@@ -997,4 +998,27 @@ public boolean getAsBoolean() throws Exception {
997998
.test()
998999
.assertResult(1, 1, 1, 1, 1);
9991000
}
1001+
1002+
1003+
@Test
1004+
public void shouldDisposeInnerObservable() {
1005+
final PublishProcessor<Object> subject = PublishProcessor.create();
1006+
final Disposable disposable = Flowable.error(new RuntimeException("Leak"))
1007+
.retryWhen(new Function<Flowable<Throwable>, Flowable<Object>>() {
1008+
@Override
1009+
public Flowable<Object> apply(Flowable<Throwable> errors) throws Exception {
1010+
return errors.switchMap(new Function<Throwable, Flowable<Object>>() {
1011+
@Override
1012+
public Flowable<Object> apply(Throwable ignore) throws Exception {
1013+
return subject;
1014+
}
1015+
});
1016+
}
1017+
})
1018+
.subscribe();
1019+
1020+
assertTrue(subject.hasSubscribers());
1021+
disposable.dispose();
1022+
assertFalse(subject.hasSubscribers());
1023+
}
10001024
}

src/test/java/io/reactivex/internal/operators/observable/ObservableRepeatTest.java

+24-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package io.reactivex.internal.operators.observable;
1515

1616
import static org.junit.Assert.*;
17+
import static org.mockito.ArgumentMatchers.any;
1718
import static org.mockito.Mockito.*;
1819

1920
import java.util.*;
@@ -25,11 +26,12 @@
2526
import io.reactivex.*;
2627
import io.reactivex.Observable;
2728
import io.reactivex.Observer;
28-
import io.reactivex.disposables.Disposables;
29+
import io.reactivex.disposables.*;
2930
import io.reactivex.exceptions.TestException;
3031
import io.reactivex.functions.*;
3132
import io.reactivex.observers.TestObserver;
3233
import io.reactivex.schedulers.Schedulers;
34+
import io.reactivex.subjects.PublishSubject;
3335

3436
public class ObservableRepeatTest {
3537

@@ -257,4 +259,25 @@ public boolean getAsBoolean() throws Exception {
257259
.assertFailure(TestException.class, 1);
258260
}
259261

262+
@Test
263+
public void shouldDisposeInnerObservable() {
264+
final PublishSubject<Object> subject = PublishSubject.create();
265+
final Disposable disposable = Observable.just("Leak")
266+
.repeatWhen(new Function<Observable<Object>, ObservableSource<Object>>() {
267+
@Override
268+
public ObservableSource<Object> apply(Observable<Object> completions) throws Exception {
269+
return completions.switchMap(new Function<Object, ObservableSource<Object>>() {
270+
@Override
271+
public ObservableSource<Object> apply(Object ignore) throws Exception {
272+
return subject;
273+
}
274+
});
275+
}
276+
})
277+
.subscribe();
278+
279+
assertTrue(subject.hasObservers());
280+
disposable.dispose();
281+
assertFalse(subject.hasObservers());
282+
}
260283
}

src/test/java/io/reactivex/internal/operators/observable/ObservableRetryTest.java

+22
Original file line numberDiff line numberDiff line change
@@ -908,4 +908,26 @@ public boolean test(Throwable e) throws Exception {
908908
}
909909
}
910910

911+
@Test
912+
public void shouldDisposeInnerObservable() {
913+
final PublishSubject<Object> subject = PublishSubject.create();
914+
final Disposable disposable = Observable.error(new RuntimeException("Leak"))
915+
.retryWhen(new Function<Observable<Throwable>, ObservableSource<Object>>() {
916+
@Override
917+
public ObservableSource<Object> apply(Observable<Throwable> errors) throws Exception {
918+
return errors.switchMap(new Function<Throwable, ObservableSource<Object>>() {
919+
@Override
920+
public ObservableSource<Object> apply(Throwable ignore) throws Exception {
921+
return subject;
922+
}
923+
});
924+
}
925+
})
926+
.subscribe();
927+
928+
assertTrue(subject.hasObservers());
929+
disposable.dispose();
930+
assertFalse(subject.hasObservers());
931+
}
932+
911933
}

0 commit comments

Comments
 (0)