Skip to content

Commit 62d2b31

Browse files
authored
3.x: Make using() resource disposal order consistent with eager-mode (#6534)
1 parent 1fe60f5 commit 62d2b31

File tree

15 files changed

+392
-39
lines changed

15 files changed

+392
-39
lines changed

src/main/java/io/reactivex/Completable.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -1032,8 +1032,11 @@ public static <R> Completable using(Supplier<R> resourceSupplier,
10321032
* @param completableFunction the function that given a resource returns a non-null
10331033
* Completable instance that will be subscribed to
10341034
* @param disposer the consumer that disposes the resource created by the resource supplier
1035-
* @param eager if true, the resource is disposed before the terminal event is emitted, if false, the
1036-
* resource is disposed after the terminal event has been emitted
1035+
* @param eager
1036+
* If {@code true} then resource disposal will happen either on a {@code dispose()} call before the upstream is disposed
1037+
* or just before the emission of a terminal event ({@code onComplete} or {@code onError}).
1038+
* If {@code false} the resource disposal will happen either on a {@code dispose()} call after the upstream is disposed
1039+
* or just after the emission of a terminal event ({@code onComplete} or {@code onError}).
10371040
* @return the new Completable instance
10381041
*/
10391042
@CheckReturnValue

src/main/java/io/reactivex/Flowable.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -4609,8 +4609,10 @@ public static <T, D> Flowable<T> using(Supplier<? extends D> resourceSupplier,
46094609
* @param resourceDisposer
46104610
* the function that will dispose of the resource
46114611
* @param eager
4612-
* if {@code true} then disposal will happen either on cancellation or just before emission of
4613-
* a terminal event ({@code onComplete} or {@code onError}).
4612+
* If {@code true} then resource disposal will happen either on a {@code cancel()} call before the upstream is disposed
4613+
* or just before the emission of a terminal event ({@code onComplete} or {@code onError}).
4614+
* If {@code false} the resource disposal will happen either on a {@code cancel()} call after the upstream is disposed
4615+
* or just after the emission of a terminal event ({@code onComplete} or {@code onError}).
46144616
* @return the Publisher whose lifetime controls the lifetime of the dependent resource object
46154617
* @see <a href="http://reactivex.io/documentation/operators/using.html">ReactiveX operators documentation: Using</a>
46164618
* @since 2.0

src/main/java/io/reactivex/Maybe.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -1772,8 +1772,10 @@ public static <T, D> Maybe<T> using(Supplier<? extends D> resourceSupplier,
17721772
* @param resourceDisposer
17731773
* the function that will dispose of the resource
17741774
* @param eager
1775-
* if {@code true} then disposal will happen either on a dispose() call or just before emission of
1776-
* a terminal event ({@code onComplete} or {@code onError}).
1775+
* If {@code true} then resource disposal will happen either on a {@code dispose()} call before the upstream is disposed
1776+
* or just before the emission of a terminal event ({@code onSuccess}, {@code onComplete} or {@code onError}).
1777+
* If {@code false} the resource disposal will happen either on a {@code dispose()} call after the upstream is disposed
1778+
* or just after the emission of a terminal event ({@code onSuccess}, {@code onComplete} or {@code onError}).
17771779
* @return the Maybe whose lifetime controls the lifetime of the dependent resource object
17781780
* @see <a href="http://reactivex.io/documentation/operators/using.html">ReactiveX operators documentation: Using</a>
17791781
*/

src/main/java/io/reactivex/Observable.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -4095,8 +4095,10 @@ public static <T, D> Observable<T> using(Supplier<? extends D> resourceSupplier,
40954095
* @param disposer
40964096
* the function that will dispose of the resource
40974097
* @param eager
4098-
* if {@code true} then disposal will happen either on a dispose() call or just before emission of
4099-
* a terminal event ({@code onComplete} or {@code onError}).
4098+
* If {@code true} then resource disposal will happen either on a {@code dispose()} call before the upstream is disposed
4099+
* or just before the emission of a terminal event ({@code onComplete} or {@code onError}).
4100+
* If {@code false} the resource disposal will happen either on a {@code dispose()} call after the upstream is disposed
4101+
* or just after the emission of a terminal event ({@code onComplete} or {@code onError}).
41004102
* @return the ObservableSource whose lifetime controls the lifetime of the dependent resource object
41014103
* @see <a href="http://reactivex.io/documentation/operators/using.html">ReactiveX operators documentation: Using</a>
41024104
* @since 2.0

src/main/java/io/reactivex/Single.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -1483,8 +1483,10 @@ public static <T, U> Single<T> using(Supplier<U> resourceSupplier,
14831483
* that particular resource when the generated SingleSource terminates
14841484
* (successfully or with an error) or gets disposed.
14851485
* @param eager
1486-
* if true, the disposer is called before the terminal event is signalled
1487-
* if false, the disposer is called after the terminal event is delivered to downstream
1486+
* If {@code true} then resource disposal will happen either on a {@code dispose()} call before the upstream is disposed
1487+
* or just before the emission of a terminal event ({@code onSuccess} or {@code onError}).
1488+
* If {@code false} the resource disposal will happen either on a {@code dispose()} call after the upstream is disposed
1489+
* or just after the emission of a terminal event ({@code onSuccess} or {@code onError}).
14881490
* @return the new Single instance
14891491
* @since 2.0
14901492
*/

src/main/java/io/reactivex/internal/operators/completable/CompletableUsing.java

+12-6
Original file line numberDiff line numberDiff line change
@@ -106,13 +106,19 @@ static final class UsingObserver<R>
106106

107107
@Override
108108
public void dispose() {
109-
upstream.dispose();
110-
upstream = DisposableHelper.DISPOSED;
111-
disposeResourceAfter();
109+
if (eager) {
110+
disposeResource();
111+
upstream.dispose();
112+
upstream = DisposableHelper.DISPOSED;
113+
} else {
114+
upstream.dispose();
115+
upstream = DisposableHelper.DISPOSED;
116+
disposeResource();
117+
}
112118
}
113119

114120
@SuppressWarnings("unchecked")
115-
void disposeResourceAfter() {
121+
void disposeResource() {
116122
Object resource = getAndSet(this);
117123
if (resource != this) {
118124
try {
@@ -159,7 +165,7 @@ public void onError(Throwable e) {
159165
downstream.onError(e);
160166

161167
if (!eager) {
162-
disposeResourceAfter();
168+
disposeResource();
163169
}
164170
}
165171

@@ -185,7 +191,7 @@ public void onComplete() {
185191
downstream.onComplete();
186192

187193
if (!eager) {
188-
disposeResourceAfter();
194+
disposeResource();
189195
}
190196
}
191197
}

src/main/java/io/reactivex/internal/operators/flowable/FlowableUsing.java

+12-5
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ public void onError(Throwable t) {
126126
} else {
127127
downstream.onError(t);
128128
upstream.cancel();
129-
disposeAfter();
129+
disposeResource();
130130
}
131131
}
132132

@@ -148,7 +148,7 @@ public void onComplete() {
148148
} else {
149149
downstream.onComplete();
150150
upstream.cancel();
151-
disposeAfter();
151+
disposeResource();
152152
}
153153
}
154154

@@ -159,11 +159,18 @@ public void request(long n) {
159159

160160
@Override
161161
public void cancel() {
162-
disposeAfter();
163-
upstream.cancel();
162+
if (eager) {
163+
disposeResource();
164+
upstream.cancel();
165+
upstream = SubscriptionHelper.CANCELLED;
166+
} else {
167+
upstream.cancel();
168+
upstream = SubscriptionHelper.CANCELLED;
169+
disposeResource();
170+
}
164171
}
165172

166-
void disposeAfter() {
173+
void disposeResource() {
167174
if (compareAndSet(false, true)) {
168175
try {
169176
disposer.accept(resource);

src/main/java/io/reactivex/internal/operators/maybe/MaybeUsing.java

+13-7
Original file line numberDiff line numberDiff line change
@@ -117,13 +117,19 @@ static final class UsingObserver<T, D>
117117

118118
@Override
119119
public void dispose() {
120-
upstream.dispose();
121-
upstream = DisposableHelper.DISPOSED;
122-
disposeResourceAfter();
120+
if (eager) {
121+
disposeResource();
122+
upstream.dispose();
123+
upstream = DisposableHelper.DISPOSED;
124+
} else {
125+
upstream.dispose();
126+
upstream = DisposableHelper.DISPOSED;
127+
disposeResource();
128+
}
123129
}
124130

125131
@SuppressWarnings("unchecked")
126-
void disposeResourceAfter() {
132+
void disposeResource() {
127133
Object resource = getAndSet(this);
128134
if (resource != this) {
129135
try {
@@ -171,7 +177,7 @@ public void onSuccess(T value) {
171177
downstream.onSuccess(value);
172178

173179
if (!eager) {
174-
disposeResourceAfter();
180+
disposeResource();
175181
}
176182
}
177183

@@ -196,7 +202,7 @@ public void onError(Throwable e) {
196202
downstream.onError(e);
197203

198204
if (!eager) {
199-
disposeResourceAfter();
205+
disposeResource();
200206
}
201207
}
202208

@@ -222,7 +228,7 @@ public void onComplete() {
222228
downstream.onComplete();
223229

224230
if (!eager) {
225-
disposeResourceAfter();
231+
disposeResource();
226232
}
227233
}
228234
}

src/main/java/io/reactivex/internal/operators/observable/ObservableUsing.java

+12-5
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ public void onError(Throwable t) {
120120
} else {
121121
downstream.onError(t);
122122
upstream.dispose();
123-
disposeAfter();
123+
disposeResource();
124124
}
125125
}
126126

@@ -142,22 +142,29 @@ public void onComplete() {
142142
} else {
143143
downstream.onComplete();
144144
upstream.dispose();
145-
disposeAfter();
145+
disposeResource();
146146
}
147147
}
148148

149149
@Override
150150
public void dispose() {
151-
disposeAfter();
152-
upstream.dispose();
151+
if (eager) {
152+
disposeResource();
153+
upstream.dispose();
154+
upstream = DisposableHelper.DISPOSED;
155+
} else {
156+
upstream.dispose();
157+
upstream = DisposableHelper.DISPOSED;
158+
disposeResource();
159+
}
153160
}
154161

155162
@Override
156163
public boolean isDisposed() {
157164
return get();
158165
}
159166

160-
void disposeAfter() {
167+
void disposeResource() {
161168
if (compareAndSet(false, true)) {
162169
try {
163170
disposer.accept(resource);

src/main/java/io/reactivex/internal/operators/single/SingleUsing.java

+12-6
Original file line numberDiff line numberDiff line change
@@ -106,9 +106,15 @@ static final class UsingSingleObserver<T, U> extends
106106

107107
@Override
108108
public void dispose() {
109-
upstream.dispose();
110-
upstream = DisposableHelper.DISPOSED;
111-
disposeAfter();
109+
if (eager) {
110+
disposeResource();
111+
upstream.dispose();
112+
upstream = DisposableHelper.DISPOSED;
113+
} else {
114+
upstream.dispose();
115+
upstream = DisposableHelper.DISPOSED;
116+
disposeResource();
117+
}
112118
}
113119

114120
@Override
@@ -148,7 +154,7 @@ public void onSuccess(T value) {
148154
downstream.onSuccess(value);
149155

150156
if (!eager) {
151-
disposeAfter();
157+
disposeResource();
152158
}
153159
}
154160

@@ -174,12 +180,12 @@ public void onError(Throwable e) {
174180
downstream.onError(e);
175181

176182
if (!eager) {
177-
disposeAfter();
183+
disposeResource();
178184
}
179185
}
180186

181187
@SuppressWarnings("unchecked")
182-
void disposeAfter() {
188+
void disposeResource() {
183189
Object u = getAndSet(this);
184190
if (u != this) {
185191
try {

src/test/java/io/reactivex/internal/operators/completable/CompletableUsingTest.java

+63
Original file line numberDiff line numberDiff line change
@@ -570,4 +570,67 @@ public void run() {
570570
}
571571
}
572572

573+
@Test
574+
public void eagerDisposeResourceThenDisposeUpstream() {
575+
final StringBuilder sb = new StringBuilder();
576+
577+
TestObserver<Void> to = Completable.using(Functions.justSupplier(1),
578+
new Function<Integer, Completable>() {
579+
@Override
580+
public Completable apply(Integer t) throws Throwable {
581+
return Completable.never()
582+
.doOnDispose(new Action() {
583+
@Override
584+
public void run() throws Throwable {
585+
sb.append("Dispose");
586+
}
587+
})
588+
;
589+
}
590+
}, new Consumer<Integer>() {
591+
@Override
592+
public void accept(Integer t) throws Throwable {
593+
sb.append("Resource");
594+
}
595+
}, true)
596+
.test()
597+
;
598+
to.assertEmpty();
599+
600+
to.dispose();
601+
602+
assertEquals("ResourceDispose", sb.toString());
603+
}
604+
605+
@Test
606+
public void nonEagerDisposeUpstreamThenDisposeResource() {
607+
final StringBuilder sb = new StringBuilder();
608+
609+
TestObserver<Void> to = Completable.using(Functions.justSupplier(1),
610+
new Function<Integer, Completable>() {
611+
@Override
612+
public Completable apply(Integer t) throws Throwable {
613+
return Completable.never()
614+
.doOnDispose(new Action() {
615+
@Override
616+
public void run() throws Throwable {
617+
sb.append("Dispose");
618+
}
619+
})
620+
;
621+
}
622+
}, new Consumer<Integer>() {
623+
@Override
624+
public void accept(Integer t) throws Throwable {
625+
sb.append("Resource");
626+
}
627+
}, false)
628+
.test()
629+
;
630+
to.assertEmpty();
631+
632+
to.dispose();
633+
634+
assertEquals("DisposeResource", sb.toString());
635+
}
573636
}

0 commit comments

Comments
 (0)