Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BoundedReplayBuffer no longer leaks memory #5282

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -617,23 +617,20 @@ final void removeFirst() {
// can't just move the head because it would retain the very first value
// can't null out the head's value because of late replayers would see null
setFirst(next);
}
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When testing you can just call removeFirst() repeatedly. No need to introduce a test-only method. Plus, doing this reduces some memory leak false positives.

/* test */ final void removeSome(int n) {
Node head = get();
while (n > 0) {
head = head.get();
n--;
size--;
}

setFirst(head);
}
/**
* Arranges the given node is the new head from now on.
* @param n the Node instance to set as first
* @param next the Node instance to set as first
*/
final void setFirst(Node n) {
set(n);
final void setFirst(Node next) {
Node newHead = new Node(null);
Node newNext = next.get();
newHead.set(newNext);
set(newHead);
if (newNext == null) {
tail = newHead;
}
}

@Override
Expand Down Expand Up @@ -734,7 +731,31 @@ Object leaveTransform(Object value) {
void truncateFinal() {

}
/* test */ final void collect(Collection<? super T> output) {
/**
* Collect all values in memory even if they will be ignored by subsequent calls to replay(). This is useful
* for tracking memory leaks.
* @param output all values.
*/
/* test */ final void collectValuesInMemory(Collection<? super T> output) {
Node next = getHead();
for (;;) {
if (next == null) {
break;
} else if (next.value != null) {
Object o = next.value;
Object v = leaveTransform(o);
if (NotificationLite.isComplete(v) || NotificationLite.isError(v)) {
break;
}
T value = NotificationLite.<T>getValue(v);
if (value != null) {
output.add(value);
}
}
next = next.get();
}
}
/* test */ final void collect(Collection<? super T> output) {
Node n = getHead();
for (;;) {
Node next = n.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
*/

package io.reactivex.internal.operators.observable;

import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
Expand Down Expand Up @@ -731,24 +730,24 @@ void truncate() {
buf.addLast(new Node(5));

List<Integer> values = new ArrayList<Integer>();
buf.collect(values);
collectAndAssertNoLeaks(buf, values);

Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), values);

buf.removeSome(2);
buf.removeFirst();
buf.removeSome(2);
buf.removeFirst();
buf.removeFirst();
buf.removeFirst();
buf.removeFirst();

values.clear();
buf.collect(values);
collectAndAssertNoLeaks(buf, values);
Assert.assertTrue(values.isEmpty());

buf.addLast(new Node(5));
buf.addLast(new Node(6));
buf.collect(values);
collectAndAssertNoLeaks(buf, values);

Assert.assertEquals(Arrays.asList(5, 6), values);

}

@Test
Expand All @@ -766,8 +765,7 @@ public void testTimedAndSizedTruncation() {

buf.next(3);
buf.next(4);
values.clear();
buf.collect(values);
collectAndAssertNoLeaks(buf, values);
Assert.assertEquals(Arrays.asList(3, 4), values);

test.advanceTimeBy(2, TimeUnit.SECONDS);
Expand Down Expand Up @@ -809,8 +807,7 @@ public void testTimedAndSizedTruncationError() {

buf.next(3);
buf.next(4);
values.clear();
buf.collect(values);
collectAndAssertNoLeaks(buf, values);
Assert.assertEquals(Arrays.asList(3, 4), values);

test.advanceTimeBy(2, TimeUnit.SECONDS);
Expand Down Expand Up @@ -841,26 +838,23 @@ public void testSizedTruncation() {

buf.next(1);
buf.next(2);
buf.collect(values);
collectAndAssertNoLeaks(buf, values);
Assert.assertEquals(Arrays.asList(1, 2), values);

buf.next(3);
buf.next(4);
values.clear();
buf.collect(values);
collectAndAssertNoLeaks(buf, values);
Assert.assertEquals(Arrays.asList(3, 4), values);

buf.next(5);

values.clear();
buf.collect(values);
collectAndAssertNoLeaks(buf, values);
Assert.assertEquals(Arrays.asList(4, 5), values);
Assert.assertFalse(buf.hasCompleted());

buf.complete();

values.clear();
buf.collect(values);
collectAndAssertNoLeaks(buf, values);
Assert.assertEquals(Arrays.asList(4, 5), values);

Assert.assertEquals(3, buf.size);
Expand Down Expand Up @@ -1528,4 +1522,12 @@ public void timedNoOutdatedData() {

source.test().assertResult();
}

private <T> void collectAndAssertNoLeaks(BoundedReplayBuffer<T> buffer, Collection<T> collection) {
Collection<T> allObjects = new ArrayList<T>();
collection.clear();
buffer.collectValuesInMemory(allObjects);
buffer.collect(collection);
assertEquals(allObjects, collection);
}
}