From 637a2493a6ee6db3b080f7a3b835feeed899c030 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 30 Aug 2018 12:21:11 +0200 Subject: [PATCH 1/3] 2.x: Fix refCount termination-reconnect race --- .../disposables/ResettableConnectable.java | 35 ++++++++++++++++++ .../operators/flowable/FlowableRefCount.java | 11 +++++- .../operators/flowable/FlowableReplay.java | 14 +++---- .../observable/ObservableRefCount.java | 11 +++++- .../observable/ObservableReplay.java | 13 ++----- .../flowable/FlowableRefCountTest.java | 37 ++++++++++++++++--- .../observable/ObservableRefCountTest.java | 37 ++++++++++++++++--- 7 files changed, 126 insertions(+), 32 deletions(-) create mode 100644 src/main/java/io/reactivex/internal/disposables/ResettableConnectable.java diff --git a/src/main/java/io/reactivex/internal/disposables/ResettableConnectable.java b/src/main/java/io/reactivex/internal/disposables/ResettableConnectable.java new file mode 100644 index 0000000000..32b986f76d --- /dev/null +++ b/src/main/java/io/reactivex/internal/disposables/ResettableConnectable.java @@ -0,0 +1,35 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.disposables; + +import io.reactivex.annotations.Experimental; +import io.reactivex.flowables.ConnectableFlowable; +import io.reactivex.observables.ConnectableObservable; + +/** + * Interface allowing conditional resetting of connections in {@link ConnectableObservable}s + * and {@link ConnectableFlowable}s. + * @since 2.2.2 - experimental + */ +@Experimental +public interface ResettableConnectable { + + /** + * Reset the connectable if the current internal connection object is the + * same as the provided object. + * @param connectionObject the connection object identifying the last known + * active connection + */ + void resetIf(Object connectionObject); +} diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableRefCount.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableRefCount.java index 82a1373228..57579c0e92 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableRefCount.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableRefCount.java @@ -95,7 +95,7 @@ protected void subscribeActual(Subscriber s) { void cancel(RefConnection rc) { SequentialDisposable sd; synchronized (this) { - if (connection == null) { + if (connection == null || connection != rc) { return; } long c = rc.subscriberCount - 1; @@ -116,13 +116,17 @@ void cancel(RefConnection rc) { void terminated(RefConnection rc) { synchronized (this) { - if (connection != null) { + if (connection != null && connection == rc) { connection = null; if (rc.timer != null) { rc.timer.dispose(); } + } + if (--rc.subscriberCount == 0) { if (source instanceof Disposable) { ((Disposable)source).dispose(); + } else if (source instanceof ResettableConnectable) { + ((ResettableConnectable)source).resetIf(rc.get()); } } } @@ -132,9 +136,12 @@ void timeout(RefConnection rc) { synchronized (this) { if (rc.subscriberCount == 0 && rc == connection) { connection = null; + Object connectionObject = rc.get(); DisposableHelper.dispose(rc); if (source instanceof Disposable) { ((Disposable)source).dispose(); + } else if (source instanceof ResettableConnectable) { + ((ResettableConnectable)source).resetIf(connectionObject); } } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java index 51943b9c48..4b9ac2a969 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java @@ -24,6 +24,7 @@ import io.reactivex.exceptions.Exceptions; import io.reactivex.flowables.ConnectableFlowable; import io.reactivex.functions.*; +import io.reactivex.internal.disposables.ResettableConnectable; import io.reactivex.internal.functions.ObjectHelper; import io.reactivex.internal.fuseable.HasUpstreamPublisher; import io.reactivex.internal.subscribers.SubscriberResourceWrapper; @@ -32,7 +33,7 @@ import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.schedulers.Timed; -public final class FlowableReplay extends ConnectableFlowable implements HasUpstreamPublisher, Disposable { +public final class FlowableReplay extends ConnectableFlowable implements HasUpstreamPublisher, ResettableConnectable { /** The source observable. */ final Flowable source; /** Holds the current subscriber that is, will be or just was subscribed to the source observable. */ @@ -161,15 +162,10 @@ protected void subscribeActual(Subscriber s) { onSubscribe.subscribe(s); } + @SuppressWarnings({ "unchecked", "rawtypes" }) @Override - public void dispose() { - current.lazySet(null); - } - - @Override - public boolean isDisposed() { - Disposable d = current.get(); - return d == null || d.isDisposed(); + public void resetIf(Object connectionObject) { + current.compareAndSet((ReplaySubscriber)connectionObject, null); } @Override diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableRefCount.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableRefCount.java index 59b571640d..7969e56e10 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableRefCount.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableRefCount.java @@ -92,7 +92,7 @@ protected void subscribeActual(Observer observer) { void cancel(RefConnection rc) { SequentialDisposable sd; synchronized (this) { - if (connection == null) { + if (connection == null || connection != rc) { return; } long c = rc.subscriberCount - 1; @@ -113,13 +113,17 @@ void cancel(RefConnection rc) { void terminated(RefConnection rc) { synchronized (this) { - if (connection != null) { + if (connection != null && connection == rc) { connection = null; if (rc.timer != null) { rc.timer.dispose(); } + } + if (--rc.subscriberCount == 0) { if (source instanceof Disposable) { ((Disposable)source).dispose(); + } else if (source instanceof ResettableConnectable) { + ((ResettableConnectable)source).resetIf(rc.get()); } } } @@ -129,9 +133,12 @@ void timeout(RefConnection rc) { synchronized (this) { if (rc.subscriberCount == 0 && rc == connection) { connection = null; + Object connectionObject = rc.get(); DisposableHelper.dispose(rc); if (source instanceof Disposable) { ((Disposable)source).dispose(); + } else if (source instanceof ResettableConnectable) { + ((ResettableConnectable)source).resetIf(connectionObject); } } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java index a1c75b67c0..c6e7aa9abe 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java @@ -31,7 +31,7 @@ import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.schedulers.Timed; -public final class ObservableReplay extends ConnectableObservable implements HasUpstreamObservableSource, Disposable { +public final class ObservableReplay extends ConnectableObservable implements HasUpstreamObservableSource, ResettableConnectable { /** The source observable. */ final ObservableSource source; /** Holds the current subscriber that is, will be or just was subscribed to the source observable. */ @@ -159,15 +159,10 @@ public ObservableSource source() { return source; } + @SuppressWarnings({ "unchecked", "rawtypes" }) @Override - public void dispose() { - current.lazySet(null); - } - - @Override - public boolean isDisposed() { - Disposable d = current.get(); - return d == null || d.isDisposed(); + public void resetIf(Object connectionObject) { + current.compareAndSet((ReplayObserver)connectionObject, null); } @Override diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java index 8eefc701e3..2951031a0c 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java @@ -788,15 +788,17 @@ public void replayIsUnsubscribed() { ConnectableFlowable cf = Flowable.just(1) .replay(); - assertTrue(((Disposable)cf).isDisposed()); + if (cf instanceof Disposable) { + assertTrue(((Disposable)cf).isDisposed()); - Disposable connection = cf.connect(); + Disposable connection = cf.connect(); - assertFalse(((Disposable)cf).isDisposed()); + assertFalse(((Disposable)cf).isDisposed()); - connection.dispose(); + connection.dispose(); - assertTrue(((Disposable)cf).isDisposed()); + assertTrue(((Disposable)cf).isDisposed()); + } } static final class BadFlowableSubscribe extends ConnectableFlowable { @@ -1326,4 +1328,29 @@ public void cancelTerminateStateExclusion() { o.connection = rc; o.cancel(rc); } + + @Test + public void replayRefCountShallBeThreadSafe() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + Flowable flowable = Flowable.just(1).replay(1).refCount(); + + TestSubscriber ts1 = flowable + .subscribeOn(Schedulers.io()) + .test(); + + TestSubscriber ts2 = flowable + .subscribeOn(Schedulers.io()) + .test(); + + ts1 + .withTag("" + i) + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1); + + ts2 + .withTag("" + i) + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java index 37efa77570..e212703c8c 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java @@ -765,15 +765,17 @@ public void replayIsUnsubscribed() { ConnectableObservable co = Observable.just(1).concatWith(Observable.never()) .replay(); - assertTrue(((Disposable)co).isDisposed()); + if (co instanceof Disposable) { + assertTrue(((Disposable)co).isDisposed()); - Disposable connection = co.connect(); + Disposable connection = co.connect(); - assertFalse(((Disposable)co).isDisposed()); + assertFalse(((Disposable)co).isDisposed()); - connection.dispose(); + connection.dispose(); - assertTrue(((Disposable)co).isDisposed()); + assertTrue(((Disposable)co).isDisposed()); + } } static final class BadObservableSubscribe extends ConnectableObservable { @@ -1275,4 +1277,29 @@ public void cancelTerminateStateExclusion() { o.connection = rc; o.cancel(rc); } + + @Test + public void replayRefCountShallBeThreadSafe() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + Observable observable = Observable.just(1).replay(1).refCount(); + + TestObserver observer1 = observable + .subscribeOn(Schedulers.io()) + .test(); + + TestObserver observer2 = observable + .subscribeOn(Schedulers.io()) + .test(); + + observer1 + .withTag("" + i) + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1); + + observer2 + .withTag("" + i) + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1); + } + } } From 65d48afbfc107dfd8b10be5c1b69a485fc3380a3 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 30 Aug 2018 13:03:03 +0200 Subject: [PATCH 2/3] Add/restore coverage --- .../flowable/FlowableRefCountTest.java | 41 ++++++++++++++++++ .../observable/ObservableRefCountTest.java | 43 +++++++++++++++++++ 2 files changed, 84 insertions(+) diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java index 2951031a0c..6bd46295e3 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java @@ -1327,6 +1327,9 @@ public void cancelTerminateStateExclusion() { rc.connected = true; o.connection = rc; o.cancel(rc); + + o.connection = rc; + o.cancel(new RefConnection(o)); } @Test @@ -1353,4 +1356,42 @@ public void replayRefCountShallBeThreadSafe() { .assertResult(1); } } + + static final class TestConnectableFlowable extends ConnectableFlowable + implements Disposable { + + volatile boolean disposed; + + @Override + public void dispose() { + disposed = true; + } + + @Override + public boolean isDisposed() { + return disposed; + } + + @Override + public void connect(Consumer connection) { + // not relevant + } + + @Override + protected void subscribeActual(Subscriber subscriber) { + // not relevant + } + } + + @Test + public void timeoutDisposesSource() { + FlowableRefCount o = (FlowableRefCount)new TestConnectableFlowable().refCount(); + + RefConnection rc = new RefConnection(o); + o.connection = rc; + + o.timeout(rc); + + assertTrue(((Disposable)o.source).isDisposed()); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java index e212703c8c..d75498bc69 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java @@ -1241,6 +1241,8 @@ public void cancelTerminateStateExclusion() { o.cancel(null); + o.cancel(new RefConnection(o)); + RefConnection rc = new RefConnection(o); o.connection = null; rc.subscriberCount = 0; @@ -1276,6 +1278,9 @@ public void cancelTerminateStateExclusion() { rc.connected = true; o.connection = rc; o.cancel(rc); + + o.connection = rc; + o.cancel(new RefConnection(o)); } @Test @@ -1302,4 +1307,42 @@ public void replayRefCountShallBeThreadSafe() { .assertResult(1); } } + + static final class TestConnectableObservable extends ConnectableObservable + implements Disposable { + + volatile boolean disposed; + + @Override + public void dispose() { + disposed = true; + } + + @Override + public boolean isDisposed() { + return disposed; + } + + @Override + public void connect(Consumer connection) { + // not relevant + } + + @Override + protected void subscribeActual(Observer observer) { + // not relevant + } + } + + @Test + public void timeoutDisposesSource() { + ObservableRefCount o = (ObservableRefCount)new TestConnectableObservable().refCount(); + + RefConnection rc = new RefConnection(o); + o.connection = rc; + + o.timeout(rc); + + assertTrue(((Disposable)o.source).isDisposed()); + } } From 05ac5d1d943f7429d41d79fc1316c7787ea528d7 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 30 Aug 2018 15:12:35 +0200 Subject: [PATCH 3/3] Update ResettableConnectable interface and definitions --- .../disposables/ResettableConnectable.java | 29 +++++++++++++++---- .../operators/flowable/FlowableRefCount.java | 2 +- .../operators/flowable/FlowableReplay.java | 2 +- .../observable/ObservableRefCount.java | 2 +- .../observable/ObservableReplay.java | 2 +- 5 files changed, 28 insertions(+), 9 deletions(-) diff --git a/src/main/java/io/reactivex/internal/disposables/ResettableConnectable.java b/src/main/java/io/reactivex/internal/disposables/ResettableConnectable.java index 32b986f76d..a111080a77 100644 --- a/src/main/java/io/reactivex/internal/disposables/ResettableConnectable.java +++ b/src/main/java/io/reactivex/internal/disposables/ResettableConnectable.java @@ -14,6 +14,7 @@ package io.reactivex.internal.disposables; import io.reactivex.annotations.Experimental; +import io.reactivex.disposables.Disposable; import io.reactivex.flowables.ConnectableFlowable; import io.reactivex.observables.ConnectableObservable; @@ -26,10 +27,28 @@ public interface ResettableConnectable { /** - * Reset the connectable if the current internal connection object is the - * same as the provided object. - * @param connectionObject the connection object identifying the last known - * active connection + * Reset the connectable source only if the given {@link Disposable} {@code connection} instance + * is still representing a connection established by a previous {@code connect()} connection. + *

+ * For example, an immediately previous connection should reset the connectable source: + *


+     * Disposable d = connectable.connect();
+     * 
+     * ((ResettableConnectable)connectable).resetIf(d);
+     * 
+ * However, if the connection indicator {@code Disposable} is from a much earlier connection, + * it should not affect the current connection: + *

+     * Disposable d1 = connectable.connect();
+     * d.dispose();
+     *
+     * Disposable d2 = connectable.connect();
+     *
+     * ((ResettableConnectable)connectable).resetIf(d);
+     * 
+     * assertFalse(d2.isDisposed());
+     * 
+ * @param connection the disposable received from a previous {@code connect()} call. */ - void resetIf(Object connectionObject); + void resetIf(Disposable connection); } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableRefCount.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableRefCount.java index 57579c0e92..f966f01365 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableRefCount.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableRefCount.java @@ -136,7 +136,7 @@ void timeout(RefConnection rc) { synchronized (this) { if (rc.subscriberCount == 0 && rc == connection) { connection = null; - Object connectionObject = rc.get(); + Disposable connectionObject = rc.get(); DisposableHelper.dispose(rc); if (source instanceof Disposable) { ((Disposable)source).dispose(); diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java index 4b9ac2a969..21b1b1d39c 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java @@ -164,7 +164,7 @@ protected void subscribeActual(Subscriber s) { @SuppressWarnings({ "unchecked", "rawtypes" }) @Override - public void resetIf(Object connectionObject) { + public void resetIf(Disposable connectionObject) { current.compareAndSet((ReplaySubscriber)connectionObject, null); } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableRefCount.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableRefCount.java index 7969e56e10..3dced24de6 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableRefCount.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableRefCount.java @@ -133,7 +133,7 @@ void timeout(RefConnection rc) { synchronized (this) { if (rc.subscriberCount == 0 && rc == connection) { connection = null; - Object connectionObject = rc.get(); + Disposable connectionObject = rc.get(); DisposableHelper.dispose(rc); if (source instanceof Disposable) { ((Disposable)source).dispose(); diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java index c6e7aa9abe..89db184d6b 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java @@ -161,7 +161,7 @@ public ObservableSource source() { @SuppressWarnings({ "unchecked", "rawtypes" }) @Override - public void resetIf(Object connectionObject) { + public void resetIf(Disposable connectionObject) { current.compareAndSet((ReplayObserver)connectionObject, null); }