diff --git a/language-adaptors/rxjava-groovy/src/examples/groovy/rx/lang/groovy/examples/RxExamples.groovy b/language-adaptors/rxjava-groovy/src/examples/groovy/rx/lang/groovy/examples/RxExamples.groovy index 971c65f330..3df7a88689 100644 --- a/language-adaptors/rxjava-groovy/src/examples/groovy/rx/lang/groovy/examples/RxExamples.groovy +++ b/language-adaptors/rxjava-groovy/src/examples/groovy/rx/lang/groovy/examples/RxExamples.groovy @@ -19,6 +19,7 @@ import rx.Observable; import rx.Observer; import rx.Subscription; import rx.subscriptions.Subscriptions; +import rx.util.functions.Action0 import rx.util.functions.Func1; // -------------------------------------------------- @@ -123,13 +124,13 @@ def customObservableNonBlocking() { }); t.start(); - return new Subscription() { - public void unsubscribe() { + return Subscriptions.create(new Action0() { + public void call() { // Ask the thread to stop doing work. // For this simple example it just interrupts. t.interrupt(); } - }; + }); }; }); } diff --git a/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy b/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy index 9a093e98cf..1bc87c0ee5 100644 --- a/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy +++ b/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy @@ -530,12 +530,7 @@ def class ObservableTests { observer.onNext("hello_" + count); observer.onCompleted(); - return new Subscription() { - - public void unsubscribe() { - // unregister ... will never be called here since we are executing synchronously - } - }; + return Subscriptions.empty(); } } } \ No newline at end of file diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subscription.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subscription.scala index 69ab35c1cd..618e0abe8d 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subscription.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subscription.scala @@ -44,6 +44,7 @@ trait Subscription { private [scala] val unsubscribed = new AtomicBoolean(false) private [scala] val asJavaSubscription: rx.Subscription = new rx.Subscription { override def unsubscribe() { unsubscribed.compareAndSet(false, true) } + override def isUnsubscribed(): Boolean = { unsubscribed.get() } } @@ -81,6 +82,7 @@ object Subscription { def apply(u: => Unit): Subscription = new Subscription() { override val asJavaSubscription = new rx.Subscription { override def unsubscribe() { if(unsubscribed.compareAndSet(false, true)) { u } } + override def isUnsubscribed(): Boolean = { unsubscribed.get() } } } diff --git a/rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperationObserveFromAndroidComponent.java b/rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperationObserveFromAndroidComponent.java index 5fa5584927..169c99e9af 100644 --- a/rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperationObserveFromAndroidComponent.java +++ b/rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperationObserveFromAndroidComponent.java @@ -19,6 +19,8 @@ import rx.Observer; import rx.Subscription; import rx.android.schedulers.AndroidSchedulers; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Action0; import android.app.Activity; import android.os.Looper; import android.util.Log; @@ -94,14 +96,14 @@ public void onNext(T args) { } } }); - return new Subscription() { + return Subscriptions.create(new Action0() { @Override - public void unsubscribe() { + public void call() { log("unsubscribing from source sequence"); releaseReferences(); sourceSub.unsubscribe(); } - }; + }); } private void releaseReferences() { diff --git a/rxjava-core/src/main/java/rx/Subscription.java b/rxjava-core/src/main/java/rx/Subscription.java index 690e5b93c3..20a11ad664 100644 --- a/rxjava-core/src/main/java/rx/Subscription.java +++ b/rxjava-core/src/main/java/rx/Subscription.java @@ -32,5 +32,7 @@ public interface Subscription { * This allows unregistering an {@link Subscriber} before it has finished receiving all events (ie. before onCompleted is called). */ public void unsubscribe(); + + public boolean isUnsubscribed(); } diff --git a/rxjava-core/src/main/java/rx/operators/ChunkedOperation.java b/rxjava-core/src/main/java/rx/operators/ChunkedOperation.java index 6fa8879777..77606a03e6 100644 --- a/rxjava-core/src/main/java/rx/operators/ChunkedOperation.java +++ b/rxjava-core/src/main/java/rx/operators/ChunkedOperation.java @@ -156,6 +156,7 @@ protected static class TimeAndSizeBasedChunks extends Chunks impleme private final long maxTime; private final TimeUnit unit; private final int maxSize; + private volatile boolean unsubscribed = false; public TimeAndSizeBasedChunks(Observer observer, Func0> chunkMaker, int maxSize, long maxTime, TimeUnit unit, Scheduler scheduler) { super(observer, chunkMaker); @@ -210,10 +211,16 @@ public void pushValue(T value) { @Override public void unsubscribe() { + unsubscribed = true; for (Subscription s : subscriptions.values()) { s.unsubscribe(); } } + + @Override + public boolean isUnsubscribed() { + return unsubscribed; + } } /** @@ -232,6 +239,7 @@ protected static class TimeBasedChunks extends OverlappingChunks imp private final Scheduler scheduler; private final long time; private final TimeUnit unit; + private volatile boolean unsubscribed = false; public TimeBasedChunks(Observer observer, Func0> chunkMaker, long time, TimeUnit unit, Scheduler scheduler) { super(observer, chunkMaker); @@ -260,11 +268,17 @@ public void emitChunk(Chunk chunk) { @Override public void unsubscribe() { + unsubscribed = true; for (Subscription s : subscriptions.values()) { s.unsubscribe(); } } + @Override + public boolean isUnsubscribed() { + return unsubscribed; + } + } /** diff --git a/rxjava-core/src/main/java/rx/operators/OperationBuffer.java b/rxjava-core/src/main/java/rx/operators/OperationBuffer.java index b0946f0351..688d8afe31 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationBuffer.java +++ b/rxjava-core/src/main/java/rx/operators/OperationBuffer.java @@ -393,6 +393,11 @@ public void unsubscribe() { cc0.stop(); } } + + @Override + public boolean isUnsubscribed() { + return done.get(); + } } /** diff --git a/rxjava-core/src/main/java/rx/operators/OperationConcat.java b/rxjava-core/src/main/java/rx/operators/OperationConcat.java index 1d12200ba8..9eab8085f1 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationConcat.java +++ b/rxjava-core/src/main/java/rx/operators/OperationConcat.java @@ -23,6 +23,8 @@ import rx.Observable.OnSubscribeFunc; import rx.Observer; import rx.Subscription; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Action0; /** * Returns an Observable that emits the items emitted by two or more Observables, one after the @@ -153,9 +155,9 @@ public void onCompleted() { } })); - return new Subscription() { + return Subscriptions.create(new Action0() { @Override - public void unsubscribe() { + public void call() { Subscription q; synchronized (nextSequences) { q = innerSubscription; @@ -165,7 +167,7 @@ public void unsubscribe() { } outerSubscription.unsubscribe(); } - }; + }); } } } diff --git a/rxjava-core/src/main/java/rx/operators/OperationGroupJoin.java b/rxjava-core/src/main/java/rx/operators/OperationGroupJoin.java index 8ec39d7d67..b32ffdf188 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationGroupJoin.java +++ b/rxjava-core/src/main/java/rx/operators/OperationGroupJoin.java @@ -99,6 +99,11 @@ public void init() { public void unsubscribe() { cancel.unsubscribe(); } + + @Override + public boolean isUnsubscribed() { + return cancel.isUnsubscribed(); + } void groupsOnCompleted() { List> list = new ArrayList>(leftMap.values()); @@ -299,6 +304,7 @@ public void onNext(D2 args) { onCompleted(); } } + } /** diff --git a/rxjava-core/src/main/java/rx/operators/OperationMergeDelayError.java b/rxjava-core/src/main/java/rx/operators/OperationMergeDelayError.java index dd1882e53c..d36225a5ed 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationMergeDelayError.java +++ b/rxjava-core/src/main/java/rx/operators/OperationMergeDelayError.java @@ -25,8 +25,11 @@ import rx.Observer; import rx.Subscription; import rx.observers.SynchronizedObserver; +import rx.subscriptions.BooleanSubscription; import rx.subscriptions.CompositeSubscription; +import rx.subscriptions.Subscriptions; import rx.util.CompositeException; +import rx.util.functions.Action0; /** * This behaves like {@link OperatorMerge} except that if any of the merged Observables notify of @@ -69,29 +72,22 @@ public Subscription onSubscribe(Observer observer) { public static OnSubscribeFunc mergeDelayError(final Observable... sequences) { return mergeDelayError(Observable.create(new OnSubscribeFunc>() { - private volatile boolean unsubscribed = false; + private final BooleanSubscription s = new BooleanSubscription(); @Override public Subscription onSubscribe(Observer> observer) { for (Observable o : sequences) { - if (!unsubscribed) { + if (!s.isUnsubscribed()) { observer.onNext(o); } else { // break out of the loop if we are unsubscribed break; } } - if (!unsubscribed) { + if (!s.isUnsubscribed()) { observer.onCompleted(); } - return new Subscription() { - - @Override - public void unsubscribe() { - unsubscribed = true; - } - - }; + return s; } })); } @@ -99,30 +95,23 @@ public void unsubscribe() { public static OnSubscribeFunc mergeDelayError(final List> sequences) { return mergeDelayError(Observable.create(new OnSubscribeFunc>() { - private volatile boolean unsubscribed = false; + private final BooleanSubscription s = new BooleanSubscription(); @Override public Subscription onSubscribe(Observer> observer) { for (Observable o : sequences) { - if (!unsubscribed) { + if (!s.isUnsubscribed()) { observer.onNext(o); } else { // break out of the loop if we are unsubscribed break; } } - if (!unsubscribed) { + if (!s.isUnsubscribed()) { observer.onCompleted(); } - return new Subscription() { - - @Override - public void unsubscribe() { - unsubscribed = true; - } - - }; + return s; } })); } @@ -201,6 +190,11 @@ public boolean stop() { return false; } } + + @Override + public boolean isUnsubscribed() { + return stopped.get(); + } } /** diff --git a/rxjava-core/src/main/java/rx/operators/OperationMulticast.java b/rxjava-core/src/main/java/rx/operators/OperationMulticast.java index fc46b43fa0..903a885587 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationMulticast.java +++ b/rxjava-core/src/main/java/rx/operators/OperationMulticast.java @@ -24,6 +24,7 @@ import rx.subjects.Subject; import rx.subscriptions.CompositeSubscription; import rx.subscriptions.Subscriptions; +import rx.util.functions.Action0; import rx.util.functions.Func0; import rx.util.functions.Func1; @@ -73,9 +74,9 @@ public void onNext(T args) { } } - return new Subscription() { + return Subscriptions.create(new Action0() { @Override - public void unsubscribe() { + public void call() { synchronized (lock) { if (subscription != null) { subscription.unsubscribe(); @@ -83,7 +84,7 @@ public void unsubscribe() { } } } - }; + }); } } diff --git a/rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaFunction.java b/rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaFunction.java index 5e4f8480ff..f4de4348cc 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaFunction.java +++ b/rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaFunction.java @@ -22,7 +22,9 @@ import rx.Observable.OnSubscribeFunc; import rx.Observer; import rx.Subscription; +import rx.subscriptions.Subscriptions; import rx.util.CompositeException; +import rx.util.functions.Action0; import rx.util.functions.Func1; /** @@ -103,15 +105,15 @@ public void onCompleted() { } }))); - return new Subscription() { - public void unsubscribe() { + return Subscriptions.create(new Action0() { + public void call() { // this will get either the original, or the resumeSequence one and unsubscribe on it Subscription s = subscriptionRef.getAndSet(null); if (s != null) { s.unsubscribe(); } } - }; + }); } } } diff --git a/rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaObservable.java b/rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaObservable.java index 1f4ac3d24c..4b4d987696 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaObservable.java +++ b/rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaObservable.java @@ -21,6 +21,8 @@ import rx.Observable.OnSubscribeFunc; import rx.Observer; import rx.Subscription; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Action0; /** * Instruct an Observable to pass control to another Observable rather than invoking @@ -97,15 +99,15 @@ public void onCompleted() { } })); - return new Subscription() { - public void unsubscribe() { + return Subscriptions.create(new Action0() { + public void call() { // this will get either the original, or the resumeSequence one and unsubscribe on it Subscription s = subscriptionRef.getAndSet(null); if (s != null) { s.unsubscribe(); } } - }; + }); } } } diff --git a/rxjava-core/src/main/java/rx/operators/OperationOnErrorReturn.java b/rxjava-core/src/main/java/rx/operators/OperationOnErrorReturn.java index fc4a672195..35b4f2afc7 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationOnErrorReturn.java +++ b/rxjava-core/src/main/java/rx/operators/OperationOnErrorReturn.java @@ -22,7 +22,9 @@ import rx.Observable.OnSubscribeFunc; import rx.Observer; import rx.Subscription; +import rx.subscriptions.Subscriptions; import rx.util.CompositeException; +import rx.util.functions.Action0; import rx.util.functions.Func1; /** @@ -107,15 +109,15 @@ public void onCompleted() { } })); - return new Subscription() { - public void unsubscribe() { + return Subscriptions.create(new Action0() { + public void call() { // this will get either the original, or the resumeSequence one and unsubscribe on it Subscription s = subscriptionRef.getAndSet(null); if (s != null) { s.unsubscribe(); } } - }; + }); } } } diff --git a/rxjava-core/src/main/java/rx/operators/OperationOnExceptionResumeNextViaObservable.java b/rxjava-core/src/main/java/rx/operators/OperationOnExceptionResumeNextViaObservable.java index 1b079624f9..170b5d02b4 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationOnExceptionResumeNextViaObservable.java +++ b/rxjava-core/src/main/java/rx/operators/OperationOnExceptionResumeNextViaObservable.java @@ -21,6 +21,8 @@ import rx.Observable.OnSubscribeFunc; import rx.Observer; import rx.Subscription; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Action0; /** * Instruct an Observable to pass control to another Observable rather than invoking @@ -104,15 +106,15 @@ public void onCompleted() { } })); - return new Subscription() { - public void unsubscribe() { + return Subscriptions.create(new Action0() { + public void call() { // this will get either the original, or the resumeSequence one and unsubscribe on it Subscription s = subscriptionRef.getAndSet(null); if (s != null) { s.unsubscribe(); } } - }; + }); } } } diff --git a/rxjava-core/src/main/java/rx/operators/OperationReplay.java b/rxjava-core/src/main/java/rx/operators/OperationReplay.java index b59b929980..a8e4c70042 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationReplay.java +++ b/rxjava-core/src/main/java/rx/operators/OperationReplay.java @@ -594,6 +594,11 @@ public void unsubscribe() { } } + @Override + public boolean isUnsubscribed() { + return once.get(); + } + }; Replayer rp = new Replayer(obs, s); replayers.put(s, rp); diff --git a/rxjava-core/src/main/java/rx/operators/OperationSkipUntil.java b/rxjava-core/src/main/java/rx/operators/OperationSkipUntil.java index 4f6cd185e7..3bd44032b5 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationSkipUntil.java +++ b/rxjava-core/src/main/java/rx/operators/OperationSkipUntil.java @@ -74,6 +74,11 @@ public ResultManager init() { public void unsubscribe() { cancel.unsubscribe(); } + + @Override + public boolean isUnsubscribed() { + return cancel.isUnsubscribed(); + } @Override public void onNext(T args) { @@ -123,5 +128,6 @@ public void onCompleted() { } } + } } diff --git a/rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java b/rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java index c46d98b20f..1d244312e2 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java +++ b/rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java @@ -57,6 +57,7 @@ public Subscription call(Scheduler s, T t) { private static class ScheduledSubscription implements Subscription { private final Subscription underlying; private final Scheduler scheduler; + private volatile boolean unsubscribed = false; private ScheduledSubscription(Subscription underlying, Scheduler scheduler) { this.underlying = underlying; @@ -65,6 +66,7 @@ private ScheduledSubscription(Subscription underlying, Scheduler scheduler) { @Override public void unsubscribe() { + unsubscribed = true; scheduler.schedule(new Action0() { @Override public void call() { @@ -72,5 +74,10 @@ public void call() { } }); } + + @Override + public boolean isUnsubscribed() { + return unsubscribed; + } } } diff --git a/rxjava-core/src/main/java/rx/operators/OperationZip.java b/rxjava-core/src/main/java/rx/operators/OperationZip.java index 5b05946913..7f6affae4f 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationZip.java +++ b/rxjava-core/src/main/java/rx/operators/OperationZip.java @@ -294,6 +294,11 @@ public void connect() { public void unsubscribe() { toSource.unsubscribe(); } + + @Override + public boolean isUnsubscribed() { + return toSource.isUnsubscribed(); + } private void runCollector() { if (rwLock.writeLock().tryLock()) { @@ -330,6 +335,7 @@ private void runCollector() { } } } + } } diff --git a/rxjava-core/src/main/java/rx/operators/SafeObservableSubscription.java b/rxjava-core/src/main/java/rx/operators/SafeObservableSubscription.java index fb4364b86c..af496c05d7 100644 --- a/rxjava-core/src/main/java/rx/operators/SafeObservableSubscription.java +++ b/rxjava-core/src/main/java/rx/operators/SafeObservableSubscription.java @@ -33,6 +33,11 @@ public final class SafeObservableSubscription implements Subscription { public void unsubscribe() { } + + @Override + public boolean isUnsubscribed() { + return true; + } }; private final AtomicReference actualSubscription = new AtomicReference(); diff --git a/rxjava-core/src/main/java/rx/schedulers/CurrentThreadScheduler.java b/rxjava-core/src/main/java/rx/schedulers/CurrentThreadScheduler.java index 090ec7c601..effc5101be 100644 --- a/rxjava-core/src/main/java/rx/schedulers/CurrentThreadScheduler.java +++ b/rxjava-core/src/main/java/rx/schedulers/CurrentThreadScheduler.java @@ -135,6 +135,11 @@ public void unsubscribe() { childSubscription.unsubscribe(); } + @Override + public boolean isUnsubscribed() { + return childSubscription.isUnsubscribed(); + } + } /** diff --git a/rxjava-core/src/main/java/rx/schedulers/DiscardableAction.java b/rxjava-core/src/main/java/rx/schedulers/DiscardableAction.java index 0140af7c32..eafc825012 100644 --- a/rxjava-core/src/main/java/rx/schedulers/DiscardableAction.java +++ b/rxjava-core/src/main/java/rx/schedulers/DiscardableAction.java @@ -54,4 +54,9 @@ public void unsubscribe() { wrapper.unsubscribe(); } + @Override + public boolean isUnsubscribed() { + return wrapper.isUnsubscribed(); + } + } diff --git a/rxjava-core/src/main/java/rx/schedulers/ExecutorScheduler.java b/rxjava-core/src/main/java/rx/schedulers/ExecutorScheduler.java index b838d3251b..bd31760395 100644 --- a/rxjava-core/src/main/java/rx/schedulers/ExecutorScheduler.java +++ b/rxjava-core/src/main/java/rx/schedulers/ExecutorScheduler.java @@ -243,6 +243,11 @@ public void unsubscribe() { childSubscription.unsubscribe(); } + @Override + public boolean isUnsubscribed() { + return childSubscription.isUnsubscribed(); + } + } } diff --git a/rxjava-core/src/main/java/rx/schedulers/TestScheduler.java b/rxjava-core/src/main/java/rx/schedulers/TestScheduler.java index ac18c55316..cb6126b83b 100644 --- a/rxjava-core/src/main/java/rx/schedulers/TestScheduler.java +++ b/rxjava-core/src/main/java/rx/schedulers/TestScheduler.java @@ -23,6 +23,8 @@ import rx.Scheduler; import rx.Subscription; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Action0; import rx.util.functions.Func2; public class TestScheduler extends Scheduler { @@ -110,11 +112,12 @@ public Subscription schedule(T state, Func2 timedAction = new TimedAction(this, time + unit.toNanos(delayTime), action, state); queue.add(timedAction); - return new Subscription() { + return Subscriptions.create(new Action0() { + @Override - public void unsubscribe() { + public void call() { timedAction.cancel(); } - }; + }); } } diff --git a/rxjava-core/src/main/java/rx/subjects/SubjectSubscriptionManager.java b/rxjava-core/src/main/java/rx/subjects/SubjectSubscriptionManager.java index 945eef700f..81079eae1a 100644 --- a/rxjava-core/src/main/java/rx/subjects/SubjectSubscriptionManager.java +++ b/rxjava-core/src/main/java/rx/subjects/SubjectSubscriptionManager.java @@ -25,6 +25,8 @@ import rx.Subscription; import rx.Observable.OnSubscribe; import rx.operators.SafeObservableSubscription; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Action0; import rx.util.functions.Action1; /* package */class SubjectSubscriptionManager { @@ -71,9 +73,10 @@ public void call(Subscriber actualOperator) { final SafeObservableSubscription subscription = new SafeObservableSubscription(); actualOperator.add(subscription); // add to parent if the Subject itself is unsubscribed addedObserver = true; - subscription.wrap(new Subscription() { + subscription.wrap(Subscriptions.create(new Action0() { + @Override - public void unsubscribe() { + public void call() { State current; State newState; do { @@ -82,7 +85,7 @@ public void unsubscribe() { newState = current.removeObserver(subscription); } while (!state.compareAndSet(current, newState)); } - }); + })); // on subscribe add it to the map of outbound observers to notify newState = current.addObserver(subscription, observer); diff --git a/rxjava-core/src/main/java/rx/subscriptions/RefCountSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/RefCountSubscription.java index 7731842371..6165b7f627 100644 --- a/rxjava-core/src/main/java/rx/subscriptions/RefCountSubscription.java +++ b/rxjava-core/src/main/java/rx/subscriptions/RefCountSubscription.java @@ -131,5 +131,10 @@ public void unsubscribe() { unsubscribeActualIfApplicable(newState); } } + + @Override + public boolean isUnsubscribed() { + return innerDone.get(); + } }; } \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/subscriptions/Subscriptions.java b/rxjava-core/src/main/java/rx/subscriptions/Subscriptions.java index ad2195b4b2..6499967f65 100644 --- a/rxjava-core/src/main/java/rx/subscriptions/Subscriptions.java +++ b/rxjava-core/src/main/java/rx/subscriptions/Subscriptions.java @@ -44,11 +44,19 @@ public static Subscription empty() { public static Subscription create(final Action0 unsubscribe) { return new SafeObservableSubscription(new Subscription() { + private volatile boolean unsubscribed = false; + @Override public void unsubscribe() { + unsubscribed = true; unsubscribe.call(); } + @Override + public boolean isUnsubscribed() { + return unsubscribed; + } + }); } @@ -68,24 +76,9 @@ public void unsubscribe() { f.cancel(true); } - }; - } - - /** - * A {@link Subscription} that wraps a {@link Future} and cancels it when unsubscribed. - * - * - * @param f - * {@link Future} - * @return {@link Subscription} - * @deprecated Use {@link #from(Future)} instead - */ - public static Subscription create(final Future f) { - return new Subscription() { - @Override - public void unsubscribe() { - f.cancel(true); + public boolean isUnsubscribed() { + return f.isCancelled(); } }; @@ -103,24 +96,16 @@ public static CompositeSubscription from(Subscription... subscriptions) { return new CompositeSubscription(subscriptions); } - /** - * A {@link Subscription} that groups multiple Subscriptions together and unsubscribes from all of them together. - * - * @param subscriptions - * Subscriptions to group together - * @return {@link Subscription} - * @deprecated Use {@link #from(Subscription...)} instead - */ - - public static CompositeSubscription create(Subscription... subscriptions) { - return new CompositeSubscription(subscriptions); - } - /** * A {@link Subscription} that does nothing when its unsubscribe method is called. */ private static Subscription EMPTY = new Subscription() { public void unsubscribe() { } + + @Override + public boolean isUnsubscribed() { + return false; + } }; } diff --git a/rxjava-core/src/test/java/rx/operators/OperationConcatTest.java b/rxjava-core/src/test/java/rx/operators/OperationConcatTest.java index 4a3599fb0e..5cc1a588c1 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationConcatTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationConcatTest.java @@ -35,6 +35,7 @@ import rx.Subscription; import rx.schedulers.TestScheduler; import rx.subscriptions.BooleanSubscription; +import rx.subscriptions.Subscriptions; public class OperationConcatTest { @@ -95,14 +96,7 @@ public Subscription onSubscribe(Observer> observer) { observer.onNext(even); observer.onCompleted(); - return new Subscription() { - - @Override - public void unsubscribe() { - // unregister ... will never be called here since we are executing synchronously - } - - }; + return Subscriptions.empty(); } }); @@ -348,13 +342,7 @@ public Subscription onSubscribe(Observer> observer) { observer.onNext(Observable.create(w2)); observer.onCompleted(); - return new Subscription() { - - @Override - public void unsubscribe() { - } - - }; + return Subscriptions.empty(); } }); @@ -482,6 +470,11 @@ public void unsubscribe() { subscribed = false; } + @Override + public boolean isUnsubscribed() { + return subscribed; + } + }; private final List values; private Thread t = null; diff --git a/rxjava-core/src/test/java/rx/operators/OperationMaterializeTest.java b/rxjava-core/src/test/java/rx/operators/OperationMaterializeTest.java index f00e6015a9..75dfaa59a7 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationMaterializeTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationMaterializeTest.java @@ -29,6 +29,7 @@ import rx.Observer; import rx.Subscriber; import rx.Subscription; +import rx.subscriptions.Subscriptions; public class OperationMaterializeTest { @@ -154,14 +155,7 @@ public void run() { }); t.start(); - return new Subscription() { - - @Override - public void unsubscribe() { - - } - - }; + return Subscriptions.empty(); } } } diff --git a/rxjava-core/src/test/java/rx/operators/OperationMergeDelayErrorTest.java b/rxjava-core/src/test/java/rx/operators/OperationMergeDelayErrorTest.java index 9ff81f6181..6a6908698a 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationMergeDelayErrorTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationMergeDelayErrorTest.java @@ -31,6 +31,7 @@ import rx.Observable; import rx.Observer; import rx.Subscription; +import rx.subscriptions.Subscriptions; import rx.util.CompositeException; public class OperationMergeDelayErrorTest { @@ -225,14 +226,7 @@ public Subscription onSubscribe(Observer> observer) { observer.onNext(o2); observer.onCompleted(); - return new Subscription() { - - @Override - public void unsubscribe() { - // unregister ... will never be called here since we are executing synchronously - } - - }; + return Subscriptions.empty(); } }); @@ -330,14 +324,7 @@ public Subscription onSubscribe(Observer observer) { observer.onNext("hello"); observer.onCompleted(); - return new Subscription() { - - @Override - public void unsubscribe() { - // unregister ... will never be called here since we are executing synchronously - } - - }; + return Subscriptions.empty(); } } @@ -357,14 +344,7 @@ public void run() { }); t.start(); - return new Subscription() { - - @Override - public void unsubscribe() { - - } - - }; + return Subscriptions.empty(); } } @@ -383,6 +363,11 @@ public void unsubscribe() { } + @Override + public boolean isUnsubscribed() { + return unsubscribed; + } + }; /* used to simulate subscription */ @@ -434,14 +419,7 @@ public Subscription onSubscribe(Observer observer) { observer.onCompleted(); } - return new Subscription() { - - @Override - public void unsubscribe() { - // unregister ... will never be called here since we are executing synchronously - } - - }; + return Subscriptions.empty(); } } @@ -482,14 +460,7 @@ public void run() { }); t.start(); - return new Subscription() { - - @Override - public void unsubscribe() { - - } - - }; + return Subscriptions.empty(); } } diff --git a/rxjava-core/src/test/java/rx/operators/OperationSynchronizeTest.java b/rxjava-core/src/test/java/rx/operators/OperationSynchronizeTest.java index 7d796b5b89..c94ba537ca 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationSynchronizeTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationSynchronizeTest.java @@ -216,6 +216,11 @@ public void unsubscribe() { System.out.println("==> SynchronizeTest unsubscribe that does nothing!"); } + @Override + public boolean isUnsubscribed() { + return false; + } + }; } diff --git a/rxjava-core/src/test/java/rx/operators/OperationUsingTest.java b/rxjava-core/src/test/java/rx/operators/OperationUsingTest.java index 646dfc22db..4a7e8f419d 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationUsingTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationUsingTest.java @@ -102,6 +102,11 @@ public String getTextFromWeb() { public void unsubscribe() { } + @Override + public boolean isUnsubscribed() { + return false; + } + }; } }; diff --git a/rxjava-core/src/test/java/rx/operators/OperatorMergeTest.java b/rxjava-core/src/test/java/rx/operators/OperatorMergeTest.java index d20cc8acb9..7d30e46413 100644 --- a/rxjava-core/src/test/java/rx/operators/OperatorMergeTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorMergeTest.java @@ -66,14 +66,7 @@ public Subscription onSubscribe(Observer> observer) { observer.onNext(o2); observer.onCompleted(); - return new Subscription() { - - @Override - public void unsubscribe() { - // unregister ... will never be called here since we are executing synchronously - } - - }; + return Subscriptions.empty(); } }); @@ -329,14 +322,7 @@ public Subscription onSubscribe(Observer observer) { observer.onNext("hello"); observer.onCompleted(); - return new Subscription() { - - @Override - public void unsubscribe() { - // unregister ... will never be called here since we are executing synchronously - } - - }; + return Subscriptions.empty(); } } @@ -360,14 +346,7 @@ public void run() { }); t.start(); - return new Subscription() { - - @Override - public void unsubscribe() { - - } - - }; + return Subscriptions.empty(); } } @@ -386,6 +365,11 @@ public void unsubscribe() { } + @Override + public boolean isUnsubscribed() { + return unsubscribed; + } + }; /* used to simulate subscription */ @@ -432,14 +416,7 @@ public Subscription onSubscribe(Observer observer) { } observer.onCompleted(); - return new Subscription() { - - @Override - public void unsubscribe() { - // unregister ... will never be called here since we are executing synchronously - } - - }; + return Subscriptions.empty(); } } diff --git a/rxjava-core/src/test/java/rx/operators/OperatorTakeTest.java b/rxjava-core/src/test/java/rx/operators/OperatorTakeTest.java index a75fa703e8..5331bfa90a 100644 --- a/rxjava-core/src/test/java/rx/operators/OperatorTakeTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorTakeTest.java @@ -145,6 +145,11 @@ public Subscription onSubscribe(Observer observer) { public void unsubscribe() { unSubscribed.set(true); } + + @Override + public boolean isUnsubscribed() { + return unSubscribed.get(); + } }; } }); diff --git a/rxjava-core/src/test/java/rx/subscriptions/CompositeSubscriptionTest.java b/rxjava-core/src/test/java/rx/subscriptions/CompositeSubscriptionTest.java index 432a78ecdd..99782f6f0e 100644 --- a/rxjava-core/src/test/java/rx/subscriptions/CompositeSubscriptionTest.java +++ b/rxjava-core/src/test/java/rx/subscriptions/CompositeSubscriptionTest.java @@ -39,6 +39,11 @@ public void testSuccess() { public void unsubscribe() { counter.incrementAndGet(); } + + @Override + public boolean isUnsubscribed() { + return false; + } }); s.add(new Subscription() { @@ -47,6 +52,11 @@ public void unsubscribe() { public void unsubscribe() { counter.incrementAndGet(); } + + @Override + public boolean isUnsubscribed() { + return false; + } }); s.unsubscribe(); @@ -68,6 +78,11 @@ public void shouldUnsubscribeAll() throws InterruptedException { public void unsubscribe() { counter.incrementAndGet(); } + + @Override + public boolean isUnsubscribed() { + return false; + } }); } @@ -106,6 +121,11 @@ public void testException() { public void unsubscribe() { throw new RuntimeException("failed on first one"); } + + @Override + public boolean isUnsubscribed() { + return false; + } }); s.add(new Subscription() { @@ -114,6 +134,11 @@ public void unsubscribe() { public void unsubscribe() { counter.incrementAndGet(); } + + @Override + public boolean isUnsubscribed() { + return false; + } }); try { @@ -138,6 +163,11 @@ public void testCompositeException() { public void unsubscribe() { throw new RuntimeException("failed on first one"); } + + @Override + public boolean isUnsubscribed() { + return false; + } }); s.add(new Subscription() { @@ -146,6 +176,11 @@ public void unsubscribe() { public void unsubscribe() { throw new RuntimeException("failed on second one too"); } + + @Override + public boolean isUnsubscribed() { + return false; + } }); s.add(new Subscription() { @@ -154,6 +189,11 @@ public void unsubscribe() { public void unsubscribe() { counter.incrementAndGet(); } + + @Override + public boolean isUnsubscribed() { + return false; + } }); try { @@ -220,6 +260,11 @@ public void testUnsubscribeIdempotence() { public void unsubscribe() { counter.incrementAndGet(); } + + @Override + public boolean isUnsubscribed() { + return false; + } }); s.unsubscribe(); @@ -244,6 +289,11 @@ public void testUnsubscribeIdempotenceConcurrently() public void unsubscribe() { counter.incrementAndGet(); } + + @Override + public boolean isUnsubscribed() { + return false; + } }); final List threads = new ArrayList();