Skip to content

Commit 2f8e8bc

Browse files
authored
2.x: add doFinally to the rest of the reactive base classes (#4832)
1 parent 91d2b93 commit 2f8e8bc

File tree

15 files changed

+1388
-6
lines changed

15 files changed

+1388
-6
lines changed

src/main/java/io/reactivex/Completable.java

+22-2
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,7 @@
2525
import io.reactivex.internal.observers.*;
2626
import io.reactivex.internal.operators.completable.*;
2727
import io.reactivex.internal.operators.flowable.FlowableDelaySubscriptionOther;
28-
import io.reactivex.internal.operators.maybe.MaybeFromCompletable;
29-
import io.reactivex.internal.operators.maybe.MaybeDelayWithCompletable;
28+
import io.reactivex.internal.operators.maybe.*;
3029
import io.reactivex.internal.operators.observable.ObservableDelaySubscriptionOther;
3130
import io.reactivex.internal.operators.single.SingleDelayWithCompletable;
3231
import io.reactivex.internal.util.ExceptionHelper;
@@ -1171,6 +1170,27 @@ public final Completable doAfterTerminate(final Action onAfterTerminate) {
11711170
onAfterTerminate,
11721171
Functions.EMPTY_ACTION);
11731172
}
1173+
/**
1174+
* Calls the specified action after this Completable signals onError or onComplete or gets disposed by
1175+
* the downstream.
1176+
* <p>In case of a race between a terminal event and a dispose call, the provided {@code onFinally} action
1177+
* is executed once per subscription.
1178+
* <p>Note that the {@code onFinally} action is shared between subscriptions and as such
1179+
* should be thread-safe.
1180+
* <dl>
1181+
* <dt><b>Scheduler:</b></dt>
1182+
* <dd>{@code doFinally} does not operate by default on a particular {@link Scheduler}.</dd>
1183+
* </dl>
1184+
* @param onFinally the action called when this Completable terminates or gets cancelled
1185+
* @return the new Completable instance
1186+
* @since 2.0.1 - experimental
1187+
*/
1188+
@SchedulerSupport(SchedulerSupport.NONE)
1189+
@Experimental
1190+
public final Completable doFinally(Action onFinally) {
1191+
ObjectHelper.requireNonNull(onFinally, "onFinally is null");
1192+
return RxJavaPlugins.onAssembly(new CompletableDoFinally(this, onFinally));
1193+
}
11741194

11751195
/**
11761196
* <strong>Advanced use without safeguards:</strong> lifts a CompletableOperator

src/main/java/io/reactivex/Flowable.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -7328,7 +7328,7 @@ public final Flowable<T> distinctUntilChanged(BiPredicate<? super T, ? super T>
73287328
* Calls the specified action after this Flowable signals onError or onCompleted or gets cancelled by
73297329
* the downstream.
73307330
* <p>In case of a race between a terminal event and a cancellation, the provided {@code onFinally} action
7331-
* is executed at once per subscription.
7331+
* is executed once per subscription.
73327332
* <p>Note that the {@code onFinally} action is shared between subscriptions and as such
73337333
* should be thread-safe.
73347334
* <dl>

src/main/java/io/reactivex/Maybe.java

+22
Original file line numberDiff line numberDiff line change
@@ -2302,6 +2302,28 @@ public final Maybe<T> doAfterTerminate(Action onAfterTerminate) {
23022302
));
23032303
}
23042304

2305+
/**
2306+
* Calls the specified action after this Maybe signals onSuccess, onError or onComplete or gets disposed by
2307+
* the downstream.
2308+
* <p>In case of a race between a terminal event and a dispose call, the provided {@code onFinally} action
2309+
* is executed once per subscription.
2310+
* <p>Note that the {@code onFinally} action is shared between subscriptions and as such
2311+
* should be thread-safe.
2312+
* <dl>
2313+
* <dt><b>Scheduler:</b></dt>
2314+
* <dd>{@code doFinally} does not operate by default on a particular {@link Scheduler}.</dd>
2315+
* </dl>
2316+
* @param onFinally the action called when this Maybe terminates or gets cancelled
2317+
* @return the new Maybe instance
2318+
* @since 2.0.1 - experimental
2319+
*/
2320+
@SchedulerSupport(SchedulerSupport.NONE)
2321+
@Experimental
2322+
public final Maybe<T> doFinally(Action onFinally) {
2323+
ObjectHelper.requireNonNull(onFinally, "onFinally is null");
2324+
return RxJavaPlugins.onAssembly(new MaybeDoFinally<T>(this, onFinally));
2325+
}
2326+
23052327
/**
23062328
* Calls the shared runnable if a MaybeObserver subscribed to the current Maybe
23072329
* disposes the common Disposable it received via onSubscribe.

src/main/java/io/reactivex/Observable.java

+25-2
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import java.util.*;
1717
import java.util.concurrent.*;
1818

19-
import io.reactivex.internal.operators.flowable.FlowableOnBackpressureError;
2019
import org.reactivestreams.Publisher;
2120

2221
import io.reactivex.annotations.*;
@@ -26,7 +25,7 @@
2625
import io.reactivex.internal.functions.*;
2726
import io.reactivex.internal.fuseable.ScalarCallable;
2827
import io.reactivex.internal.observers.*;
29-
import io.reactivex.internal.operators.flowable.FlowableFromObservable;
28+
import io.reactivex.internal.operators.flowable.*;
3029
import io.reactivex.internal.operators.observable.*;
3130
import io.reactivex.internal.util.*;
3231
import io.reactivex.observables.*;
@@ -6440,6 +6439,30 @@ public final Observable<T> doAfterTerminate(Action onFinally) {
64406439
return doOnEach(Functions.emptyConsumer(), Functions.emptyConsumer(), Functions.EMPTY_ACTION, onFinally);
64416440
}
64426441

6442+
/**
6443+
* Calls the specified action after this Observable signals onError or onCompleted or gets disposed by
6444+
* the downstream.
6445+
* <p>In case of a race between a terminal event and a dispose call, the provided {@code onFinally} action
6446+
* is executed once per subscription.
6447+
* <p>Note that the {@code onFinally} action is shared between subscriptions and as such
6448+
* should be thread-safe.
6449+
* <dl>
6450+
* <dt><b>Scheduler:</b></dt>
6451+
* <dd>{@code doFinally} does not operate by default on a particular {@link Scheduler}.</dd>
6452+
* <td><b>Operator-fusion:</b></dt>
6453+
* <dd>This operator supports boundary-limited synchronous or asynchronous queue-fusion.</dd>
6454+
* </dl>
6455+
* @param onFinally the action called when this Observable terminates or gets cancelled
6456+
* @return the new Observable instance
6457+
* @since 2.0.1 - experimental
6458+
*/
6459+
@SchedulerSupport(SchedulerSupport.NONE)
6460+
@Experimental
6461+
public final Observable<T> doFinally(Action onFinally) {
6462+
ObjectHelper.requireNonNull(onFinally, "onFinally is null");
6463+
return RxJavaPlugins.onAssembly(new ObservableDoFinally<T>(this, onFinally));
6464+
}
6465+
64436466
/**
64446467
* Calls the unsubscribe {@code Action} if the downstream disposes the sequence.
64456468
* <p>

src/main/java/io/reactivex/Single.java

+22
Original file line numberDiff line numberDiff line change
@@ -1716,6 +1716,28 @@ public final <U> Single<T> delaySubscription(long time, TimeUnit unit, Scheduler
17161716
return delaySubscription(Observable.timer(time, unit, scheduler));
17171717
}
17181718

1719+
/**
1720+
* Calls the specified action after this Single signals onSuccess or onError or gets disposed by
1721+
* the downstream.
1722+
* <p>In case of a race between a terminal event and a dispose call, the provided {@code onFinally} action
1723+
* is executed once per subscription.
1724+
* <p>Note that the {@code onFinally} action is shared between subscriptions and as such
1725+
* should be thread-safe.
1726+
* <dl>
1727+
* <dt><b>Scheduler:</b></dt>
1728+
* <dd>{@code doFinally} does not operate by default on a particular {@link Scheduler}.</dd>
1729+
* </dl>
1730+
* @param onFinally the action called when this Single terminates or gets cancelled
1731+
* @return the new Single instance
1732+
* @since 2.0.1 - experimental
1733+
*/
1734+
@SchedulerSupport(SchedulerSupport.NONE)
1735+
@Experimental
1736+
public final Single<T> doFinally(Action onFinally) {
1737+
ObjectHelper.requireNonNull(onFinally, "onFinally is null");
1738+
return RxJavaPlugins.onAssembly(new SingleDoFinally<T>(this, onFinally));
1739+
}
1740+
17191741
/**
17201742
* Calls the shared consumer with the Disposable sent through the onSubscribe for each
17211743
* SingleObserver that subscribes to the current Single.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.completable;
15+
16+
import java.util.concurrent.atomic.AtomicInteger;
17+
18+
import io.reactivex.*;
19+
import io.reactivex.annotations.Experimental;
20+
import io.reactivex.disposables.Disposable;
21+
import io.reactivex.exceptions.Exceptions;
22+
import io.reactivex.functions.Action;
23+
import io.reactivex.internal.disposables.DisposableHelper;
24+
import io.reactivex.plugins.RxJavaPlugins;
25+
26+
/**
27+
* Execute an action after an onError, onComplete or a dispose event.
28+
*
29+
* @since 2.0.1 - experimental
30+
*/
31+
@Experimental
32+
public final class CompletableDoFinally extends Completable {
33+
34+
final CompletableSource source;
35+
36+
final Action onFinally;
37+
38+
public CompletableDoFinally(CompletableSource source, Action onFinally) {
39+
this.source = source;
40+
this.onFinally = onFinally;
41+
}
42+
43+
@Override
44+
protected void subscribeActual(CompletableObserver s) {
45+
source.subscribe(new DoFinallyObserver(s, onFinally));
46+
}
47+
48+
static final class DoFinallyObserver extends AtomicInteger implements CompletableObserver, Disposable {
49+
50+
private static final long serialVersionUID = 4109457741734051389L;
51+
52+
final CompletableObserver actual;
53+
54+
final Action onFinally;
55+
56+
Disposable d;
57+
58+
DoFinallyObserver(CompletableObserver actual, Action onFinally) {
59+
this.actual = actual;
60+
this.onFinally = onFinally;
61+
}
62+
63+
@Override
64+
public void onSubscribe(Disposable d) {
65+
if (DisposableHelper.validate(this.d, d)) {
66+
this.d = d;
67+
68+
actual.onSubscribe(this);
69+
}
70+
}
71+
72+
@Override
73+
public void onError(Throwable t) {
74+
actual.onError(t);
75+
runFinally();
76+
}
77+
78+
@Override
79+
public void onComplete() {
80+
actual.onComplete();
81+
runFinally();
82+
}
83+
84+
@Override
85+
public void dispose() {
86+
d.dispose();
87+
runFinally();
88+
}
89+
90+
@Override
91+
public boolean isDisposed() {
92+
return d.isDisposed();
93+
}
94+
95+
void runFinally() {
96+
if (compareAndSet(0, 1)) {
97+
try {
98+
onFinally.run();
99+
} catch (Throwable ex) {
100+
Exceptions.throwIfFatal(ex);
101+
RxJavaPlugins.onError(ex);
102+
}
103+
}
104+
}
105+
}
106+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.maybe;
15+
16+
import java.util.concurrent.atomic.AtomicInteger;
17+
18+
import io.reactivex.*;
19+
import io.reactivex.annotations.Experimental;
20+
import io.reactivex.disposables.Disposable;
21+
import io.reactivex.exceptions.Exceptions;
22+
import io.reactivex.functions.Action;
23+
import io.reactivex.internal.disposables.DisposableHelper;
24+
import io.reactivex.plugins.RxJavaPlugins;
25+
26+
/**
27+
* Execute an action after an onSuccess, onError, onComplete or a dispose event.
28+
*
29+
* @param <T> the value type
30+
* @since 2.0.1 - experimental
31+
*/
32+
@Experimental
33+
public final class MaybeDoFinally<T> extends AbstractMaybeWithUpstream<T, T> {
34+
35+
final Action onFinally;
36+
37+
public MaybeDoFinally(MaybeSource<T> source, Action onFinally) {
38+
super(source);
39+
this.onFinally = onFinally;
40+
}
41+
42+
@Override
43+
protected void subscribeActual(MaybeObserver<? super T> s) {
44+
source.subscribe(new DoFinallyObserver<T>(s, onFinally));
45+
}
46+
47+
static final class DoFinallyObserver<T> extends AtomicInteger implements MaybeObserver<T>, Disposable {
48+
49+
private static final long serialVersionUID = 4109457741734051389L;
50+
51+
final MaybeObserver<? super T> actual;
52+
53+
final Action onFinally;
54+
55+
Disposable d;
56+
57+
DoFinallyObserver(MaybeObserver<? super T> actual, Action onFinally) {
58+
this.actual = actual;
59+
this.onFinally = onFinally;
60+
}
61+
62+
@Override
63+
public void onSubscribe(Disposable d) {
64+
if (DisposableHelper.validate(this.d, d)) {
65+
this.d = d;
66+
67+
actual.onSubscribe(this);
68+
}
69+
}
70+
71+
@Override
72+
public void onSuccess(T t) {
73+
actual.onSuccess(t);
74+
runFinally();
75+
}
76+
77+
@Override
78+
public void onError(Throwable t) {
79+
actual.onError(t);
80+
runFinally();
81+
}
82+
83+
@Override
84+
public void onComplete() {
85+
actual.onComplete();
86+
runFinally();
87+
}
88+
89+
@Override
90+
public void dispose() {
91+
d.dispose();
92+
runFinally();
93+
}
94+
95+
@Override
96+
public boolean isDisposed() {
97+
return d.isDisposed();
98+
}
99+
100+
void runFinally() {
101+
if (compareAndSet(0, 1)) {
102+
try {
103+
onFinally.run();
104+
} catch (Throwable ex) {
105+
Exceptions.throwIfFatal(ex);
106+
RxJavaPlugins.onError(ex);
107+
}
108+
}
109+
}
110+
}
111+
}

0 commit comments

Comments
 (0)