Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 21 additions & 9 deletions src/main/java/rx/internal/operators/OperatorPublish.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public void call() {
public void connect(Action1<? super Subscription> connection) {
// each time we connect we create a new Subscription
boolean shouldSubscribe = false;

// subscription is the state of whether we are connected or not
OriginSubscriber<T> origin = requestHandler.state.getOrigin();
if (origin == null) {
Expand All @@ -113,7 +113,7 @@ public void connect(Action1<? super Subscription> connection) {
connection.call(Subscriptions.create(new Action0() {
@Override
public void call() {
Subscription s = requestHandler.state.getOrigin();
OriginSubscriber<T> s = requestHandler.state.getOrigin();
requestHandler.state.setOrigin(null);
if (s != null) {
s.unsubscribe();
Expand All @@ -135,9 +135,11 @@ private static class OriginSubscriber<T> extends Subscriber<T> {
private final RequestHandler<T> requestHandler;
private final AtomicLong originOutstanding = new AtomicLong();
private final long THRESHOLD = RxRingBuffer.SIZE / 4;
private final RxRingBuffer buffer = RxRingBuffer.getSpmcInstance();

OriginSubscriber(RequestHandler<T> requestHandler) {
this.requestHandler = requestHandler;
add(buffer);
}

@Override
Expand Down Expand Up @@ -199,6 +201,8 @@ public void onNext(T t) {
* with a complicated state machine so I'm sticking with mutex locks and just trying to make sure the work done while holding the
* lock is small (such as never emitting data).
*
* This does however mean we can't rely on a reference to State being consistent. For example, it can end up with a null OriginSubscriber.
*
* @param <T>
*/
private static class State<T> {
Expand Down Expand Up @@ -288,7 +292,7 @@ private long resetAfterSubscriberUpdate() {

private static class RequestHandler<T> {
private final NotificationLite<T> notifier = NotificationLite.instance();
private final RxRingBuffer buffer = RxRingBuffer.getSpmcInstance();

private final State<T> state = new State<T>();
@SuppressWarnings("unused")
volatile long wip;
Expand All @@ -297,16 +301,24 @@ private static class RequestHandler<T> {

public void requestFromChildSubscriber(Subscriber<? super T> subscriber, Long request) {
state.requestFromSubscriber(subscriber, request);
drainQueue();
OriginSubscriber<T> originSubscriber = state.getOrigin();
if(originSubscriber != null) {
drainQueue(originSubscriber);
}
}

public void emit(Object t) throws MissingBackpressureException {
OriginSubscriber<T> originSubscriber = state.getOrigin();
if(originSubscriber == null) {
// unsubscribed so break ... we are done
return;
}
if (notifier.isCompleted(t)) {
buffer.onCompleted();
originSubscriber.buffer.onCompleted();
} else {
buffer.onNext(notifier.getValue(t));
originSubscriber.buffer.onNext(notifier.getValue(t));
}
drainQueue();
drainQueue(originSubscriber);
}

private void requestMoreAfterEmission(int emitted) {
Expand All @@ -319,7 +331,7 @@ private void requestMoreAfterEmission(int emitted) {
}
}

public void drainQueue() {
public void drainQueue(OriginSubscriber<T> originSubscriber) {
if (WIP.getAndIncrement(this) == 0) {
int emitted = 0;
do {
Expand All @@ -338,7 +350,7 @@ public void drainQueue() {
if (!shouldEmit) {
break;
}
Object o = buffer.poll();
Object o = originSubscriber.buffer.poll();
if (o == null) {
// nothing in buffer so increment outstanding back again
state.incrementOutstandingAfterFailedEmit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public void call(Integer l) {
s2.unsubscribe(); // unsubscribe s2 first as we're counting in 1 and there can be a race between unsubscribe and one subscriber getting a value but not the other
s1.unsubscribe();

System.out.println("onNext: " + nextCount.get());
System.out.println("onNext Count: " + nextCount.get());

// it will emit twice because it is synchronous
assertEquals(nextCount.get(), receivedCount.get() * 2);
Expand Down