Skip to content

Commit e76a476

Browse files
authored
2.x: Fix Observable.switchMap main onError not disposing the current inner source (#5833)
* 2.x: Fix Obs.switchMap main onError not disposing the current inner src * Fix error-error race test
1 parent 1fbc44f commit e76a476

File tree

3 files changed

+119
-7
lines changed

3 files changed

+119
-7
lines changed

src/main/java/io/reactivex/internal/operators/observable/ObservableSwitchMap.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -131,15 +131,15 @@ public void onNext(T t) {
131131

132132
@Override
133133
public void onError(Throwable t) {
134-
if (done || !errors.addThrowable(t)) {
134+
if (!done && errors.addThrowable(t)) {
135135
if (!delayErrors) {
136136
disposeInner();
137137
}
138+
done = true;
139+
drain();
140+
} else {
138141
RxJavaPlugins.onError(t);
139-
return;
140142
}
141-
done = true;
142-
drain();
143143
}
144144

145145
@Override

src/test/java/io/reactivex/internal/operators/flowable/FlowableSwitchTest.java

+21
Original file line numberDiff line numberDiff line change
@@ -1153,4 +1153,25 @@ public Object apply(Integer v) throws Exception {
11531153
.test()
11541154
.assertFailure(TestException.class);
11551155
}
1156+
1157+
@Test
1158+
public void innerCancelledOnMainError() {
1159+
final PublishProcessor<Integer> main = PublishProcessor.create();
1160+
final PublishProcessor<Integer> inner = PublishProcessor.create();
1161+
1162+
TestSubscriber<Integer> to = main.switchMap(Functions.justFunction(inner))
1163+
.test();
1164+
1165+
assertTrue(main.hasSubscribers());
1166+
1167+
main.onNext(1);
1168+
1169+
assertTrue(inner.hasSubscribers());
1170+
1171+
main.onError(new TestException());
1172+
1173+
assertFalse(inner.hasSubscribers());
1174+
1175+
to.assertFailure(TestException.class);
1176+
}
11561177
}

src/test/java/io/reactivex/internal/operators/observable/ObservableSwitchTest.java

+94-3
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
import java.util.List;
2020
import java.util.concurrent.TimeUnit;
21-
import java.util.concurrent.atomic.AtomicBoolean;
21+
import java.util.concurrent.atomic.*;
2222

2323
import org.junit.*;
2424
import org.mockito.InOrder;
@@ -768,7 +768,7 @@ public void run() {
768768

769769
@Test
770770
public void outerInnerErrorRace() {
771-
for (int i = 0; i < 500; i++) {
771+
for (int i = 0; i < 5000; i++) {
772772
List<Throwable> errors = TestHelper.trackPluginErrors();
773773
try {
774774

@@ -786,6 +786,8 @@ public ObservableSource<Integer> apply(Integer v) throws Exception {
786786
})
787787
.test();
788788

789+
ps1.onNext(1);
790+
789791
final TestException ex1 = new TestException();
790792

791793
Runnable r1 = new Runnable() {
@@ -807,7 +809,7 @@ public void run() {
807809
TestHelper.race(r1, r2);
808810

809811
for (Throwable e : errors) {
810-
assertTrue(e.toString(), e instanceof TestException);
812+
assertTrue(e.getCause().toString(), e.getCause() instanceof TestException);
811813
}
812814
} finally {
813815
RxJavaPlugins.reset();
@@ -963,4 +965,93 @@ public void onNext(Integer t) {
963965

964966
to.assertFailure(TestException.class, 1);
965967
}
968+
969+
@Test
970+
public void innerDisposedOnMainError() {
971+
final PublishSubject<Integer> main = PublishSubject.create();
972+
final PublishSubject<Integer> inner = PublishSubject.create();
973+
974+
TestObserver<Integer> to = main.switchMap(Functions.justFunction(inner))
975+
.test();
976+
977+
assertTrue(main.hasObservers());
978+
979+
main.onNext(1);
980+
981+
assertTrue(inner.hasObservers());
982+
983+
main.onError(new TestException());
984+
985+
assertFalse(inner.hasObservers());
986+
987+
to.assertFailure(TestException.class);
988+
}
989+
990+
@Test
991+
public void outerInnerErrorRaceIgnoreDispose() {
992+
for (int i = 0; i < 5000; i++) {
993+
List<Throwable> errors = TestHelper.trackPluginErrors();
994+
try {
995+
996+
final AtomicReference<Observer<? super Integer>> obs1 = new AtomicReference<Observer<? super Integer>>();
997+
final Observable<Integer> ps1 = new Observable<Integer>() {
998+
@Override
999+
protected void subscribeActual(
1000+
Observer<? super Integer> observer) {
1001+
obs1.set(observer);
1002+
}
1003+
};
1004+
final AtomicReference<Observer<? super Integer>> obs2 = new AtomicReference<Observer<? super Integer>>();
1005+
final Observable<Integer> ps2 = new Observable<Integer>() {
1006+
@Override
1007+
protected void subscribeActual(
1008+
Observer<? super Integer> observer) {
1009+
obs2.set(observer);
1010+
}
1011+
};
1012+
1013+
ps1.switchMap(new Function<Integer, ObservableSource<Integer>>() {
1014+
@Override
1015+
public ObservableSource<Integer> apply(Integer v) throws Exception {
1016+
if (v == 1) {
1017+
return ps2;
1018+
}
1019+
return Observable.never();
1020+
}
1021+
})
1022+
.test();
1023+
1024+
obs1.get().onSubscribe(Disposables.empty());
1025+
obs1.get().onNext(1);
1026+
1027+
obs2.get().onSubscribe(Disposables.empty());
1028+
1029+
final TestException ex1 = new TestException();
1030+
1031+
Runnable r1 = new Runnable() {
1032+
@Override
1033+
public void run() {
1034+
obs1.get().onError(ex1);
1035+
}
1036+
};
1037+
1038+
final TestException ex2 = new TestException();
1039+
1040+
Runnable r2 = new Runnable() {
1041+
@Override
1042+
public void run() {
1043+
obs2.get().onError(ex2);
1044+
}
1045+
};
1046+
1047+
TestHelper.race(r1, r2);
1048+
1049+
for (Throwable e : errors) {
1050+
assertTrue(e.toString(), e.getCause() instanceof TestException);
1051+
}
1052+
} finally {
1053+
RxJavaPlugins.reset();
1054+
}
1055+
}
1056+
}
9661057
}

0 commit comments

Comments
 (0)