Skip to content

Commit e84651e

Browse files
JakeWhartonakarnokd
authored andcommitted
Use DisposableHelper and SubscriptionHelper to reduce duplication. (ReactiveX#4058)
A lot of operators contained their own marker instance and class definition of Disposable or Subscription. This changes almost all of them to use the shared instance provided by their respetive helper class. Some duplication still exists but can be cleaned up in a subsequent change.
1 parent c67eb03 commit e84651e

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+192
-1076
lines changed

Diff for: src/main/java/io/reactivex/disposables/RefCountDisposable.java

+5-17
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
package io.reactivex.disposables;
1515

16+
import io.reactivex.internal.disposables.DisposableHelper;
1617
import java.util.concurrent.atomic.*;
1718

1819
import io.reactivex.internal.functions.Objects;
@@ -21,10 +22,7 @@ public final class RefCountDisposable implements Disposable {
2122

2223
final AtomicReference<Disposable> resource = new AtomicReference<Disposable>();
2324

24-
static final Disposable DISPOSED = new Disposable() {
25-
@Override
26-
public void dispose() { }
27-
};
25+
static final Disposable DISPOSED = DisposableHelper.DISPOSED;
2826

2927
final AtomicInteger count = new AtomicInteger();
3028

@@ -40,29 +38,19 @@ public RefCountDisposable(Disposable resource) {
4038
public void dispose() {
4139
if (once.compareAndSet(false, true)) {
4240
if (count.decrementAndGet() == 0) {
43-
disposeActual();
41+
DisposableHelper.dispose(resource);
4442
}
4543
}
4644
}
47-
48-
void disposeActual() {
49-
Disposable d = resource.get();
50-
if (d != DISPOSED) {
51-
d = resource.getAndSet(DISPOSED);
52-
if (d != DISPOSED && d != null) {
53-
d.dispose();
54-
}
55-
}
56-
}
57-
45+
5846
public Disposable get() {
5947
count.getAndIncrement();
6048
return new InnerDisposable(this);
6149
}
6250

6351
void release() {
6452
if (count.decrementAndGet() == 0) {
65-
disposeActual();
53+
DisposableHelper.dispose(resource);
6654
}
6755
}
6856

Diff for: src/main/java/io/reactivex/internal/operators/flowable/FlowableAmb.java

+4-22
Original file line numberDiff line numberDiff line change
@@ -150,19 +150,7 @@ static final class AmbInnerSubscriber<T> extends AtomicReference<Subscription> i
150150
boolean won;
151151

152152
final AtomicLong missedRequested = new AtomicLong();
153-
154-
static final Subscription CANCELLED = new Subscription() {
155-
@Override
156-
public void request(long n) {
157-
158-
}
159-
160-
@Override
161-
public void cancel() {
162-
163-
}
164-
};
165-
153+
166154
public AmbInnerSubscriber(AmbCoordinator<T> parent, int index, Subscriber<? super T> actual) {
167155
this.parent = parent;
168156
this.index = index;
@@ -173,7 +161,7 @@ public AmbInnerSubscriber(AmbCoordinator<T> parent, int index, Subscriber<? supe
173161
public void onSubscribe(Subscription s) {
174162
if (!compareAndSet(null, s)) {
175163
s.cancel();
176-
if (get() != CANCELLED) {
164+
if (get() != SubscriptionHelper.CANCELLED) {
177165
SubscriptionHelper.reportSubscriptionSet();
178166
}
179167
return;
@@ -196,7 +184,7 @@ public void request(long n) {
196184
}
197185
BackpressureHelper.add(missedRequested, n);
198186
s = get();
199-
if (s != null && s != CANCELLED) {
187+
if (s != null && s != SubscriptionHelper.CANCELLED) {
200188
long r = missedRequested.getAndSet(0L);
201189
if (r != 0L) {
202190
s.request(r);
@@ -250,13 +238,7 @@ public void onComplete() {
250238

251239
@Override
252240
public void cancel() {
253-
Subscription s = get();
254-
if (s != CANCELLED) {
255-
s = getAndSet(CANCELLED);
256-
if (s != CANCELLED && s != null) {
257-
s.cancel();
258-
}
259-
}
241+
SubscriptionHelper.dispose(this);
260242
}
261243

262244
}

Diff for: src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferTimed.java

+7-21
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
package io.reactivex.internal.operators.flowable;
1515

16+
import io.reactivex.internal.disposables.DisposableHelper;
1617
import java.util.*;
1718
import java.util.concurrent.*;
1819
import java.util.concurrent.atomic.AtomicReference;
@@ -93,11 +94,6 @@ static final class BufferExactUnboundedSubscriber<T, U extends Collection<? supe
9394

9495
final AtomicReference<Disposable> timer = new AtomicReference<Disposable>();
9596

96-
static final Disposable CANCELLED = new Disposable() {
97-
@Override
98-
public void dispose() { }
99-
};
100-
10197
public BufferExactUnboundedSubscriber(
10298
Subscriber<? super U> actual, Supplier<U> bufferSupplier,
10399
long timespan, TimeUnit unit, Scheduler scheduler) {
@@ -158,7 +154,7 @@ public void onNext(T t) {
158154

159155
@Override
160156
public void onError(Throwable t) {
161-
disposeTimer();
157+
DisposableHelper.dispose(timer);
162158
synchronized (this) {
163159
buffer = null;
164160
}
@@ -167,7 +163,7 @@ public void onError(Throwable t) {
167163

168164
@Override
169165
public void onComplete() {
170-
disposeTimer();
166+
DisposableHelper.dispose(timer);
171167
U b;
172168
synchronized (this) {
173169
b = buffer;
@@ -190,21 +186,11 @@ public void request(long n) {
190186

191187
@Override
192188
public void cancel() {
193-
disposeTimer();
194-
189+
DisposableHelper.dispose(timer);
190+
195191
s.cancel();
196192
}
197-
198-
void disposeTimer() {
199-
Disposable d = timer.get();
200-
if (d != CANCELLED) {
201-
d = timer.getAndSet(CANCELLED);
202-
if (d != CANCELLED && d != null) {
203-
d.dispose();
204-
}
205-
}
206-
}
207-
193+
208194
@Override
209195
public void run() {
210196
/*
@@ -246,7 +232,7 @@ public void run() {
246232

247233
if (current == null) {
248234
selfCancel = true;
249-
disposeTimer();
235+
DisposableHelper.dispose(timer);
250236
return;
251237
}
252238

Diff for: src/main/java/io/reactivex/internal/operators/flowable/FlowableCombineLatest.java

+3-21
Original file line numberDiff line numberDiff line change
@@ -346,19 +346,7 @@ static final class CombinerSubscriber<T, R> implements Subscriber<T>, Subscripti
346346
boolean done;
347347

348348
final AtomicReference<Subscription> s = new AtomicReference<Subscription>();
349-
350-
static final Subscription CANCELLED = new Subscription() {
351-
@Override
352-
public void request(long n) {
353-
354-
}
355-
356-
@Override
357-
public void cancel() {
358-
359-
}
360-
};
361-
349+
362350
public CombinerSubscriber(LatestCoordinator<T, R> parent, int index) {
363351
this.parent = parent;
364352
this.index = index;
@@ -368,7 +356,7 @@ public CombinerSubscriber(LatestCoordinator<T, R> parent, int index) {
368356
public void onSubscribe(Subscription s) {
369357
if (!this.s.compareAndSet(null, s)) {
370358
s.cancel();
371-
if (this.s.get() != CANCELLED) {
359+
if (this.s.get() != SubscriptionHelper.CANCELLED) {
372360
SubscriptionHelper.reportSubscriptionSet();
373361
}
374362
return;
@@ -411,13 +399,7 @@ public void request(long n) {
411399

412400
@Override
413401
public void cancel() {
414-
Subscription a = s.get();
415-
if (a != CANCELLED) {
416-
a = s.getAndSet(CANCELLED);
417-
if (a != CANCELLED && a != null) {
418-
a.cancel();
419-
}
420-
}
402+
SubscriptionHelper.dispose(s);
421403
}
422404
}
423405
}

Diff for: src/main/java/io/reactivex/internal/operators/flowable/FlowableDebounce.java

+6-20
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
package io.reactivex.internal.operators.flowable;
1515

16+
import io.reactivex.internal.disposables.DisposableHelper;
1617
import java.util.concurrent.atomic.*;
1718

1819
import org.reactivestreams.*;
@@ -53,11 +54,6 @@ static final class DebounceSubscriber<T, U> extends AtomicLong
5354

5455
final AtomicReference<Disposable> debouncer = new AtomicReference<Disposable>();
5556

56-
static final Disposable CANCELLED = new Disposable() {
57-
@Override
58-
public void dispose() { }
59-
};
60-
6157
volatile long index;
6258

6359
boolean done;
@@ -118,7 +114,7 @@ public void onNext(T t) {
118114

119115
@Override
120116
public void onError(Throwable t) {
121-
disposeDebouncer();
117+
DisposableHelper.dispose(debouncer);
122118
actual.onError(t);
123119
}
124120

@@ -129,11 +125,11 @@ public void onComplete() {
129125
}
130126
done = true;
131127
Disposable d = debouncer.get();
132-
if (d != CANCELLED) {
128+
if (!DisposableHelper.isDisposed(d)) {
133129
@SuppressWarnings("unchecked")
134130
DebounceInnerSubscriber<T, U> dis = (DebounceInnerSubscriber<T, U>)d;
135131
dis.emit();
136-
disposeDebouncer();
132+
DisposableHelper.dispose(debouncer);
137133
actual.onComplete();
138134
}
139135
}
@@ -150,19 +146,9 @@ public void request(long n) {
150146
@Override
151147
public void cancel() {
152148
s.cancel();
153-
disposeDebouncer();
149+
DisposableHelper.dispose(debouncer);
154150
}
155-
156-
public void disposeDebouncer() {
157-
Disposable d = debouncer.get();
158-
if (d != CANCELLED) {
159-
d = debouncer.getAndSet(CANCELLED);
160-
if (d != CANCELLED && d != null) {
161-
d.dispose();
162-
}
163-
}
164-
}
165-
151+
166152
void emit(long idx, T value) {
167153
if (idx == index) {
168154
long r = get();

Diff for: src/main/java/io/reactivex/internal/operators/flowable/FlowableDebounceTimed.java

+5-25
Original file line numberDiff line numberDiff line change
@@ -60,16 +60,6 @@ static final class DebounceTimedSubscriber<T> extends AtomicLong
6060

6161
final AtomicReference<Disposable> timer = new AtomicReference<Disposable>();
6262

63-
static final Disposable CANCELLED = new Disposable() {
64-
@Override
65-
public void dispose() { }
66-
};
67-
68-
static final Disposable NEW_TIMER = new Disposable() {
69-
@Override
70-
public void dispose() { }
71-
};
72-
7363
volatile long index;
7464

7565
boolean done;
@@ -80,17 +70,7 @@ public DebounceTimedSubscriber(Subscriber<? super T> actual, long timeout, TimeU
8070
this.unit = unit;
8171
this.worker = worker;
8272
}
83-
84-
public void disposeTimer() {
85-
Disposable d = timer.get();
86-
if (d != CANCELLED) {
87-
d = timer.getAndSet(CANCELLED);
88-
if (d != CANCELLED && d != null) {
89-
d.dispose();
90-
}
91-
}
92-
}
93-
73+
9474
@Override
9575
public void onSubscribe(Subscription s) {
9676
if (SubscriptionHelper.validateSubscription(this.s, s)) {
@@ -132,7 +112,7 @@ public void onError(Throwable t) {
132112
return;
133113
}
134114
done = true;
135-
disposeTimer();
115+
DisposableHelper.dispose(timer);
136116
actual.onError(t);
137117
}
138118

@@ -144,11 +124,11 @@ public void onComplete() {
144124
done = true;
145125

146126
Disposable d = timer.get();
147-
if (d != CANCELLED) {
127+
if (!DisposableHelper.isDisposed(d)) {
148128
@SuppressWarnings("unchecked")
149129
DebounceEmitter<T> de = (DebounceEmitter<T>)d;
150130
de.emit();
151-
disposeTimer();
131+
DisposableHelper.dispose(timer);
152132
worker.dispose();
153133
actual.onComplete();
154134
}
@@ -164,7 +144,7 @@ public void request(long n) {
164144

165145
@Override
166146
public void cancel() {
167-
disposeTimer();
147+
DisposableHelper.dispose(timer);
168148
worker.dispose();
169149
s.cancel();
170150
}

0 commit comments

Comments
 (0)