diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java index af25fa9bf2..1e57343789 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java @@ -617,23 +617,20 @@ final void removeFirst() { // can't just move the head because it would retain the very first value // can't null out the head's value because of late replayers would see null setFirst(next); - } - /* test */ final void removeSome(int n) { - Node head = get(); - while (n > 0) { - head = head.get(); - n--; - size--; - } - setFirst(head); } /** * Arranges the given node is the new head from now on. - * @param n the Node instance to set as first + * @param next the Node instance to set as first */ - final void setFirst(Node n) { - set(n); + final void setFirst(Node next) { + Node newHead = new Node(null); + Node newNext = next.get(); + newHead.set(newNext); + set(newHead); + if (newNext == null) { + tail = newHead; + } } @Override @@ -734,7 +731,31 @@ Object leaveTransform(Object value) { void truncateFinal() { } - /* test */ final void collect(Collection output) { + /** + * Collect all values in memory even if they will be ignored by subsequent calls to replay(). This is useful + * for tracking memory leaks. + * @param output all values. + */ + /* test */ final void collectValuesInMemory(Collection output) { + Node next = getHead(); + for (;;) { + if (next == null) { + break; + } else if (next.value != null) { + Object o = next.value; + Object v = leaveTransform(o); + if (NotificationLite.isComplete(v) || NotificationLite.isError(v)) { + break; + } + T value = NotificationLite.getValue(v); + if (value != null) { + output.add(value); + } + } + next = next.get(); + } + } + /* test */ final void collect(Collection output) { Node n = getHead(); for (;;) { Node next = n.get(); diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java index 2057e33692..de97bd47f9 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java @@ -12,7 +12,6 @@ */ package io.reactivex.internal.operators.observable; - import static org.junit.Assert.*; import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; @@ -731,24 +730,24 @@ void truncate() { buf.addLast(new Node(5)); List values = new ArrayList(); - buf.collect(values); + collectAndAssertNoLeaks(buf, values); Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), values); - buf.removeSome(2); buf.removeFirst(); - buf.removeSome(2); + buf.removeFirst(); + buf.removeFirst(); + buf.removeFirst(); + buf.removeFirst(); - values.clear(); - buf.collect(values); + collectAndAssertNoLeaks(buf, values); Assert.assertTrue(values.isEmpty()); buf.addLast(new Node(5)); buf.addLast(new Node(6)); - buf.collect(values); + collectAndAssertNoLeaks(buf, values); Assert.assertEquals(Arrays.asList(5, 6), values); - } @Test @@ -766,8 +765,7 @@ public void testTimedAndSizedTruncation() { buf.next(3); buf.next(4); - values.clear(); - buf.collect(values); + collectAndAssertNoLeaks(buf, values); Assert.assertEquals(Arrays.asList(3, 4), values); test.advanceTimeBy(2, TimeUnit.SECONDS); @@ -809,8 +807,7 @@ public void testTimedAndSizedTruncationError() { buf.next(3); buf.next(4); - values.clear(); - buf.collect(values); + collectAndAssertNoLeaks(buf, values); Assert.assertEquals(Arrays.asList(3, 4), values); test.advanceTimeBy(2, TimeUnit.SECONDS); @@ -841,26 +838,23 @@ public void testSizedTruncation() { buf.next(1); buf.next(2); - buf.collect(values); + collectAndAssertNoLeaks(buf, values); Assert.assertEquals(Arrays.asList(1, 2), values); buf.next(3); buf.next(4); - values.clear(); - buf.collect(values); + collectAndAssertNoLeaks(buf, values); Assert.assertEquals(Arrays.asList(3, 4), values); buf.next(5); - values.clear(); - buf.collect(values); + collectAndAssertNoLeaks(buf, values); Assert.assertEquals(Arrays.asList(4, 5), values); Assert.assertFalse(buf.hasCompleted()); buf.complete(); - values.clear(); - buf.collect(values); + collectAndAssertNoLeaks(buf, values); Assert.assertEquals(Arrays.asList(4, 5), values); Assert.assertEquals(3, buf.size); @@ -1528,4 +1522,12 @@ public void timedNoOutdatedData() { source.test().assertResult(); } + + private void collectAndAssertNoLeaks(BoundedReplayBuffer buffer, Collection collection) { + Collection allObjects = new ArrayList(); + collection.clear(); + buffer.collectValuesInMemory(allObjects); + buffer.collect(collection); + assertEquals(allObjects, collection); + } }