Skip to content

Commit f79625a

Browse files
authored
2.x: fix Flowable.concatMapEager hang due to bad request management (#4751)
* 2.x: fix Flowable.concatMapEager hang due to bad request management * Missed negation of check
1 parent 792d1cf commit f79625a

File tree

4 files changed

+52
-37
lines changed

4 files changed

+52
-37
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableConcatMapEager.java

+12-7
Original file line numberDiff line numberDiff line change
@@ -233,12 +233,11 @@ public void drain() {
233233
int missed = 1;
234234
InnerQueuedSubscriber<R> inner = current;
235235
Subscriber<? super R> a = actual;
236-
long r = requested.get();
237-
long e = 0L;
238236
ErrorMode em = errorMode;
239237

240-
outer:
241238
for (;;) {
239+
long r = requested.get();
240+
long e = 0L;
242241

243242
if (inner == null) {
244243

@@ -271,6 +270,8 @@ public void drain() {
271270
}
272271
}
273272

273+
boolean continueNextSource = false;
274+
274275
if (inner != null) {
275276
SimpleQueue<R> q = inner.queue();
276277
if (q != null) {
@@ -313,7 +314,8 @@ public void drain() {
313314
inner = null;
314315
current = null;
315316
s.request(1);
316-
continue outer;
317+
continueNextSource = true;
318+
break;
317319
}
318320

319321
if (empty) {
@@ -353,15 +355,18 @@ public void drain() {
353355
inner = null;
354356
current = null;
355357
s.request(1);
356-
continue;
358+
continueNextSource = true;
357359
}
358360
}
359361
}
360362
}
361363

362364
if (e != 0L && r != Long.MAX_VALUE) {
363-
r = requested.addAndGet(-e);
364-
e = 0L;
365+
requested.addAndGet(-e);
366+
}
367+
368+
if (continueNextSource) {
369+
continue;
365370
}
366371

367372
missed = addAndGet(-missed);

src/main/java/io/reactivex/internal/subscribers/InnerQueuedSubscriber.java

+18-14
Original file line numberDiff line numberDiff line change
@@ -72,14 +72,14 @@ public void onSubscribe(Subscription s) {
7272
if (m == QueueSubscription.ASYNC) {
7373
fusionMode = m;
7474
queue = qs;
75-
QueueDrainHelper.request(get(), prefetch);
75+
QueueDrainHelper.request(s, prefetch);
7676
return;
7777
}
7878
}
7979

8080
queue = QueueDrainHelper.createQueue(prefetch);
8181

82-
QueueDrainHelper.request(get(), prefetch);
82+
QueueDrainHelper.request(s, prefetch);
8383
}
8484
}
8585

@@ -104,22 +104,26 @@ public void onComplete() {
104104

105105
@Override
106106
public void request(long n) {
107-
long p = produced + n;
108-
if (p >= limit) {
109-
produced = 0L;
110-
get().request(p);
111-
} else {
112-
produced = p;
107+
if (fusionMode != QueueSubscription.SYNC) {
108+
long p = produced + n;
109+
if (p >= limit) {
110+
produced = 0L;
111+
get().request(p);
112+
} else {
113+
produced = p;
114+
}
113115
}
114116
}
115117

116118
public void requestOne() {
117-
long p = produced + 1;
118-
if (p == limit) {
119-
produced = 0L;
120-
get().request(p);
121-
} else {
122-
produced = p;
119+
if (fusionMode != QueueSubscription.SYNC) {
120+
long p = produced + 1;
121+
if (p == limit) {
122+
produced = 0L;
123+
get().request(p);
124+
} else {
125+
produced = p;
126+
}
123127
}
124128
}
125129

src/test/java/io/reactivex/TestHelper.java

+16-11
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import io.reactivex.disposables.*;
3131
import io.reactivex.exceptions.*;
3232
import io.reactivex.functions.*;
33+
import io.reactivex.internal.functions.ObjectHelper;
3334
import io.reactivex.internal.fuseable.*;
3435
import io.reactivex.internal.operators.maybe.MaybeToFlowable;
3536
import io.reactivex.internal.operators.single.SingleToFlowable;
@@ -144,21 +145,25 @@ public void accept(Throwable t) {
144145
}
145146

146147
public static void assertError(List<Throwable> list, int index, Class<? extends Throwable> clazz) {
147-
try {
148-
assertTrue(list.get(index).toString(), clazz.isInstance(list.get(index)));
149-
} catch (AssertionError e) {
150-
list.get(index).printStackTrace();
151-
throw e;
148+
Throwable ex = list.get(index);
149+
if (!clazz.isInstance(ex)) {
150+
AssertionError err = new AssertionError(clazz + " expected but got " + list.get(index));
151+
err.initCause(list.get(index));
152+
throw err;
152153
}
153154
}
154155

155156
public static void assertError(List<Throwable> list, int index, Class<? extends Throwable> clazz, String message) {
156-
try {
157-
assertTrue(list.get(index).toString(), clazz.isInstance(list.get(index)));
158-
assertEquals(message, list.get(index).getMessage());
159-
} catch (AssertionError e) {
160-
list.get(index).printStackTrace();
161-
throw e;
157+
Throwable ex = list.get(index);
158+
if (!clazz.isInstance(ex)) {
159+
AssertionError err = new AssertionError("Type " + clazz + " expected but got " + ex);
160+
err.initCause(ex);
161+
throw err;
162+
}
163+
if (!ObjectHelper.equals(message, ex.getMessage())) {
164+
AssertionError err = new AssertionError("Message " + message + " expected but got " + ex.getMessage());
165+
err.initCause(ex);
166+
throw err;
162167
}
163168
}
164169

src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatMapEagerTest.java

+6-5
Original file line numberDiff line numberDiff line change
@@ -577,11 +577,12 @@ public void testAsynchronousRun() {
577577
public Flowable<Integer> apply(Integer t) {
578578
return Flowable.range(1, 1000).subscribeOn(Schedulers.computation());
579579
}
580-
}).observeOn(Schedulers.newThread()).subscribe(ts);
581-
582-
ts.awaitTerminalEvent(5, TimeUnit.SECONDS);
583-
ts.assertNoErrors();
584-
ts.assertValueCount(2000);
580+
}).observeOn(Schedulers.single())
581+
.test()
582+
.awaitDone(5, TimeUnit.SECONDS)
583+
.assertNoErrors()
584+
.assertValueCount(2000)
585+
.assertComplete();
585586
}
586587

587588
@Test

0 commit comments

Comments
 (0)