Skip to content

Commit d9d1b7e

Browse files
authored
2.x: fix flatMapX calling SpscLinkedArrayQueue.offer concurrently (#4678)
1 parent 4562e69 commit d9d1b7e

File tree

4 files changed

+18
-6
lines changed

4 files changed

+18
-6
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapMaybe.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -191,14 +191,18 @@ void innerSuccess(InnerObserver inner, R value) {
191191
}
192192
} else {
193193
SpscLinkedArrayQueue<R> q = getOrCreateQueue();
194-
q.offer(value);
194+
synchronized (q) {
195+
q.offer(value);
196+
}
195197
}
196198
if (decrementAndGet() == 0) {
197199
return;
198200
}
199201
} else {
200202
SpscLinkedArrayQueue<R> q = getOrCreateQueue();
201-
q.offer(value);
203+
synchronized (q) {
204+
q.offer(value);
205+
}
202206
active.decrementAndGet();
203207
if (getAndIncrement() != 0) {
204208
return;

src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapSingle.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -191,14 +191,18 @@ void innerSuccess(InnerObserver inner, R value) {
191191
}
192192
} else {
193193
SpscLinkedArrayQueue<R> q = getOrCreateQueue();
194-
q.offer(value);
194+
synchronized (q) {
195+
q.offer(value);
196+
}
195197
}
196198
if (decrementAndGet() == 0) {
197199
return;
198200
}
199201
} else {
200202
SpscLinkedArrayQueue<R> q = getOrCreateQueue();
201-
q.offer(value);
203+
synchronized (q) {
204+
q.offer(value);
205+
}
202206
active.decrementAndGet();
203207
if (getAndIncrement() != 0) {
204208
return;

src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapMaybe.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,9 @@ void innerSuccess(InnerObserver inner, R value) {
167167
}
168168
} else {
169169
SpscLinkedArrayQueue<R> q = getOrCreateQueue();
170-
q.offer(value);
170+
synchronized (q) {
171+
q.offer(value);
172+
}
171173
active.decrementAndGet();
172174
if (getAndIncrement() != 0) {
173175
return;

src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapSingle.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,9 @@ void innerSuccess(InnerObserver inner, R value) {
167167
}
168168
} else {
169169
SpscLinkedArrayQueue<R> q = getOrCreateQueue();
170-
q.offer(value);
170+
synchronized (q) {
171+
q.offer(value);
172+
}
171173
active.decrementAndGet();
172174
if (getAndIncrement() != 0) {
173175
return;

0 commit comments

Comments
 (0)