Skip to content

Commit 90203b6

Browse files
authored
2.x: Fix switchMap to indicate boundary fusion (#5991)
1 parent 214181f commit 90203b6

File tree

5 files changed

+281
-13
lines changed

5 files changed

+281
-13
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableSwitchMap.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,7 @@ public void onSubscribe(Subscription s) {
359359
@SuppressWarnings("unchecked")
360360
QueueSubscription<R> qs = (QueueSubscription<R>) s;
361361

362-
int m = qs.requestFusion(QueueSubscription.ANY);
362+
int m = qs.requestFusion(QueueSubscription.ANY | QueueSubscription.BOUNDARY);
363363
if (m == QueueSubscription.SYNC) {
364364
fusionMode = m;
365365
queue = qs;

src/main/java/io/reactivex/internal/operators/observable/ObservableSwitchMap.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,7 @@ public void onSubscribe(Disposable s) {
348348
@SuppressWarnings("unchecked")
349349
QueueDisposable<R> qd = (QueueDisposable<R>) s;
350350

351-
int m = qd.requestFusion(QueueDisposable.ANY);
351+
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
352352
if (m == QueueDisposable.SYNC) {
353353
queue = qd;
354354
done = true;

src/test/java/io/reactivex/TestHelper.java

+210
Original file line numberDiff line numberDiff line change
@@ -2922,4 +2922,214 @@ public void request(long n) {
29222922
}
29232923
};
29242924
}
2925+
2926+
static final class FlowableStripBoundary<T> extends Flowable<T> implements FlowableTransformer<T, T> {
2927+
2928+
final Flowable<T> source;
2929+
2930+
FlowableStripBoundary(Flowable<T> source) {
2931+
this.source = source;
2932+
}
2933+
2934+
@Override
2935+
public Flowable<T> apply(Flowable<T> upstream) {
2936+
return new FlowableStripBoundary<T>(upstream);
2937+
}
2938+
2939+
@Override
2940+
protected void subscribeActual(Subscriber<? super T> s) {
2941+
source.subscribe(new StripBoundarySubscriber<T>(s));
2942+
}
2943+
2944+
static final class StripBoundarySubscriber<T> implements FlowableSubscriber<T>, QueueSubscription<T> {
2945+
2946+
final Subscriber<? super T> actual;
2947+
2948+
Subscription upstream;
2949+
2950+
QueueSubscription<T> qs;
2951+
2952+
StripBoundarySubscriber(Subscriber<? super T> actual) {
2953+
this.actual = actual;
2954+
}
2955+
2956+
@SuppressWarnings("unchecked")
2957+
@Override
2958+
public void onSubscribe(Subscription subscription) {
2959+
this.upstream = subscription;
2960+
if (subscription instanceof QueueSubscription) {
2961+
qs = (QueueSubscription<T>)subscription;
2962+
}
2963+
actual.onSubscribe(this);
2964+
}
2965+
2966+
@Override
2967+
public void onNext(T t) {
2968+
actual.onNext(t);
2969+
}
2970+
2971+
@Override
2972+
public void onError(Throwable throwable) {
2973+
actual.onError(throwable);
2974+
}
2975+
2976+
@Override
2977+
public void onComplete() {
2978+
actual.onComplete();
2979+
}
2980+
2981+
@Override
2982+
public int requestFusion(int mode) {
2983+
QueueSubscription<T> fs = qs;
2984+
if (fs != null) {
2985+
return fs.requestFusion(mode & ~BOUNDARY);
2986+
}
2987+
return NONE;
2988+
}
2989+
2990+
@Override
2991+
public boolean offer(T value) {
2992+
throw new UnsupportedOperationException("Should not be called");
2993+
}
2994+
2995+
@Override
2996+
public boolean offer(T v1, T v2) {
2997+
throw new UnsupportedOperationException("Should not be called");
2998+
}
2999+
3000+
@Override
3001+
public T poll() throws Exception {
3002+
return qs.poll();
3003+
}
3004+
3005+
@Override
3006+
public void clear() {
3007+
qs.clear();
3008+
}
3009+
3010+
@Override
3011+
public boolean isEmpty() {
3012+
return qs.isEmpty();
3013+
}
3014+
3015+
@Override
3016+
public void request(long n) {
3017+
upstream.request(n);
3018+
}
3019+
3020+
@Override
3021+
public void cancel() {
3022+
upstream.cancel();
3023+
}
3024+
}
3025+
}
3026+
3027+
public static <T> FlowableTransformer<T, T> flowableStripBoundary() {
3028+
return new FlowableStripBoundary<T>(null);
3029+
}
3030+
3031+
static final class ObservableStripBoundary<T> extends Observable<T> implements ObservableTransformer<T, T> {
3032+
3033+
final Observable<T> source;
3034+
3035+
ObservableStripBoundary(Observable<T> source) {
3036+
this.source = source;
3037+
}
3038+
3039+
@Override
3040+
public Observable<T> apply(Observable<T> upstream) {
3041+
return new ObservableStripBoundary<T>(upstream);
3042+
}
3043+
3044+
@Override
3045+
protected void subscribeActual(Observer<? super T> s) {
3046+
source.subscribe(new StripBoundaryObserver<T>(s));
3047+
}
3048+
3049+
static final class StripBoundaryObserver<T> implements Observer<T>, QueueDisposable<T> {
3050+
3051+
final Observer<? super T> actual;
3052+
3053+
Disposable upstream;
3054+
3055+
QueueDisposable<T> qd;
3056+
3057+
StripBoundaryObserver(Observer<? super T> actual) {
3058+
this.actual = actual;
3059+
}
3060+
3061+
@SuppressWarnings("unchecked")
3062+
@Override
3063+
public void onSubscribe(Disposable d) {
3064+
this.upstream = d;
3065+
if (d instanceof QueueDisposable) {
3066+
qd = (QueueDisposable<T>)d;
3067+
}
3068+
actual.onSubscribe(this);
3069+
}
3070+
3071+
@Override
3072+
public void onNext(T t) {
3073+
actual.onNext(t);
3074+
}
3075+
3076+
@Override
3077+
public void onError(Throwable throwable) {
3078+
actual.onError(throwable);
3079+
}
3080+
3081+
@Override
3082+
public void onComplete() {
3083+
actual.onComplete();
3084+
}
3085+
3086+
@Override
3087+
public int requestFusion(int mode) {
3088+
QueueDisposable<T> fs = qd;
3089+
if (fs != null) {
3090+
return fs.requestFusion(mode & ~BOUNDARY);
3091+
}
3092+
return NONE;
3093+
}
3094+
3095+
@Override
3096+
public boolean offer(T value) {
3097+
throw new UnsupportedOperationException("Should not be called");
3098+
}
3099+
3100+
@Override
3101+
public boolean offer(T v1, T v2) {
3102+
throw new UnsupportedOperationException("Should not be called");
3103+
}
3104+
3105+
@Override
3106+
public T poll() throws Exception {
3107+
return qd.poll();
3108+
}
3109+
3110+
@Override
3111+
public void clear() {
3112+
qd.clear();
3113+
}
3114+
3115+
@Override
3116+
public boolean isEmpty() {
3117+
return qd.isEmpty();
3118+
}
3119+
3120+
@Override
3121+
public void dispose() {
3122+
upstream.dispose();
3123+
}
3124+
3125+
@Override
3126+
public boolean isDisposed() {
3127+
return upstream.isDisposed();
3128+
}
3129+
}
3130+
}
3131+
3132+
public static <T> ObservableTransformer<T, T> observableStripBoundary() {
3133+
return new ObservableStripBoundary<T>(null);
3134+
}
29253135
}

src/test/java/io/reactivex/internal/operators/flowable/FlowableSwitchTest.java

+37-7
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
import io.reactivex.internal.util.ExceptionHelper;
3434
import io.reactivex.plugins.RxJavaPlugins;
3535
import io.reactivex.processors.PublishProcessor;
36-
import io.reactivex.schedulers.TestScheduler;
36+
import io.reactivex.schedulers.*;
3737
import io.reactivex.subscribers.*;
3838

3939
public class FlowableSwitchTest {
@@ -1144,12 +1144,16 @@ public void run() {
11441144
@Test
11451145
public void fusedInnerCrash() {
11461146
Flowable.just(1).hide()
1147-
.switchMap(Functions.justFunction(Flowable.just(1).map(new Function<Integer, Object>() {
1148-
@Override
1149-
public Object apply(Integer v) throws Exception {
1150-
throw new TestException();
1151-
}
1152-
})))
1147+
.switchMap(Functions.justFunction(Flowable.just(1)
1148+
.map(new Function<Integer, Object>() {
1149+
@Override
1150+
public Object apply(Integer v) throws Exception {
1151+
throw new TestException();
1152+
}
1153+
})
1154+
.compose(TestHelper.<Object>flowableStripBoundary())
1155+
)
1156+
)
11531157
.test()
11541158
.assertFailure(TestException.class);
11551159
}
@@ -1174,4 +1178,30 @@ public void innerCancelledOnMainError() {
11741178

11751179
ts.assertFailure(TestException.class);
11761180
}
1181+
1182+
@Test
1183+
public void fusedBoundary() {
1184+
String thread = Thread.currentThread().getName();
1185+
1186+
Flowable.range(1, 10000)
1187+
.switchMap(new Function<Integer, Flowable<? extends Object>>() {
1188+
@Override
1189+
public Flowable<? extends Object> apply(Integer v)
1190+
throws Exception {
1191+
return Flowable.just(2).hide()
1192+
.observeOn(Schedulers.single())
1193+
.map(new Function<Integer, Object>() {
1194+
@Override
1195+
public Object apply(Integer w) throws Exception {
1196+
return Thread.currentThread().getName();
1197+
}
1198+
});
1199+
}
1200+
})
1201+
.test()
1202+
.awaitDone(5, TimeUnit.SECONDS)
1203+
.assertNever(thread)
1204+
.assertNoErrors()
1205+
.assertComplete();
1206+
}
11771207
}

src/test/java/io/reactivex/internal/operators/observable/ObservableSwitchTest.java

+32-4
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package io.reactivex.internal.operators.observable;
1515

1616
import static org.junit.Assert.*;
17+
import static org.mockito.ArgumentMatchers.*;
1718
import static org.mockito.Mockito.*;
1819

1920
import java.util.List;
@@ -26,15 +27,14 @@
2627
import io.reactivex.*;
2728
import io.reactivex.disposables.*;
2829
import io.reactivex.exceptions.*;
29-
import io.reactivex.functions.Consumer;
30-
import io.reactivex.functions.Function;
30+
import io.reactivex.functions.*;
3131
import io.reactivex.internal.functions.Functions;
3232
import io.reactivex.internal.schedulers.ImmediateThinScheduler;
3333
import io.reactivex.internal.util.ExceptionHelper;
3434
import io.reactivex.observers.TestObserver;
3535
import io.reactivex.plugins.RxJavaPlugins;
36-
import io.reactivex.schedulers.TestScheduler;
37-
import io.reactivex.subjects.PublishSubject;
36+
import io.reactivex.schedulers.*;
37+
import io.reactivex.subjects.*;
3838

3939
public class ObservableSwitchTest {
4040

@@ -1121,6 +1121,7 @@ public Integer apply(Integer v) throws Exception {
11211121
throw new TestException();
11221122
}
11231123
})
1124+
.compose(TestHelper.<Integer>observableStripBoundary())
11241125
))
11251126
.test();
11261127

@@ -1148,6 +1149,7 @@ public Integer apply(Integer v) throws Exception {
11481149
throw new TestException();
11491150
}
11501151
})
1152+
.compose(TestHelper.<Integer>observableStripBoundary())
11511153
))
11521154
.test();
11531155

@@ -1166,4 +1168,30 @@ public Integer apply(Integer v) throws Exception {
11661168

11671169
assertFalse(ps.hasObservers());
11681170
}
1171+
1172+
@Test
1173+
public void fusedBoundary() {
1174+
String thread = Thread.currentThread().getName();
1175+
1176+
Observable.range(1, 10000)
1177+
.switchMap(new Function<Integer, ObservableSource<? extends Object>>() {
1178+
@Override
1179+
public ObservableSource<? extends Object> apply(Integer v)
1180+
throws Exception {
1181+
return Observable.just(2).hide()
1182+
.observeOn(Schedulers.single())
1183+
.map(new Function<Integer, Object>() {
1184+
@Override
1185+
public Object apply(Integer w) throws Exception {
1186+
return Thread.currentThread().getName();
1187+
}
1188+
});
1189+
}
1190+
})
1191+
.test()
1192+
.awaitDone(5, TimeUnit.SECONDS)
1193+
.assertNever(thread)
1194+
.assertNoErrors()
1195+
.assertComplete();
1196+
}
11691197
}

0 commit comments

Comments
 (0)