Skip to content

Commit 1ea1e2a

Browse files
authored
2.x: Cleanup Observable.flatMap drain logic (#6232)
1 parent fd48d56 commit 1ea1e2a

File tree

2 files changed

+55
-28
lines changed

2 files changed

+55
-28
lines changed

src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMap.java

+14-26
Original file line numberDiff line numberDiff line change
@@ -338,23 +338,17 @@ void drainLoop() {
338338

339339
if (svq != null) {
340340
for (;;) {
341-
U o;
342-
for (;;) {
343-
if (checkTerminate()) {
344-
return;
345-
}
346-
347-
o = svq.poll();
341+
if (checkTerminate()) {
342+
return;
343+
}
348344

349-
if (o == null) {
350-
break;
351-
}
345+
U o = svq.poll();
352346

353-
child.onNext(o);
354-
}
355347
if (o == null) {
356348
break;
357349
}
350+
351+
child.onNext(o);
358352
}
359353
}
360354

@@ -415,17 +409,10 @@ void drainLoop() {
415409

416410
@SuppressWarnings("unchecked")
417411
InnerObserver<T, U> is = (InnerObserver<T, U>)inner[j];
418-
419-
for (;;) {
420-
if (checkTerminate()) {
421-
return;
422-
}
423-
SimpleQueue<U> q = is.queue;
424-
if (q == null) {
425-
break;
426-
}
427-
U o;
412+
SimpleQueue<U> q = is.queue;
413+
if (q != null) {
428414
for (;;) {
415+
U o;
429416
try {
430417
o = q.poll();
431418
} catch (Throwable ex) {
@@ -437,7 +424,10 @@ void drainLoop() {
437424
}
438425
removeInner(is);
439426
innerCompleted = true;
440-
i++;
427+
j++;
428+
if (j == n) {
429+
j = 0;
430+
}
441431
continue sourceLoop;
442432
}
443433
if (o == null) {
@@ -450,10 +440,8 @@ void drainLoop() {
450440
return;
451441
}
452442
}
453-
if (o == null) {
454-
break;
455-
}
456443
}
444+
457445
boolean innerDone = is.done;
458446
SimpleQueue<U> innerQueue = is.queue;
459447
if (innerDone && (innerQueue == null || innerQueue.isEmpty())) {

src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapTest.java

+41-2
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,14 @@
2626
import io.reactivex.*;
2727
import io.reactivex.Observable;
2828
import io.reactivex.Observer;
29-
import io.reactivex.disposables.Disposable;
29+
import io.reactivex.disposables.*;
3030
import io.reactivex.exceptions.*;
3131
import io.reactivex.functions.*;
3232
import io.reactivex.internal.functions.Functions;
3333
import io.reactivex.observers.TestObserver;
3434
import io.reactivex.plugins.RxJavaPlugins;
3535
import io.reactivex.schedulers.Schedulers;
36-
import io.reactivex.subjects.PublishSubject;
36+
import io.reactivex.subjects.*;
3737

3838
public class ObservableFlatMapTest {
3939
@Test
@@ -1006,4 +1006,43 @@ public void onNext(Integer t) {
10061006

10071007
to.assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
10081008
}
1009+
1010+
@Test
1011+
public void fusedSourceCrashResumeWithNextSource() {
1012+
final UnicastSubject<Integer> fusedSource = UnicastSubject.create();
1013+
TestObserver<Integer> to = new TestObserver<Integer>();
1014+
1015+
ObservableFlatMap.MergeObserver<Integer, Integer> merger =
1016+
new ObservableFlatMap.MergeObserver<Integer, Integer>(to, new Function<Integer, Observable<Integer>>() {
1017+
@Override
1018+
public Observable<Integer> apply(Integer t)
1019+
throws Exception {
1020+
if (t == 0) {
1021+
return fusedSource
1022+
.map(new Function<Integer, Integer>() {
1023+
@Override
1024+
public Integer apply(Integer v)
1025+
throws Exception { throw new TestException(); }
1026+
})
1027+
.compose(TestHelper.<Integer>observableStripBoundary());
1028+
}
1029+
return Observable.range(10 * t, 5);
1030+
}
1031+
}, true, Integer.MAX_VALUE, 128);
1032+
1033+
merger.onSubscribe(Disposables.empty());
1034+
merger.getAndIncrement();
1035+
1036+
merger.onNext(0);
1037+
merger.onNext(1);
1038+
merger.onNext(2);
1039+
1040+
assertTrue(fusedSource.hasObservers());
1041+
1042+
fusedSource.onNext(-1);
1043+
1044+
merger.drainLoop();
1045+
1046+
to.assertValuesOnly(10, 11, 12, 13, 14, 20, 21, 22, 23, 24);
1047+
}
10091048
}

0 commit comments

Comments
 (0)