Skip to content

Commit 5273cf0

Browse files
JakeWhartonakarnokd
authored andcommitted
Promote anonymous Observables to top-level types. (#4291)
1 parent c0aff9c commit 5273cf0

File tree

5 files changed

+186
-74
lines changed

5 files changed

+186
-74
lines changed

src/main/java/io/reactivex/Observable.java

+6-74
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import io.reactivex.disposables.*;
2424
import io.reactivex.exceptions.Exceptions;
2525
import io.reactivex.functions.*;
26-
import io.reactivex.internal.disposables.EmptyDisposable;
2726
import io.reactivex.internal.functions.Functions;
2827
import io.reactivex.internal.functions.Objects;
2928
import io.reactivex.internal.operators.flowable.FlowableFromObservable;
@@ -43,23 +42,6 @@
4342
public abstract class Observable<T> implements ObservableSource<T> {
4443
static final Object OBJECT = new Object();
4544

46-
/** An empty observable instance as there is no need to instantiate this more than once. */
47-
static final Observable<Object> EMPTY = new Observable<Object>() {
48-
@Override
49-
protected void subscribeActual(Observer<? super Object> o) {
50-
o.onSubscribe(EmptyDisposable.INSTANCE);
51-
o.onComplete();
52-
}
53-
};
54-
55-
/** A never observable instance as there is no need to instantiate this more than once. */
56-
static final Observable<Object> NEVER = new Observable<Object>() {
57-
@Override
58-
protected void subscribeActual(Observer<? super Object> o) {
59-
o.onSubscribe(EmptyDisposable.INSTANCE);
60-
}
61-
};
62-
6345
public static <T> Observable<T> amb(Iterable<? extends ObservableSource<? extends T>> sources) {
6446
Objects.requireNonNull(sources, "sources is null");
6547
return new ObservableAmb<T>(null, sources);
@@ -368,7 +350,7 @@ public static <T> Observable<T> defer(Callable<? extends ObservableSource<? exte
368350
@SchedulerSupport(SchedulerSupport.NONE)
369351
@SuppressWarnings("unchecked")
370352
public static <T> Observable<T> empty() {
371-
return (Observable<T>)EMPTY;
353+
return (Observable<T>) ObservableEmpty.INSTANCE;
372354
}
373355

374356
@SchedulerSupport(SchedulerSupport.NONE)
@@ -445,35 +427,7 @@ public static <T> Observable<T> fromIterable(Iterable<? extends T> source) {
445427

446428
public static <T> Observable<T> fromPublisher(final Publisher<? extends T> publisher) {
447429
Objects.requireNonNull(publisher, "publisher is null");
448-
return new Observable<T>() {
449-
@Override
450-
protected void subscribeActual(final Observer<? super T> o) {
451-
publisher.subscribe(new Subscriber<T>() {
452-
453-
@Override
454-
public void onComplete() {
455-
o.onComplete();
456-
}
457-
458-
@Override
459-
public void onError(Throwable t) {
460-
o.onError(t);
461-
}
462-
463-
@Override
464-
public void onNext(T t) {
465-
o.onNext(t);
466-
}
467-
468-
@Override
469-
public void onSubscribe(Subscription inner) {
470-
o.onSubscribe(Disposables.from(inner));
471-
inner.request(Long.MAX_VALUE);
472-
}
473-
474-
});
475-
}
476-
};
430+
return new ObservableFromPublisher<T>(publisher);
477431
}
478432

479433
@SchedulerSupport(SchedulerSupport.NONE)
@@ -850,7 +804,7 @@ public static <T> Observable<T> mergeDelayError(ObservableSource<? extends T>...
850804
@SchedulerSupport(SchedulerSupport.NONE)
851805
@SuppressWarnings("unchecked")
852806
public static <T> Observable<T> never() {
853-
return (Observable<T>)NEVER;
807+
return (Observable<T>) ObservableNever.INSTANCE;
854808
}
855809

856810
@SchedulerSupport(SchedulerSupport.NONE)
@@ -867,21 +821,7 @@ public static Observable<Integer> range(final int start, final int count) {
867821
if ((long)start + (count - 1) > Integer.MAX_VALUE) {
868822
throw new IllegalArgumentException("Integer overflow");
869823
}
870-
return new Observable<Integer>() {
871-
@Override
872-
protected void subscribeActual(Observer<? super Integer> o) {
873-
Disposable d = Disposables.empty();
874-
o.onSubscribe(d);
875-
876-
long end = start - 1L + count;
877-
for (long i = start; i <= end && !d.isDisposed(); i++) {
878-
o.onNext((int)i);
879-
}
880-
if (!d.isDisposed()) {
881-
o.onComplete();
882-
}
883-
}
884-
};
824+
return new ObservableRange(start, count);
885825
}
886826

887827
@SchedulerSupport(SchedulerSupport.NONE)
@@ -1110,13 +1050,7 @@ public final Observable<Boolean> any(Predicate<? super T> predicate) {
11101050

11111051
@SchedulerSupport(SchedulerSupport.NONE)
11121052
public final Observable<T> asObservable() {
1113-
final Observable<T> outer = this;
1114-
return new Observable<T>() {
1115-
@Override
1116-
protected void subscribeActual(Observer<? super T> o) {
1117-
outer.subscribe(o);
1118-
}
1119-
};
1053+
return new ObservableWrapper<T>(this);
11201054
}
11211055

11221056
@SchedulerSupport(SchedulerSupport.NONE)
@@ -3303,6 +3237,4 @@ public final <U, R> Observable<R> zipWith(ObservableSource<? extends U> other, B
33033237
public final <U, R> Observable<R> zipWith(ObservableSource<? extends U> other, BiFunction<? super T, ? super U, ? extends R> zipper, boolean delayError, int bufferSize) {
33043238
return zip(this, other, zipper, delayError, bufferSize);
33053239
}
3306-
3307-
3308-
}
3240+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
package io.reactivex.internal.operators.observable;
14+
15+
import io.reactivex.Observable;
16+
import io.reactivex.Observer;
17+
import io.reactivex.internal.disposables.EmptyDisposable;
18+
19+
public final class ObservableEmpty extends Observable<Object> {
20+
public static final Observable<Object> INSTANCE = new ObservableEmpty();
21+
22+
private ObservableEmpty() {
23+
}
24+
25+
@Override
26+
protected void subscribeActual(Observer<? super Object> o) {
27+
o.onSubscribe(EmptyDisposable.INSTANCE);
28+
o.onComplete();
29+
}
30+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
package io.reactivex.internal.operators.observable;
14+
15+
import io.reactivex.Observable;
16+
import io.reactivex.Observer;
17+
import io.reactivex.disposables.Disposable;
18+
import java.util.concurrent.atomic.AtomicBoolean;
19+
import org.reactivestreams.Publisher;
20+
import org.reactivestreams.Subscriber;
21+
import org.reactivestreams.Subscription;
22+
23+
public final class ObservableFromPublisher<T> extends Observable<T> {
24+
private final Publisher<? extends T> publisher;
25+
26+
public ObservableFromPublisher(Publisher<? extends T> publisher) {
27+
this.publisher = publisher;
28+
}
29+
30+
@Override
31+
protected void subscribeActual(final Observer<? super T> o) {
32+
publisher.subscribe(new PublisherSubscriber<T>(o));
33+
}
34+
35+
static final class PublisherSubscriber<T>
36+
extends AtomicBoolean
37+
implements Subscriber<T>, Disposable {
38+
39+
private final Observer<? super T> o;
40+
private Subscription inner;
41+
42+
PublisherSubscriber(Observer<? super T> o) {
43+
this.o = o;
44+
}
45+
46+
@Override
47+
public void onComplete() {
48+
o.onComplete();
49+
}
50+
51+
@Override
52+
public void onError(Throwable t) {
53+
o.onError(t);
54+
}
55+
56+
@Override
57+
public void onNext(T t) {
58+
o.onNext(t);
59+
}
60+
61+
@Override
62+
public void onSubscribe(Subscription inner) {
63+
this.inner = inner;
64+
o.onSubscribe(this);
65+
inner.request(Long.MAX_VALUE);
66+
}
67+
68+
@Override public void dispose() {
69+
if (compareAndSet(false, true)) {
70+
inner.cancel();
71+
inner = null;
72+
}
73+
}
74+
75+
@Override public boolean isDisposed() {
76+
return get();
77+
}
78+
}
79+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
package io.reactivex.internal.operators.observable;
14+
15+
import io.reactivex.Observable;
16+
import io.reactivex.Observer;
17+
import io.reactivex.internal.disposables.EmptyDisposable;
18+
19+
public final class ObservableNever extends Observable<Object> {
20+
public static final Observable<Object> INSTANCE = new ObservableNever();
21+
22+
private ObservableNever() {
23+
}
24+
25+
@Override
26+
protected void subscribeActual(Observer<? super Object> o) {
27+
o.onSubscribe(EmptyDisposable.INSTANCE);
28+
}
29+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
package io.reactivex.internal.operators.observable;
14+
15+
import io.reactivex.Observable;
16+
import io.reactivex.Observer;
17+
import io.reactivex.disposables.Disposable;
18+
import io.reactivex.disposables.Disposables;
19+
20+
public final class ObservableRange extends Observable<Integer> {
21+
private final int start;
22+
private final int count;
23+
24+
public ObservableRange(int start, int count) {
25+
this.start = start;
26+
this.count = count;
27+
}
28+
29+
@Override
30+
protected void subscribeActual(Observer<? super Integer> o) {
31+
Disposable d = Disposables.empty();
32+
o.onSubscribe(d);
33+
34+
long end = start - 1L + count;
35+
for (long i = start; i <= end && !d.isDisposed(); i++) {
36+
o.onNext((int)i);
37+
}
38+
if (!d.isDisposed()) {
39+
o.onComplete();
40+
}
41+
}
42+
}

0 commit comments

Comments
 (0)