diff --git a/build.gradle b/build.gradle index 796c066c6e..c3975fa3c5 100644 --- a/build.gradle +++ b/build.gradle @@ -11,8 +11,8 @@ group = 'io.reactivex.rxjava2' description = 'RxJava: Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.' apply plugin: 'java' -// apply plugin: 'pmd' -apply plugin: 'findbugs' +// apply plugin: 'pmd' // disabled because runs out of memory on Travis +// apply plugin: 'findbugs' // disabled because runs out of memory on Travis apply plugin: 'checkstyle' apply plugin: 'jacoco' apply plugin: 'ru.vyarus.animalsniffer' @@ -126,6 +126,7 @@ checkstyle { ignoreFailures = true } +/* findbugs { ignoreFailures true toolVersion = '3.0.1' @@ -140,4 +141,4 @@ findbugsMain { xml.enabled = true } } - +*/ diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableRefCount.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableRefCount.java index 28bec338b3..9f6da012bc 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableRefCount.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableRefCount.java @@ -33,33 +33,35 @@ public final class FlowableRefCount extends AbstractFlowableWithUpstream { final ConnectableFlowable source; volatile CompositeDisposable baseSubscription = new CompositeDisposable(); - final AtomicInteger subscriptionCount = new AtomicInteger(0); + final AtomicInteger subscriptionCount = new AtomicInteger(); /** * Use this lock for every subscription and disconnect action. */ final ReentrantLock lock = new ReentrantLock(); - final class ConnectionSubscriber implements Subscriber, Subscription { + final class ConnectionSubscriber + extends AtomicReference + implements Subscriber, Subscription { + /** */ + private static final long serialVersionUID = 152064694420235350L; final Subscriber subscriber; final CompositeDisposable currentBase; final Disposable resource; - Subscription s; + final AtomicLong requested; ConnectionSubscriber(Subscriber subscriber, CompositeDisposable currentBase, Disposable resource) { this.subscriber = subscriber; this.currentBase = currentBase; this.resource = resource; + this.requested = new AtomicLong(); } @Override public void onSubscribe(Subscription s) { - if (SubscriptionHelper.validate(this.s, s)) { - this.s = s; - subscriber.onSubscribe(this); - } + SubscriptionHelper.deferredSetOnce(this, requested, s); } @Override @@ -81,12 +83,12 @@ public void onComplete() { @Override public void request(long n) { - s.request(n); + SubscriptionHelper.deferredRequest(this, requested, n); } @Override public void cancel() { - s.cancel(); + SubscriptionHelper.cancel(this); resource.dispose(); } @@ -173,9 +175,10 @@ void doSubscribe(final Subscriber subscriber, final CompositeDisposab // handle unsubscribing from the base subscription Disposable d = disconnect(currentBase); - ConnectionSubscriber s = new ConnectionSubscriber(subscriber, currentBase, d); + ConnectionSubscriber connection = new ConnectionSubscriber(subscriber, currentBase, d); + subscriber.onSubscribe(connection); - source.subscribe(s); + source.subscribe(connection); } private Disposable disconnect(final CompositeDisposable current) { diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableRefCount.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableRefCount.java index 0238553b36..5e7516f2ac 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableRefCount.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableRefCount.java @@ -87,7 +87,7 @@ public void subscribeActual(final Observer subscriber) { } - private Consumer onSubscribe(final Observer subscriber, + private Consumer onSubscribe(final Observer observer, final AtomicBoolean writeLocked) { return new Consumer() { @Override @@ -95,7 +95,7 @@ public void accept(Disposable subscription) { try { baseSubscription.add(subscription); // ready to subscribe to source so do it - doSubscribe(subscriber, baseSubscription); + doSubscribe(observer, baseSubscription); } finally { // release the write lock lock.unlock(); @@ -105,11 +105,12 @@ public void accept(Disposable subscription) { }; } - void doSubscribe(final Observer subscriber, final CompositeDisposable currentBase) { + void doSubscribe(final Observer observer, final CompositeDisposable currentBase) { // handle unsubscribing from the base subscription Disposable d = disconnect(currentBase); - ConnectionSubscriber s = new ConnectionSubscriber(subscriber, currentBase, d); + ConnectionSubscriber s = new ConnectionSubscriber(observer, currentBase, d); + observer.onSubscribe(s); source.subscribe(s); } @@ -135,13 +136,16 @@ public void run() { }); } - final class ConnectionSubscriber implements Observer, Disposable { + final class ConnectionSubscriber + extends AtomicReference + implements Observer, Disposable { + /** */ + private static final long serialVersionUID = 3813126992133394324L; + final Observer subscriber; final CompositeDisposable currentBase; final Disposable resource; - Disposable s; - ConnectionSubscriber(Observer subscriber, CompositeDisposable currentBase, Disposable resource) { this.subscriber = subscriber; @@ -151,10 +155,7 @@ final class ConnectionSubscriber implements Observer, Disposable { @Override public void onSubscribe(Disposable s) { - if (DisposableHelper.validate(this.s, s)) { - this.s = s; - subscriber.onSubscribe(this); - } + DisposableHelper.setOnce(this, s); } @Override @@ -176,13 +177,13 @@ public void onComplete() { @Override public void dispose() { - s.dispose(); + DisposableHelper.dispose(this); resource.dispose(); } @Override public boolean isDisposed() { - return s.isDisposed(); + return DisposableHelper.isDisposed(get()); } void cleanup() { diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithSizeTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithSizeTest.java index 24dd7f61b3..1593b39039 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithSizeTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithSizeTest.java @@ -132,7 +132,7 @@ public void accept(Integer t1) { if (count.incrementAndGet() == 500000) { // give it a small break halfway through try { - Thread.sleep(1); + Thread.sleep(5); } catch (InterruptedException ex) { // ignored }