Skip to content

Commit 23a77e8

Browse files
authored
2.x: coverage and cleanup 10/04-1 (#4666)
1 parent df94c0d commit 23a77e8

31 files changed

+3126
-119
lines changed

src/main/java/io/reactivex/Maybe.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -2670,13 +2670,13 @@ public final Maybe<T> hide() {
26702670
* <dd>{@code ignoreElement} does not operate by default on a particular {@link Scheduler}.</dd>
26712671
* </dl>
26722672
*
2673-
* @return an empty Maybe that only calls {@code onComplete} or {@code onError}, based on which one is
2673+
* @return an empty Completable that only calls {@code onComplete} or {@code onError}, based on which one is
26742674
* called by the source Maybe
26752675
* @see <a href="http://reactivex.io/documentation/operators/ignoreelements.html">ReactiveX operators documentation: IgnoreElements</a>
26762676
*/
26772677
@SchedulerSupport(SchedulerSupport.NONE)
2678-
public final Maybe<T> ignoreElement() {
2679-
return RxJavaPlugins.onAssembly(new MaybeIgnoreElement<T>(this));
2678+
public final Completable ignoreElement() {
2679+
return RxJavaPlugins.onAssembly(new MaybeIgnoreElementCompletable<T>(this));
26802680
}
26812681

26822682
/**

src/main/java/io/reactivex/internal/disposables/DisposableHelper.java

+5
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,11 @@ public static boolean replace(AtomicReference<Disposable> field, Disposable d) {
9797
}
9898
}
9999

100+
/**
101+
* Atomically disposes the Disposable in the field if not already disposed.
102+
* @param field the target field
103+
* @return true if the curren thread managed to dispose the Disposable
104+
*/
100105
public static boolean dispose(AtomicReference<Disposable> field) {
101106
Disposable current = field.get();
102107
Disposable d = DISPOSED;

src/main/java/io/reactivex/internal/observers/ToNotificationObserver.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
import io.reactivex.*;
1717
import io.reactivex.disposables.Disposable;
18-
import io.reactivex.exceptions.Exceptions;
18+
import io.reactivex.exceptions.*;
1919
import io.reactivex.functions.Consumer;
2020
import io.reactivex.internal.disposables.DisposableHelper;
2121
import io.reactivex.plugins.RxJavaPlugins;
@@ -58,7 +58,7 @@ public void onError(Throwable t) {
5858
consumer.accept(Notification.<Object>createOnError(t));
5959
} catch (Throwable ex) {
6060
Exceptions.throwIfFatal(ex);
61-
RxJavaPlugins.onError(ex);
61+
RxJavaPlugins.onError(new CompositeException(t, ex));
6262
}
6363
}
6464

src/main/java/io/reactivex/internal/operators/flowable/FlowableFromIterable.java

-44
Original file line numberDiff line numberDiff line change
@@ -261,28 +261,6 @@ void slowPath(long r) {
261261

262262
r = get();
263263
if (e == r) {
264-
265-
if (cancelled) {
266-
return;
267-
}
268-
269-
boolean b;
270-
271-
try {
272-
b = it.hasNext();
273-
} catch (Throwable ex) {
274-
Exceptions.throwIfFatal(ex);
275-
a.onError(ex);
276-
return;
277-
}
278-
279-
if (!b) {
280-
if (!cancelled) {
281-
a.onComplete();
282-
}
283-
return;
284-
}
285-
286264
r = addAndGet(-e);
287265
if (r == 0L) {
288266
return;
@@ -423,28 +401,6 @@ void slowPath(long r) {
423401

424402
r = get();
425403
if (e == r) {
426-
427-
if (cancelled) {
428-
return;
429-
}
430-
431-
boolean hasNext;
432-
433-
try {
434-
hasNext = it.hasNext();
435-
} catch (Throwable ex) {
436-
Exceptions.throwIfFatal(ex);
437-
a.onError(ex);
438-
return;
439-
}
440-
441-
if (!hasNext) {
442-
if (!cancelled) {
443-
a.onComplete();
444-
}
445-
return;
446-
}
447-
448404
r = addAndGet(-e);
449405
if (r == 0L) {
450406
return;

src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowTimed.java

+9-9
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import io.reactivex.*;
2424
import io.reactivex.Scheduler.Worker;
2525
import io.reactivex.disposables.Disposable;
26-
import io.reactivex.exceptions.Exceptions;
26+
import io.reactivex.exceptions.*;
2727
import io.reactivex.internal.disposables.DisposableHelper;
2828
import io.reactivex.internal.fuseable.SimpleQueue;
2929
import io.reactivex.internal.queue.MpscLinkedQueue;
@@ -126,7 +126,7 @@ public void onSubscribe(Subscription s) {
126126
} else {
127127
cancelled = true;
128128
s.cancel();
129-
a.onError(new IllegalStateException("Could not deliver first window due to lack of requests."));
129+
a.onError(new MissingBackpressureException("Could not deliver first window due to lack of requests."));
130130
return;
131131
}
132132

@@ -281,7 +281,7 @@ void drainLoop() {
281281
queue.clear();
282282
s.cancel();
283283
dispose();
284-
a.onError(new IllegalStateException("Could not deliver first window due to lack of requests."));
284+
a.onError(new MissingBackpressureException("Could not deliver first window due to lack of requests."));
285285
return;
286286
}
287287
} else {
@@ -374,7 +374,7 @@ public void onSubscribe(Subscription s) {
374374
} else {
375375
cancelled = true;
376376
s.cancel();
377-
a.onError(new IllegalStateException("Could not deliver initial window due to lack of requests."));
377+
a.onError(new MissingBackpressureException("Could not deliver initial window due to lack of requests."));
378378
return;
379379
}
380380

@@ -436,7 +436,7 @@ public void onNext(T t) {
436436
window = null;
437437
s.cancel();
438438
dispose();
439-
actual.onError(new IllegalStateException("Could not deliver window due to lack of requests"));
439+
actual.onError(new MissingBackpressureException("Could not deliver window due to lack of requests"));
440440
return;
441441
}
442442
} else {
@@ -572,7 +572,7 @@ void drainLoop() {
572572
queue.clear();
573573
s.cancel();
574574
dispose();
575-
a.onError(new IllegalStateException("Could not deliver first window due to lack of requests."));
575+
a.onError(new MissingBackpressureException("Could not deliver first window due to lack of requests."));
576576
return;
577577
}
578578
}
@@ -613,7 +613,7 @@ void drainLoop() {
613613
window = null;
614614
s.cancel();
615615
dispose();
616-
actual.onError(new IllegalStateException("Could not deliver window due to lack of requests"));
616+
actual.onError(new MissingBackpressureException("Could not deliver window due to lack of requests"));
617617
return;
618618
}
619619
} else {
@@ -719,7 +719,7 @@ public void run() {
719719

720720
} else {
721721
s.cancel();
722-
actual.onError(new IllegalStateException("Could not emit the first window due to lack of requests"));
722+
actual.onError(new MissingBackpressureException("Could not emit the first window due to lack of requests"));
723723
}
724724
}
725725

@@ -878,7 +878,7 @@ public void run() {
878878
}
879879
}, timespan, unit);
880880
} else {
881-
a.onError(new IllegalStateException("Can't emit window due to lack of requests"));
881+
a.onError(new MissingBackpressureException("Can't emit window due to lack of requests"));
882882
continue;
883883
}
884884
} else {

src/main/java/io/reactivex/internal/operators/maybe/MaybeFromFuture.java

+27-33
Original file line numberDiff line numberDiff line change
@@ -42,43 +42,37 @@ public MaybeFromFuture(Future<? extends T> future, long timeout, TimeUnit unit)
4242
protected void subscribeActual(MaybeObserver<? super T> observer) {
4343
Disposable d = Disposables.empty();
4444
observer.onSubscribe(d);
45-
if (d.isDisposed()) {
46-
return;
47-
}
48-
49-
T v;
50-
try {
51-
if (timeout <= 0L) {
52-
v = future.get();
53-
} else {
54-
v = future.get(timeout, unit);
55-
}
56-
} catch (InterruptedException ex) {
57-
if (d.isDisposed()) {
45+
if (!d.isDisposed()) {
46+
T v;
47+
try {
48+
if (timeout <= 0L) {
49+
v = future.get();
50+
} else {
51+
v = future.get(timeout, unit);
52+
}
53+
} catch (InterruptedException ex) {
54+
if (!d.isDisposed()) {
55+
observer.onError(ex);
56+
}
5857
return;
59-
}
60-
observer.onError(ex);
61-
return;
62-
} catch (ExecutionException ex) {
63-
if (d.isDisposed()) {
58+
} catch (ExecutionException ex) {
59+
if (!d.isDisposed()) {
60+
observer.onError(ex.getCause());
61+
}
6462
return;
65-
}
66-
observer.onError(ex.getCause());
67-
return;
68-
} catch (TimeoutException ex) {
69-
if (d.isDisposed()) {
63+
} catch (TimeoutException ex) {
64+
if (!d.isDisposed()) {
65+
observer.onError(ex);
66+
}
7067
return;
7168
}
72-
observer.onError(ex);
73-
return;
74-
}
75-
if (d.isDisposed()) {
76-
return;
77-
}
78-
if (v == null) {
79-
observer.onComplete();
80-
} else {
81-
observer.onSuccess(v);
69+
if (!d.isDisposed()) {
70+
if (v == null) {
71+
observer.onComplete();
72+
} else {
73+
observer.onSuccess(v);
74+
}
75+
}
8276
}
8377
}
8478
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.maybe;
15+
16+
import io.reactivex.*;
17+
import io.reactivex.disposables.Disposable;
18+
import io.reactivex.internal.disposables.DisposableHelper;
19+
import io.reactivex.internal.fuseable.FuseToMaybe;
20+
import io.reactivex.plugins.RxJavaPlugins;
21+
22+
/**
23+
* Turns an onSuccess into an onComplete, onError and onComplete is relayed as is.
24+
*
25+
* @param <T> the value type
26+
*/
27+
public final class MaybeIgnoreElementCompletable<T> extends Completable implements FuseToMaybe<T> {
28+
29+
final MaybeSource<T> source;
30+
31+
public MaybeIgnoreElementCompletable(MaybeSource<T> source) {
32+
this.source = source;
33+
}
34+
35+
@Override
36+
protected void subscribeActual(CompletableObserver observer) {
37+
source.subscribe(new IgnoreMaybeObserver<T>(observer));
38+
}
39+
40+
@Override
41+
public Maybe<T> fuseToMaybe() {
42+
return RxJavaPlugins.onAssembly(new MaybeIgnoreElement<T>(source));
43+
}
44+
45+
static final class IgnoreMaybeObserver<T> implements MaybeObserver<T>, Disposable {
46+
47+
final CompletableObserver actual;
48+
49+
Disposable d;
50+
51+
IgnoreMaybeObserver(CompletableObserver actual) {
52+
this.actual = actual;
53+
}
54+
55+
@Override
56+
public void onSubscribe(Disposable d) {
57+
if (DisposableHelper.validate(this.d, d)) {
58+
this.d = d;
59+
60+
actual.onSubscribe(this);
61+
}
62+
}
63+
64+
@Override
65+
public void onSuccess(T value) {
66+
d = DisposableHelper.DISPOSED;
67+
actual.onComplete();
68+
}
69+
70+
@Override
71+
public void onError(Throwable e) {
72+
d = DisposableHelper.DISPOSED;
73+
actual.onError(e);
74+
}
75+
76+
@Override
77+
public void onComplete() {
78+
d = DisposableHelper.DISPOSED;
79+
actual.onComplete();
80+
}
81+
82+
@Override
83+
public boolean isDisposed() {
84+
return d.isDisposed();
85+
}
86+
87+
@Override
88+
public void dispose() {
89+
d.dispose();
90+
d = DisposableHelper.DISPOSED;
91+
}
92+
93+
}
94+
95+
}

0 commit comments

Comments
 (0)