Skip to content

Commit 6b0f1b0

Browse files
JakeWhartonakarnokd
authored andcommitted
Elide more create/callback wrappers in favor of direct Observables. (#4289)
1 parent d4e8f29 commit 6b0f1b0

File tree

4 files changed

+29
-28
lines changed

4 files changed

+29
-28
lines changed

src/main/java/io/reactivex/Observable.java

+19-18
Original file line numberDiff line numberDiff line change
@@ -451,35 +451,35 @@ public static <T> Observable<T> fromIterable(Iterable<? extends T> source) {
451451

452452
public static <T> Observable<T> fromPublisher(final Publisher<? extends T> publisher) {
453453
Objects.requireNonNull(publisher, "publisher is null");
454-
return create(new ObservableConsumable<T>() {
454+
return new Observable<T>() {
455455
@Override
456-
public void subscribe(final Observer<? super T> s) {
456+
protected void subscribeActual(final Observer<? super T> o) {
457457
publisher.subscribe(new Subscriber<T>() {
458458

459459
@Override
460460
public void onComplete() {
461-
s.onComplete();
461+
o.onComplete();
462462
}
463463

464464
@Override
465465
public void onError(Throwable t) {
466-
s.onError(t);
466+
o.onError(t);
467467
}
468468

469469
@Override
470470
public void onNext(T t) {
471-
s.onNext(t);
471+
o.onNext(t);
472472
}
473473

474474
@Override
475475
public void onSubscribe(Subscription inner) {
476-
s.onSubscribe(Disposables.from(inner));
476+
o.onSubscribe(Disposables.from(inner));
477477
inner.request(Long.MAX_VALUE);
478478
}
479-
479+
480480
});
481481
}
482-
});
482+
};
483483
}
484484

485485
@SchedulerSupport(SchedulerSupport.NONE)
@@ -873,21 +873,21 @@ public static Observable<Integer> range(final int start, final int count) {
873873
if ((long)start + (count - 1) > Integer.MAX_VALUE) {
874874
throw new IllegalArgumentException("Integer overflow");
875875
}
876-
return create(new ObservableConsumable<Integer>() {
876+
return new Observable<Integer>() {
877877
@Override
878-
public void subscribe(Observer<? super Integer> s) {
878+
protected void subscribeActual(Observer<? super Integer> o) {
879879
Disposable d = Disposables.empty();
880-
s.onSubscribe(d);
880+
o.onSubscribe(d);
881881

882882
long end = start - 1L + count;
883883
for (long i = start; i <= end && !d.isDisposed(); i++) {
884-
s.onNext((int)i);
884+
o.onNext((int)i);
885885
}
886886
if (!d.isDisposed()) {
887-
s.onComplete();
887+
o.onComplete();
888888
}
889889
}
890-
});
890+
};
891891
}
892892

893893
@SchedulerSupport(SchedulerSupport.NONE)
@@ -1116,12 +1116,13 @@ public final Observable<Boolean> any(Predicate<? super T> predicate) {
11161116

11171117
@SchedulerSupport(SchedulerSupport.NONE)
11181118
public final Observable<T> asObservable() {
1119-
return create(new ObservableConsumable<T>() {
1119+
final Observable<T> outer = this;
1120+
return new Observable<T>() {
11201121
@Override
1121-
public void subscribe(Observer<? super T> s) {
1122-
Observable.this.subscribe(s);
1122+
protected void subscribeActual(Observer<? super T> o) {
1123+
outer.subscribe(o);
11231124
}
1124-
});
1125+
};
11251126
}
11261127

11271128
@SchedulerSupport(SchedulerSupport.NONE)

src/main/java/io/reactivex/internal/operators/observable/ObservablePublish.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -119,12 +119,12 @@ public void subscribe(Observer<? super T> child) {
119119

120120
public static <T, R> Observable<R> create(final ObservableConsumable<? extends T> source,
121121
final Function<? super Observable<T>, ? extends ObservableConsumable<R>> selector, final int bufferSize) {
122-
return create(new ObservableConsumable<R>() {
122+
return new Observable<R>() {
123123
@Override
124-
public void subscribe(Observer<? super R> sr) {
125-
ConnectableObservable<T> op = create(source, bufferSize);
124+
protected void subscribeActual(Observer<? super R> o) {
125+
ConnectableObservable<T> op = ObservablePublish.create(source, bufferSize);
126126

127-
final ObserverResourceWrapper<R> srw = new ObserverResourceWrapper<R>(sr);
127+
final ObserverResourceWrapper<R> srw = new ObserverResourceWrapper<R>(o);
128128

129129
ObservableConsumable<R> target;
130130

@@ -145,7 +145,7 @@ public void accept(Disposable r) {
145145
}
146146
});
147147
}
148-
});
148+
};
149149
}
150150

151151
private ObservablePublish(ObservableConsumable<T> onSubscribe, ObservableConsumable<? extends T> source,

src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,9 @@ public Object call() {
5858
public static <U, R> Observable<R> multicastSelector(
5959
final Callable<? extends ConnectableObservable<U>> connectableFactory,
6060
final Function<? super Observable<U>, ? extends ObservableConsumable<R>> selector) {
61-
return Observable.create(new ObservableConsumable<R>() {
61+
return new Observable<R>() {
6262
@Override
63-
public void subscribe(Observer<? super R> child) {
63+
protected void subscribeActual(Observer<? super R> child) {
6464
ConnectableObservable<U> co;
6565
ObservableConsumable<R> observable;
6666
try {
@@ -82,7 +82,7 @@ public void accept(Disposable r) {
8282
}
8383
});
8484
}
85-
});
85+
};
8686
}
8787

8888
/**

src/main/java/io/reactivex/observables/ConnectableObservable.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public void accept(Disposable d) {
7575
* @see <a href="http://reactivex.io/documentation/operators/refcount.html">ReactiveX documentation: RefCount</a>
7676
*/
7777
public Observable<T> refCount() {
78-
return create(new ObservableRefCount<T>(this));
78+
return new ObservableRefCount<T>(this);
7979
}
8080

8181
/**
@@ -121,6 +121,6 @@ public Observable<T> autoConnect(int numberOfSubscribers, Consumer<? super Dispo
121121
this.connect(connection);
122122
return this;
123123
}
124-
return create(new ObservableAutoConnect<T>(this, numberOfSubscribers, connection));
124+
return new ObservableAutoConnect<T>(this, numberOfSubscribers, connection);
125125
}
126126
}

0 commit comments

Comments
 (0)