Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Fix boundary fusion of concatMap and publish operator #6145

Merged
merged 1 commit into from
Aug 8, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public final void onSubscribe(Subscription s) {

if (s instanceof QueueSubscription) {
@SuppressWarnings("unchecked") QueueSubscription<T> f = (QueueSubscription<T>)s;
int m = f.requestFusion(QueueSubscription.ANY);
int m = f.requestFusion(QueueSubscription.ANY | QueueSubscription.BOUNDARY);
if (m == QueueSubscription.SYNC) {
sourceMode = m;
queue = f;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ static final class PublishSubscriber<T>
*/
final AtomicBoolean shouldConnect;

final AtomicReference<Subscription> s = new AtomicReference<Subscription>();
final AtomicReference<Subscription> upstream = new AtomicReference<Subscription>();

/** Contains either an onComplete or an onError token from upstream. */
volatile Object terminalEvent;
Expand All @@ -180,7 +180,7 @@ public void dispose() {
InnerSubscriber[] ps = subscribers.getAndSet(TERMINATED);
if (ps != TERMINATED) {
current.compareAndSet(PublishSubscriber.this, null);
SubscriptionHelper.cancel(s);
SubscriptionHelper.cancel(upstream);
}
}
}
Expand All @@ -192,12 +192,12 @@ public boolean isDisposed() {

@Override
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.setOnce(this.s, s)) {
if (SubscriptionHelper.setOnce(this.upstream, s)) {
if (s instanceof QueueSubscription) {
@SuppressWarnings("unchecked")
QueueSubscription<T> qs = (QueueSubscription<T>) s;

int m = qs.requestFusion(QueueSubscription.ANY);
int m = qs.requestFusion(QueueSubscription.ANY | QueueSubscription.BOUNDARY);
if (m == QueueSubscription.SYNC) {
sourceMode = m;
queue = qs;
Expand Down Expand Up @@ -482,7 +482,7 @@ void dispatch() {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.get().cancel();
upstream.get().cancel();
term = NotificationLite.error(ex);
terminalEvent = term;
v = null;
Expand All @@ -493,7 +493,7 @@ void dispatch() {
}
// otherwise, just ask for a new value
if (sourceMode != QueueSubscription.SYNC) {
s.get().request(1);
upstream.get().request(1);
}
// and retry emitting to potential new child subscribers
continue;
Expand All @@ -510,7 +510,7 @@ void dispatch() {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.get().cancel();
upstream.get().cancel();
term = NotificationLite.error(ex);
terminalEvent = term;
v = null;
Expand Down Expand Up @@ -562,7 +562,7 @@ void dispatch() {
// if we did emit at least one element, request more to replenish the queue
if (d > 0) {
if (sourceMode != QueueSubscription.SYNC) {
s.get().request(d);
upstream.get().request(d);
}
}
// if we have requests but not an empty queue after emission
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,16 @@

package io.reactivex.internal.operators.flowable;

import java.util.concurrent.TimeUnit;

import org.junit.Test;
import org.reactivestreams.Publisher;

import io.reactivex.*;
import io.reactivex.exceptions.TestException;
import io.reactivex.functions.Function;
import io.reactivex.internal.operators.flowable.FlowableConcatMap.WeakScalarSubscription;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subscribers.TestSubscriber;

public class FlowableConcatMapTest {
Expand All @@ -39,4 +46,101 @@ public void weakSubscriptionRequest() {
ts.assertResult(1);
}

@Test
public void boundaryFusion() {
Flowable.range(1, 10000)
.observeOn(Schedulers.single())
.map(new Function<Integer, String>() {
@Override
public String apply(Integer t) throws Exception {
String name = Thread.currentThread().getName();
if (name.contains("RxSingleScheduler")) {
return "RxSingleScheduler";
}
return name;
}
})
.concatMap(new Function<String, Publisher<? extends Object>>() {
@Override
public Publisher<? extends Object> apply(String v)
throws Exception {
return Flowable.just(v);
}
})
.observeOn(Schedulers.computation())
.distinct()
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult("RxSingleScheduler");
}

@Test
public void boundaryFusionDelayError() {
Flowable.range(1, 10000)
.observeOn(Schedulers.single())
.map(new Function<Integer, String>() {
@Override
public String apply(Integer t) throws Exception {
String name = Thread.currentThread().getName();
if (name.contains("RxSingleScheduler")) {
return "RxSingleScheduler";
}
return name;
}
})
.concatMapDelayError(new Function<String, Publisher<? extends Object>>() {
@Override
public Publisher<? extends Object> apply(String v)
throws Exception {
return Flowable.just(v);
}
})
.observeOn(Schedulers.computation())
.distinct()
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult("RxSingleScheduler");
}

@Test
public void pollThrows() {
Flowable.just(1)
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer v) throws Exception {
throw new TestException();
}
})
.compose(TestHelper.<Integer>flowableStripBoundary())
.concatMap(new Function<Integer, Publisher<Integer>>() {
@Override
public Publisher<Integer> apply(Integer v)
throws Exception {
return Flowable.just(v);
}
})
.test()
.assertFailure(TestException.class);
}

@Test
public void pollThrowsDelayError() {
Flowable.just(1)
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer v) throws Exception {
throw new TestException();
}
})
.compose(TestHelper.<Integer>flowableStripBoundary())
.concatMapDelayError(new Function<Integer, Publisher<Integer>>() {
@Override
public Publisher<Integer> apply(Integer v)
throws Exception {
return Flowable.just(v);
}
})
.test()
.assertFailure(TestException.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -824,12 +824,36 @@ public Object apply(Integer v) throws Exception {
throw new TestException();
}
})
.compose(TestHelper.flowableStripBoundary())
.publish()
.autoConnect()
.test()
.assertFailure(TestException.class);
}

@Test
public void pollThrowsNoSubscribers() {
ConnectableFlowable<Integer> cf = Flowable.just(1, 2)
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer v) throws Exception {
if (v == 2) {
throw new TestException();
}
return v;
}
})
.compose(TestHelper.<Integer>flowableStripBoundary())
.publish();

TestSubscriber<Integer> ts = cf.take(1)
.test();

cf.connect();

ts.assertResult(1);
}

@Test
public void dryRunCrash() {
List<Throwable> errors = TestHelper.trackPluginErrors();
Expand Down Expand Up @@ -1316,4 +1340,31 @@ public void onComplete() {
ts1.assertEmpty();
ts2.assertValuesOnly(1);
}

@Test
public void boundaryFusion() {
Flowable.range(1, 10000)
.observeOn(Schedulers.single())
.map(new Function<Integer, String>() {
@Override
public String apply(Integer t) throws Exception {
String name = Thread.currentThread().getName();
if (name.contains("RxSingleScheduler")) {
return "RxSingleScheduler";
}
return name;
}
})
.share()
.observeOn(Schedulers.computation())
.distinct()
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult("RxSingleScheduler");
}

@Test
public void badRequest() {
TestHelper.assertBadRequestReported(Flowable.range(1, 5).publish());
}
}