Skip to content

Commit 12c0e30

Browse files
authored
2.x: Add efficient concatWith(Single|Maybe|Completable) overloads (#5845)
* 2.x: Add efficient concatWith(Single|Maybe|Completable) overloads * Correct the concatWith(Completable) TCK file name * Increase coverage * Change local variable names.
1 parent ba79413 commit 12c0e30

21 files changed

+1717
-3
lines changed

src/main/java/io/reactivex/Flowable.java

+77
Original file line numberDiff line numberDiff line change
@@ -7170,6 +7170,83 @@ public final Flowable<T> concatWith(Publisher<? extends T> other) {
71707170
return concat(this, other);
71717171
}
71727172

7173+
/**
7174+
* Returns a {@code Flowable} that emits the items from this {@code Flowable} followed by the success item or error event
7175+
* of the other {@link SingleSource}.
7176+
* <p>
7177+
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concat.png" alt="">
7178+
* <dl>
7179+
* <dt><b>Backpressure:</b></dt>
7180+
* <dd>The operator supports backpressure and makes sure the success item of the other {@code SingleSource}
7181+
* is only emitted when there is a demand for it.</dd>
7182+
* <dt><b>Scheduler:</b></dt>
7183+
* <dd>{@code concatWith} does not operate by default on a particular {@link Scheduler}.</dd>
7184+
* </dl>
7185+
* @param other the SingleSource whose signal should be emitted after this {@code Flowable} completes normally.
7186+
* @return the new Flowable instance
7187+
* @since 2.1.10 - experimental
7188+
*/
7189+
@CheckReturnValue
7190+
@BackpressureSupport(BackpressureKind.FULL)
7191+
@SchedulerSupport(SchedulerSupport.NONE)
7192+
@Experimental
7193+
public final Flowable<T> concatWith(@NonNull SingleSource<? extends T> other) {
7194+
ObjectHelper.requireNonNull(other, "other is null");
7195+
return RxJavaPlugins.onAssembly(new FlowableConcatWithSingle<T>(this, other));
7196+
}
7197+
7198+
/**
7199+
* Returns a {@code Flowable} that emits the items from this {@code Flowable} followed by the success item or terminal events
7200+
* of the other {@link MaybeSource}.
7201+
* <p>
7202+
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concat.png" alt="">
7203+
* <dl>
7204+
* <dt><b>Backpressure:</b></dt>
7205+
* <dd>The operator supports backpressure and makes sure the success item of the other {@code MaybeSource}
7206+
* is only emitted when there is a demand for it.</dd>
7207+
* <dt><b>Scheduler:</b></dt>
7208+
* <dd>{@code concatWith} does not operate by default on a particular {@link Scheduler}.</dd>
7209+
* </dl>
7210+
* @param other the MaybeSource whose signal should be emitted after this Flowable completes normally.
7211+
* @return the new Flowable instance
7212+
* @since 2.1.10 - experimental
7213+
*/
7214+
@CheckReturnValue
7215+
@BackpressureSupport(BackpressureKind.FULL)
7216+
@SchedulerSupport(SchedulerSupport.NONE)
7217+
@Experimental
7218+
public final Flowable<T> concatWith(@NonNull MaybeSource<? extends T> other) {
7219+
ObjectHelper.requireNonNull(other, "other is null");
7220+
return RxJavaPlugins.onAssembly(new FlowableConcatWithMaybe<T>(this, other));
7221+
}
7222+
7223+
/**
7224+
* Returns a {@code Flowable} that emits items from this {@code Flowable} and when it completes normally, the
7225+
* other {@link CompletableSource} is subscribed to and the returned {@code Flowable} emits its terminal events.
7226+
* <p>
7227+
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concat.png" alt="">
7228+
* <dl>
7229+
* <dt><b>Backpressure:</b></dt>
7230+
* <dd>The operator does not interfere with backpressure between the current Flowable and the
7231+
* downstream consumer (i.e., acts as pass-through). When the operator switches to the
7232+
* {@code Completable}, backpressure is no longer present because {@code Completable} doesn't
7233+
* have items to apply backpressure to.</dd>
7234+
* <dt><b>Scheduler:</b></dt>
7235+
* <dd>{@code concatWith} does not operate by default on a particular {@link Scheduler}.</dd>
7236+
* </dl>
7237+
* @param other the {@code CompletableSource} to subscribe to once the current {@code Flowable} completes normally
7238+
* @return the new Flowable instance
7239+
* @since 2.1.10 - experimental
7240+
*/
7241+
@CheckReturnValue
7242+
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
7243+
@SchedulerSupport(SchedulerSupport.NONE)
7244+
@Experimental
7245+
public final Flowable<T> concatWith(@NonNull CompletableSource other) {
7246+
ObjectHelper.requireNonNull(other, "other is null");
7247+
return RxJavaPlugins.onAssembly(new FlowableConcatWithCompletable<T>(this, other));
7248+
}
7249+
71737250
/**
71747251
* Returns a Single that emits a Boolean that indicates whether the source Publisher emitted a
71757252
* specified item.

src/main/java/io/reactivex/Observable.java

+63
Original file line numberDiff line numberDiff line change
@@ -6573,6 +6573,69 @@ public final Observable<T> concatWith(ObservableSource<? extends T> other) {
65736573
return concat(this, other);
65746574
}
65756575

6576+
/**
6577+
* Returns an {@code Observable} that emits the items from this {@code Observable} followed by the success item or error event
6578+
* of the other {@link SingleSource}.
6579+
* <p>
6580+
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concat.png" alt="">
6581+
* <dl>
6582+
* <dt><b>Scheduler:</b></dt>
6583+
* <dd>{@code concatWith} does not operate by default on a particular {@link Scheduler}.</dd>
6584+
* </dl>
6585+
* @param other the SingleSource whose signal should be emitted after this {@code Observable} completes normally.
6586+
* @return the new Observable instance
6587+
* @since 2.1.10 - experimental
6588+
*/
6589+
@CheckReturnValue
6590+
@SchedulerSupport(SchedulerSupport.NONE)
6591+
@Experimental
6592+
public final Observable<T> concatWith(@NonNull SingleSource<? extends T> other) {
6593+
ObjectHelper.requireNonNull(other, "other is null");
6594+
return RxJavaPlugins.onAssembly(new ObservableConcatWithSingle<T>(this, other));
6595+
}
6596+
6597+
/**
6598+
* Returns an {@code Observable} that emits the items from this {@code Observable} followed by the success item or terminal events
6599+
* of the other {@link MaybeSource}.
6600+
* <p>
6601+
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concat.png" alt="">
6602+
* <dl>
6603+
* <dt><b>Scheduler:</b></dt>
6604+
* <dd>{@code concatWith} does not operate by default on a particular {@link Scheduler}.</dd>
6605+
* </dl>
6606+
* @param other the MaybeSource whose signal should be emitted after this Observable completes normally.
6607+
* @return the new Observable instance
6608+
* @since 2.1.10 - experimental
6609+
*/
6610+
@CheckReturnValue
6611+
@SchedulerSupport(SchedulerSupport.NONE)
6612+
@Experimental
6613+
public final Observable<T> concatWith(@NonNull MaybeSource<? extends T> other) {
6614+
ObjectHelper.requireNonNull(other, "other is null");
6615+
return RxJavaPlugins.onAssembly(new ObservableConcatWithMaybe<T>(this, other));
6616+
}
6617+
6618+
/**
6619+
* Returns an {@code Observable} that emits items from this {@code Observable} and when it completes normally, the
6620+
* other {@link CompletableSource} is subscribed to and the returned {@code Observable} emits its terminal events.
6621+
* <p>
6622+
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concat.png" alt="">
6623+
* <dl>
6624+
* <dt><b>Scheduler:</b></dt>
6625+
* <dd>{@code concatWith} does not operate by default on a particular {@link Scheduler}.</dd>
6626+
* </dl>
6627+
* @param other the {@code CompletableSource} to subscribe to once the current {@code Observable} completes normally
6628+
* @return the new Observable instance
6629+
* @since 2.1.10 - experimental
6630+
*/
6631+
@CheckReturnValue
6632+
@SchedulerSupport(SchedulerSupport.NONE)
6633+
@Experimental
6634+
public final Observable<T> concatWith(@NonNull CompletableSource other) {
6635+
ObjectHelper.requireNonNull(other, "other is null");
6636+
return RxJavaPlugins.onAssembly(new ObservableConcatWithCompletable<T>(this, other));
6637+
}
6638+
65766639
/**
65776640
* Returns a Single that emits a Boolean that indicates whether the source ObservableSource emitted a
65786641
* specified item.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.flowable;
15+
16+
import java.util.concurrent.atomic.AtomicReference;
17+
18+
import org.reactivestreams.*;
19+
20+
import io.reactivex.*;
21+
import io.reactivex.disposables.Disposable;
22+
import io.reactivex.internal.disposables.DisposableHelper;
23+
import io.reactivex.internal.subscriptions.SubscriptionHelper;
24+
25+
/**
26+
* Subscribe to a main Flowable first, then when it completes normally, subscribe to a Completable
27+
* and terminate when it terminates.
28+
* @param <T> the element type of the main source and output type
29+
* @since 2.1.10 - experimental
30+
*/
31+
public final class FlowableConcatWithCompletable<T> extends AbstractFlowableWithUpstream<T, T> {
32+
33+
final CompletableSource other;
34+
35+
public FlowableConcatWithCompletable(Flowable<T> source, CompletableSource other) {
36+
super(source);
37+
this.other = other;
38+
}
39+
40+
@Override
41+
protected void subscribeActual(Subscriber<? super T> s) {
42+
source.subscribe(new ConcatWithSubscriber<T>(s, other));
43+
}
44+
45+
static final class ConcatWithSubscriber<T>
46+
extends AtomicReference<Disposable>
47+
implements FlowableSubscriber<T>, CompletableObserver, Subscription {
48+
49+
private static final long serialVersionUID = -7346385463600070225L;
50+
51+
final Subscriber<? super T> actual;
52+
53+
Subscription upstream;
54+
55+
CompletableSource other;
56+
57+
boolean inCompletable;
58+
59+
ConcatWithSubscriber(Subscriber<? super T> actual, CompletableSource other) {
60+
this.actual = actual;
61+
this.other = other;
62+
}
63+
64+
@Override
65+
public void onSubscribe(Subscription s) {
66+
if (SubscriptionHelper.validate(upstream, s)) {
67+
this.upstream = s;
68+
actual.onSubscribe(this);
69+
}
70+
}
71+
72+
@Override
73+
public void onSubscribe(Disposable d) {
74+
DisposableHelper.setOnce(this, d);
75+
}
76+
77+
@Override
78+
public void onNext(T t) {
79+
actual.onNext(t);
80+
}
81+
82+
@Override
83+
public void onError(Throwable t) {
84+
actual.onError(t);
85+
}
86+
87+
@Override
88+
public void onComplete() {
89+
if (inCompletable) {
90+
actual.onComplete();
91+
} else {
92+
inCompletable = true;
93+
upstream = SubscriptionHelper.CANCELLED;
94+
CompletableSource cs = other;
95+
other = null;
96+
cs.subscribe(this);
97+
}
98+
}
99+
100+
@Override
101+
public void request(long n) {
102+
upstream.request(n);
103+
}
104+
105+
@Override
106+
public void cancel() {
107+
upstream.cancel();
108+
DisposableHelper.dispose(this);
109+
}
110+
}
111+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.flowable;
15+
16+
import java.util.concurrent.atomic.*;
17+
18+
import org.reactivestreams.*;
19+
20+
import io.reactivex.*;
21+
import io.reactivex.disposables.Disposable;
22+
import io.reactivex.internal.disposables.DisposableHelper;
23+
import io.reactivex.internal.subscribers.SinglePostCompleteSubscriber;
24+
import io.reactivex.internal.subscriptions.SubscriptionHelper;
25+
26+
/**
27+
* Subscribe to a main Flowable first, then when it completes normally, subscribe to a Maybe,
28+
* signal its success value followed by a completion or signal its error or completion signal as is.
29+
* @param <T> the element type of the main source and output type
30+
* @since 2.1.10 - experimental
31+
*/
32+
public final class FlowableConcatWithMaybe<T> extends AbstractFlowableWithUpstream<T, T> {
33+
34+
final MaybeSource<? extends T> other;
35+
36+
public FlowableConcatWithMaybe(Flowable<T> source, MaybeSource<? extends T> other) {
37+
super(source);
38+
this.other = other;
39+
}
40+
41+
@Override
42+
protected void subscribeActual(Subscriber<? super T> s) {
43+
source.subscribe(new ConcatWithSubscriber<T>(s, other));
44+
}
45+
46+
static final class ConcatWithSubscriber<T>
47+
extends SinglePostCompleteSubscriber<T, T>
48+
implements MaybeObserver<T> {
49+
50+
private static final long serialVersionUID = -7346385463600070225L;
51+
52+
final AtomicReference<Disposable> otherDisposable;
53+
54+
MaybeSource<? extends T> other;
55+
56+
boolean inMaybe;
57+
58+
ConcatWithSubscriber(Subscriber<? super T> actual, MaybeSource<? extends T> other) {
59+
super(actual);
60+
this.other = other;
61+
this.otherDisposable = new AtomicReference<Disposable>();
62+
}
63+
64+
@Override
65+
public void onSubscribe(Disposable d) {
66+
DisposableHelper.setOnce(otherDisposable, d);
67+
}
68+
69+
@Override
70+
public void onNext(T t) {
71+
produced++;
72+
actual.onNext(t);
73+
}
74+
75+
@Override
76+
public void onError(Throwable t) {
77+
actual.onError(t);
78+
}
79+
80+
@Override
81+
public void onSuccess(T t) {
82+
complete(t);
83+
}
84+
85+
@Override
86+
public void onComplete() {
87+
if (inMaybe) {
88+
actual.onComplete();
89+
} else {
90+
inMaybe = true;
91+
s = SubscriptionHelper.CANCELLED;
92+
MaybeSource<? extends T> ms = other;
93+
other = null;
94+
ms.subscribe(this);
95+
}
96+
}
97+
98+
@Override
99+
public void cancel() {
100+
super.cancel();
101+
DisposableHelper.dispose(otherDisposable);
102+
}
103+
}
104+
}

0 commit comments

Comments
 (0)