Skip to content

Commit c7d91c6

Browse files
authored
2.x: Fix refCount termination-reconnect race (#6187)
* 2.x: Fix refCount termination-reconnect race * Add/restore coverage * Update ResettableConnectable interface and definitions
1 parent 2e566fb commit c7d91c6

File tree

7 files changed

+229
-32
lines changed

7 files changed

+229
-32
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.disposables;
15+
16+
import io.reactivex.annotations.Experimental;
17+
import io.reactivex.disposables.Disposable;
18+
import io.reactivex.flowables.ConnectableFlowable;
19+
import io.reactivex.observables.ConnectableObservable;
20+
21+
/**
22+
* Interface allowing conditional resetting of connections in {@link ConnectableObservable}s
23+
* and {@link ConnectableFlowable}s.
24+
* @since 2.2.2 - experimental
25+
*/
26+
@Experimental
27+
public interface ResettableConnectable {
28+
29+
/**
30+
* Reset the connectable source only if the given {@link Disposable} {@code connection} instance
31+
* is still representing a connection established by a previous {@code connect()} connection.
32+
* <p>
33+
* For example, an immediately previous connection should reset the connectable source:
34+
* <pre><code>
35+
* Disposable d = connectable.connect();
36+
*
37+
* ((ResettableConnectable)connectable).resetIf(d);
38+
* </code></pre>
39+
* However, if the connection indicator {@code Disposable} is from a much earlier connection,
40+
* it should not affect the current connection:
41+
* <pre><code>
42+
* Disposable d1 = connectable.connect();
43+
* d.dispose();
44+
*
45+
* Disposable d2 = connectable.connect();
46+
*
47+
* ((ResettableConnectable)connectable).resetIf(d);
48+
*
49+
* assertFalse(d2.isDisposed());
50+
* </code></pre>
51+
* @param connection the disposable received from a previous {@code connect()} call.
52+
*/
53+
void resetIf(Disposable connection);
54+
}

src/main/java/io/reactivex/internal/operators/flowable/FlowableRefCount.java

+9-2
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ protected void subscribeActual(Subscriber<? super T> s) {
9595
void cancel(RefConnection rc) {
9696
SequentialDisposable sd;
9797
synchronized (this) {
98-
if (connection == null) {
98+
if (connection == null || connection != rc) {
9999
return;
100100
}
101101
long c = rc.subscriberCount - 1;
@@ -116,13 +116,17 @@ void cancel(RefConnection rc) {
116116

117117
void terminated(RefConnection rc) {
118118
synchronized (this) {
119-
if (connection != null) {
119+
if (connection != null && connection == rc) {
120120
connection = null;
121121
if (rc.timer != null) {
122122
rc.timer.dispose();
123123
}
124+
}
125+
if (--rc.subscriberCount == 0) {
124126
if (source instanceof Disposable) {
125127
((Disposable)source).dispose();
128+
} else if (source instanceof ResettableConnectable) {
129+
((ResettableConnectable)source).resetIf(rc.get());
126130
}
127131
}
128132
}
@@ -132,9 +136,12 @@ void timeout(RefConnection rc) {
132136
synchronized (this) {
133137
if (rc.subscriberCount == 0 && rc == connection) {
134138
connection = null;
139+
Disposable connectionObject = rc.get();
135140
DisposableHelper.dispose(rc);
136141
if (source instanceof Disposable) {
137142
((Disposable)source).dispose();
143+
} else if (source instanceof ResettableConnectable) {
144+
((ResettableConnectable)source).resetIf(connectionObject);
138145
}
139146
}
140147
}

src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java

+5-9
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.reactivex.exceptions.Exceptions;
2525
import io.reactivex.flowables.ConnectableFlowable;
2626
import io.reactivex.functions.*;
27+
import io.reactivex.internal.disposables.ResettableConnectable;
2728
import io.reactivex.internal.functions.ObjectHelper;
2829
import io.reactivex.internal.fuseable.HasUpstreamPublisher;
2930
import io.reactivex.internal.subscribers.SubscriberResourceWrapper;
@@ -32,7 +33,7 @@
3233
import io.reactivex.plugins.RxJavaPlugins;
3334
import io.reactivex.schedulers.Timed;
3435

35-
public final class FlowableReplay<T> extends ConnectableFlowable<T> implements HasUpstreamPublisher<T>, Disposable {
36+
public final class FlowableReplay<T> extends ConnectableFlowable<T> implements HasUpstreamPublisher<T>, ResettableConnectable {
3637
/** The source observable. */
3738
final Flowable<T> source;
3839
/** Holds the current subscriber that is, will be or just was subscribed to the source observable. */
@@ -161,15 +162,10 @@ protected void subscribeActual(Subscriber<? super T> s) {
161162
onSubscribe.subscribe(s);
162163
}
163164

165+
@SuppressWarnings({ "unchecked", "rawtypes" })
164166
@Override
165-
public void dispose() {
166-
current.lazySet(null);
167-
}
168-
169-
@Override
170-
public boolean isDisposed() {
171-
Disposable d = current.get();
172-
return d == null || d.isDisposed();
167+
public void resetIf(Disposable connectionObject) {
168+
current.compareAndSet((ReplaySubscriber)connectionObject, null);
173169
}
174170

175171
@Override

src/main/java/io/reactivex/internal/operators/observable/ObservableRefCount.java

+9-2
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ protected void subscribeActual(Observer<? super T> observer) {
9292
void cancel(RefConnection rc) {
9393
SequentialDisposable sd;
9494
synchronized (this) {
95-
if (connection == null) {
95+
if (connection == null || connection != rc) {
9696
return;
9797
}
9898
long c = rc.subscriberCount - 1;
@@ -113,13 +113,17 @@ void cancel(RefConnection rc) {
113113

114114
void terminated(RefConnection rc) {
115115
synchronized (this) {
116-
if (connection != null) {
116+
if (connection != null && connection == rc) {
117117
connection = null;
118118
if (rc.timer != null) {
119119
rc.timer.dispose();
120120
}
121+
}
122+
if (--rc.subscriberCount == 0) {
121123
if (source instanceof Disposable) {
122124
((Disposable)source).dispose();
125+
} else if (source instanceof ResettableConnectable) {
126+
((ResettableConnectable)source).resetIf(rc.get());
123127
}
124128
}
125129
}
@@ -129,9 +133,12 @@ void timeout(RefConnection rc) {
129133
synchronized (this) {
130134
if (rc.subscriberCount == 0 && rc == connection) {
131135
connection = null;
136+
Disposable connectionObject = rc.get();
132137
DisposableHelper.dispose(rc);
133138
if (source instanceof Disposable) {
134139
((Disposable)source).dispose();
140+
} else if (source instanceof ResettableConnectable) {
141+
((ResettableConnectable)source).resetIf(connectionObject);
135142
}
136143
}
137144
}

src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java

+4-9
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
import io.reactivex.plugins.RxJavaPlugins;
3232
import io.reactivex.schedulers.Timed;
3333

34-
public final class ObservableReplay<T> extends ConnectableObservable<T> implements HasUpstreamObservableSource<T>, Disposable {
34+
public final class ObservableReplay<T> extends ConnectableObservable<T> implements HasUpstreamObservableSource<T>, ResettableConnectable {
3535
/** The source observable. */
3636
final ObservableSource<T> source;
3737
/** Holds the current subscriber that is, will be or just was subscribed to the source observable. */
@@ -159,15 +159,10 @@ public ObservableSource<T> source() {
159159
return source;
160160
}
161161

162+
@SuppressWarnings({ "unchecked", "rawtypes" })
162163
@Override
163-
public void dispose() {
164-
current.lazySet(null);
165-
}
166-
167-
@Override
168-
public boolean isDisposed() {
169-
Disposable d = current.get();
170-
return d == null || d.isDisposed();
164+
public void resetIf(Disposable connectionObject) {
165+
current.compareAndSet((ReplayObserver)connectionObject, null);
171166
}
172167

173168
@Override

src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java

+73-5
Original file line numberDiff line numberDiff line change
@@ -788,15 +788,17 @@ public void replayIsUnsubscribed() {
788788
ConnectableFlowable<Integer> cf = Flowable.just(1)
789789
.replay();
790790

791-
assertTrue(((Disposable)cf).isDisposed());
791+
if (cf instanceof Disposable) {
792+
assertTrue(((Disposable)cf).isDisposed());
792793

793-
Disposable connection = cf.connect();
794+
Disposable connection = cf.connect();
794795

795-
assertFalse(((Disposable)cf).isDisposed());
796+
assertFalse(((Disposable)cf).isDisposed());
796797

797-
connection.dispose();
798+
connection.dispose();
798799

799-
assertTrue(((Disposable)cf).isDisposed());
800+
assertTrue(((Disposable)cf).isDisposed());
801+
}
800802
}
801803

802804
static final class BadFlowableSubscribe extends ConnectableFlowable<Object> {
@@ -1325,5 +1327,71 @@ public void cancelTerminateStateExclusion() {
13251327
rc.connected = true;
13261328
o.connection = rc;
13271329
o.cancel(rc);
1330+
1331+
o.connection = rc;
1332+
o.cancel(new RefConnection(o));
1333+
}
1334+
1335+
@Test
1336+
public void replayRefCountShallBeThreadSafe() {
1337+
for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) {
1338+
Flowable<Integer> flowable = Flowable.just(1).replay(1).refCount();
1339+
1340+
TestSubscriber<Integer> ts1 = flowable
1341+
.subscribeOn(Schedulers.io())
1342+
.test();
1343+
1344+
TestSubscriber<Integer> ts2 = flowable
1345+
.subscribeOn(Schedulers.io())
1346+
.test();
1347+
1348+
ts1
1349+
.withTag("" + i)
1350+
.awaitDone(5, TimeUnit.SECONDS)
1351+
.assertResult(1);
1352+
1353+
ts2
1354+
.withTag("" + i)
1355+
.awaitDone(5, TimeUnit.SECONDS)
1356+
.assertResult(1);
1357+
}
1358+
}
1359+
1360+
static final class TestConnectableFlowable<T> extends ConnectableFlowable<T>
1361+
implements Disposable {
1362+
1363+
volatile boolean disposed;
1364+
1365+
@Override
1366+
public void dispose() {
1367+
disposed = true;
1368+
}
1369+
1370+
@Override
1371+
public boolean isDisposed() {
1372+
return disposed;
1373+
}
1374+
1375+
@Override
1376+
public void connect(Consumer<? super Disposable> connection) {
1377+
// not relevant
1378+
}
1379+
1380+
@Override
1381+
protected void subscribeActual(Subscriber<? super T> subscriber) {
1382+
// not relevant
1383+
}
1384+
}
1385+
1386+
@Test
1387+
public void timeoutDisposesSource() {
1388+
FlowableRefCount<Object> o = (FlowableRefCount<Object>)new TestConnectableFlowable<Object>().refCount();
1389+
1390+
RefConnection rc = new RefConnection(o);
1391+
o.connection = rc;
1392+
1393+
o.timeout(rc);
1394+
1395+
assertTrue(((Disposable)o.source).isDisposed());
13281396
}
13291397
}

src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java

+75-5
Original file line numberDiff line numberDiff line change
@@ -765,15 +765,17 @@ public void replayIsUnsubscribed() {
765765
ConnectableObservable<Integer> co = Observable.just(1).concatWith(Observable.<Integer>never())
766766
.replay();
767767

768-
assertTrue(((Disposable)co).isDisposed());
768+
if (co instanceof Disposable) {
769+
assertTrue(((Disposable)co).isDisposed());
769770

770-
Disposable connection = co.connect();
771+
Disposable connection = co.connect();
771772

772-
assertFalse(((Disposable)co).isDisposed());
773+
assertFalse(((Disposable)co).isDisposed());
773774

774-
connection.dispose();
775+
connection.dispose();
775776

776-
assertTrue(((Disposable)co).isDisposed());
777+
assertTrue(((Disposable)co).isDisposed());
778+
}
777779
}
778780

779781
static final class BadObservableSubscribe extends ConnectableObservable<Object> {
@@ -1239,6 +1241,8 @@ public void cancelTerminateStateExclusion() {
12391241

12401242
o.cancel(null);
12411243

1244+
o.cancel(new RefConnection(o));
1245+
12421246
RefConnection rc = new RefConnection(o);
12431247
o.connection = null;
12441248
rc.subscriberCount = 0;
@@ -1274,5 +1278,71 @@ public void cancelTerminateStateExclusion() {
12741278
rc.connected = true;
12751279
o.connection = rc;
12761280
o.cancel(rc);
1281+
1282+
o.connection = rc;
1283+
o.cancel(new RefConnection(o));
1284+
}
1285+
1286+
@Test
1287+
public void replayRefCountShallBeThreadSafe() {
1288+
for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) {
1289+
Observable<Integer> observable = Observable.just(1).replay(1).refCount();
1290+
1291+
TestObserver<Integer> observer1 = observable
1292+
.subscribeOn(Schedulers.io())
1293+
.test();
1294+
1295+
TestObserver<Integer> observer2 = observable
1296+
.subscribeOn(Schedulers.io())
1297+
.test();
1298+
1299+
observer1
1300+
.withTag("" + i)
1301+
.awaitDone(5, TimeUnit.SECONDS)
1302+
.assertResult(1);
1303+
1304+
observer2
1305+
.withTag("" + i)
1306+
.awaitDone(5, TimeUnit.SECONDS)
1307+
.assertResult(1);
1308+
}
1309+
}
1310+
1311+
static final class TestConnectableObservable<T> extends ConnectableObservable<T>
1312+
implements Disposable {
1313+
1314+
volatile boolean disposed;
1315+
1316+
@Override
1317+
public void dispose() {
1318+
disposed = true;
1319+
}
1320+
1321+
@Override
1322+
public boolean isDisposed() {
1323+
return disposed;
1324+
}
1325+
1326+
@Override
1327+
public void connect(Consumer<? super Disposable> connection) {
1328+
// not relevant
1329+
}
1330+
1331+
@Override
1332+
protected void subscribeActual(Observer<? super T> observer) {
1333+
// not relevant
1334+
}
1335+
}
1336+
1337+
@Test
1338+
public void timeoutDisposesSource() {
1339+
ObservableRefCount<Object> o = (ObservableRefCount<Object>)new TestConnectableObservable<Object>().refCount();
1340+
1341+
RefConnection rc = new RefConnection(o);
1342+
o.connection = rc;
1343+
1344+
o.timeout(rc);
1345+
1346+
assertTrue(((Disposable)o.source).isDisposed());
12771347
}
12781348
}

0 commit comments

Comments
 (0)