Skip to content

Commit 7e301d4

Browse files
authored
2.x: Add Completable.takeUntil(Completable) operator (#6079)
1 parent fd76594 commit 7e301d4

File tree

3 files changed

+406
-0
lines changed

3 files changed

+406
-0
lines changed

src/main/java/io/reactivex/Completable.java

+26
Original file line numberDiff line numberDiff line change
@@ -2031,6 +2031,32 @@ public final Completable subscribeOn(final Scheduler scheduler) {
20312031
return RxJavaPlugins.onAssembly(new CompletableSubscribeOn(this, scheduler));
20322032
}
20332033

2034+
/**
2035+
* Terminates the downstream if this or the other {@code Completable}
2036+
* terminates (wins the termination race) while disposing the connection to the losing source.
2037+
* <p>
2038+
* <img width="640" height="468" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.takeuntil.c.png" alt="">
2039+
* <dl>
2040+
* <dt><b>Scheduler:</b></dt>
2041+
* <dd>{@code takeUntil} does not operate by default on a particular {@link Scheduler}.</dd>
2042+
* <dt><b>Error handling:</b></dt>
2043+
* <dd>If both this and the other sources signal an error, only one of the errors
2044+
* is signaled to the downstream and the other error is signaled to the global
2045+
* error handler via {@link RxJavaPlugins#onError(Throwable)}.</dd>
2046+
* </dl>
2047+
* @param other the other completable source to observe for the terminal signals
2048+
* @return the new Completable instance
2049+
* @since 2.1.17 - experimental
2050+
*/
2051+
@CheckReturnValue
2052+
@Experimental
2053+
@SchedulerSupport(SchedulerSupport.NONE)
2054+
public final Completable takeUntil(CompletableSource other) {
2055+
ObjectHelper.requireNonNull(other, "other is null");
2056+
2057+
return RxJavaPlugins.onAssembly(new CompletableTakeUntilCompletable(this, other));
2058+
}
2059+
20342060
/**
20352061
* Returns a Completable that runs this Completable and emits a TimeoutException in case
20362062
* this Completable doesn't complete within the given time.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.completable;
15+
16+
import java.util.concurrent.atomic.*;
17+
18+
import io.reactivex.*;
19+
import io.reactivex.annotations.Experimental;
20+
import io.reactivex.disposables.Disposable;
21+
import io.reactivex.internal.disposables.DisposableHelper;
22+
import io.reactivex.plugins.RxJavaPlugins;
23+
24+
/**
25+
* Terminates the sequence if either the main or the other Completable terminate.
26+
* @since 2.1.17 - experimental
27+
*/
28+
@Experimental
29+
public final class CompletableTakeUntilCompletable extends Completable {
30+
31+
final Completable source;
32+
33+
final CompletableSource other;
34+
35+
public CompletableTakeUntilCompletable(Completable source,
36+
CompletableSource other) {
37+
this.source = source;
38+
this.other = other;
39+
}
40+
41+
@Override
42+
protected void subscribeActual(CompletableObserver s) {
43+
TakeUntilMainObserver parent = new TakeUntilMainObserver(s);
44+
s.onSubscribe(parent);
45+
46+
other.subscribe(parent.other);
47+
source.subscribe(parent);
48+
}
49+
50+
static final class TakeUntilMainObserver extends AtomicReference<Disposable>
51+
implements CompletableObserver, Disposable {
52+
53+
private static final long serialVersionUID = 3533011714830024923L;
54+
55+
final CompletableObserver downstream;
56+
57+
final OtherObserver other;
58+
59+
final AtomicBoolean once;
60+
61+
TakeUntilMainObserver(CompletableObserver downstream) {
62+
this.downstream = downstream;
63+
this.other = new OtherObserver(this);
64+
this.once = new AtomicBoolean();
65+
}
66+
67+
@Override
68+
public void dispose() {
69+
if (once.compareAndSet(false, true)) {
70+
DisposableHelper.dispose(this);
71+
DisposableHelper.dispose(other);
72+
}
73+
}
74+
75+
@Override
76+
public boolean isDisposed() {
77+
return once.get();
78+
}
79+
80+
@Override
81+
public void onSubscribe(Disposable d) {
82+
DisposableHelper.setOnce(this, d);
83+
}
84+
85+
@Override
86+
public void onComplete() {
87+
if (once.compareAndSet(false, true)) {
88+
DisposableHelper.dispose(other);
89+
downstream.onComplete();
90+
}
91+
}
92+
93+
@Override
94+
public void onError(Throwable e) {
95+
if (once.compareAndSet(false, true)) {
96+
DisposableHelper.dispose(other);
97+
downstream.onError(e);
98+
} else {
99+
RxJavaPlugins.onError(e);
100+
}
101+
}
102+
103+
void innerComplete() {
104+
if (once.compareAndSet(false, true)) {
105+
DisposableHelper.dispose(this);
106+
downstream.onComplete();
107+
}
108+
}
109+
110+
void innerError(Throwable e) {
111+
if (once.compareAndSet(false, true)) {
112+
DisposableHelper.dispose(this);
113+
downstream.onError(e);
114+
} else {
115+
RxJavaPlugins.onError(e);
116+
}
117+
}
118+
119+
static final class OtherObserver extends AtomicReference<Disposable>
120+
implements CompletableObserver {
121+
122+
private static final long serialVersionUID = 5176264485428790318L;
123+
final TakeUntilMainObserver parent;
124+
125+
OtherObserver(TakeUntilMainObserver parent) {
126+
this.parent = parent;
127+
}
128+
129+
@Override
130+
public void onSubscribe(Disposable d) {
131+
DisposableHelper.setOnce(this, d);
132+
}
133+
134+
@Override
135+
public void onComplete() {
136+
parent.innerComplete();
137+
}
138+
139+
@Override
140+
public void onError(Throwable e) {
141+
parent.innerError(e);
142+
}
143+
}
144+
}
145+
}

0 commit comments

Comments
 (0)