Skip to content

Commit 7ede8ee

Browse files
authored
2.x: Add efficient mergeWith(Single|Maybe|Completable) overloads. (#5847)
* 2.x: Add efficient mergeWith(Single|Maybe|Completable) overloads. * Compact tests, use named constants
1 parent 4227a59 commit 7ede8ee

21 files changed

+3441
-4
lines changed

src/main/java/io/reactivex/Flowable.java

+84-1
Original file line numberDiff line numberDiff line change
@@ -5771,7 +5771,7 @@ public final Future<T> toFuture() {
57715771
}
57725772

57735773
/**
5774-
* Runs the source observable to a terminal event, ignoring any values and rethrowing any exception.
5774+
* Runs the source Flowable to a terminal event, ignoring any values and rethrowing any exception.
57755775
* <dl>
57765776
* <dt><b>Backpressure:</b></dt>
57775777
* <dd>The operator consumes the source {@code Flowable} in an unbounded manner
@@ -10112,6 +10112,89 @@ public final Flowable<T> mergeWith(Publisher<? extends T> other) {
1011210112
return merge(this, other);
1011310113
}
1011410114

10115+
/**
10116+
* Merges the sequence of items of this Flowable with the success value of the other SingleSource.
10117+
* <p>
10118+
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/merge.png" alt="">
10119+
* <p>
10120+
* The success value of the other {@code SingleSource} can get interleaved at any point of this
10121+
* {@code Flowable} sequence.
10122+
* <dl>
10123+
* <dt><b>Backpressure:</b></dt>
10124+
* <dd>The operator honors backpressure from downstream and ensures the success item from the
10125+
* {@code SingleSource} is emitted only when there is a downstream demand.</dd>
10126+
* <dt><b>Scheduler:</b></dt>
10127+
* <dd>{@code mergeWith} does not operate by default on a particular {@link Scheduler}.</dd>
10128+
* </dl>
10129+
*
10130+
* @param other the {@code SingleSource} whose success value to merge with
10131+
* @return the new Flowable instance
10132+
* @since 2.1.10 - experimental
10133+
*/
10134+
@CheckReturnValue
10135+
@BackpressureSupport(BackpressureKind.FULL)
10136+
@SchedulerSupport(SchedulerSupport.NONE)
10137+
@Experimental
10138+
public final Flowable<T> mergeWith(@NonNull SingleSource<? extends T> other) {
10139+
ObjectHelper.requireNonNull(other, "other is null");
10140+
return RxJavaPlugins.onAssembly(new FlowableMergeWithSingle<T>(this, other));
10141+
}
10142+
10143+
/**
10144+
* Merges the sequence of items of this Flowable with the success value of the other MaybeSource
10145+
* or waits both to complete normally if the MaybeSource is empty.
10146+
* <p>
10147+
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/merge.png" alt="">
10148+
* <p>
10149+
* The success value of the other {@code MaybeSource} can get interleaved at any point of this
10150+
* {@code Flowable} sequence.
10151+
* <dl>
10152+
* <dt><b>Backpressure:</b></dt>
10153+
* <dd>The operator honors backpressure from downstream and ensures the success item from the
10154+
* {@code MaybeSource} is emitted only when there is a downstream demand.</dd>
10155+
* <dt><b>Scheduler:</b></dt>
10156+
* <dd>{@code mergeWith} does not operate by default on a particular {@link Scheduler}.</dd>
10157+
* </dl>
10158+
*
10159+
* @param other the {@code MaybeSource} which provides a success value to merge with or completes
10160+
* @return the new Flowable instance
10161+
* @since 2.1.10 - experimental
10162+
*/
10163+
@CheckReturnValue
10164+
@BackpressureSupport(BackpressureKind.FULL)
10165+
@SchedulerSupport(SchedulerSupport.NONE)
10166+
@Experimental
10167+
public final Flowable<T> mergeWith(@NonNull MaybeSource<? extends T> other) {
10168+
ObjectHelper.requireNonNull(other, "other is null");
10169+
return RxJavaPlugins.onAssembly(new FlowableMergeWithMaybe<T>(this, other));
10170+
}
10171+
10172+
/**
10173+
* Relays the items of this Flowable and completes only when the other CompletableSource completes
10174+
* as well.
10175+
* <p>
10176+
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/merge.png" alt="">
10177+
* <dl>
10178+
* <dt><b>Backpressure:</b></dt>
10179+
* <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s backpressure
10180+
* behavior.</dd>
10181+
* <dt><b>Scheduler:</b></dt>
10182+
* <dd>{@code mergeWith} does not operate by default on a particular {@link Scheduler}.</dd>
10183+
* </dl>
10184+
*
10185+
* @param other the {@code CompletableSource} to await for completion
10186+
* @return the new Flowable instance
10187+
* @since 2.1.10 - experimental
10188+
*/
10189+
@CheckReturnValue
10190+
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
10191+
@SchedulerSupport(SchedulerSupport.NONE)
10192+
@Experimental
10193+
public final Flowable<T> mergeWith(@NonNull CompletableSource other) {
10194+
ObjectHelper.requireNonNull(other, "other is null");
10195+
return RxJavaPlugins.onAssembly(new FlowableMergeWithCompletable<T>(this, other));
10196+
}
10197+
1011510198
/**
1011610199
* Modifies a Publisher to perform its emissions and notifications on a specified {@link Scheduler},
1011710200
* asynchronously with a bounded buffer of {@link #bufferSize()} slots.

src/main/java/io/reactivex/Observable.java

+71
Original file line numberDiff line numberDiff line change
@@ -8991,6 +8991,77 @@ public final Observable<T> mergeWith(ObservableSource<? extends T> other) {
89918991
return merge(this, other);
89928992
}
89938993

8994+
/**
8995+
* Merges the sequence of items of this Observable with the success value of the other SingleSource.
8996+
* <p>
8997+
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/merge.png" alt="">
8998+
* <p>
8999+
* The success value of the other {@code SingleSource} can get interleaved at any point of this
9000+
* {@code Observable} sequence.
9001+
* <dl>
9002+
* <dt><b>Scheduler:</b></dt>
9003+
* <dd>{@code mergeWith} does not operate by default on a particular {@link Scheduler}.</dd>
9004+
* </dl>
9005+
*
9006+
* @param other the {@code SingleSource} whose success value to merge with
9007+
* @return the new Observable instance
9008+
* @since 2.1.10 - experimental
9009+
*/
9010+
@CheckReturnValue
9011+
@SchedulerSupport(SchedulerSupport.NONE)
9012+
@Experimental
9013+
public final Observable<T> mergeWith(@NonNull SingleSource<? extends T> other) {
9014+
ObjectHelper.requireNonNull(other, "other is null");
9015+
return RxJavaPlugins.onAssembly(new ObservableMergeWithSingle<T>(this, other));
9016+
}
9017+
9018+
/**
9019+
* Merges the sequence of items of this Observable with the success value of the other MaybeSource
9020+
* or waits both to complete normally if the MaybeSource is empty.
9021+
* <p>
9022+
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/merge.png" alt="">
9023+
* <p>
9024+
* The success value of the other {@code MaybeSource} can get interleaved at any point of this
9025+
* {@code Observable} sequence.
9026+
* <dl>
9027+
* <dt><b>Scheduler:</b></dt>
9028+
* <dd>{@code mergeWith} does not operate by default on a particular {@link Scheduler}.</dd>
9029+
* </dl>
9030+
*
9031+
* @param other the {@code MaybeSource} which provides a success value to merge with or completes
9032+
* @return the new Observable instance
9033+
* @since 2.1.10 - experimental
9034+
*/
9035+
@CheckReturnValue
9036+
@SchedulerSupport(SchedulerSupport.NONE)
9037+
@Experimental
9038+
public final Observable<T> mergeWith(@NonNull MaybeSource<? extends T> other) {
9039+
ObjectHelper.requireNonNull(other, "other is null");
9040+
return RxJavaPlugins.onAssembly(new ObservableMergeWithMaybe<T>(this, other));
9041+
}
9042+
9043+
/**
9044+
* Relays the items of this Observable and completes only when the other CompletableSource completes
9045+
* as well.
9046+
* <p>
9047+
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/merge.png" alt="">
9048+
* <dl>
9049+
* <dt><b>Scheduler:</b></dt>
9050+
* <dd>{@code mergeWith} does not operate by default on a particular {@link Scheduler}.</dd>
9051+
* </dl>
9052+
*
9053+
* @param other the {@code CompletableSource} to await for completion
9054+
* @return the new Observable instance
9055+
* @since 2.1.10 - experimental
9056+
*/
9057+
@CheckReturnValue
9058+
@SchedulerSupport(SchedulerSupport.NONE)
9059+
@Experimental
9060+
public final Observable<T> mergeWith(@NonNull CompletableSource other) {
9061+
ObjectHelper.requireNonNull(other, "other is null");
9062+
return RxJavaPlugins.onAssembly(new ObservableMergeWithCompletable<T>(this, other));
9063+
}
9064+
89949065
/**
89959066
* Modifies an ObservableSource to perform its emissions and notifications on a specified {@link Scheduler},
89969067
* asynchronously with an unbounded buffer with {@link Flowable#bufferSize()} "island size".
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.flowable;
15+
16+
import java.util.concurrent.atomic.*;
17+
18+
import org.reactivestreams.*;
19+
20+
import io.reactivex.*;
21+
import io.reactivex.disposables.Disposable;
22+
import io.reactivex.internal.disposables.DisposableHelper;
23+
import io.reactivex.internal.subscriptions.SubscriptionHelper;
24+
import io.reactivex.internal.util.*;
25+
26+
/**
27+
* Merges a Flowable and a Completable by emitting the items of the Flowable and waiting until
28+
* both the Flowable and Completable complete normally.
29+
*
30+
* @param <T> the element type of the Flowable
31+
* @since 2.1.10 - experimental
32+
*/
33+
public final class FlowableMergeWithCompletable<T> extends AbstractFlowableWithUpstream<T, T> {
34+
35+
final CompletableSource other;
36+
37+
public FlowableMergeWithCompletable(Flowable<T> source, CompletableSource other) {
38+
super(source);
39+
this.other = other;
40+
}
41+
42+
@Override
43+
protected void subscribeActual(Subscriber<? super T> observer) {
44+
MergeWithSubscriber<T> parent = new MergeWithSubscriber<T>(observer);
45+
observer.onSubscribe(parent);
46+
source.subscribe(parent);
47+
other.subscribe(parent.otherObserver);
48+
}
49+
50+
static final class MergeWithSubscriber<T> extends AtomicInteger
51+
implements FlowableSubscriber<T>, Subscription {
52+
53+
private static final long serialVersionUID = -4592979584110982903L;
54+
55+
final Subscriber<? super T> actual;
56+
57+
final AtomicReference<Subscription> mainSubscription;
58+
59+
final OtherObserver otherObserver;
60+
61+
final AtomicThrowable error;
62+
63+
final AtomicLong requested;
64+
65+
volatile boolean mainDone;
66+
67+
volatile boolean otherDone;
68+
69+
MergeWithSubscriber(Subscriber<? super T> actual) {
70+
this.actual = actual;
71+
this.mainSubscription = new AtomicReference<Subscription>();
72+
this.otherObserver = new OtherObserver(this);
73+
this.error = new AtomicThrowable();
74+
this.requested = new AtomicLong();
75+
}
76+
77+
@Override
78+
public void onSubscribe(Subscription d) {
79+
SubscriptionHelper.deferredSetOnce(mainSubscription, requested, d);
80+
}
81+
82+
@Override
83+
public void onNext(T t) {
84+
HalfSerializer.onNext(actual, t, this, error);
85+
}
86+
87+
@Override
88+
public void onError(Throwable ex) {
89+
SubscriptionHelper.cancel(mainSubscription);
90+
HalfSerializer.onError(actual, ex, this, error);
91+
}
92+
93+
@Override
94+
public void onComplete() {
95+
mainDone = true;
96+
if (otherDone) {
97+
HalfSerializer.onComplete(actual, this, error);
98+
}
99+
}
100+
101+
@Override
102+
public void request(long n) {
103+
SubscriptionHelper.deferredRequest(mainSubscription, requested, n);
104+
}
105+
106+
@Override
107+
public void cancel() {
108+
SubscriptionHelper.cancel(mainSubscription);
109+
DisposableHelper.dispose(otherObserver);
110+
}
111+
112+
void otherError(Throwable ex) {
113+
SubscriptionHelper.cancel(mainSubscription);
114+
HalfSerializer.onError(actual, ex, this, error);
115+
}
116+
117+
void otherComplete() {
118+
otherDone = true;
119+
if (mainDone) {
120+
HalfSerializer.onComplete(actual, this, error);
121+
}
122+
}
123+
124+
static final class OtherObserver extends AtomicReference<Disposable>
125+
implements CompletableObserver {
126+
127+
private static final long serialVersionUID = -2935427570954647017L;
128+
129+
final MergeWithSubscriber<?> parent;
130+
131+
OtherObserver(MergeWithSubscriber<?> parent) {
132+
this.parent = parent;
133+
}
134+
135+
@Override
136+
public void onSubscribe(Disposable d) {
137+
DisposableHelper.setOnce(this, d);
138+
}
139+
140+
@Override
141+
public void onError(Throwable e) {
142+
parent.otherError(e);
143+
}
144+
145+
@Override
146+
public void onComplete() {
147+
parent.otherComplete();
148+
}
149+
}
150+
}
151+
}

0 commit comments

Comments
 (0)