Skip to content

Commit 2de974a

Browse files
vanniktechakarnokd
authored andcommitted
2.x: Rename Observable Base Interface Types for consistency (#4300)
1 parent e81d399 commit 2de974a

File tree

138 files changed

+809
-777
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

138 files changed

+809
-777
lines changed

src/main/java/io/reactivex/Completable.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ public Object call() throws Exception {
266266
* @throws NullPointerException if flowable is null
267267
*/
268268
@SchedulerSupport(SchedulerSupport.NONE)
269-
public static <T> Completable fromObservable(final ObservableConsumable<T> observable) {
269+
public static <T> Completable fromObservable(final ObservableSource<T> observable) {
270270
Objects.requireNonNull(observable, "observable is null");
271271
return new CompletableFromObservable<T>(observable);
272272
}
@@ -817,7 +817,7 @@ public final Completable endWith(CompletableSource other) {
817817
* @throws NullPointerException if next is null
818818
*/
819819
@SchedulerSupport(SchedulerSupport.CUSTOM)
820-
public final <T> Observable<T> endWith(ObservableConsumable<T> next) {
820+
public final <T> Observable<T> endWith(ObservableSource<T> next) {
821821
return this.<T>toObservable().endWith(next);
822822
}
823823

src/main/java/io/reactivex/Observable.java

+229-237
Large diffs are not rendered by default.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex;
15+
16+
import io.reactivex.functions.Function;
17+
18+
public interface ObservableOperator<Downstream, Upstream> extends Function<Observer<? super Downstream>, Observer<? super Upstream>> {
19+
20+
}

src/main/java/io/reactivex/ObservableConsumable.java src/main/java/io/reactivex/ObservableSource.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@
1717
* consumable via an {@link Observer}.
1818
* <p>
1919
* This class also serves the base type for custom operators wrapped into
20-
* Observable via {@link Observable#create(ObservableConsumable)}.
20+
* Observable via {@link Observable#create(ObservableSource)}.
2121
*
2222
* @param <T> the element type
2323
* @since 2.0
2424
*/
25-
public interface ObservableConsumable<T> {
25+
public interface ObservableSource<T> {
2626

2727
void subscribe(Observer<? super T> observer);
2828
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex;
15+
16+
import io.reactivex.functions.Function;
17+
18+
public interface ObservableTransformer<Upstream, Downstream> extends Function<Observable<Upstream>, ObservableSource<Downstream>> {
19+
20+
}

src/main/java/io/reactivex/Single.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -641,7 +641,7 @@ public final <U> Single<T> delaySubscription(SingleSource<U> other) {
641641
return new SingleDelayWithSingle<T, U>(this, other);
642642
}
643643

644-
public final <U> Single<T> delaySubscription(ObservableConsumable<U> other) {
644+
public final <U> Single<T> delaySubscription(ObservableSource<U> other) {
645645
return new SingleDelayWithObservable<T, U>(this, other);
646646
}
647647

src/main/java/io/reactivex/internal/operators/completable/CompletableFromObservable.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@
1818

1919
public final class CompletableFromObservable<T> extends Completable {
2020

21-
final ObservableConsumable<T> observable;
21+
final ObservableSource<T> observable;
2222

23-
public CompletableFromObservable(ObservableConsumable<T> observable) {
23+
public CompletableFromObservable(ObservableSource<T> observable) {
2424
this.observable = observable;
2525
}
2626

src/main/java/io/reactivex/internal/operators/observable/BlockingObservableLatest.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,14 @@ public enum BlockingObservableLatest {
4040
* @return an {@code Iterable} that blocks until or unless the {@code Observable} emits an item that has not
4141
* been returned by the {@code Iterable}, then returns that item
4242
*/
43-
public static <T> Iterable<T> latest(final ObservableConsumable<? extends T> source) {
43+
public static <T> Iterable<T> latest(final ObservableSource<? extends T> source) {
4444
return new Iterable<T>() {
4545
@Override
4646
public Iterator<T> iterator() {
4747
LatestObserverIterator<T> lio = new LatestObserverIterator<T>();
4848

4949
@SuppressWarnings("unchecked")
50-
Observable<Try<Optional<T>>> materialized = Observable.wrap((ObservableConsumable<T>)source).materialize();
50+
Observable<Try<Optional<T>>> materialized = Observable.wrap((ObservableSource<T>)source).materialize();
5151

5252
materialized.subscribe(lio);
5353
return lio;
@@ -126,4 +126,4 @@ public void remove() {
126126
}
127127

128128
}
129-
}
129+
}

src/main/java/io/reactivex/internal/operators/observable/BlockingObservableMostRecent.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
import java.util.*;
1818

19-
import io.reactivex.ObservableConsumable;
19+
import io.reactivex.ObservableSource;
2020
import io.reactivex.exceptions.Exceptions;
2121
import io.reactivex.internal.util.NotificationLite;
2222
import io.reactivex.observers.DefaultObserver;
@@ -41,7 +41,7 @@ public enum BlockingObservableMostRecent {
4141
* @return an {@code Iterable} that always returns the item most recently emitted by {@code source}, or
4242
* {@code initialValue} if {@code source} has not yet emitted any items
4343
*/
44-
public static <T> Iterable<T> mostRecent(final ObservableConsumable<? extends T> source, final T initialValue) {
44+
public static <T> Iterable<T> mostRecent(final ObservableSource<? extends T> source, final T initialValue) {
4545
return new Iterable<T>() {
4646
@Override
4747
public Iterator<T> iterator() {

src/main/java/io/reactivex/internal/operators/observable/ObservableAll.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@
1919
import io.reactivex.plugins.RxJavaPlugins;
2020

2121
public final class ObservableAll<T> extends Observable<Boolean> {
22-
final ObservableConsumable<T> source;
22+
final ObservableSource<T> source;
2323

2424
final Predicate<? super T> predicate;
25-
public ObservableAll(ObservableConsumable<T> source, Predicate<? super T> predicate) {
25+
public ObservableAll(ObservableSource<T> source, Predicate<? super T> predicate) {
2626
this.source = source;
2727
this.predicate = predicate;
2828
}

src/main/java/io/reactivex/internal/operators/observable/ObservableAmb.java

+7-7
Original file line numberDiff line numberDiff line change
@@ -21,22 +21,22 @@
2121
import io.reactivex.plugins.RxJavaPlugins;
2222

2323
public final class ObservableAmb<T> extends Observable<T> {
24-
final ObservableConsumable<? extends T>[] sources;
25-
final Iterable<? extends ObservableConsumable<? extends T>> sourcesIterable;
24+
final ObservableSource<? extends T>[] sources;
25+
final Iterable<? extends ObservableSource<? extends T>> sourcesIterable;
2626

27-
public ObservableAmb(ObservableConsumable<? extends T>[] sources, Iterable<? extends ObservableConsumable<? extends T>> sourcesIterable) {
27+
public ObservableAmb(ObservableSource<? extends T>[] sources, Iterable<? extends ObservableSource<? extends T>> sourcesIterable) {
2828
this.sources = sources;
2929
this.sourcesIterable = sourcesIterable;
3030
}
3131

3232
@Override
3333
@SuppressWarnings("unchecked")
3434
public void subscribeActual(Observer<? super T> s) {
35-
ObservableConsumable<? extends T>[] sources = this.sources;
35+
ObservableSource<? extends T>[] sources = this.sources;
3636
int count = 0;
3737
if (sources == null) {
3838
sources = new Observable[8];
39-
for (ObservableConsumable<? extends T> p : sourcesIterable) {
39+
for (ObservableSource<? extends T> p : sourcesIterable) {
4040
if (count == sources.length) {
4141
Observable<? extends T>[] b = new Observable[count + (count >> 2)];
4242
System.arraycopy(sources, 0, b, 0, count);
@@ -73,7 +73,7 @@ public AmbCoordinator(Observer<? super T> actual, int count) {
7373
this.subscribers = new AmbInnerSubscriber[count];
7474
}
7575

76-
public void subscribe(ObservableConsumable<? extends T>[] sources) {
76+
public void subscribe(ObservableSource<? extends T>[] sources) {
7777
AmbInnerSubscriber<T>[] as = subscribers;
7878
int len = as.length;
7979
for (int i = 0; i < len; i++) {
@@ -199,4 +199,4 @@ public boolean isDisposed() {
199199
return get() == DisposableHelper.DISPOSED;
200200
}
201201
}
202-
}
202+
}

src/main/java/io/reactivex/internal/operators/observable/ObservableAny.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@
1818
import io.reactivex.internal.disposables.DisposableHelper;
1919

2020
public final class ObservableAny<T> extends Observable<Boolean> {
21-
final ObservableConsumable<T> source;
21+
final ObservableSource<T> source;
2222
final Predicate<? super T> predicate;
23-
public ObservableAny(ObservableConsumable<T> source, Predicate<? super T> predicate) {
23+
public ObservableAny(ObservableSource<T> source, Predicate<? super T> predicate) {
2424
this.source = source;
2525
this.predicate = predicate;
2626
}

src/main/java/io/reactivex/internal/operators/observable/ObservableBuffer.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,18 @@
1818
import java.util.concurrent.atomic.AtomicBoolean;
1919

2020
import io.reactivex.Observable;
21-
import io.reactivex.ObservableConsumable;
21+
import io.reactivex.ObservableSource;
2222
import io.reactivex.Observer;
2323
import io.reactivex.disposables.Disposable;
2424
import io.reactivex.internal.disposables.*;
2525

2626
public final class ObservableBuffer<T, U extends Collection<? super T>> extends Observable<U> {
27-
final ObservableConsumable<T> source;
27+
final ObservableSource<T> source;
2828
final int count;
2929
final int skip;
3030
final Callable<U> bufferSupplier;
3131

32-
public ObservableBuffer(ObservableConsumable<T> source, int count, int skip, Callable<U> bufferSupplier) {
32+
public ObservableBuffer(ObservableSource<T> source, int count, int skip, Callable<U> bufferSupplier) {
3333
this.source = source;
3434
this.count = count;
3535
this.skip = skip;

src/main/java/io/reactivex/internal/operators/observable/ObservableBufferBoundary.java

+11-11
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import java.util.concurrent.atomic.AtomicInteger;
1919

2020
import io.reactivex.Observable;
21-
import io.reactivex.ObservableConsumable;
21+
import io.reactivex.ObservableSource;
2222
import io.reactivex.Observer;
2323
import io.reactivex.disposables.*;
2424
import io.reactivex.functions.Function;
@@ -32,13 +32,13 @@
3232

3333
public final class ObservableBufferBoundary<T, U extends Collection<? super T>, Open, Close>
3434
extends Observable<U> {
35-
final ObservableConsumable<T> source;
35+
final ObservableSource<T> source;
3636
final Callable<U> bufferSupplier;
37-
final ObservableConsumable<? extends Open> bufferOpen;
38-
final Function<? super Open, ? extends ObservableConsumable<? extends Close>> bufferClose;
37+
final ObservableSource<? extends Open> bufferOpen;
38+
final Function<? super Open, ? extends ObservableSource<? extends Close>> bufferClose;
3939

40-
public ObservableBufferBoundary(ObservableConsumable<T> source, ObservableConsumable<? extends Open> bufferOpen,
41-
Function<? super Open, ? extends ObservableConsumable<? extends Close>> bufferClose, Callable<U> bufferSupplier) {
40+
public ObservableBufferBoundary(ObservableSource<T> source, ObservableSource<? extends Open> bufferOpen,
41+
Function<? super Open, ? extends ObservableSource<? extends Close>> bufferClose, Callable<U> bufferSupplier) {
4242
this.source = source;
4343
this.bufferOpen = bufferOpen;
4444
this.bufferClose = bufferClose;
@@ -55,8 +55,8 @@ protected void subscribeActual(Observer<? super U> t) {
5555

5656
static final class BufferBoundarySubscriber<T, U extends Collection<? super T>, Open, Close>
5757
extends QueueDrainObserver<T, U, U> implements Disposable {
58-
final ObservableConsumable<? extends Open> bufferOpen;
59-
final Function<? super Open, ? extends ObservableConsumable<? extends Close>> bufferClose;
58+
final ObservableSource<? extends Open> bufferOpen;
59+
final Function<? super Open, ? extends ObservableSource<? extends Close>> bufferClose;
6060
final Callable<U> bufferSupplier;
6161
final CompositeDisposable resources;
6262

@@ -67,8 +67,8 @@ static final class BufferBoundarySubscriber<T, U extends Collection<? super T>,
6767
final AtomicInteger windows = new AtomicInteger();
6868

6969
public BufferBoundarySubscriber(Observer<? super U> actual,
70-
ObservableConsumable<? extends Open> bufferOpen,
71-
Function<? super Open, ? extends ObservableConsumable<? extends Close>> bufferClose,
70+
ObservableSource<? extends Open> bufferOpen,
71+
Function<? super Open, ? extends ObservableSource<? extends Close>> bufferClose,
7272
Callable<U> bufferSupplier) {
7373
super(actual, new MpscLinkedQueue<U>());
7474
this.bufferOpen = bufferOpen;
@@ -171,7 +171,7 @@ void open(Open window) {
171171
return;
172172
}
173173

174-
ObservableConsumable<? extends Close> p;
174+
ObservableSource<? extends Close> p;
175175

176176
try {
177177
p = bufferClose.apply(window);

src/main/java/io/reactivex/internal/operators/observable/ObservableBufferBoundarySupplier.java

+7-7
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,11 @@
2828

2929
public final class ObservableBufferBoundarySupplier<T, U extends Collection<? super T>, B>
3030
extends Observable<U> {
31-
final ObservableConsumable<T> source;
32-
final Callable<? extends ObservableConsumable<B>> boundarySupplier;
31+
final ObservableSource<T> source;
32+
final Callable<? extends ObservableSource<B>> boundarySupplier;
3333
final Callable<U> bufferSupplier;
3434

35-
public ObservableBufferBoundarySupplier(ObservableConsumable<T> source, Callable<? extends ObservableConsumable<B>> boundarySupplier, Callable<U> bufferSupplier) {
35+
public ObservableBufferBoundarySupplier(ObservableSource<T> source, Callable<? extends ObservableSource<B>> boundarySupplier, Callable<U> bufferSupplier) {
3636
this.source = source;
3737
this.boundarySupplier = boundarySupplier;
3838
this.bufferSupplier = bufferSupplier;
@@ -47,7 +47,7 @@ static final class BufferBondarySupplierSubscriber<T, U extends Collection<? sup
4747
extends QueueDrainObserver<T, U, U> implements Observer<T>, Disposable {
4848
/** */
4949
final Callable<U> bufferSupplier;
50-
final Callable<? extends ObservableConsumable<B>> boundarySupplier;
50+
final Callable<? extends ObservableSource<B>> boundarySupplier;
5151

5252
Disposable s;
5353

@@ -56,7 +56,7 @@ static final class BufferBondarySupplierSubscriber<T, U extends Collection<? sup
5656
U buffer;
5757

5858
public BufferBondarySupplierSubscriber(Observer<? super U> actual, Callable<U> bufferSupplier,
59-
Callable<? extends ObservableConsumable<B>> boundarySupplier) {
59+
Callable<? extends ObservableSource<B>> boundarySupplier) {
6060
super(actual, new MpscLinkedQueue<U>());
6161
this.bufferSupplier = bufferSupplier;
6262
this.boundarySupplier = boundarySupplier;
@@ -88,7 +88,7 @@ public void onSubscribe(Disposable s) {
8888
}
8989
buffer = b;
9090

91-
ObservableConsumable<B> boundary;
91+
ObservableSource<B> boundary;
9292

9393
try {
9494
boundary = boundarySupplier.call();
@@ -191,7 +191,7 @@ void next() {
191191
return;
192192
}
193193

194-
ObservableConsumable<B> boundary;
194+
ObservableSource<B> boundary;
195195

196196
try {
197197
boundary = boundarySupplier.call();

src/main/java/io/reactivex/internal/operators/observable/ObservableBufferExactBoundary.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@
2626

2727
public final class ObservableBufferExactBoundary<T, U extends Collection<? super T>, B>
2828
extends Observable<U> {
29-
final ObservableConsumable<T> source;
30-
final ObservableConsumable<B> boundary;
29+
final ObservableSource<T> source;
30+
final ObservableSource<B> boundary;
3131
final Callable<U> bufferSupplier;
3232

33-
public ObservableBufferExactBoundary(ObservableConsumable<T> source, ObservableConsumable<B> boundary, Callable<U> bufferSupplier) {
33+
public ObservableBufferExactBoundary(ObservableSource<T> source, ObservableSource<B> boundary, Callable<U> bufferSupplier) {
3434
this.source = source;
3535
this.boundary = boundary;
3636
this.bufferSupplier = bufferSupplier;
@@ -45,7 +45,7 @@ static final class BufferExactBondarySubscriber<T, U extends Collection<? super
4545
extends QueueDrainObserver<T, U, U> implements Observer<T>, Disposable {
4646
/** */
4747
final Callable<U> bufferSupplier;
48-
final ObservableConsumable<B> boundary;
48+
final ObservableSource<B> boundary;
4949

5050
Disposable s;
5151

@@ -54,7 +54,7 @@ static final class BufferExactBondarySubscriber<T, U extends Collection<? super
5454
U buffer;
5555

5656
public BufferExactBondarySubscriber(Observer<? super U> actual, Callable<U> bufferSupplier,
57-
ObservableConsumable<B> boundary) {
57+
ObservableSource<B> boundary) {
5858
super(actual, new MpscLinkedQueue<U>());
5959
this.bufferSupplier = bufferSupplier;
6060
this.boundary = boundary;

src/main/java/io/reactivex/internal/operators/observable/ObservableBufferTimed.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
public final class ObservableBufferTimed<T, U extends Collection<? super T>>
3232
extends Observable<U> {
3333

34-
final ObservableConsumable<T> source;
34+
final ObservableSource<T> source;
3535
final long timespan;
3636
final long timeskip;
3737
final TimeUnit unit;
@@ -40,8 +40,8 @@ public final class ObservableBufferTimed<T, U extends Collection<? super T>>
4040
final int maxSize;
4141
final boolean restartTimerOnMaxSize;
4242

43-
public ObservableBufferTimed(ObservableConsumable<T> source, long timespan, long timeskip, TimeUnit unit, Scheduler scheduler, Callable<U> bufferSupplier, int maxSize,
44-
boolean restartTimerOnMaxSize) {
43+
public ObservableBufferTimed(ObservableSource<T> source, long timespan, long timeskip, TimeUnit unit, Scheduler scheduler, Callable<U> bufferSupplier, int maxSize,
44+
boolean restartTimerOnMaxSize) {
4545
this.source = source;
4646
this.timespan = timespan;
4747
this.timeskip = timeskip;

0 commit comments

Comments
 (0)