Skip to content

Commit 6a5c9aa

Browse files
committed
2.x: add full implementation for Single.flatMapPublisher so doesn't batch requests (#6015)
1 parent 07e126f commit 6a5c9aa

File tree

3 files changed

+206
-1
lines changed

3 files changed

+206
-1
lines changed

src/main/java/io/reactivex/Single.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -2460,7 +2460,8 @@ public final <R> Maybe<R> flatMapMaybe(final Function<? super T, ? extends Maybe
24602460
@CheckReturnValue
24612461
@SchedulerSupport(SchedulerSupport.NONE)
24622462
public final <R> Flowable<R> flatMapPublisher(Function<? super T, ? extends Publisher<? extends R>> mapper) {
2463-
return toFlowable().flatMap(mapper);
2463+
ObjectHelper.requireNonNull(mapper, "mapper is null");
2464+
return RxJavaPlugins.onAssembly(new SingleFlatMapPublisher<T, R>(this, mapper));
24642465
}
24652466

24662467
/**
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.single;
15+
16+
import java.util.concurrent.atomic.AtomicLong;
17+
import java.util.concurrent.atomic.AtomicReference;
18+
19+
import org.reactivestreams.Publisher;
20+
import org.reactivestreams.Subscriber;
21+
import org.reactivestreams.Subscription;
22+
23+
import io.reactivex.Flowable;
24+
import io.reactivex.SingleObserver;
25+
import io.reactivex.SingleSource;
26+
import io.reactivex.disposables.Disposable;
27+
import io.reactivex.exceptions.Exceptions;
28+
import io.reactivex.functions.Function;
29+
import io.reactivex.internal.functions.ObjectHelper;
30+
import io.reactivex.internal.subscriptions.SubscriptionHelper;
31+
32+
public final class SingleFlatMapPublisher<S, T> extends Flowable<T> {
33+
34+
final SingleSource<S> source;
35+
final Function<? super S, ? extends Publisher<? extends T>> mapper;
36+
37+
public SingleFlatMapPublisher(SingleSource<S> source,
38+
Function<? super S, ? extends Publisher<? extends T>> mapper) {
39+
this.source = source;
40+
this.mapper = mapper;
41+
}
42+
43+
@Override
44+
protected void subscribeActual(Subscriber<? super T> actual) {
45+
source.subscribe(new SingleFlatMapPublisherObserver<S, T>(actual, mapper));
46+
}
47+
48+
static final class SingleFlatMapPublisherObserver<S, T> extends AtomicLong
49+
implements SingleObserver<S>, Subscriber<T>, Subscription {
50+
51+
private static final long serialVersionUID = 7759721921468635667L;
52+
53+
final Subscriber<? super T> actual;
54+
final Function<? super S, ? extends Publisher<? extends T>> mapper;
55+
final AtomicReference<Subscription> parent = new AtomicReference<Subscription>();
56+
Disposable disposable;
57+
58+
SingleFlatMapPublisherObserver(Subscriber<? super T> actual,
59+
Function<? super S, ? extends Publisher<? extends T>> mapper) {
60+
this.actual = actual;
61+
this.mapper = mapper;
62+
}
63+
64+
@Override
65+
public void onSubscribe(Disposable d) {
66+
this.disposable = d;
67+
actual.onSubscribe(this);
68+
}
69+
70+
@Override
71+
public void onSuccess(S value) {
72+
Publisher<? extends T> f;
73+
try {
74+
f = ObjectHelper.requireNonNull(mapper.apply(value), "mapper returns null");
75+
} catch (Exception e) {
76+
Exceptions.throwIfFatal(e);
77+
actual.onError(e);
78+
return;
79+
}
80+
f.subscribe(this);
81+
}
82+
83+
@Override
84+
public void onSubscribe(Subscription s) {
85+
SubscriptionHelper.deferredSetOnce(parent, this, s);
86+
}
87+
88+
@Override
89+
public void onNext(T t) {
90+
actual.onNext(t);
91+
}
92+
93+
@Override
94+
public void onComplete() {
95+
actual.onComplete();
96+
}
97+
98+
@Override
99+
public void onError(Throwable e) {
100+
actual.onError(e);
101+
}
102+
103+
@Override
104+
public void request(long n) {
105+
SubscriptionHelper.deferredRequest(parent, this, n);
106+
}
107+
108+
@Override
109+
public void cancel() {
110+
disposable.dispose();
111+
SubscriptionHelper.cancel(parent);
112+
}
113+
}
114+
115+
}

src/test/java/io/reactivex/internal/operators/single/SingleFlatMapTest.java

+89
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,15 @@
1515

1616
import static org.junit.Assert.*;
1717

18+
import java.util.concurrent.atomic.AtomicBoolean;
19+
1820
import org.junit.Test;
1921
import org.reactivestreams.Publisher;
2022

2123
import io.reactivex.*;
2224
import io.reactivex.exceptions.TestException;
2325
import io.reactivex.functions.*;
26+
import io.reactivex.subscribers.TestSubscriber;
2427

2528
public class SingleFlatMapTest {
2629

@@ -126,6 +129,92 @@ public Publisher<Integer> apply(Integer v) throws Exception {
126129
.test()
127130
.assertResult(1, 2, 3, 4, 5);
128131
}
132+
133+
@Test(expected = NullPointerException.class)
134+
public void flatMapPublisherMapperNull() {
135+
Single.just(1).flatMapPublisher(null);
136+
}
137+
138+
@Test
139+
public void flatMapPublisherMapperThrows() {
140+
final TestException ex = new TestException();
141+
Single.just(1)
142+
.flatMapPublisher(new Function<Integer, Publisher<Integer>>() {
143+
@Override
144+
public Publisher<Integer> apply(Integer v) throws Exception {
145+
throw ex;
146+
}
147+
})
148+
.test()
149+
.assertNoValues()
150+
.assertError(ex);
151+
}
152+
153+
@Test
154+
public void flatMapPublisherSingleError() {
155+
final TestException ex = new TestException();
156+
Single.<Integer>error(ex)
157+
.flatMapPublisher(new Function<Integer, Publisher<Integer>>() {
158+
@Override
159+
public Publisher<Integer> apply(Integer v) throws Exception {
160+
return Flowable.just(1);
161+
}
162+
})
163+
.test()
164+
.assertNoValues()
165+
.assertError(ex);
166+
}
167+
168+
@Test
169+
public void flatMapPublisherCancelDuringSingle() {
170+
final AtomicBoolean disposed = new AtomicBoolean();
171+
TestSubscriber<Integer> ts = Single.<Integer>never()
172+
.doOnDispose(new Action() {
173+
@Override
174+
public void run() throws Exception {
175+
disposed.set(true);
176+
}
177+
})
178+
.flatMapPublisher(new Function<Integer, Publisher<Integer>>() {
179+
@Override
180+
public Publisher<Integer> apply(Integer v) throws Exception {
181+
return Flowable.range(v, 5);
182+
}
183+
})
184+
.test()
185+
.assertNoValues()
186+
.assertNotTerminated();
187+
assertFalse(disposed.get());
188+
ts.cancel();
189+
assertTrue(disposed.get());
190+
ts.assertNotTerminated();
191+
}
192+
193+
@Test
194+
public void flatMapPublisherCancelDuringFlowable() {
195+
final AtomicBoolean disposed = new AtomicBoolean();
196+
TestSubscriber<Integer> ts =
197+
Single.just(1)
198+
.flatMapPublisher(new Function<Integer, Publisher<Integer>>() {
199+
@Override
200+
public Publisher<Integer> apply(Integer v) throws Exception {
201+
return Flowable.<Integer>never()
202+
.doOnCancel(new Action() {
203+
@Override
204+
public void run() throws Exception {
205+
disposed.set(true);
206+
}
207+
});
208+
}
209+
})
210+
.test()
211+
.assertNoValues()
212+
.assertNotTerminated();
213+
assertFalse(disposed.get());
214+
ts.cancel();
215+
assertTrue(disposed.get());
216+
ts.assertNotTerminated();
217+
}
129218

130219
@Test(expected = NullPointerException.class)
131220
public void flatMapNull() {

0 commit comments

Comments
 (0)