Skip to content

Commit 5b929c4

Browse files
authored
2.x: Fix refCount() connect/subscribe/cancel deadlock (#5975)
* 2.x: Fix refCount() connect/subscribe/cancel deadlock * Add more coverage. * Improve logic and coverage. * Factor out common cleanup code. * Fix termination cleanup.
1 parent 99366ee commit 5b929c4

File tree

4 files changed

+973
-307
lines changed

4 files changed

+973
-307
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableRefCount.java

+164-158
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,19 @@
1313

1414
package io.reactivex.internal.operators.flowable;
1515

16+
import java.util.concurrent.TimeUnit;
1617
import java.util.concurrent.atomic.*;
17-
import java.util.concurrent.locks.ReentrantLock;
1818

1919
import org.reactivestreams.*;
2020

21-
import io.reactivex.FlowableSubscriber;
22-
import io.reactivex.disposables.*;
21+
import io.reactivex.*;
22+
import io.reactivex.disposables.Disposable;
2323
import io.reactivex.flowables.ConnectableFlowable;
2424
import io.reactivex.functions.Consumer;
25+
import io.reactivex.internal.disposables.*;
2526
import io.reactivex.internal.subscriptions.SubscriptionHelper;
27+
import io.reactivex.plugins.RxJavaPlugins;
28+
import io.reactivex.schedulers.Schedulers;
2629

2730
/**
2831
* Returns an observable sequence that stays connected to the source as long as
@@ -31,199 +34,202 @@
3134
* @param <T>
3235
* the value type
3336
*/
34-
public final class FlowableRefCount<T> extends AbstractFlowableWithUpstream<T, T> {
37+
public final class FlowableRefCount<T> extends Flowable<T> {
38+
3539
final ConnectableFlowable<T> source;
36-
volatile CompositeDisposable baseDisposable = new CompositeDisposable();
37-
final AtomicInteger subscriptionCount = new AtomicInteger();
38-
39-
/**
40-
* Use this lock for every subscription and disconnect action.
41-
*/
42-
final ReentrantLock lock = new ReentrantLock();
43-
44-
final class ConnectionSubscriber
45-
extends AtomicReference<Subscription>
46-
implements FlowableSubscriber<T>, Subscription {
47-
48-
private static final long serialVersionUID = 152064694420235350L;
49-
final Subscriber<? super T> subscriber;
50-
final CompositeDisposable currentBase;
51-
final Disposable resource;
52-
53-
final AtomicLong requested;
54-
55-
ConnectionSubscriber(Subscriber<? super T> subscriber,
56-
CompositeDisposable currentBase, Disposable resource) {
57-
this.subscriber = subscriber;
58-
this.currentBase = currentBase;
59-
this.resource = resource;
60-
this.requested = new AtomicLong();
61-
}
6240

63-
@Override
64-
public void onSubscribe(Subscription s) {
65-
SubscriptionHelper.deferredSetOnce(this, requested, s);
66-
}
41+
final int n;
6742

68-
@Override
69-
public void onError(Throwable e) {
70-
cleanup();
71-
subscriber.onError(e);
72-
}
43+
final long timeout;
7344

74-
@Override
75-
public void onNext(T t) {
76-
subscriber.onNext(t);
77-
}
45+
final TimeUnit unit;
7846

79-
@Override
80-
public void onComplete() {
81-
cleanup();
82-
subscriber.onComplete();
83-
}
84-
85-
@Override
86-
public void request(long n) {
87-
SubscriptionHelper.deferredRequest(this, requested, n);
88-
}
47+
final Scheduler scheduler;
8948

90-
@Override
91-
public void cancel() {
92-
SubscriptionHelper.cancel(this);
93-
resource.dispose();
94-
}
49+
RefConnection connection;
9550

96-
void cleanup() {
97-
// on error or completion we need to dispose the base CompositeDisposable
98-
// and set the subscriptionCount to 0
99-
lock.lock();
100-
try {
101-
if (baseDisposable == currentBase) {
102-
if (source instanceof Disposable) {
103-
((Disposable)source).dispose();
104-
}
105-
baseDisposable.dispose();
106-
baseDisposable = new CompositeDisposable();
107-
subscriptionCount.set(0);
108-
}
109-
} finally {
110-
lock.unlock();
111-
}
112-
}
51+
public FlowableRefCount(ConnectableFlowable<T> source) {
52+
this(source, 1, 0L, TimeUnit.NANOSECONDS, Schedulers.trampoline());
11353
}
11454

115-
/**
116-
* Constructor.
117-
*
118-
* @param source
119-
* observable to apply ref count to
120-
*/
121-
public FlowableRefCount(ConnectableFlowable<T> source) {
122-
super(source);
55+
public FlowableRefCount(ConnectableFlowable<T> source, int n, long timeout, TimeUnit unit,
56+
Scheduler scheduler) {
12357
this.source = source;
58+
this.n = n;
59+
this.timeout = timeout;
60+
this.unit = unit;
61+
this.scheduler = scheduler;
12462
}
12563

12664
@Override
127-
public void subscribeActual(final Subscriber<? super T> subscriber) {
128-
129-
lock.lock();
130-
if (subscriptionCount.incrementAndGet() == 1) {
131-
132-
final AtomicBoolean writeLocked = new AtomicBoolean(true);
133-
134-
try {
135-
// need to use this overload of connect to ensure that
136-
// baseSubscription is set in the case that source is a
137-
// synchronous Observable
138-
source.connect(onSubscribe(subscriber, writeLocked));
139-
} finally {
140-
// need to cover the case where the source is subscribed to
141-
// outside of this class thus preventing the Consumer passed
142-
// to source.connect above being called
143-
if (writeLocked.get()) {
144-
// Consumer passed to source.connect was not called
145-
lock.unlock();
146-
}
65+
protected void subscribeActual(Subscriber<? super T> s) {
66+
67+
RefConnection conn;
68+
69+
boolean connect = false;
70+
synchronized (this) {
71+
conn = connection;
72+
if (conn == null) {
73+
conn = new RefConnection(this);
74+
connection = conn;
75+
}
76+
77+
long c = conn.subscriberCount;
78+
if (c == 0L && conn.timer != null) {
79+
conn.timer.dispose();
14780
}
148-
} else {
149-
try {
150-
// ready to subscribe to source so do it
151-
doSubscribe(subscriber, baseDisposable);
152-
} finally {
153-
// release the read lock
154-
lock.unlock();
81+
conn.subscriberCount = c + 1;
82+
if (!conn.connected && c + 1 == n) {
83+
connect = true;
84+
conn.connected = true;
15585
}
15686
}
15787

88+
source.subscribe(new RefCountSubscriber<T>(s, this, conn));
89+
90+
if (connect) {
91+
source.connect(conn);
92+
}
15893
}
15994

160-
private Consumer<Disposable> onSubscribe(final Subscriber<? super T> subscriber,
161-
final AtomicBoolean writeLocked) {
162-
return new DisposeConsumer(subscriber, writeLocked);
95+
void cancel(RefConnection rc) {
96+
SequentialDisposable sd;
97+
synchronized (this) {
98+
if (connection == null) {
99+
return;
100+
}
101+
long c = rc.subscriberCount - 1;
102+
rc.subscriberCount = c;
103+
if (c != 0L || !rc.connected) {
104+
return;
105+
}
106+
if (timeout == 0L) {
107+
timeout(rc);
108+
return;
109+
}
110+
sd = new SequentialDisposable();
111+
rc.timer = sd;
112+
}
113+
114+
sd.replace(scheduler.scheduleDirect(rc, timeout, unit));
163115
}
164116

165-
void doSubscribe(final Subscriber<? super T> subscriber, final CompositeDisposable currentBase) {
166-
// handle disposing from the base subscription
167-
Disposable d = disconnect(currentBase);
168117

169-
ConnectionSubscriber connection = new ConnectionSubscriber(subscriber, currentBase, d);
170-
subscriber.onSubscribe(connection);
118+
void terminated(RefConnection rc) {
119+
synchronized (this) {
120+
if (connection != null) {
121+
connection = null;
122+
if (rc.timer != null) {
123+
rc.timer.dispose();
124+
}
125+
if (source instanceof Disposable) {
126+
((Disposable)source).dispose();
127+
}
128+
}
129+
}
130+
}
171131

172-
source.subscribe(connection);
132+
void timeout(RefConnection rc) {
133+
synchronized (this) {
134+
if (rc.subscriberCount == 0 && rc == connection) {
135+
connection = null;
136+
DisposableHelper.dispose(rc);
137+
if (source instanceof Disposable) {
138+
((Disposable)source).dispose();
139+
}
140+
}
141+
}
173142
}
174143

175-
private Disposable disconnect(final CompositeDisposable current) {
176-
return Disposables.fromRunnable(new DisposeTask(current));
144+
static final class RefConnection extends AtomicReference<Disposable>
145+
implements Runnable, Consumer<Disposable> {
146+
147+
private static final long serialVersionUID = -4552101107598366241L;
148+
149+
final FlowableRefCount<?> parent;
150+
151+
Disposable timer;
152+
153+
long subscriberCount;
154+
155+
boolean connected;
156+
157+
RefConnection(FlowableRefCount<?> parent) {
158+
this.parent = parent;
159+
}
160+
161+
@Override
162+
public void run() {
163+
parent.timeout(this);
164+
}
165+
166+
@Override
167+
public void accept(Disposable t) throws Exception {
168+
DisposableHelper.replace(this, t);
169+
}
177170
}
178171

179-
final class DisposeConsumer implements Consumer<Disposable> {
180-
private final Subscriber<? super T> subscriber;
181-
private final AtomicBoolean writeLocked;
172+
static final class RefCountSubscriber<T>
173+
extends AtomicBoolean implements FlowableSubscriber<T>, Subscription {
174+
175+
private static final long serialVersionUID = -7419642935409022375L;
176+
177+
final Subscriber<? super T> actual;
178+
179+
final FlowableRefCount<T> parent;
180+
181+
final RefConnection connection;
182182

183-
DisposeConsumer(Subscriber<? super T> subscriber, AtomicBoolean writeLocked) {
184-
this.subscriber = subscriber;
185-
this.writeLocked = writeLocked;
183+
Subscription upstream;
184+
185+
RefCountSubscriber(Subscriber<? super T> actual, FlowableRefCount<T> parent, RefConnection connection) {
186+
this.actual = actual;
187+
this.parent = parent;
188+
this.connection = connection;
189+
}
190+
191+
@Override
192+
public void onNext(T t) {
193+
actual.onNext(t);
186194
}
187195

188196
@Override
189-
public void accept(Disposable subscription) {
190-
try {
191-
baseDisposable.add(subscription);
192-
// ready to subscribe to source so do it
193-
doSubscribe(subscriber, baseDisposable);
194-
} finally {
195-
// release the write lock
196-
lock.unlock();
197-
writeLocked.set(false);
197+
public void onError(Throwable t) {
198+
if (compareAndSet(false, true)) {
199+
parent.terminated(connection);
200+
actual.onError(t);
201+
} else {
202+
RxJavaPlugins.onError(t);
198203
}
199204
}
200-
}
201205

202-
final class DisposeTask implements Runnable {
203-
private final CompositeDisposable current;
206+
@Override
207+
public void onComplete() {
208+
if (compareAndSet(false, true)) {
209+
parent.terminated(connection);
210+
actual.onComplete();
211+
}
212+
}
204213

205-
DisposeTask(CompositeDisposable current) {
206-
this.current = current;
214+
@Override
215+
public void request(long n) {
216+
upstream.request(n);
207217
}
208218

209219
@Override
210-
public void run() {
211-
lock.lock();
212-
try {
213-
if (baseDisposable == current) {
214-
if (subscriptionCount.decrementAndGet() == 0) {
215-
if (source instanceof Disposable) {
216-
((Disposable)source).dispose();
217-
}
218-
219-
baseDisposable.dispose();
220-
// need a new baseDisposable because once
221-
// disposed stays that way
222-
baseDisposable = new CompositeDisposable();
223-
}
224-
}
225-
} finally {
226-
lock.unlock();
220+
public void cancel() {
221+
upstream.cancel();
222+
if (compareAndSet(false, true)) {
223+
parent.cancel(connection);
224+
}
225+
}
226+
227+
@Override
228+
public void onSubscribe(Subscription s) {
229+
if (SubscriptionHelper.validate(upstream, s)) {
230+
this.upstream = s;
231+
232+
actual.onSubscribe(this);
227233
}
228234
}
229235
}

0 commit comments

Comments
 (0)