Skip to content

Commit cbb027d

Browse files
authored
2.x: fix replay().refCount() leaking items between connections (#5182)
1 parent 88c60b9 commit cbb027d

File tree

6 files changed

+343
-3
lines changed

6 files changed

+343
-3
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableRefCount.java

+7
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,9 @@ void cleanup() {
9999
lock.lock();
100100
try {
101101
if (baseDisposable == currentBase) {
102+
if (source instanceof Disposable) {
103+
((Disposable)source).dispose();
104+
}
102105
baseDisposable.dispose();
103106
baseDisposable = new CompositeDisposable();
104107
subscriptionCount.set(0);
@@ -209,6 +212,10 @@ public void run() {
209212
try {
210213
if (baseDisposable == current) {
211214
if (subscriptionCount.decrementAndGet() == 0) {
215+
if (source instanceof Disposable) {
216+
((Disposable)source).dispose();
217+
}
218+
212219
baseDisposable.dispose();
213220
// need a new baseDisposable because once
214221
// disposed stays that way

src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java

+12-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
import io.reactivex.plugins.RxJavaPlugins;
3333
import io.reactivex.schedulers.Timed;
3434

35-
public final class FlowableReplay<T> extends ConnectableFlowable<T> implements HasUpstreamPublisher<T> {
35+
public final class FlowableReplay<T> extends ConnectableFlowable<T> implements HasUpstreamPublisher<T>, Disposable {
3636
/** The source observable. */
3737
final Flowable<T> source;
3838
/** Holds the current subscriber that is, will be or just was subscribed to the source observable. */
@@ -161,6 +161,17 @@ protected void subscribeActual(Subscriber<? super T> s) {
161161
onSubscribe.subscribe(s);
162162
}
163163

164+
@Override
165+
public void dispose() {
166+
current.lazySet(null);
167+
}
168+
169+
@Override
170+
public boolean isDisposed() {
171+
Disposable d = current.get();
172+
return d == null || d.isDisposed();
173+
}
174+
164175
@Override
165176
public void connect(Consumer<? super Disposable> connection) {
166177
boolean doConnect;

src/main/java/io/reactivex/internal/operators/observable/ObservableRefCount.java

+8
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,10 @@ void cleanup() {
162162
lock.lock();
163163
try {
164164
if (baseDisposable == currentBase) {
165+
if (source instanceof Disposable) {
166+
((Disposable)source).dispose();
167+
}
168+
165169
baseDisposable.dispose();
166170
baseDisposable = new CompositeDisposable();
167171
subscriptionCount.set(0);
@@ -208,6 +212,10 @@ public void run() {
208212
try {
209213
if (baseDisposable == current) {
210214
if (subscriptionCount.decrementAndGet() == 0) {
215+
if (source instanceof Disposable) {
216+
((Disposable)source).dispose();
217+
}
218+
211219
baseDisposable.dispose();
212220
// need a new baseDisposable because once
213221
// disposed stays that way

src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java

+12-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
import io.reactivex.plugins.RxJavaPlugins;
3131
import io.reactivex.schedulers.Timed;
3232

33-
public final class ObservableReplay<T> extends ConnectableObservable<T> implements HasUpstreamObservableSource<T> {
33+
public final class ObservableReplay<T> extends ConnectableObservable<T> implements HasUpstreamObservableSource<T>, Disposable {
3434
/** The source observable. */
3535
final ObservableSource<T> source;
3636
/** Holds the current subscriber that is, will be or just was subscribed to the source observable. */
@@ -158,6 +158,17 @@ public ObservableSource<T> source() {
158158
return source;
159159
}
160160

161+
@Override
162+
public void dispose() {
163+
current.lazySet(null);
164+
}
165+
166+
@Override
167+
public boolean isDisposed() {
168+
Disposable d = current.get();
169+
return d == null || d.isDisposed();
170+
}
171+
161172
@Override
162173
protected void subscribeActual(Observer<? super T> observer) {
163174
onSubscribe.subscribe(observer);

src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java

+153-1
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,20 @@
1717
import static org.mockito.ArgumentMatchers.any;
1818
import static org.mockito.Mockito.*;
1919

20+
import java.lang.management.ManagementFactory;
2021
import java.util.*;
2122
import java.util.concurrent.*;
2223
import java.util.concurrent.atomic.*;
2324

24-
import org.junit.*;
25+
import org.junit.Test;
2526
import org.mockito.InOrder;
2627
import org.reactivestreams.*;
2728

2829
import io.reactivex.*;
2930
import io.reactivex.disposables.Disposable;
3031
import io.reactivex.flowables.ConnectableFlowable;
3132
import io.reactivex.functions.*;
33+
import io.reactivex.internal.functions.Functions;
3234
import io.reactivex.internal.subscriptions.BooleanSubscription;
3335
import io.reactivex.processors.ReplayProcessor;
3436
import io.reactivex.schedulers.*;
@@ -619,4 +621,154 @@ protected void subscribeActual(Subscriber<? super Integer> observer) {
619621

620622
assertEquals(1, calls[0]);
621623
}
624+
625+
Flowable<Object> source;
626+
627+
@Test
628+
public void replayNoLeak() throws Exception {
629+
System.gc();
630+
Thread.sleep(100);
631+
632+
long start = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();
633+
634+
source = Flowable.fromCallable(new Callable<Object>() {
635+
@Override
636+
public Object call() throws Exception {
637+
return new byte[100 * 1000 * 1000];
638+
}
639+
})
640+
.replay(1)
641+
.refCount();
642+
643+
source.subscribe();
644+
645+
System.gc();
646+
Thread.sleep(100);
647+
648+
long after = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();
649+
650+
source = null;
651+
assertTrue(String.format("%,3d -> %,3d%n", start, after), start + 20 * 1000 * 1000 > after);
652+
}
653+
654+
@Test
655+
public void replayNoLeak2() throws Exception {
656+
System.gc();
657+
Thread.sleep(100);
658+
659+
long start = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();
660+
661+
source = Flowable.fromCallable(new Callable<Object>() {
662+
@Override
663+
public Object call() throws Exception {
664+
return new byte[100 * 1000 * 1000];
665+
}
666+
}).concatWith(Flowable.never())
667+
.replay(1)
668+
.refCount();
669+
670+
Disposable s1 = source.subscribe();
671+
Disposable s2 = source.subscribe();
672+
673+
s1.dispose();
674+
s2.dispose();
675+
676+
s1 = null;
677+
s2 = null;
678+
679+
System.gc();
680+
Thread.sleep(100);
681+
682+
long after = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();
683+
684+
source = null;
685+
assertTrue(String.format("%,3d -> %,3d%n", start, after), start + 20 * 1000 * 1000 > after);
686+
}
687+
688+
static final class ExceptionData extends Exception {
689+
private static final long serialVersionUID = -6763898015338136119L;
690+
691+
public final Object data;
692+
693+
public ExceptionData(Object data) {
694+
this.data = data;
695+
}
696+
}
697+
698+
@Test
699+
public void publishNoLeak() throws Exception {
700+
System.gc();
701+
Thread.sleep(100);
702+
703+
long start = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();
704+
705+
source = Flowable.fromCallable(new Callable<Object>() {
706+
@Override
707+
public Object call() throws Exception {
708+
throw new ExceptionData(new byte[100 * 1000 * 1000]);
709+
}
710+
})
711+
.publish()
712+
.refCount();
713+
714+
source.subscribe(Functions.emptyConsumer(), Functions.emptyConsumer());
715+
716+
System.gc();
717+
Thread.sleep(100);
718+
719+
long after = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();
720+
721+
source = null;
722+
assertTrue(String.format("%,3d -> %,3d%n", start, after), start + 20 * 1000 * 1000 > after);
723+
}
724+
725+
@Test
726+
public void publishNoLeak2() throws Exception {
727+
System.gc();
728+
Thread.sleep(100);
729+
730+
long start = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();
731+
732+
source = Flowable.fromCallable(new Callable<Object>() {
733+
@Override
734+
public Object call() throws Exception {
735+
return new byte[100 * 1000 * 1000];
736+
}
737+
}).concatWith(Flowable.never())
738+
.publish()
739+
.refCount();
740+
741+
Disposable s1 = source.test();
742+
Disposable s2 = source.test();
743+
744+
s1.dispose();
745+
s2.dispose();
746+
747+
s1 = null;
748+
s2 = null;
749+
750+
System.gc();
751+
Thread.sleep(100);
752+
753+
long after = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();
754+
755+
source = null;
756+
assertTrue(String.format("%,3d -> %,3d%n", start, after), start + 20 * 1000 * 1000 > after);
757+
}
758+
759+
@Test
760+
public void replayIsUnsubscribed() {
761+
ConnectableFlowable<Integer> co = Flowable.just(1)
762+
.replay();
763+
764+
assertTrue(((Disposable)co).isDisposed());
765+
766+
Disposable s = co.connect();
767+
768+
assertFalse(((Disposable)co).isDisposed());
769+
770+
s.dispose();
771+
772+
assertTrue(((Disposable)co).isDisposed());
773+
}
622774
}

0 commit comments

Comments
 (0)