Skip to content

Commit cb05a26

Browse files
authored
2.x: Improve coverage & related cleanup 03/05 (#5891)
* 2.x: Improve coverage & related cleanup 03/05 * Fix camelCase local variable naming errors in tests.
1 parent 51dd03b commit cb05a26

File tree

93 files changed

+2523
-433
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

93 files changed

+2523
-433
lines changed

src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableIterable.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -125,9 +125,7 @@ public T next() {
125125

126126
@Override
127127
public void onSubscribe(Subscription s) {
128-
if (SubscriptionHelper.setOnce(this, s)) {
129-
s.request(batchSize);
130-
}
128+
SubscriptionHelper.setOnce(this, s, batchSize);
131129
}
132130

133131
@Override

src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferBoundary.java

+2-6
Original file line numberDiff line numberDiff line change
@@ -327,9 +327,7 @@ static final class BufferOpenSubscriber<Open>
327327

328328
@Override
329329
public void onSubscribe(Subscription s) {
330-
if (SubscriptionHelper.setOnce(this, s)) {
331-
s.request(Long.MAX_VALUE);
332-
}
330+
SubscriptionHelper.setOnce(this, s, Long.MAX_VALUE);
333331
}
334332

335333
@Override
@@ -378,9 +376,7 @@ static final class BufferCloseSubscriber<T, C extends Collection<? super T>>
378376

379377
@Override
380378
public void onSubscribe(Subscription s) {
381-
if (SubscriptionHelper.setOnce(this, s)) {
382-
s.request(Long.MAX_VALUE);
383-
}
379+
SubscriptionHelper.setOnce(this, s, Long.MAX_VALUE);
384380
}
385381

386382
@Override

src/main/java/io/reactivex/internal/operators/flowable/FlowableCache.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -180,9 +180,7 @@ public void removeChild(ReplaySubscription<T> p) {
180180

181181
@Override
182182
public void onSubscribe(Subscription s) {
183-
if (SubscriptionHelper.setOnce(connection, s)) {
184-
s.request(Long.MAX_VALUE);
185-
}
183+
SubscriptionHelper.setOnce(connection, s, Long.MAX_VALUE);
186184
}
187185

188186
/**

src/main/java/io/reactivex/internal/operators/flowable/FlowableCombineLatest.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -516,9 +516,7 @@ static final class CombineLatestInnerSubscriber<T>
516516

517517
@Override
518518
public void onSubscribe(Subscription s) {
519-
if (SubscriptionHelper.setOnce(this, s)) {
520-
s.request(prefetch);
521-
}
519+
SubscriptionHelper.setOnce(this, s, prefetch);
522520
}
523521

524522
@Override

src/main/java/io/reactivex/internal/operators/flowable/FlowableGroupJoin.java

+2-6
Original file line numberDiff line numberDiff line change
@@ -417,9 +417,7 @@ public boolean isDisposed() {
417417

418418
@Override
419419
public void onSubscribe(Subscription s) {
420-
if (SubscriptionHelper.setOnce(this, s)) {
421-
s.request(Long.MAX_VALUE);
422-
}
420+
SubscriptionHelper.setOnce(this, s, Long.MAX_VALUE);
423421
}
424422

425423
@Override
@@ -470,9 +468,7 @@ public boolean isDisposed() {
470468

471469
@Override
472470
public void onSubscribe(Subscription s) {
473-
if (SubscriptionHelper.setOnce(this, s)) {
474-
s.request(Long.MAX_VALUE);
475-
}
471+
SubscriptionHelper.setOnce(this, s, Long.MAX_VALUE);
476472
}
477473

478474
@Override

src/main/java/io/reactivex/internal/operators/flowable/FlowableMergeWithMaybe.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -98,10 +98,8 @@ static final class MergeWithObserver<T> extends AtomicInteger
9898
}
9999

100100
@Override
101-
public void onSubscribe(Subscription d) {
102-
if (SubscriptionHelper.setOnce(mainSubscription, d)) {
103-
d.request(prefetch);
104-
}
101+
public void onSubscribe(Subscription s) {
102+
SubscriptionHelper.setOnce(mainSubscription, s, prefetch);
105103
}
106104

107105
@Override

src/main/java/io/reactivex/internal/operators/flowable/FlowableMergeWithSingle.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -98,10 +98,8 @@ static final class MergeWithObserver<T> extends AtomicInteger
9898
}
9999

100100
@Override
101-
public void onSubscribe(Subscription d) {
102-
if (SubscriptionHelper.setOnce(mainSubscription, d)) {
103-
d.request(prefetch);
104-
}
101+
public void onSubscribe(Subscription s) {
102+
SubscriptionHelper.setOnce(mainSubscription, s, prefetch);
105103
}
106104

107105
@Override

src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java

+10-30
Original file line numberDiff line numberDiff line change
@@ -526,36 +526,16 @@ static final class InnerSubscription<T> extends AtomicLong implements Subscripti
526526
public void request(long n) {
527527
// ignore negative requests
528528
if (SubscriptionHelper.validate(n)) {
529-
// In general, RxJava doesn't prevent concurrent requests (with each other or with
530-
// a cancel) so we need a CAS-loop, but we need to handle
531-
// request overflow and cancelled/not requested state as well.
532-
for (;;) {
533-
// get the current request amount
534-
long r = get();
535-
// if child called cancel() do nothing
536-
if (r == CANCELLED) {
537-
return;
538-
}
539-
// ignore zero requests except any first that sets in zero
540-
if (r >= 0L && n == 0) {
541-
return;
542-
}
543-
// otherwise, increase the request count
544-
long u = BackpressureHelper.addCap(r, n);
545-
546-
// try setting the new request value
547-
if (compareAndSet(r, u)) {
548-
// increment the total request counter
549-
BackpressureHelper.add(totalRequested, n);
550-
// if successful, notify the parent dispatcher this child can receive more
551-
// elements
552-
parent.manageRequests();
553-
554-
parent.buffer.replay(this);
555-
return;
556-
}
557-
// otherwise, someone else changed the state (perhaps a concurrent
558-
// request or cancellation) so retry
529+
// add to the current requested and cap it at MAX_VALUE
530+
// except when there was a concurrent cancellation
531+
if (BackpressureHelper.addCancel(this, n) != CANCELLED) {
532+
// increment the total request counter
533+
BackpressureHelper.add(totalRequested, n);
534+
// if successful, notify the parent dispatcher this child can receive more
535+
// elements
536+
parent.manageRequests();
537+
// try replaying any cached content
538+
parent.buffer.replay(this);
559539
}
560540
}
561541
}

src/main/java/io/reactivex/internal/operators/flowable/FlowableSamplePublisher.java

+3-5
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,8 @@ public void onComplete() {
9393
completeMain();
9494
}
9595

96-
boolean setOther(Subscription o) {
97-
return SubscriptionHelper.setOnce(other, o);
96+
void setOther(Subscription o) {
97+
SubscriptionHelper.setOnce(other, o, Long.MAX_VALUE);
9898
}
9999

100100
@Override
@@ -150,9 +150,7 @@ static final class SamplerSubscriber<T> implements FlowableSubscriber<Object> {
150150

151151
@Override
152152
public void onSubscribe(Subscription s) {
153-
if (parent.setOther(s)) {
154-
s.request(Long.MAX_VALUE);
155-
}
153+
parent.setOther(s);
156154
}
157155

158156
@Override

src/main/java/io/reactivex/internal/operators/flowable/FlowableSkipUntil.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -114,9 +114,7 @@ final class OtherSubscriber extends AtomicReference<Subscription>
114114

115115
@Override
116116
public void onSubscribe(Subscription s) {
117-
if (SubscriptionHelper.setOnce(this, s)) {
118-
s.request(Long.MAX_VALUE);
119-
}
117+
SubscriptionHelper.setOnce(this, s, Long.MAX_VALUE);
120118
}
121119

122120
@Override

src/main/java/io/reactivex/internal/operators/flowable/FlowableTakeUntil.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -99,9 +99,7 @@ final class OtherSubscriber extends AtomicReference<Subscription> implements Flo
9999

100100
@Override
101101
public void onSubscribe(Subscription s) {
102-
if (SubscriptionHelper.setOnce(this, s)) {
103-
s.request(Long.MAX_VALUE);
104-
}
102+
SubscriptionHelper.setOnce(this, s, Long.MAX_VALUE);
105103
}
106104

107105
@Override

src/main/java/io/reactivex/internal/operators/flowable/FlowableTimeout.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -343,9 +343,7 @@ static final class TimeoutConsumer extends AtomicReference<Subscription>
343343

344344
@Override
345345
public void onSubscribe(Subscription s) {
346-
if (SubscriptionHelper.setOnce(this, s)) {
347-
s.request(Long.MAX_VALUE);
348-
}
346+
SubscriptionHelper.setOnce(this, s, Long.MAX_VALUE);
349347
}
350348

351349
@Override

src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundary.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -95,9 +95,7 @@ static final class WindowBoundaryMainSubscriber<T, B>
9595

9696
@Override
9797
public void onSubscribe(Subscription d) {
98-
if (SubscriptionHelper.setOnce(upstream, d)) {
99-
d.request(Long.MAX_VALUE);
100-
}
98+
SubscriptionHelper.setOnce(upstream, d, Long.MAX_VALUE);
10199
}
102100

103101
@Override

src/main/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFromMany.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -268,9 +268,7 @@ static final class WithLatestInnerSubscriber
268268

269269
@Override
270270
public void onSubscribe(Subscription s) {
271-
if (SubscriptionHelper.setOnce(this, s)) {
272-
s.request(Long.MAX_VALUE);
273-
}
271+
SubscriptionHelper.setOnce(this, s, Long.MAX_VALUE);
274272
}
275273

276274
@Override

src/main/java/io/reactivex/internal/operators/maybe/MaybeDelayOtherPublisher.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -120,9 +120,7 @@ static final class OtherSubscriber<T> extends
120120

121121
@Override
122122
public void onSubscribe(Subscription s) {
123-
if (SubscriptionHelper.setOnce(this, s)) {
124-
s.request(Long.MAX_VALUE);
125-
}
123+
SubscriptionHelper.setOnce(this, s, Long.MAX_VALUE);
126124
}
127125

128126
@Override

src/main/java/io/reactivex/internal/operators/maybe/MaybeFromFuture.java

+5-10
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import io.reactivex.*;
1919
import io.reactivex.disposables.*;
20+
import io.reactivex.exceptions.Exceptions;
2021

2122
/**
2223
* Waits until the source Future completes or the wait times out; treats a {@code null}
@@ -50,17 +51,11 @@ protected void subscribeActual(MaybeObserver<? super T> observer) {
5051
} else {
5152
v = future.get(timeout, unit);
5253
}
53-
} catch (InterruptedException ex) {
54-
if (!d.isDisposed()) {
55-
observer.onError(ex);
56-
}
57-
return;
58-
} catch (ExecutionException ex) {
59-
if (!d.isDisposed()) {
60-
observer.onError(ex.getCause());
54+
} catch (Throwable ex) {
55+
if (ex instanceof ExecutionException) {
56+
ex = ex.getCause();
6157
}
62-
return;
63-
} catch (TimeoutException ex) {
58+
Exceptions.throwIfFatal(ex);
6459
if (!d.isDisposed()) {
6560
observer.onError(ex);
6661
}

src/main/java/io/reactivex/internal/operators/maybe/MaybeTakeUntilPublisher.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -132,9 +132,7 @@ static final class TakeUntilOtherMaybeObserver<U>
132132

133133
@Override
134134
public void onSubscribe(Subscription s) {
135-
if (SubscriptionHelper.setOnce(this, s)) {
136-
s.request(Long.MAX_VALUE);
137-
}
135+
SubscriptionHelper.setOnce(this, s, Long.MAX_VALUE);
138136
}
139137

140138
@Override

src/main/java/io/reactivex/internal/operators/maybe/MaybeTimeoutPublisher.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -155,9 +155,7 @@ static final class TimeoutOtherMaybeObserver<T, U>
155155

156156
@Override
157157
public void onSubscribe(Subscription s) {
158-
if (SubscriptionHelper.setOnce(this, s)) {
159-
s.request(Long.MAX_VALUE);
160-
}
158+
SubscriptionHelper.setOnce(this, s, Long.MAX_VALUE);
161159
}
162160

163161
@Override

src/main/java/io/reactivex/internal/operators/observable/ObservableSkip.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import io.reactivex.*;
1717
import io.reactivex.disposables.Disposable;
18+
import io.reactivex.internal.disposables.DisposableHelper;
1819

1920
public final class ObservableSkip<T> extends AbstractObservableWithUpstream<T, T> {
2021
final long n;
@@ -40,9 +41,11 @@ static final class SkipObserver<T> implements Observer<T>, Disposable {
4041
}
4142

4243
@Override
43-
public void onSubscribe(Disposable s) {
44-
this.d = s;
45-
actual.onSubscribe(this);
44+
public void onSubscribe(Disposable d) {
45+
if (DisposableHelper.validate(this.d, d)) {
46+
this.d = d;
47+
actual.onSubscribe(this);
48+
}
4649
}
4750

4851
@Override

src/main/java/io/reactivex/internal/operators/parallel/ParallelJoin.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -516,9 +516,7 @@ static final class JoinInnerSubscriber<T>
516516

517517
@Override
518518
public void onSubscribe(Subscription s) {
519-
if (SubscriptionHelper.setOnce(this, s)) {
520-
s.request(prefetch);
521-
}
519+
SubscriptionHelper.setOnce(this, s, prefetch);
522520
}
523521

524522
@Override

src/main/java/io/reactivex/internal/operators/parallel/ParallelReduceFull.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -180,9 +180,7 @@ static final class ParallelReduceFullInnerSubscriber<T>
180180

181181
@Override
182182
public void onSubscribe(Subscription s) {
183-
if (SubscriptionHelper.setOnce(this, s)) {
184-
s.request(Long.MAX_VALUE);
185-
}
183+
SubscriptionHelper.setOnce(this, s, Long.MAX_VALUE);
186184
}
187185

188186
@Override

src/main/java/io/reactivex/internal/operators/parallel/ParallelSortedJoin.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -280,9 +280,7 @@ static final class SortedJoinInnerSubscriber<T>
280280

281281
@Override
282282
public void onSubscribe(Subscription s) {
283-
if (SubscriptionHelper.setOnce(this, s)) {
284-
s.request(Long.MAX_VALUE);
285-
}
283+
SubscriptionHelper.setOnce(this, s, Long.MAX_VALUE);
286284
}
287285

288286
@Override

src/main/java/io/reactivex/internal/operators/single/SingleDelayWithSingle.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ static final class OtherObserver<T, U>
5454

5555
@Override
5656
public void onSubscribe(Disposable d) {
57-
if (DisposableHelper.set(this, d)) {
57+
if (DisposableHelper.setOnce(this, d)) {
5858

5959
actual.onSubscribe(this);
6060
}

src/main/java/io/reactivex/internal/operators/single/SingleTakeUntil.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -139,9 +139,7 @@ static final class TakeUntilOtherSubscriber
139139

140140
@Override
141141
public void onSubscribe(Subscription s) {
142-
if (SubscriptionHelper.setOnce(this, s)) {
143-
s.request(Long.MAX_VALUE);
144-
}
142+
SubscriptionHelper.setOnce(this, s, Long.MAX_VALUE);
145143
}
146144

147145
@Override

0 commit comments

Comments
 (0)