Skip to content

Commit 2cb20bd

Browse files
authored
2.x: Fix Observable.flatMap with maxConcurrency hangs (#6947) (#6960)
1 parent 7ca43c7 commit 2cb20bd

File tree

3 files changed

+73
-12
lines changed

3 files changed

+73
-12
lines changed

src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMap.java

+27-12
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,7 @@ void drainLoop() {
334334
if (checkTerminate()) {
335335
return;
336336
}
337+
int innerCompleted = 0;
337338
SimplePlainQueue<U> svq = queue;
338339

339340
if (svq != null) {
@@ -349,9 +350,18 @@ void drainLoop() {
349350
}
350351

351352
child.onNext(o);
353+
innerCompleted++;
352354
}
353355
}
354356

357+
if (innerCompleted != 0) {
358+
if (maxConcurrency != Integer.MAX_VALUE) {
359+
subscribeMore(innerCompleted);
360+
innerCompleted = 0;
361+
}
362+
continue;
363+
}
364+
355365
boolean d = done;
356366
svq = queue;
357367
InnerObserver<?, ?>[] inner = observers.get();
@@ -376,7 +386,6 @@ void drainLoop() {
376386
return;
377387
}
378388

379-
int innerCompleted = 0;
380389
if (n != 0) {
381390
long startId = lastId;
382391
int index = lastIndex;
@@ -463,27 +472,33 @@ void drainLoop() {
463472

464473
if (innerCompleted != 0) {
465474
if (maxConcurrency != Integer.MAX_VALUE) {
466-
while (innerCompleted-- != 0) {
467-
ObservableSource<? extends U> p;
468-
synchronized (this) {
469-
p = sources.poll();
470-
if (p == null) {
471-
wip--;
472-
continue;
473-
}
474-
}
475-
subscribeInner(p);
476-
}
475+
subscribeMore(innerCompleted);
476+
innerCompleted = 0;
477477
}
478478
continue;
479479
}
480+
480481
missed = addAndGet(-missed);
481482
if (missed == 0) {
482483
break;
483484
}
484485
}
485486
}
486487

488+
void subscribeMore(int innerCompleted) {
489+
while (innerCompleted-- != 0) {
490+
ObservableSource<? extends U> p;
491+
synchronized (this) {
492+
p = sources.poll();
493+
if (p == null) {
494+
wip--;
495+
continue;
496+
}
497+
}
498+
subscribeInner(p);
499+
}
500+
}
501+
487502
boolean checkTerminate() {
488503
if (cancelled) {
489504
return true;

src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapTest.java

+23
Original file line numberDiff line numberDiff line change
@@ -1157,4 +1157,27 @@ public void innerErrorsMainCancelled() {
11571157

11581158
assertFalse("Has subscribers?", pp1.hasSubscribers());
11591159
}
1160+
1161+
@Test(timeout = 5000)
1162+
public void mixedScalarAsync() {
1163+
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
1164+
Flowable
1165+
.range(0, 20)
1166+
.flatMap(new Function<Integer, Publisher<?>>() {
1167+
@Override
1168+
public Publisher<?> apply(Integer integer) throws Exception {
1169+
if (integer % 5 != 0) {
1170+
return Flowable
1171+
.just(integer);
1172+
}
1173+
1174+
return Flowable
1175+
.just(-integer)
1176+
.observeOn(Schedulers.computation());
1177+
}
1178+
}, false, 1)
1179+
.ignoreElements()
1180+
.blockingAwait();
1181+
}
1182+
}
11601183
}

src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapTest.java

+23
Original file line numberDiff line numberDiff line change
@@ -1118,4 +1118,27 @@ public void innerErrorsMainCancelled() {
11181118

11191119
assertFalse("Has subscribers?", ps1.hasObservers());
11201120
}
1121+
1122+
@Test(timeout = 5000)
1123+
public void mixedScalarAsync() {
1124+
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
1125+
Observable
1126+
.range(0, 20)
1127+
.flatMap(new Function<Integer, ObservableSource<?>>() {
1128+
@Override
1129+
public ObservableSource<?> apply(Integer integer) throws Exception {
1130+
if (integer % 5 != 0) {
1131+
return Observable
1132+
.just(integer);
1133+
}
1134+
1135+
return Observable
1136+
.just(-integer)
1137+
.observeOn(Schedulers.computation());
1138+
}
1139+
}, false, 1)
1140+
.ignoreElements()
1141+
.blockingAwait();
1142+
}
1143+
}
11211144
}

0 commit comments

Comments
 (0)