Skip to content

Commit 4c7e62c

Browse files
authored
2.x: fix wrong reference check in FlattenIterable (#4165)
1 parent 75c40b5 commit 4c7e62c

File tree

2 files changed

+57
-5
lines changed

2 files changed

+57
-5
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableFlattenIterable.java

+7-5
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@
1919

2020
import org.reactivestreams.*;
2121

22-
import io.reactivex.functions.*;
22+
import io.reactivex.exceptions.MissingBackpressureException;
23+
import io.reactivex.functions.Function;
2324
import io.reactivex.internal.functions.Objects;
2425
import io.reactivex.internal.fuseable.QueueSubscription;
2526
import io.reactivex.internal.queue.SpscArrayQueue;
@@ -166,7 +167,7 @@ public void onSubscribe(Subscription s) {
166167
@Override
167168
public void onNext(T t) {
168169
if (fusionMode != ASYNC && !queue.offer(t)) {
169-
onError(new IllegalStateException("Queue is full?!"));
170+
onError(new MissingBackpressureException("Queue is full?!"));
170171
return;
171172
}
172173
drain();
@@ -363,13 +364,14 @@ boolean checkTerminated(boolean d, boolean empty, Subscriber<?> a, Queue<?> q) {
363364
return true;
364365
}
365366
if (d) {
366-
if (error != null) {
367-
Throwable e = Exceptions.terminate(error);
367+
Throwable ex = error.get();
368+
if (ex != null) {
369+
ex = Exceptions.terminate(error);
368370

369371
current = null;
370372
q.clear();
371373

372-
a.onError(e);
374+
a.onError(ex);
373375
return true;
374376
} else
375377
if (empty) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.flowable;
15+
16+
import java.util.Arrays;
17+
18+
import org.junit.Test;
19+
20+
import io.reactivex.Flowable;
21+
import io.reactivex.functions.*;
22+
import io.reactivex.subscribers.TestSubscriber;
23+
24+
public class FlowableFlattenIterableTest {
25+
26+
@Test
27+
public void normal() {
28+
29+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
30+
31+
Flowable.range(1, 2)
32+
.reduce(new BiFunction<Integer, Integer, Integer>() {
33+
@Override
34+
public Integer apply(Integer a, Integer b) {
35+
return Math.max(a, b);
36+
}
37+
})
38+
.flatMapIterable(new Function<Integer, Iterable<Integer>>() {
39+
@Override
40+
public Iterable<Integer> apply(Integer v) {
41+
return Arrays.asList(v, v + 1);
42+
}
43+
})
44+
.subscribe(ts);
45+
46+
ts.assertValues(2, 3)
47+
.assertNoErrors()
48+
.assertComplete();
49+
}
50+
}

0 commit comments

Comments
 (0)