Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Rename Observable Base Interface Types for consistency #4300

Merged
merged 1 commit into from
Aug 7, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/main/java/io/reactivex/Completable.java
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ public Object call() throws Exception {
* @throws NullPointerException if flowable is null
*/
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Completable fromObservable(final ObservableConsumable<T> observable) {
public static <T> Completable fromObservable(final ObservableSource<T> observable) {
Objects.requireNonNull(observable, "observable is null");
return new CompletableFromObservable<T>(observable);
}
Expand Down Expand Up @@ -817,7 +817,7 @@ public final Completable endWith(CompletableSource other) {
* @throws NullPointerException if next is null
*/
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final <T> Observable<T> endWith(ObservableConsumable<T> next) {
public final <T> Observable<T> endWith(ObservableSource<T> next) {
return this.<T>toObservable().endWith(next);
}

Expand Down
466 changes: 229 additions & 237 deletions src/main/java/io/reactivex/Observable.java

Large diffs are not rendered by default.

20 changes: 20 additions & 0 deletions src/main/java/io/reactivex/ObservableOperator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex;

import io.reactivex.functions.Function;

public interface ObservableOperator<Downstream, Upstream> extends Function<Observer<? super Downstream>, Observer<? super Upstream>> {

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
* consumable via an {@link Observer}.
* <p>
* This class also serves the base type for custom operators wrapped into
* Observable via {@link Observable#create(ObservableConsumable)}.
* Observable via {@link Observable#create(ObservableSource)}.
*
* @param <T> the element type
* @since 2.0
*/
public interface ObservableConsumable<T> {
public interface ObservableSource<T> {

void subscribe(Observer<? super T> observer);
}
20 changes: 20 additions & 0 deletions src/main/java/io/reactivex/ObservableTransformer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex;

import io.reactivex.functions.Function;

public interface ObservableTransformer<Upstream, Downstream> extends Function<Observable<Upstream>, ObservableSource<Downstream>> {

}
2 changes: 1 addition & 1 deletion src/main/java/io/reactivex/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,7 @@ public final <U> Single<T> delaySubscription(SingleSource<U> other) {
return new SingleDelayWithSingle<T, U>(this, other);
}

public final <U> Single<T> delaySubscription(ObservableConsumable<U> other) {
public final <U> Single<T> delaySubscription(ObservableSource<U> other) {
return new SingleDelayWithObservable<T, U>(this, other);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@

public final class CompletableFromObservable<T> extends Completable {

final ObservableConsumable<T> observable;
final ObservableSource<T> observable;

public CompletableFromObservable(ObservableConsumable<T> observable) {
public CompletableFromObservable(ObservableSource<T> observable) {
this.observable = observable;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ public enum BlockingObservableLatest {
* @return an {@code Iterable} that blocks until or unless the {@code Observable} emits an item that has not
* been returned by the {@code Iterable}, then returns that item
*/
public static <T> Iterable<T> latest(final ObservableConsumable<? extends T> source) {
public static <T> Iterable<T> latest(final ObservableSource<? extends T> source) {
return new Iterable<T>() {
@Override
public Iterator<T> iterator() {
LatestObserverIterator<T> lio = new LatestObserverIterator<T>();

@SuppressWarnings("unchecked")
Observable<Try<Optional<T>>> materialized = Observable.wrap((ObservableConsumable<T>)source).materialize();
Observable<Try<Optional<T>>> materialized = Observable.wrap((ObservableSource<T>)source).materialize();

materialized.subscribe(lio);
return lio;
Expand Down Expand Up @@ -126,4 +126,4 @@ public void remove() {
}

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import java.util.*;

import io.reactivex.ObservableConsumable;
import io.reactivex.ObservableSource;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.internal.util.NotificationLite;
import io.reactivex.observers.DefaultObserver;
Expand All @@ -41,7 +41,7 @@ public enum BlockingObservableMostRecent {
* @return an {@code Iterable} that always returns the item most recently emitted by {@code source}, or
* {@code initialValue} if {@code source} has not yet emitted any items
*/
public static <T> Iterable<T> mostRecent(final ObservableConsumable<? extends T> source, final T initialValue) {
public static <T> Iterable<T> mostRecent(final ObservableSource<? extends T> source, final T initialValue) {
return new Iterable<T>() {
@Override
public Iterator<T> iterator() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
import io.reactivex.plugins.RxJavaPlugins;

public final class ObservableAll<T> extends Observable<Boolean> {
final ObservableConsumable<T> source;
final ObservableSource<T> source;

final Predicate<? super T> predicate;
public ObservableAll(ObservableConsumable<T> source, Predicate<? super T> predicate) {
public ObservableAll(ObservableSource<T> source, Predicate<? super T> predicate) {
this.source = source;
this.predicate = predicate;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,22 @@
import io.reactivex.plugins.RxJavaPlugins;

public final class ObservableAmb<T> extends Observable<T> {
final ObservableConsumable<? extends T>[] sources;
final Iterable<? extends ObservableConsumable<? extends T>> sourcesIterable;
final ObservableSource<? extends T>[] sources;
final Iterable<? extends ObservableSource<? extends T>> sourcesIterable;

public ObservableAmb(ObservableConsumable<? extends T>[] sources, Iterable<? extends ObservableConsumable<? extends T>> sourcesIterable) {
public ObservableAmb(ObservableSource<? extends T>[] sources, Iterable<? extends ObservableSource<? extends T>> sourcesIterable) {
this.sources = sources;
this.sourcesIterable = sourcesIterable;
}

@Override
@SuppressWarnings("unchecked")
public void subscribeActual(Observer<? super T> s) {
ObservableConsumable<? extends T>[] sources = this.sources;
ObservableSource<? extends T>[] sources = this.sources;
int count = 0;
if (sources == null) {
sources = new Observable[8];
for (ObservableConsumable<? extends T> p : sourcesIterable) {
for (ObservableSource<? extends T> p : sourcesIterable) {
if (count == sources.length) {
Observable<? extends T>[] b = new Observable[count + (count >> 2)];
System.arraycopy(sources, 0, b, 0, count);
Expand Down Expand Up @@ -73,7 +73,7 @@ public AmbCoordinator(Observer<? super T> actual, int count) {
this.subscribers = new AmbInnerSubscriber[count];
}

public void subscribe(ObservableConsumable<? extends T>[] sources) {
public void subscribe(ObservableSource<? extends T>[] sources) {
AmbInnerSubscriber<T>[] as = subscribers;
int len = as.length;
for (int i = 0; i < len; i++) {
Expand Down Expand Up @@ -199,4 +199,4 @@ public boolean isDisposed() {
return get() == DisposableHelper.DISPOSED;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
import io.reactivex.internal.disposables.DisposableHelper;

public final class ObservableAny<T> extends Observable<Boolean> {
final ObservableConsumable<T> source;
final ObservableSource<T> source;
final Predicate<? super T> predicate;
public ObservableAny(ObservableConsumable<T> source, Predicate<? super T> predicate) {
public ObservableAny(ObservableSource<T> source, Predicate<? super T> predicate) {
this.source = source;
this.predicate = predicate;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@
import java.util.concurrent.atomic.AtomicBoolean;

import io.reactivex.Observable;
import io.reactivex.ObservableConsumable;
import io.reactivex.ObservableSource;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.disposables.*;

public final class ObservableBuffer<T, U extends Collection<? super T>> extends Observable<U> {
final ObservableConsumable<T> source;
final ObservableSource<T> source;
final int count;
final int skip;
final Callable<U> bufferSupplier;

public ObservableBuffer(ObservableConsumable<T> source, int count, int skip, Callable<U> bufferSupplier) {
public ObservableBuffer(ObservableSource<T> source, int count, int skip, Callable<U> bufferSupplier) {
this.source = source;
this.count = count;
this.skip = skip;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import java.util.concurrent.atomic.AtomicInteger;

import io.reactivex.Observable;
import io.reactivex.ObservableConsumable;
import io.reactivex.ObservableSource;
import io.reactivex.Observer;
import io.reactivex.disposables.*;
import io.reactivex.functions.Function;
Expand All @@ -32,13 +32,13 @@

public final class ObservableBufferBoundary<T, U extends Collection<? super T>, Open, Close>
extends Observable<U> {
final ObservableConsumable<T> source;
final ObservableSource<T> source;
final Callable<U> bufferSupplier;
final ObservableConsumable<? extends Open> bufferOpen;
final Function<? super Open, ? extends ObservableConsumable<? extends Close>> bufferClose;
final ObservableSource<? extends Open> bufferOpen;
final Function<? super Open, ? extends ObservableSource<? extends Close>> bufferClose;

public ObservableBufferBoundary(ObservableConsumable<T> source, ObservableConsumable<? extends Open> bufferOpen,
Function<? super Open, ? extends ObservableConsumable<? extends Close>> bufferClose, Callable<U> bufferSupplier) {
public ObservableBufferBoundary(ObservableSource<T> source, ObservableSource<? extends Open> bufferOpen,
Function<? super Open, ? extends ObservableSource<? extends Close>> bufferClose, Callable<U> bufferSupplier) {
this.source = source;
this.bufferOpen = bufferOpen;
this.bufferClose = bufferClose;
Expand All @@ -55,8 +55,8 @@ protected void subscribeActual(Observer<? super U> t) {

static final class BufferBoundarySubscriber<T, U extends Collection<? super T>, Open, Close>
extends QueueDrainObserver<T, U, U> implements Disposable {
final ObservableConsumable<? extends Open> bufferOpen;
final Function<? super Open, ? extends ObservableConsumable<? extends Close>> bufferClose;
final ObservableSource<? extends Open> bufferOpen;
final Function<? super Open, ? extends ObservableSource<? extends Close>> bufferClose;
final Callable<U> bufferSupplier;
final CompositeDisposable resources;

Expand All @@ -67,8 +67,8 @@ static final class BufferBoundarySubscriber<T, U extends Collection<? super T>,
final AtomicInteger windows = new AtomicInteger();

public BufferBoundarySubscriber(Observer<? super U> actual,
ObservableConsumable<? extends Open> bufferOpen,
Function<? super Open, ? extends ObservableConsumable<? extends Close>> bufferClose,
ObservableSource<? extends Open> bufferOpen,
Function<? super Open, ? extends ObservableSource<? extends Close>> bufferClose,
Callable<U> bufferSupplier) {
super(actual, new MpscLinkedQueue<U>());
this.bufferOpen = bufferOpen;
Expand Down Expand Up @@ -171,7 +171,7 @@ void open(Open window) {
return;
}

ObservableConsumable<? extends Close> p;
ObservableSource<? extends Close> p;

try {
p = bufferClose.apply(window);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@

public final class ObservableBufferBoundarySupplier<T, U extends Collection<? super T>, B>
extends Observable<U> {
final ObservableConsumable<T> source;
final Callable<? extends ObservableConsumable<B>> boundarySupplier;
final ObservableSource<T> source;
final Callable<? extends ObservableSource<B>> boundarySupplier;
final Callable<U> bufferSupplier;

public ObservableBufferBoundarySupplier(ObservableConsumable<T> source, Callable<? extends ObservableConsumable<B>> boundarySupplier, Callable<U> bufferSupplier) {
public ObservableBufferBoundarySupplier(ObservableSource<T> source, Callable<? extends ObservableSource<B>> boundarySupplier, Callable<U> bufferSupplier) {
this.source = source;
this.boundarySupplier = boundarySupplier;
this.bufferSupplier = bufferSupplier;
Expand All @@ -47,7 +47,7 @@ static final class BufferBondarySupplierSubscriber<T, U extends Collection<? sup
extends QueueDrainObserver<T, U, U> implements Observer<T>, Disposable {
/** */
final Callable<U> bufferSupplier;
final Callable<? extends ObservableConsumable<B>> boundarySupplier;
final Callable<? extends ObservableSource<B>> boundarySupplier;

Disposable s;

Expand All @@ -56,7 +56,7 @@ static final class BufferBondarySupplierSubscriber<T, U extends Collection<? sup
U buffer;

public BufferBondarySupplierSubscriber(Observer<? super U> actual, Callable<U> bufferSupplier,
Callable<? extends ObservableConsumable<B>> boundarySupplier) {
Callable<? extends ObservableSource<B>> boundarySupplier) {
super(actual, new MpscLinkedQueue<U>());
this.bufferSupplier = bufferSupplier;
this.boundarySupplier = boundarySupplier;
Expand Down Expand Up @@ -88,7 +88,7 @@ public void onSubscribe(Disposable s) {
}
buffer = b;

ObservableConsumable<B> boundary;
ObservableSource<B> boundary;

try {
boundary = boundarySupplier.call();
Expand Down Expand Up @@ -191,7 +191,7 @@ void next() {
return;
}

ObservableConsumable<B> boundary;
ObservableSource<B> boundary;

try {
boundary = boundarySupplier.call();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@

public final class ObservableBufferExactBoundary<T, U extends Collection<? super T>, B>
extends Observable<U> {
final ObservableConsumable<T> source;
final ObservableConsumable<B> boundary;
final ObservableSource<T> source;
final ObservableSource<B> boundary;
final Callable<U> bufferSupplier;

public ObservableBufferExactBoundary(ObservableConsumable<T> source, ObservableConsumable<B> boundary, Callable<U> bufferSupplier) {
public ObservableBufferExactBoundary(ObservableSource<T> source, ObservableSource<B> boundary, Callable<U> bufferSupplier) {
this.source = source;
this.boundary = boundary;
this.bufferSupplier = bufferSupplier;
Expand All @@ -45,7 +45,7 @@ static final class BufferExactBondarySubscriber<T, U extends Collection<? super
extends QueueDrainObserver<T, U, U> implements Observer<T>, Disposable {
/** */
final Callable<U> bufferSupplier;
final ObservableConsumable<B> boundary;
final ObservableSource<B> boundary;

Disposable s;

Expand All @@ -54,7 +54,7 @@ static final class BufferExactBondarySubscriber<T, U extends Collection<? super
U buffer;

public BufferExactBondarySubscriber(Observer<? super U> actual, Callable<U> bufferSupplier,
ObservableConsumable<B> boundary) {
ObservableSource<B> boundary) {
super(actual, new MpscLinkedQueue<U>());
this.bufferSupplier = bufferSupplier;
this.boundary = boundary;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
public final class ObservableBufferTimed<T, U extends Collection<? super T>>
extends Observable<U> {

final ObservableConsumable<T> source;
final ObservableSource<T> source;
final long timespan;
final long timeskip;
final TimeUnit unit;
Expand All @@ -40,8 +40,8 @@ public final class ObservableBufferTimed<T, U extends Collection<? super T>>
final int maxSize;
final boolean restartTimerOnMaxSize;

public ObservableBufferTimed(ObservableConsumable<T> source, long timespan, long timeskip, TimeUnit unit, Scheduler scheduler, Callable<U> bufferSupplier, int maxSize,
boolean restartTimerOnMaxSize) {
public ObservableBufferTimed(ObservableSource<T> source, long timespan, long timeskip, TimeUnit unit, Scheduler scheduler, Callable<U> bufferSupplier, int maxSize,
boolean restartTimerOnMaxSize) {
this.source = source;
this.timespan = timespan;
this.timeskip = timeskip;
Expand Down
Loading