Skip to content

Commit db75e89

Browse files
authored
2.x: fix flatMap emitting the terminal exception indicator on cancel (#5188)
1 parent ce1f2d0 commit db75e89

File tree

4 files changed

+166
-11
lines changed

4 files changed

+166
-11
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMap.java

+10-5
Original file line numberDiff line numberDiff line change
@@ -421,10 +421,12 @@ void drainLoop() {
421421

422422
if (d && (svq == null || svq.isEmpty()) && n == 0) {
423423
Throwable ex = errs.terminate();
424-
if (ex == null) {
425-
child.onComplete();
426-
} else {
427-
child.onError(ex);
424+
if (ex != ExceptionHelper.TERMINATED) {
425+
if (ex == null) {
426+
child.onComplete();
427+
} else {
428+
child.onError(ex);
429+
}
428430
}
429431
return;
430432
}
@@ -556,7 +558,10 @@ boolean checkTerminate() {
556558
}
557559
if (!delayErrors && errs.get() != null) {
558560
clearScalarQueue();
559-
actual.onError(errs.terminate());
561+
Throwable ex = errs.terminate();
562+
if (ex != ExceptionHelper.TERMINATED) {
563+
actual.onError(ex);
564+
}
560565
return true;
561566
}
562567
return false;

src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMap.java

+11-6
Original file line numberDiff line numberDiff line change
@@ -361,11 +361,13 @@ void drainLoop() {
361361
int n = inner.length;
362362

363363
if (d && (svq == null || svq.isEmpty()) && n == 0) {
364-
Throwable ex = errors.get();
365-
if (ex == null) {
366-
child.onComplete();
367-
} else {
368-
child.onError(errors.terminate());
364+
Throwable ex = errors.terminate();
365+
if (ex != ExceptionHelper.TERMINATED) {
366+
if (ex == null) {
367+
child.onComplete();
368+
} else {
369+
child.onError(ex);
370+
}
369371
}
370372
return;
371373
}
@@ -488,7 +490,10 @@ boolean checkTerminate() {
488490
Throwable e = errors.get();
489491
if (!delayErrors && (e != null)) {
490492
disposeAll();
491-
actual.onError(errors.terminate());
493+
e = errors.terminate();
494+
if (e != ExceptionHelper.TERMINATED) {
495+
actual.onError(e);
496+
}
492497
return true;
493498
}
494499
return false;

src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapTest.java

+71
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import io.reactivex.exceptions.*;
2929
import io.reactivex.functions.*;
3030
import io.reactivex.internal.functions.Functions;
31+
import io.reactivex.plugins.RxJavaPlugins;
3132
import io.reactivex.processors.PublishProcessor;
3233
import io.reactivex.schedulers.Schedulers;
3334
import io.reactivex.subscribers.TestSubscriber;
@@ -926,4 +927,74 @@ public Object apply(Integer v) throws Exception {
926927
assertTrue(list.toString(), list.contains("RxCo"));
927928
}
928929
}
930+
931+
@Test
932+
public void cancelScalarDrainRace() {
933+
for (int i = 0; i < 1000; i++) {
934+
List<Throwable> errors = TestHelper.trackPluginErrors();
935+
try {
936+
937+
final PublishProcessor<Flowable<Integer>> pp = PublishProcessor.create();
938+
939+
final TestSubscriber<Integer> ts = pp.flatMap(Functions.<Flowable<Integer>>identity()).test(0);
940+
941+
Runnable r1 = new Runnable() {
942+
@Override
943+
public void run() {
944+
ts.cancel();
945+
}
946+
};
947+
Runnable r2 = new Runnable() {
948+
@Override
949+
public void run() {
950+
pp.onComplete();
951+
}
952+
};
953+
954+
TestHelper.race(r1, r2);
955+
956+
assertTrue(errors.toString(), errors.isEmpty());
957+
} finally {
958+
RxJavaPlugins.reset();
959+
}
960+
}
961+
}
962+
963+
@Test
964+
public void cancelDrainRace() {
965+
for (int i = 0; i < 1000; i++) {
966+
for (int j = 1; j < 50; j += 5) {
967+
List<Throwable> errors = TestHelper.trackPluginErrors();
968+
try {
969+
970+
final PublishProcessor<Flowable<Integer>> pp = PublishProcessor.create();
971+
972+
final TestSubscriber<Integer> ts = pp.flatMap(Functions.<Flowable<Integer>>identity()).test(0);
973+
974+
final PublishProcessor<Integer> just = PublishProcessor.create();
975+
pp.onNext(just);
976+
977+
Runnable r1 = new Runnable() {
978+
@Override
979+
public void run() {
980+
ts.request(1);
981+
ts.cancel();
982+
}
983+
};
984+
Runnable r2 = new Runnable() {
985+
@Override
986+
public void run() {
987+
just.onNext(1);
988+
}
989+
};
990+
991+
TestHelper.race(r1, r2);
992+
993+
assertTrue(errors.toString(), errors.isEmpty());
994+
} finally {
995+
RxJavaPlugins.reset();
996+
}
997+
}
998+
}
999+
}
9291000
}

src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapTest.java

+74
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@
2929
import io.reactivex.disposables.Disposable;
3030
import io.reactivex.exceptions.*;
3131
import io.reactivex.functions.*;
32+
import io.reactivex.internal.functions.Functions;
3233
import io.reactivex.observers.TestObserver;
34+
import io.reactivex.plugins.RxJavaPlugins;
3335
import io.reactivex.schedulers.Schedulers;
3436
import io.reactivex.subjects.PublishSubject;
3537

@@ -784,4 +786,76 @@ public Object apply(Integer v) throws Exception {
784786
assertTrue(list.toString(), list.contains("RxCo"));
785787
}
786788
}
789+
790+
@Test
791+
public void cancelScalarDrainRace() {
792+
for (int i = 0; i < 1000; i++) {
793+
List<Throwable> errors = TestHelper.trackPluginErrors();
794+
try {
795+
796+
final PublishSubject<Observable<Integer>> pp = PublishSubject.create();
797+
798+
final TestObserver<Integer> ts = pp.flatMap(Functions.<Observable<Integer>>identity()).test();
799+
800+
Runnable r1 = new Runnable() {
801+
@Override
802+
public void run() {
803+
ts.cancel();
804+
}
805+
};
806+
Runnable r2 = new Runnable() {
807+
@Override
808+
public void run() {
809+
pp.onComplete();
810+
}
811+
};
812+
813+
TestHelper.race(r1, r2);
814+
815+
assertTrue(errors.toString(), errors.isEmpty());
816+
} finally {
817+
RxJavaPlugins.reset();
818+
}
819+
}
820+
}
821+
822+
@Test
823+
public void cancelDrainRace() {
824+
for (int i = 0; i < 1000; i++) {
825+
for (int j = 1; j < 50; j += 5) {
826+
List<Throwable> errors = TestHelper.trackPluginErrors();
827+
try {
828+
829+
final PublishSubject<Observable<Integer>> pp = PublishSubject.create();
830+
831+
final TestObserver<Integer> ts = pp.flatMap(Functions.<Observable<Integer>>identity()).test();
832+
833+
final PublishSubject<Integer> just = PublishSubject.create();
834+
final PublishSubject<Integer> just2 = PublishSubject.create();
835+
pp.onNext(just);
836+
pp.onNext(just2);
837+
838+
Runnable r1 = new Runnable() {
839+
@Override
840+
public void run() {
841+
just2.onNext(1);
842+
ts.cancel();
843+
}
844+
};
845+
Runnable r2 = new Runnable() {
846+
@Override
847+
public void run() {
848+
just.onNext(1);
849+
}
850+
};
851+
852+
TestHelper.race(r1, r2);
853+
854+
assertTrue(errors.toString(), errors.isEmpty());
855+
} finally {
856+
RxJavaPlugins.reset();
857+
}
858+
}
859+
}
860+
}
787861
}

0 commit comments

Comments
 (0)