From 069a0b1e91ef1a82ad426b46a96830cfc2390d1a Mon Sep 17 00:00:00 2001 From: wrightm Date: Wed, 2 Sep 2015 23:42:43 +0100 Subject: [PATCH] Added more tests and cleaned up SubscriptionList to program to List interface and removed static method. --- .../rx/internal/util/SubscriptionList.java | 10 +- .../internal/util/SubscriptionListTest.java | 347 +++++++++--------- 2 files changed, 175 insertions(+), 182 deletions(-) diff --git a/src/main/java/rx/internal/util/SubscriptionList.java b/src/main/java/rx/internal/util/SubscriptionList.java index 6f6f391dde..bc2825f87d 100644 --- a/src/main/java/rx/internal/util/SubscriptionList.java +++ b/src/main/java/rx/internal/util/SubscriptionList.java @@ -27,7 +27,7 @@ */ public final class SubscriptionList implements Subscription { - private LinkedList subscriptions; + private List subscriptions; private volatile boolean unsubscribed; public SubscriptionList() { @@ -62,7 +62,7 @@ public void add(final Subscription s) { if (!unsubscribed) { synchronized (this) { if (!unsubscribed) { - LinkedList subs = subscriptions; + List subs = subscriptions; if (subs == null) { subs = new LinkedList(); subscriptions = subs; @@ -80,7 +80,7 @@ public void remove(final Subscription s) { if (!unsubscribed) { boolean unsubscribe = false; synchronized (this) { - LinkedList subs = subscriptions; + List subs = subscriptions; if (unsubscribed || subs == null) { return; } @@ -114,7 +114,7 @@ public void unsubscribe() { } } - private static void unsubscribeFromAll(Collection subscriptions) { + private void unsubscribeFromAll(Collection subscriptions) { if (subscriptions == null) { return; } @@ -124,7 +124,7 @@ private static void unsubscribeFromAll(Collection subscriptions) { s.unsubscribe(); } catch (Throwable e) { if (es == null) { - es = new ArrayList(); + es = new LinkedList(); } es.add(e); } diff --git a/src/test/java/rx/internal/util/SubscriptionListTest.java b/src/test/java/rx/internal/util/SubscriptionListTest.java index 614b55445d..5de69177ab 100644 --- a/src/test/java/rx/internal/util/SubscriptionListTest.java +++ b/src/test/java/rx/internal/util/SubscriptionListTest.java @@ -23,6 +23,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Assert; import org.junit.Test; import rx.Subscription; @@ -34,34 +35,10 @@ public class SubscriptionListTest { @Test public void testSuccess() { final AtomicInteger counter = new AtomicInteger(); - SubscriptionList s = new SubscriptionList(); - s.add(new Subscription() { + final int numberOfSubscriptions = 2; + SubscriptionList subscriptions = createSubscriptions(counter, numberOfSubscriptions); - @Override - public void unsubscribe() { - counter.incrementAndGet(); - } - - @Override - public boolean isUnsubscribed() { - return false; - } - }); - - s.add(new Subscription() { - - @Override - public void unsubscribe() { - counter.incrementAndGet(); - } - - @Override - public boolean isUnsubscribed() { - return false; - } - }); - - s.unsubscribe(); + subscriptions.unsubscribe(); assertEquals(2, counter.get()); } @@ -69,82 +46,32 @@ public boolean isUnsubscribed() { @Test(timeout = 1000) public void shouldUnsubscribeAll() throws InterruptedException { final AtomicInteger counter = new AtomicInteger(); - final SubscriptionList s = new SubscriptionList(); - - final int count = 10; + final int numberOfSubscriptions = 10; + + SubscriptionList subscriptions = createSubscriptions(counter, numberOfSubscriptions); + final CountDownLatch start = new CountDownLatch(1); - for (int i = 0; i < count; i++) { - s.add(new Subscription() { - - @Override - public void unsubscribe() { - counter.incrementAndGet(); - } - - @Override - public boolean isUnsubscribed() { - return false; - } - }); - } - - final List threads = new ArrayList(); - for (int i = 0; i < count; i++) { - final Thread t = new Thread() { - @Override - public void run() { - try { - start.await(); - s.unsubscribe(); - } catch (final InterruptedException e) { - fail(e.getMessage()); - } - } - }; - t.start(); - threads.add(t); - } - + + final List threads = createUnsubscribeThreads(start, subscriptions, numberOfSubscriptions); + start.countDown(); - for (final Thread t : threads) { - t.join(); - } + + joinThreads(threads); - assertEquals(count, counter.get()); + assertEquals(numberOfSubscriptions, counter.get()); } @Test - public void testException() { + public void testRuntimeException() { + final AtomicInteger counter = new AtomicInteger(); - SubscriptionList s = new SubscriptionList(); - s.add(new Subscription() { - - @Override - public void unsubscribe() { - throw new RuntimeException("failed on first one"); - } - - @Override - public boolean isUnsubscribed() { - return false; - } - }); - - s.add(new Subscription() { - - @Override - public void unsubscribe() { - counter.incrementAndGet(); - } - - @Override - public boolean isUnsubscribed() { - return false; - } - }); - + final int numberOfSubscriptions = 1; + + SubscriptionList subscriptions = createSubscriptions(counter, numberOfSubscriptions); + addThrowableSubscriptions(subscriptions, new RuntimeException("failed on first one"), numberOfSubscriptions); + try { - s.unsubscribe(); + subscriptions.unsubscribe(); fail("Expecting an exception"); } catch (RuntimeException e) { // we expect this @@ -157,49 +84,15 @@ public boolean isUnsubscribed() { @Test public void testCompositeException() { - final AtomicInteger counter = new AtomicInteger(); - SubscriptionList s = new SubscriptionList(); - s.add(new Subscription() { - - @Override - public void unsubscribe() { - throw new RuntimeException("failed on first one"); - } - - @Override - public boolean isUnsubscribed() { - return false; - } - }); - - s.add(new Subscription() { - - @Override - public void unsubscribe() { - throw new RuntimeException("failed on second one too"); - } - - @Override - public boolean isUnsubscribed() { - return false; - } - }); - - s.add(new Subscription() { - - @Override - public void unsubscribe() { - counter.incrementAndGet(); - } - - @Override - public boolean isUnsubscribed() { - return false; - } - }); + final AtomicInteger counter = new AtomicInteger(); + final int numberOfSubscriptions = 1; + + SubscriptionList subscriptions = createSubscriptions(counter, numberOfSubscriptions); + addThrowableSubscriptions(subscriptions, new RuntimeException("failed on first one"), numberOfSubscriptions); + addThrowableSubscriptions(subscriptions, new RuntimeException("failed on second one too"), numberOfSubscriptions); try { - s.unsubscribe(); + subscriptions.unsubscribe(); fail("Expecting an exception"); } catch (CompositeException e) { // we expect this @@ -213,24 +106,14 @@ public boolean isUnsubscribed() { @Test public void testUnsubscribeIdempotence() { - final AtomicInteger counter = new AtomicInteger(); - SubscriptionList s = new SubscriptionList(); - s.add(new Subscription() { - - @Override - public void unsubscribe() { - counter.incrementAndGet(); - } - - @Override - public boolean isUnsubscribed() { - return false; - } - }); + final AtomicInteger counter = new AtomicInteger(); + final int numberOfSubscriptions = 1; + + SubscriptionList subscriptions = createSubscriptions(counter, numberOfSubscriptions); - s.unsubscribe(); - s.unsubscribe(); - s.unsubscribe(); + subscriptions.unsubscribe(); + subscriptions.unsubscribe(); + subscriptions.unsubscribe(); // we should have only unsubscribed once assertEquals(1, counter.get()); @@ -239,32 +122,125 @@ public boolean isUnsubscribed() { @Test(timeout = 1000) public void testUnsubscribeIdempotenceConcurrently() throws InterruptedException { - final AtomicInteger counter = new AtomicInteger(); - final SubscriptionList s = new SubscriptionList(); + final AtomicInteger counter = new AtomicInteger(); + final int numberOfSubscriptions = 1; - final int count = 10; - final CountDownLatch start = new CountDownLatch(1); - s.add(new Subscription() { + SubscriptionList subscriptions = createSubscriptions(counter, numberOfSubscriptions); - @Override - public void unsubscribe() { - counter.incrementAndGet(); - } + + final CountDownLatch start = new CountDownLatch(1); + final int numberOfThreads = 10; + final List threads = createUnsubscribeThreads(start, subscriptions, numberOfThreads); + + start.countDown(); + + joinThreads(threads); + - @Override - public boolean isUnsubscribed() { - return false; - } - }); + start.countDown(); - final List threads = new ArrayList(); - for (int i = 0; i < count; i++) { + // we should have only unsubscribed once + assertEquals(1, counter.get()); + } + + @Test + public void testRemoveSuccess(){ + SubscriptionList subscriptions = new SubscriptionList(); + Subscription subscription = createSubscribedSubscription(); + + subscriptions.add(subscription); + Assert.assertFalse(subscription.isUnsubscribed()); + + subscriptions.remove(subscription); + Assert.assertTrue(subscription.isUnsubscribed()); + } + + @Test + public void testHasSubscriptionsAfterRemove(){ + SubscriptionList subscriptions = new SubscriptionList(); + Subscription subscription = createSubscribedSubscription(); + + subscriptions.add(subscription); + Assert.assertTrue(subscriptions.hasSubscriptions()); + + subscriptions.remove(subscription); + Assert.assertFalse(subscriptions.hasSubscriptions()); + } + + @Test + public void testHasSubscriptionsAfterUnsubscribe(){ + SubscriptionList subscriptions = new SubscriptionList(); + Subscription subscription = createSubscribedSubscription(); + + subscriptions.add(subscription); + Assert.assertTrue(subscriptions.hasSubscriptions()); + + subscriptions.unsubscribe(); + Assert.assertFalse(subscriptions.hasSubscriptions()); + } + + @Test + public void testClearSuccess(){ + SubscriptionList subscriptions = new SubscriptionList(); + Subscription subscription = createSubscribedSubscription(); + subscriptions.add(subscription); + + Assert.assertFalse(subscription.isUnsubscribed()); + + subscriptions.clear(); + + Assert.assertTrue(subscription.isUnsubscribed()); + Assert.assertFalse(subscriptions.hasSubscriptions()); + } + + private SubscriptionList createSubscriptions(final AtomicInteger counter, final int numberOfSubscriptions){ + SubscriptionList subscriptions = new SubscriptionList(); + for(int i = 0; i < numberOfSubscriptions; ++i){ + subscriptions.add(new Subscription() { + + @Override + public void unsubscribe() { + counter.incrementAndGet(); + } + + @Override + public boolean isUnsubscribed() { + return false; + } + }); + } + + return subscriptions; + } + + private SubscriptionList addThrowableSubscriptions(final SubscriptionList subscriptions, final RuntimeException runtimeException, final int numberOfSubscriptions){ + for(int i = 0; i < numberOfSubscriptions; ++i){ + subscriptions.add(new Subscription() { + + @Override + public void unsubscribe() { + throw runtimeException; + } + + @Override + public boolean isUnsubscribed() { + return false; + } + }); + } + + return subscriptions; + } + + private List createUnsubscribeThreads(final CountDownLatch start, final SubscriptionList subscriptions, final int numberOfThreads){ + final List threads = new ArrayList(); + for (int i = 0; i < numberOfThreads; i++) { final Thread t = new Thread() { @Override public void run() { try { start.await(); - s.unsubscribe(); + subscriptions.unsubscribe(); } catch (final InterruptedException e) { fail(e.getMessage()); } @@ -273,13 +249,30 @@ public void run() { t.start(); threads.add(t); } - - start.countDown(); - for (final Thread t : threads) { + return threads; + } + + private void joinThreads(List threads) throws InterruptedException{ + for (final Thread t : threads) { t.join(); } - - // we should have only unsubscribed once - assertEquals(1, counter.get()); - } + } + + private Subscription createSubscribedSubscription(){ + return new Subscription() { + + private boolean unsubscribed = false; + + @Override + public void unsubscribe() { + unsubscribed = true; + } + + @Override + public boolean isUnsubscribed() { + return unsubscribed; + } + }; + } + }