Skip to content

Commit 4ce25cb

Browse files
authored
2.x: fix operator RefCount, disable FindBugs (due to Travis OOM) (#4506)
1 parent 043f37d commit 4ce25cb

File tree

4 files changed

+33
-28
lines changed

4 files changed

+33
-28
lines changed

build.gradle

+4-3
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ group = 'io.reactivex.rxjava2'
1111
description = 'RxJava: Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.'
1212

1313
apply plugin: 'java'
14-
// apply plugin: 'pmd'
15-
apply plugin: 'findbugs'
14+
// apply plugin: 'pmd' // disabled because runs out of memory on Travis
15+
// apply plugin: 'findbugs' // disabled because runs out of memory on Travis
1616
apply plugin: 'checkstyle'
1717
apply plugin: 'jacoco'
1818
apply plugin: 'ru.vyarus.animalsniffer'
@@ -126,6 +126,7 @@ checkstyle {
126126
ignoreFailures = true
127127
}
128128

129+
/*
129130
findbugs {
130131
ignoreFailures true
131132
toolVersion = '3.0.1'
@@ -140,4 +141,4 @@ findbugsMain {
140141
xml.enabled = true
141142
}
142143
}
143-
144+
*/

src/main/java/io/reactivex/internal/operators/flowable/FlowableRefCount.java

+14-11
Original file line numberDiff line numberDiff line change
@@ -33,33 +33,35 @@
3333
public final class FlowableRefCount<T> extends AbstractFlowableWithUpstream<T, T> {
3434
final ConnectableFlowable<? extends T> source;
3535
volatile CompositeDisposable baseSubscription = new CompositeDisposable();
36-
final AtomicInteger subscriptionCount = new AtomicInteger(0);
36+
final AtomicInteger subscriptionCount = new AtomicInteger();
3737

3838
/**
3939
* Use this lock for every subscription and disconnect action.
4040
*/
4141
final ReentrantLock lock = new ReentrantLock();
4242

43-
final class ConnectionSubscriber implements Subscriber<T>, Subscription {
43+
final class ConnectionSubscriber
44+
extends AtomicReference<Subscription>
45+
implements Subscriber<T>, Subscription {
46+
/** */
47+
private static final long serialVersionUID = 152064694420235350L;
4448
final Subscriber<? super T> subscriber;
4549
final CompositeDisposable currentBase;
4650
final Disposable resource;
4751

48-
Subscription s;
52+
final AtomicLong requested;
4953

5054
ConnectionSubscriber(Subscriber<? super T> subscriber,
5155
CompositeDisposable currentBase, Disposable resource) {
5256
this.subscriber = subscriber;
5357
this.currentBase = currentBase;
5458
this.resource = resource;
59+
this.requested = new AtomicLong();
5560
}
5661

5762
@Override
5863
public void onSubscribe(Subscription s) {
59-
if (SubscriptionHelper.validate(this.s, s)) {
60-
this.s = s;
61-
subscriber.onSubscribe(this);
62-
}
64+
SubscriptionHelper.deferredSetOnce(this, requested, s);
6365
}
6466

6567
@Override
@@ -81,12 +83,12 @@ public void onComplete() {
8183

8284
@Override
8385
public void request(long n) {
84-
s.request(n);
86+
SubscriptionHelper.deferredRequest(this, requested, n);
8587
}
8688

8789
@Override
8890
public void cancel() {
89-
s.cancel();
91+
SubscriptionHelper.cancel(this);
9092
resource.dispose();
9193
}
9294

@@ -173,9 +175,10 @@ void doSubscribe(final Subscriber<? super T> subscriber, final CompositeDisposab
173175
// handle unsubscribing from the base subscription
174176
Disposable d = disconnect(currentBase);
175177

176-
ConnectionSubscriber s = new ConnectionSubscriber(subscriber, currentBase, d);
178+
ConnectionSubscriber connection = new ConnectionSubscriber(subscriber, currentBase, d);
179+
subscriber.onSubscribe(connection);
177180

178-
source.subscribe(s);
181+
source.subscribe(connection);
179182
}
180183

181184
private Disposable disconnect(final CompositeDisposable current) {

src/main/java/io/reactivex/internal/operators/observable/ObservableRefCount.java

+14-13
Original file line numberDiff line numberDiff line change
@@ -87,15 +87,15 @@ public void subscribeActual(final Observer<? super T> subscriber) {
8787

8888
}
8989

90-
private Consumer<Disposable> onSubscribe(final Observer<? super T> subscriber,
90+
private Consumer<Disposable> onSubscribe(final Observer<? super T> observer,
9191
final AtomicBoolean writeLocked) {
9292
return new Consumer<Disposable>() {
9393
@Override
9494
public void accept(Disposable subscription) {
9595
try {
9696
baseSubscription.add(subscription);
9797
// ready to subscribe to source so do it
98-
doSubscribe(subscriber, baseSubscription);
98+
doSubscribe(observer, baseSubscription);
9999
} finally {
100100
// release the write lock
101101
lock.unlock();
@@ -105,11 +105,12 @@ public void accept(Disposable subscription) {
105105
};
106106
}
107107

108-
void doSubscribe(final Observer<? super T> subscriber, final CompositeDisposable currentBase) {
108+
void doSubscribe(final Observer<? super T> observer, final CompositeDisposable currentBase) {
109109
// handle unsubscribing from the base subscription
110110
Disposable d = disconnect(currentBase);
111111

112-
ConnectionSubscriber s = new ConnectionSubscriber(subscriber, currentBase, d);
112+
ConnectionSubscriber s = new ConnectionSubscriber(observer, currentBase, d);
113+
observer.onSubscribe(s);
113114

114115
source.subscribe(s);
115116
}
@@ -135,13 +136,16 @@ public void run() {
135136
});
136137
}
137138

138-
final class ConnectionSubscriber implements Observer<T>, Disposable {
139+
final class ConnectionSubscriber
140+
extends AtomicReference<Disposable>
141+
implements Observer<T>, Disposable {
142+
/** */
143+
private static final long serialVersionUID = 3813126992133394324L;
144+
139145
final Observer<? super T> subscriber;
140146
final CompositeDisposable currentBase;
141147
final Disposable resource;
142148

143-
Disposable s;
144-
145149
ConnectionSubscriber(Observer<? super T> subscriber,
146150
CompositeDisposable currentBase, Disposable resource) {
147151
this.subscriber = subscriber;
@@ -151,10 +155,7 @@ final class ConnectionSubscriber implements Observer<T>, Disposable {
151155

152156
@Override
153157
public void onSubscribe(Disposable s) {
154-
if (DisposableHelper.validate(this.s, s)) {
155-
this.s = s;
156-
subscriber.onSubscribe(this);
157-
}
158+
DisposableHelper.setOnce(this, s);
158159
}
159160

160161
@Override
@@ -176,13 +177,13 @@ public void onComplete() {
176177

177178
@Override
178179
public void dispose() {
179-
s.dispose();
180+
DisposableHelper.dispose(this);
180181
resource.dispose();
181182
}
182183

183184
@Override
184185
public boolean isDisposed() {
185-
return s.isDisposed();
186+
return DisposableHelper.isDisposed(get());
186187
}
187188

188189
void cleanup() {

src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithSizeTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ public void accept(Integer t1) {
132132
if (count.incrementAndGet() == 500000) {
133133
// give it a small break halfway through
134134
try {
135-
Thread.sleep(1);
135+
Thread.sleep(5);
136136
} catch (InterruptedException ex) {
137137
// ignored
138138
}

0 commit comments

Comments
 (0)