Skip to content

Commit ed9a2c4

Browse files
vanniktechakarnokd
authored andcommitted
2.x: Rename Single Base Interface Types for consistency (#4297)
1 parent 4aa80cd commit ed9a2c4

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+401
-369
lines changed

src/main/java/io/reactivex/Completable.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,7 @@ public static Completable fromRunnable(final Runnable run) {
310310
* @throws NullPointerException if single is null
311311
*/
312312
@SchedulerSupport(SchedulerSupport.NONE)
313-
public static <T> Completable fromSingle(final SingleConsumable<T> single) {
313+
public static <T> Completable fromSingle(final SingleSource<T> single) {
314314
Objects.requireNonNull(single, "single is null");
315315
return new CompletableFromSingle<T>(single);
316316
}

src/main/java/io/reactivex/Observable.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -3152,9 +3152,9 @@ public void request(long n) {
31523152

31533153
@SchedulerSupport(SchedulerSupport.NONE)
31543154
public final Single<T> toSingle() {
3155-
return Single.create(new SingleConsumable<T>() {
3155+
return Single.create(new SingleSource<T>() {
31563156
@Override
3157-
public void subscribe(final SingleSubscriber<? super T> s) {
3157+
public void subscribe(final SingleObserver<? super T> s) {
31583158
Observable.this.subscribe(new Observer<T>() {
31593159
T last;
31603160
@Override

src/main/java/io/reactivex/Single.java

+114-122
Large diffs are not rendered by default.

src/main/java/io/reactivex/SingleSubscriber.java src/main/java/io/reactivex/SingleObserver.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,11 @@
1515

1616
import io.reactivex.disposables.Disposable;
1717

18-
public interface SingleSubscriber<T> {
18+
public interface SingleObserver<T> {
1919

2020
void onSubscribe(Disposable d);
2121

2222
void onSuccess(T value);
2323

2424
void onError(Throwable e);
25-
}
25+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex;
15+
16+
import io.reactivex.functions.Function;
17+
18+
public interface SingleOperator<Downstream, Upstream> extends Function<SingleObserver<? super Downstream>, SingleObserver<? super Upstream>> {
19+
20+
}

src/main/java/io/reactivex/SingleConsumable.java src/main/java/io/reactivex/SingleSource.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,15 @@
1414

1515
/**
1616
* Represents a basic {@link Single} source base interface,
17-
* consumable via an {@link SingleSubscriber}.
17+
* consumable via an {@link SingleObserver}.
1818
* <p>
1919
* This class also serves the base type for custom operators wrapped into
20-
* Single via {@link Single#create(SingleConsumable)}.
20+
* Single via {@link Single#create(SingleSource)}.
2121
*
2222
* @param <T> the element type
2323
* @since 2.0
2424
*/
25-
public interface SingleConsumable<T> {
25+
public interface SingleSource<T> {
2626

27-
void subscribe(SingleSubscriber<? super T> s);
27+
void subscribe(SingleObserver<? super T> s);
2828
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex;
15+
16+
import io.reactivex.functions.Function;
17+
18+
public interface SingleTransformer<Upstream, Downstream> extends Function<Single<Upstream>, SingleSource<Downstream>> {
19+
20+
}

src/main/java/io/reactivex/internal/disposables/EmptyDisposable.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public static void error(Throwable e, CompletableSubscriber s) {
5050
s.onError(e);
5151
}
5252

53-
public static void error(Throwable e, SingleSubscriber<?> s) {
53+
public static void error(Throwable e, SingleObserver<?> s) {
5454
s.onSubscribe(INSTANCE);
5555
s.onError(e);
5656
}

src/main/java/io/reactivex/internal/operators/completable/CompletableFromSingle.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,15 @@
1818

1919
public final class CompletableFromSingle<T> extends Completable {
2020

21-
final SingleConsumable<T> single;
21+
final SingleSource<T> single;
2222

23-
public CompletableFromSingle(SingleConsumable<T> single) {
23+
public CompletableFromSingle(SingleSource<T> single) {
2424
this.single = single;
2525
}
2626

2727
@Override
2828
protected void subscribeActual(final CompletableSubscriber s) {
29-
single.subscribe(new SingleSubscriber<T>() {
29+
single.subscribe(new SingleObserver<T>() {
3030

3131
@Override
3232
public void onError(Throwable e) {

src/main/java/io/reactivex/internal/operators/completable/CompletableToSingle.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public CompletableToSingle(CompletableConsumable source,
3333
}
3434

3535
@Override
36-
protected void subscribeActual(final SingleSubscriber<? super T> s) {
36+
protected void subscribeActual(final SingleObserver<? super T> s) {
3737
source.subscribe(new CompletableSubscriber() {
3838

3939
@Override

src/main/java/io/reactivex/internal/operators/single/SingleAmbArray.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -21,20 +21,20 @@
2121

2222
public final class SingleAmbArray<T> extends Single<T> {
2323

24-
final SingleConsumable<? extends T>[] sources;
24+
final SingleSource<? extends T>[] sources;
2525

26-
public SingleAmbArray(SingleConsumable<? extends T>[] sources) {
26+
public SingleAmbArray(SingleSource<? extends T>[] sources) {
2727
this.sources = sources;
2828
}
2929

3030
@Override
31-
protected void subscribeActual(final SingleSubscriber<? super T> s) {
31+
protected void subscribeActual(final SingleObserver<? super T> s) {
3232

3333
final AtomicBoolean once = new AtomicBoolean();
3434
final CompositeDisposable set = new CompositeDisposable();
3535
s.onSubscribe(set);
3636

37-
for (SingleConsumable<? extends T> s1 : sources) {
37+
for (SingleSource<? extends T> s1 : sources) {
3838
if (once.get()) {
3939
return;
4040
}
@@ -50,7 +50,7 @@ protected void subscribeActual(final SingleSubscriber<? super T> s) {
5050
return;
5151
}
5252

53-
s1.subscribe(new SingleSubscriber<T>() {
53+
s1.subscribe(new SingleObserver<T>() {
5454

5555
@Override
5656
public void onSubscribe(Disposable d) {

src/main/java/io/reactivex/internal/operators/single/SingleAmbIterable.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -22,18 +22,18 @@
2222

2323
public final class SingleAmbIterable<T> extends Single<T> {
2424

25-
final Iterable<? extends SingleConsumable<? extends T>> sources;
25+
final Iterable<? extends SingleSource<? extends T>> sources;
2626

27-
public SingleAmbIterable(Iterable<? extends SingleConsumable<? extends T>> sources) {
27+
public SingleAmbIterable(Iterable<? extends SingleSource<? extends T>> sources) {
2828
this.sources = sources;
2929
}
3030

3131
@Override
32-
protected void subscribeActual(final SingleSubscriber<? super T> s) {
32+
protected void subscribeActual(final SingleObserver<? super T> s) {
3333
final CompositeDisposable set = new CompositeDisposable();
3434
s.onSubscribe(set);
3535

36-
Iterator<? extends SingleConsumable<? extends T>> iterator;
36+
Iterator<? extends SingleSource<? extends T>> iterator;
3737

3838
try {
3939
iterator = sources.iterator();
@@ -76,7 +76,7 @@ protected void subscribeActual(final SingleSubscriber<? super T> s) {
7676
return;
7777
}
7878

79-
SingleConsumable<? extends T> s1;
79+
SingleSource<? extends T> s1;
8080

8181
try {
8282
s1 = iterator.next();
@@ -92,7 +92,7 @@ protected void subscribeActual(final SingleSubscriber<? super T> s) {
9292
return;
9393
}
9494

95-
s1.subscribe(new SingleSubscriber<T>() {
95+
s1.subscribe(new SingleObserver<T>() {
9696

9797
@Override
9898
public void onSubscribe(Disposable d) {

src/main/java/io/reactivex/internal/operators/single/SingleAwait.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,12 @@
2323
public enum SingleAwait {
2424
;
2525

26-
public static <T> T get(SingleConsumable<T> source) {
26+
public static <T> T get(SingleSource<T> source) {
2727
final AtomicReference<T> valueRef = new AtomicReference<T>();
2828
final AtomicReference<Throwable> errorRef = new AtomicReference<Throwable>();
2929
final CountDownLatch cdl = new CountDownLatch(1);
3030

31-
source.subscribe(new SingleSubscriber<T>() {
31+
source.subscribe(new SingleObserver<T>() {
3232
@Override
3333
public void onError(Throwable e) {
3434
errorRef.lazySet(e);

src/main/java/io/reactivex/internal/operators/single/SingleCache.java

+12-12
Original file line numberDiff line numberDiff line change
@@ -23,21 +23,21 @@
2323

2424
public final class SingleCache<T> extends Single<T> {
2525

26-
final SingleConsumable<? extends T> source;
26+
final SingleSource<? extends T> source;
2727

2828
final AtomicInteger wip;
2929
final AtomicReference<Object> notification;
30-
final List<SingleSubscriber<? super T>> subscribers;
30+
final List<SingleObserver<? super T>> subscribers;
3131

32-
public SingleCache(SingleConsumable<? extends T> source) {
32+
public SingleCache(SingleSource<? extends T> source) {
3333
this.source = source;
3434
this.wip = new AtomicInteger();
3535
this.notification = new AtomicReference<Object>();
36-
this.subscribers = new ArrayList<SingleSubscriber<? super T>>();
36+
this.subscribers = new ArrayList<SingleObserver<? super T>>();
3737
}
3838

3939
@Override
40-
protected void subscribeActual(SingleSubscriber<? super T> s) {
40+
protected void subscribeActual(SingleObserver<? super T> s) {
4141

4242
Object o = notification.get();
4343
if (o != null) {
@@ -70,7 +70,7 @@ protected void subscribeActual(SingleSubscriber<? super T> s) {
7070
return;
7171
}
7272

73-
source.subscribe(new SingleSubscriber<T>() {
73+
source.subscribe(new SingleObserver<T>() {
7474

7575
@Override
7676
public void onSubscribe(Disposable d) {
@@ -80,25 +80,25 @@ public void onSubscribe(Disposable d) {
8080
@Override
8181
public void onSuccess(T value) {
8282
notification.set(NotificationLite.next(value));
83-
List<SingleSubscriber<? super T>> list;
83+
List<SingleObserver<? super T>> list;
8484
synchronized (subscribers) {
85-
list = new ArrayList<SingleSubscriber<? super T>>(subscribers);
85+
list = new ArrayList<SingleObserver<? super T>>(subscribers);
8686
subscribers.clear();
8787
}
88-
for (SingleSubscriber<? super T> s1 : list) {
88+
for (SingleObserver<? super T> s1 : list) {
8989
s1.onSuccess(value);
9090
}
9191
}
9292

9393
@Override
9494
public void onError(Throwable e) {
9595
notification.set(NotificationLite.error(e));
96-
List<SingleSubscriber<? super T>> list;
96+
List<SingleObserver<? super T>> list;
9797
synchronized (subscribers) {
98-
list = new ArrayList<SingleSubscriber<? super T>>(subscribers);
98+
list = new ArrayList<SingleObserver<? super T>>(subscribers);
9999
subscribers.clear();
100100
}
101-
for (SingleSubscriber<? super T> s1 : list) {
101+
for (SingleObserver<? super T> s1 : list) {
102102
s1.onError(e);
103103
}
104104
}

src/main/java/io/reactivex/internal/operators/single/SingleContains.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -20,22 +20,22 @@
2020

2121
public final class SingleContains<T> extends Single<Boolean> {
2222

23-
final SingleConsumable<T> source;
23+
final SingleSource<T> source;
2424

2525
final Object value;
2626

2727
final BiPredicate<Object, Object> comparer;
2828

29-
public SingleContains(SingleConsumable<T> source, Object value, BiPredicate<Object, Object> comparer) {
29+
public SingleContains(SingleSource<T> source, Object value, BiPredicate<Object, Object> comparer) {
3030
this.source = source;
3131
this.value = value;
3232
this.comparer = comparer;
3333
}
3434

3535
@Override
36-
protected void subscribeActual(final SingleSubscriber<? super Boolean> s) {
36+
protected void subscribeActual(final SingleObserver<? super Boolean> s) {
3737

38-
source.subscribe(new SingleSubscriber<T>() {
38+
source.subscribe(new SingleObserver<T>() {
3939

4040
@Override
4141
public void onSubscribe(Disposable d) {

src/main/java/io/reactivex/internal/operators/single/SingleDefer.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,15 @@
2020

2121
public final class SingleDefer<T> extends Single<T> {
2222

23-
final Callable<? extends SingleConsumable<? extends T>> singleSupplier;
23+
final Callable<? extends SingleSource<? extends T>> singleSupplier;
2424

25-
public SingleDefer(Callable<? extends SingleConsumable<? extends T>> singleSupplier) {
25+
public SingleDefer(Callable<? extends SingleSource<? extends T>> singleSupplier) {
2626
this.singleSupplier = singleSupplier;
2727
}
2828

2929
@Override
30-
protected void subscribeActual(SingleSubscriber<? super T> s) {
31-
SingleConsumable<? extends T> next;
30+
protected void subscribeActual(SingleObserver<? super T> s) {
31+
SingleSource<? extends T> next;
3232

3333
try {
3434
next = singleSupplier.call();

src/main/java/io/reactivex/internal/operators/single/SingleDelay.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -21,24 +21,24 @@
2121
public final class SingleDelay<T> extends Single<T> {
2222

2323

24-
final SingleConsumable<? extends T> source;
24+
final SingleSource<? extends T> source;
2525
final long time;
2626
final TimeUnit unit;
2727
final Scheduler scheduler;
2828

29-
public SingleDelay(SingleConsumable<? extends T> source, long time, TimeUnit unit, Scheduler scheduler) {
29+
public SingleDelay(SingleSource<? extends T> source, long time, TimeUnit unit, Scheduler scheduler) {
3030
this.source = source;
3131
this.time = time;
3232
this.unit = unit;
3333
this.scheduler = scheduler;
3434
}
3535

3636
@Override
37-
protected void subscribeActual(final SingleSubscriber<? super T> s) {
37+
protected void subscribeActual(final SingleObserver<? super T> s) {
3838

3939
final SerialDisposable sd = new SerialDisposable();
4040
s.onSubscribe(sd);
41-
source.subscribe(new SingleSubscriber<T>() {
41+
source.subscribe(new SingleObserver<T>() {
4242
@Override
4343
public void onSubscribe(Disposable d) {
4444
sd.replace(d);

0 commit comments

Comments
 (0)