Skip to content

Commit 2edea6b

Browse files
authored
2.x: Fix the extra retention problem in ReplaySubject (#5892)
* 2.x: Fix the extra retention problem in ReplaySubject * Cover the already-trimmed case.
1 parent 95dde6f commit 2edea6b

File tree

2 files changed

+166
-6
lines changed

2 files changed

+166
-6
lines changed

src/main/java/io/reactivex/subjects/ReplaySubject.java

+77-6
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,14 @@
1313

1414
package io.reactivex.subjects;
1515

16-
import io.reactivex.annotations.Nullable;
1716
import java.lang.reflect.Array;
1817
import java.util.*;
1918
import java.util.concurrent.TimeUnit;
2019
import java.util.concurrent.atomic.*;
2120

2221
import io.reactivex.Observer;
2322
import io.reactivex.Scheduler;
24-
import io.reactivex.annotations.CheckReturnValue;
23+
import io.reactivex.annotations.*;
2524
import io.reactivex.disposables.Disposable;
2625
import io.reactivex.internal.functions.ObjectHelper;
2726
import io.reactivex.internal.util.NotificationLite;
@@ -94,8 +93,9 @@
9493
* in a non-blocking and thread-safe manner via {@link #hasValue()}, {@link #getValue()},
9594
* {@link #getValues()} or {@link #getValues(Object[])}.
9695
* <p>
97-
* Note that due to concurrency requirements, a size-bounded {@code ReplaySubject} may hold strong references to more
98-
* source emissions than specified.
96+
* Note that due to concurrency requirements, a size- and time-bounded {@code ReplaySubject} may hold strong references to more
97+
* source emissions than specified while it isn't terminated yet. Use the {@link #cleanupBuffer()} to allow
98+
* such inaccessible items to be cleaned up by GC once no consumer references it anymore.
9999
* <dl>
100100
* <dt><b>Scheduler:</b></dt>
101101
* <dd>{@code ReplaySubject} does not operate by default on a particular {@link io.reactivex.Scheduler} and
@@ -415,6 +415,24 @@ public T getValue() {
415415
return buffer.getValue();
416416
}
417417

418+
/**
419+
* Makes sure the item cached by the head node in a bounded
420+
* ReplaySubject is released (as it is never part of a replay).
421+
* <p>
422+
* By default, live bounded buffers will remember one item before
423+
* the currently receivable one to ensure subscribers can always
424+
* receive a continuous sequence of items. A terminated ReplaySubject
425+
* automatically releases this inaccessible item.
426+
* <p>
427+
* The method must be called sequentially, similar to the standard
428+
* {@code onXXX} methods.
429+
* @since 2.1.11 - experimental
430+
*/
431+
@Experimental
432+
public void cleanupBuffer() {
433+
buffer.trimHead();
434+
}
435+
418436
/** An empty array to avoid allocation in getValues(). */
419437
private static final Object[] EMPTY_ARRAY = new Object[0];
420438

@@ -563,6 +581,12 @@ interface ReplayBuffer<T> {
563581
* @return true if successful
564582
*/
565583
boolean compareAndSet(Object expected, Object next);
584+
585+
/**
586+
* Make sure an old inaccessible head value is released
587+
* in a bounded buffer.
588+
*/
589+
void trimHead();
566590
}
567591

568592
static final class ReplayDisposable<T> extends AtomicInteger implements Disposable {
@@ -619,10 +643,16 @@ public void add(T value) {
619643
@Override
620644
public void addFinal(Object notificationLite) {
621645
buffer.add(notificationLite);
646+
trimHead();
622647
size++;
623648
done = true;
624649
}
625650

651+
@Override
652+
public void trimHead() {
653+
// no-op in this type of buffer
654+
}
655+
626656
@Override
627657
@Nullable
628658
@SuppressWarnings("unchecked")
@@ -839,9 +869,24 @@ public void addFinal(Object notificationLite) {
839869
size++;
840870
t.lazySet(n); // releases both the tail and size
841871

872+
trimHead();
842873
done = true;
843874
}
844875

876+
/**
877+
* Replace a non-empty head node with an empty one to
878+
* allow the GC of the inaccessible old value.
879+
*/
880+
@Override
881+
public void trimHead() {
882+
Node<Object> h = head;
883+
if (h.value != null) {
884+
Node<Object> n = new Node<Object>(null);
885+
n.lazySet(h.get());
886+
head = n;
887+
}
888+
}
889+
845890
@Override
846891
@Nullable
847892
@SuppressWarnings("unchecked")
@@ -1047,12 +1092,24 @@ void trimFinal() {
10471092
for (;;) {
10481093
TimedNode<Object> next = h.get();
10491094
if (next.get() == null) {
1050-
head = h;
1095+
if (h.value != null) {
1096+
TimedNode<Object> lasth = new TimedNode<Object>(null, 0L);
1097+
lasth.lazySet(h.get());
1098+
head = lasth;
1099+
} else {
1100+
head = h;
1101+
}
10511102
break;
10521103
}
10531104

10541105
if (next.time > limit) {
1055-
head = h;
1106+
if (h.value != null) {
1107+
TimedNode<Object> lasth = new TimedNode<Object>(null, 0L);
1108+
lasth.lazySet(h.get());
1109+
head = lasth;
1110+
} else {
1111+
head = h;
1112+
}
10561113
break;
10571114
}
10581115

@@ -1085,6 +1142,20 @@ public void addFinal(Object notificationLite) {
10851142
done = true;
10861143
}
10871144

1145+
/**
1146+
* Replace a non-empty head node with an empty one to
1147+
* allow the GC of the inaccessible old value.
1148+
*/
1149+
@Override
1150+
public void trimHead() {
1151+
TimedNode<Object> h = head;
1152+
if (h.value != null) {
1153+
TimedNode<Object> n = new TimedNode<Object>(null, 0);
1154+
n.lazySet(h.get());
1155+
head = n;
1156+
}
1157+
}
1158+
10881159
@Override
10891160
@Nullable
10901161
@SuppressWarnings("unchecked")

src/test/java/io/reactivex/subjects/ReplaySubjectTest.java

+89
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import io.reactivex.functions.Function;
3131
import io.reactivex.observers.*;
3232
import io.reactivex.schedulers.*;
33+
import io.reactivex.subjects.ReplaySubject.*;
3334

3435
public class ReplaySubjectTest extends SubjectTest<Integer> {
3536

@@ -1184,4 +1185,92 @@ public void timedNoOutdatedData() {
11841185

11851186
source.test().assertResult();
11861187
}
1188+
1189+
@Test
1190+
public void noHeadRetentionCompleteSize() {
1191+
ReplaySubject<Integer> source = ReplaySubject.createWithSize(1);
1192+
1193+
source.onNext(1);
1194+
source.onNext(2);
1195+
source.onComplete();
1196+
1197+
SizeBoundReplayBuffer<Integer> buf = (SizeBoundReplayBuffer<Integer>)source.buffer;
1198+
1199+
assertNull(buf.head.value);
1200+
1201+
Object o = buf.head;
1202+
1203+
source.cleanupBuffer();
1204+
1205+
assertSame(o, buf.head);
1206+
}
1207+
1208+
1209+
@Test
1210+
public void noHeadRetentionSize() {
1211+
ReplaySubject<Integer> source = ReplaySubject.createWithSize(1);
1212+
1213+
source.onNext(1);
1214+
source.onNext(2);
1215+
1216+
SizeBoundReplayBuffer<Integer> buf = (SizeBoundReplayBuffer<Integer>)source.buffer;
1217+
1218+
assertNotNull(buf.head.value);
1219+
1220+
source.cleanupBuffer();
1221+
1222+
assertNull(buf.head.value);
1223+
1224+
Object o = buf.head;
1225+
1226+
source.cleanupBuffer();
1227+
1228+
assertSame(o, buf.head);
1229+
}
1230+
1231+
@Test
1232+
public void noHeadRetentionCompleteTime() {
1233+
ReplaySubject<Integer> source = ReplaySubject.createWithTime(1, TimeUnit.MINUTES, Schedulers.computation());
1234+
1235+
source.onNext(1);
1236+
source.onNext(2);
1237+
source.onComplete();
1238+
1239+
SizeAndTimeBoundReplayBuffer<Integer> buf = (SizeAndTimeBoundReplayBuffer<Integer>)source.buffer;
1240+
1241+
assertNull(buf.head.value);
1242+
1243+
Object o = buf.head;
1244+
1245+
source.cleanupBuffer();
1246+
1247+
assertSame(o, buf.head);
1248+
}
1249+
1250+
@Test
1251+
public void noHeadRetentionTime() {
1252+
TestScheduler sch = new TestScheduler();
1253+
1254+
ReplaySubject<Integer> source = ReplaySubject.createWithTime(1, TimeUnit.MILLISECONDS, sch);
1255+
1256+
source.onNext(1);
1257+
1258+
sch.advanceTimeBy(2, TimeUnit.MILLISECONDS);
1259+
1260+
source.onNext(2);
1261+
1262+
SizeAndTimeBoundReplayBuffer<Integer> buf = (SizeAndTimeBoundReplayBuffer<Integer>)source.buffer;
1263+
1264+
assertNotNull(buf.head.value);
1265+
1266+
source.cleanupBuffer();
1267+
1268+
assertNull(buf.head.value);
1269+
1270+
Object o = buf.head;
1271+
1272+
source.cleanupBuffer();
1273+
1274+
assertSame(o, buf.head);
1275+
}
11871276
}

0 commit comments

Comments
 (0)