From e39599cc4fc01864d6aa1297b85c414d53bc97a2 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Tue, 23 Dec 2014 09:38:37 -0800 Subject: [PATCH] Make Publish Operator Release RingBuffer - it was retaining the RxRingBuffer reference between subscribes which meant it was never released to the object pool --- .../internal/operators/OperatorPublish.java | 30 +++++++++++++------ .../operators/OnSubscribeRefCountTest.java | 2 +- 2 files changed, 22 insertions(+), 10 deletions(-) diff --git a/src/main/java/rx/internal/operators/OperatorPublish.java b/src/main/java/rx/internal/operators/OperatorPublish.java index 3b04da4058..6e2328e012 100644 --- a/src/main/java/rx/internal/operators/OperatorPublish.java +++ b/src/main/java/rx/internal/operators/OperatorPublish.java @@ -99,7 +99,7 @@ public void call() { public void connect(Action1 connection) { // each time we connect we create a new Subscription boolean shouldSubscribe = false; - + // subscription is the state of whether we are connected or not OriginSubscriber origin = requestHandler.state.getOrigin(); if (origin == null) { @@ -113,7 +113,7 @@ public void connect(Action1 connection) { connection.call(Subscriptions.create(new Action0() { @Override public void call() { - Subscription s = requestHandler.state.getOrigin(); + OriginSubscriber s = requestHandler.state.getOrigin(); requestHandler.state.setOrigin(null); if (s != null) { s.unsubscribe(); @@ -135,9 +135,11 @@ private static class OriginSubscriber extends Subscriber { private final RequestHandler requestHandler; private final AtomicLong originOutstanding = new AtomicLong(); private final long THRESHOLD = RxRingBuffer.SIZE / 4; + private final RxRingBuffer buffer = RxRingBuffer.getSpmcInstance(); OriginSubscriber(RequestHandler requestHandler) { this.requestHandler = requestHandler; + add(buffer); } @Override @@ -199,6 +201,8 @@ public void onNext(T t) { * with a complicated state machine so I'm sticking with mutex locks and just trying to make sure the work done while holding the * lock is small (such as never emitting data). * + * This does however mean we can't rely on a reference to State being consistent. For example, it can end up with a null OriginSubscriber. + * * @param */ private static class State { @@ -288,7 +292,7 @@ private long resetAfterSubscriberUpdate() { private static class RequestHandler { private final NotificationLite notifier = NotificationLite.instance(); - private final RxRingBuffer buffer = RxRingBuffer.getSpmcInstance(); + private final State state = new State(); @SuppressWarnings("unused") volatile long wip; @@ -297,16 +301,24 @@ private static class RequestHandler { public void requestFromChildSubscriber(Subscriber subscriber, Long request) { state.requestFromSubscriber(subscriber, request); - drainQueue(); + OriginSubscriber originSubscriber = state.getOrigin(); + if(originSubscriber != null) { + drainQueue(originSubscriber); + } } public void emit(Object t) throws MissingBackpressureException { + OriginSubscriber originSubscriber = state.getOrigin(); + if(originSubscriber == null) { + // unsubscribed so break ... we are done + return; + } if (notifier.isCompleted(t)) { - buffer.onCompleted(); + originSubscriber.buffer.onCompleted(); } else { - buffer.onNext(notifier.getValue(t)); + originSubscriber.buffer.onNext(notifier.getValue(t)); } - drainQueue(); + drainQueue(originSubscriber); } private void requestMoreAfterEmission(int emitted) { @@ -319,7 +331,7 @@ private void requestMoreAfterEmission(int emitted) { } } - public void drainQueue() { + public void drainQueue(OriginSubscriber originSubscriber) { if (WIP.getAndIncrement(this) == 0) { int emitted = 0; do { @@ -338,7 +350,7 @@ public void drainQueue() { if (!shouldEmit) { break; } - Object o = buffer.poll(); + Object o = originSubscriber.buffer.poll(); if (o == null) { // nothing in buffer so increment outstanding back again state.incrementOutstandingAfterFailedEmit(); diff --git a/src/test/java/rx/internal/operators/OnSubscribeRefCountTest.java b/src/test/java/rx/internal/operators/OnSubscribeRefCountTest.java index d9b8364071..bdd6f47e3e 100644 --- a/src/test/java/rx/internal/operators/OnSubscribeRefCountTest.java +++ b/src/test/java/rx/internal/operators/OnSubscribeRefCountTest.java @@ -154,7 +154,7 @@ public void call(Integer l) { s2.unsubscribe(); // unsubscribe s2 first as we're counting in 1 and there can be a race between unsubscribe and one subscriber getting a value but not the other s1.unsubscribe(); - System.out.println("onNext: " + nextCount.get()); + System.out.println("onNext Count: " + nextCount.get()); // it will emit twice because it is synchronous assertEquals(nextCount.get(), receivedCount.get() * 2);