Skip to content

Commit 05b0d40

Browse files
authored
2.x: Fix Observable.concatMapSingle dropping upstream items (#5972)
1 parent 63877ae commit 05b0d40

File tree

3 files changed

+37
-3
lines changed

3 files changed

+37
-3
lines changed

src/main/java/io/reactivex/internal/operators/mixed/ObservableConcatMapSingle.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@
1818
import io.reactivex.*;
1919
import io.reactivex.annotations.Experimental;
2020
import io.reactivex.disposables.Disposable;
21-
import io.reactivex.exceptions.*;
21+
import io.reactivex.exceptions.Exceptions;
2222
import io.reactivex.functions.Function;
2323
import io.reactivex.internal.disposables.DisposableHelper;
2424
import io.reactivex.internal.functions.ObjectHelper;
2525
import io.reactivex.internal.fuseable.SimplePlainQueue;
26-
import io.reactivex.internal.queue.SpscArrayQueue;
26+
import io.reactivex.internal.queue.SpscLinkedArrayQueue;
2727
import io.reactivex.internal.util.*;
2828
import io.reactivex.plugins.RxJavaPlugins;
2929

@@ -107,7 +107,7 @@ static final class ConcatMapSingleMainObserver<T, R>
107107
this.errorMode = errorMode;
108108
this.errors = new AtomicThrowable();
109109
this.inner = new ConcatMapSingleObserver<R>(this);
110-
this.queue = new SpscArrayQueue<T>(prefetch);
110+
this.queue = new SpscLinkedArrayQueue<T>(prefetch);
111111
}
112112

113113
@Override

src/test/java/io/reactivex/internal/operators/mixed/ObservableConcatMapMaybeTest.java

+17
Original file line numberDiff line numberDiff line change
@@ -399,4 +399,21 @@ public void cancelNoConcurrentClean() {
399399

400400
assertTrue(operator.queue.isEmpty());
401401
}
402+
403+
@Test
404+
public void checkUnboundedInnerQueue() {
405+
MaybeSubject<Integer> ms = MaybeSubject.create();
406+
407+
@SuppressWarnings("unchecked")
408+
TestObserver<Integer> to = Observable
409+
.fromArray(ms, Maybe.just(2), Maybe.just(3), Maybe.just(4))
410+
.concatMapMaybe(Functions.<Maybe<Integer>>identity(), 2)
411+
.test();
412+
413+
to.assertEmpty();
414+
415+
ms.onSuccess(1);
416+
417+
to.assertResult(1, 2, 3, 4);
418+
}
402419
}

src/test/java/io/reactivex/internal/operators/mixed/ObservableConcatMapSingleTest.java

+17
Original file line numberDiff line numberDiff line change
@@ -336,4 +336,21 @@ public void cancelNoConcurrentClean() {
336336

337337
assertTrue(operator.queue.isEmpty());
338338
}
339+
340+
@Test
341+
public void checkUnboundedInnerQueue() {
342+
SingleSubject<Integer> ss = SingleSubject.create();
343+
344+
@SuppressWarnings("unchecked")
345+
TestObserver<Integer> to = Observable
346+
.fromArray(ss, Single.just(2), Single.just(3), Single.just(4))
347+
.concatMapSingle(Functions.<Single<Integer>>identity(), 2)
348+
.test();
349+
350+
to.assertEmpty();
351+
352+
ss.onSuccess(1);
353+
354+
to.assertResult(1, 2, 3, 4);
355+
}
339356
}

0 commit comments

Comments
 (0)