Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Test cleanup #6119

Merged
merged 5 commits into from
Aug 1, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
4 changes: 2 additions & 2 deletions src/jmh/java/io/reactivex/InputWithIncrementingInteger.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void subscribe(Subscriber<? super Integer> s) {
}

public Iterable<Integer> iterable;
public Flowable<Integer> observable;
public Flowable<Integer> flowable;
public Flowable<Integer> firehose;
public Blackhole bh;

Expand All @@ -104,7 +104,7 @@ public void subscribe(Subscriber<? super Integer> s) {
public void setup(final Blackhole bh) {
this.bh = bh;
final int size = getSize();
observable = Flowable.range(0, size);
flowable = Flowable.range(0, size);

firehose = Flowable.unsafeCreate(new IncrementingPublisher(size));
iterable = new IncrementingIterable(size);
Expand Down
6 changes: 3 additions & 3 deletions src/jmh/java/io/reactivex/OperatorFlatMapPerf.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public int getSize() {

@Benchmark
public void flatMapIntPassthruSync(Input input) throws InterruptedException {
input.observable.flatMap(new Function<Integer, Publisher<Integer>>() {
input.flowable.flatMap(new Function<Integer, Publisher<Integer>>() {
@Override
public Publisher<Integer> apply(Integer v) {
return Flowable.just(v);
Expand All @@ -53,7 +53,7 @@ public Publisher<Integer> apply(Integer v) {
@Benchmark
public void flatMapIntPassthruAsync(Input input) throws InterruptedException {
PerfSubscriber latchedObserver = input.newLatchedObserver();
input.observable.flatMap(new Function<Integer, Publisher<Integer>>() {
input.flowable.flatMap(new Function<Integer, Publisher<Integer>>() {
@Override
public Publisher<Integer> apply(Integer i) {
return Flowable.just(i).subscribeOn(Schedulers.computation());
Expand All @@ -71,7 +71,7 @@ public void flatMapTwoNestedSync(final Input input) throws InterruptedException
Flowable.range(1, 2).flatMap(new Function<Integer, Publisher<Integer>>() {
@Override
public Publisher<Integer> apply(Integer i) {
return input.observable;
return input.flowable;
}
}).subscribe(input.newSubscriber());
}
Expand Down
4 changes: 2 additions & 2 deletions src/jmh/java/io/reactivex/OperatorMergePerf.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public Flowable<Integer> apply(Integer i) {

@Benchmark
public void mergeNSyncStreamsOfN(final InputThousand input) throws InterruptedException {
Flowable<Flowable<Integer>> os = input.observable.map(new Function<Integer, Flowable<Integer>>() {
Flowable<Flowable<Integer>> os = input.flowable.map(new Function<Integer, Flowable<Integer>>() {
@Override
public Flowable<Integer> apply(Integer i) {
return Flowable.range(0, input.size);
Expand All @@ -85,7 +85,7 @@ public Flowable<Integer> apply(Integer i) {

@Benchmark
public void mergeNAsyncStreamsOfN(final InputThousand input) throws InterruptedException {
Flowable<Flowable<Integer>> os = input.observable.map(new Function<Integer, Flowable<Integer>>() {
Flowable<Flowable<Integer>> os = input.flowable.map(new Function<Integer, Flowable<Integer>>() {
@Override
public Flowable<Integer> apply(Integer i) {
return Flowable.range(0, input.size).subscribeOn(Schedulers.computation());
Expand Down
30 changes: 15 additions & 15 deletions src/main/java/io/reactivex/Completable.java
Original file line number Diff line number Diff line change
Expand Up @@ -2155,20 +2155,20 @@ public final Completable hide() {
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe() {
EmptyCompletableObserver s = new EmptyCompletableObserver();
subscribe(s);
return s;
EmptyCompletableObserver observer = new EmptyCompletableObserver();
subscribe(observer);
return observer;
}

@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(CompletableObserver s) {
ObjectHelper.requireNonNull(s, "s is null");
public final void subscribe(CompletableObserver observer) {
ObjectHelper.requireNonNull(observer, "s is null");
try {

s = RxJavaPlugins.onSubscribe(this, s);
observer = RxJavaPlugins.onSubscribe(this, observer);

subscribeActual(s);
subscribeActual(observer);
} catch (NullPointerException ex) { // NOPMD
throw ex;
} catch (Throwable ex) {
Expand All @@ -2184,9 +2184,9 @@ public final void subscribe(CompletableObserver s) {
* <p>There is no need to call any of the plugin hooks on the current {@code Completable} instance or
* the {@code CompletableObserver}; all hooks and basic safeguards have been
* applied by {@link #subscribe(CompletableObserver)} before this method gets called.
* @param s the CompletableObserver instance, never null
* @param observer the CompletableObserver instance, never null
*/
protected abstract void subscribeActual(CompletableObserver s);
protected abstract void subscribeActual(CompletableObserver observer);

/**
* Subscribes a given CompletableObserver (subclass) to this Completable and returns the given
Expand Down Expand Up @@ -2240,9 +2240,9 @@ public final Disposable subscribe(final Action onComplete, final Consumer<? supe
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");

CallbackCompletableObserver s = new CallbackCompletableObserver(onError, onComplete);
subscribe(s);
return s;
CallbackCompletableObserver observer = new CallbackCompletableObserver(onError, onComplete);
subscribe(observer);
return observer;
}

/**
Expand All @@ -2266,9 +2266,9 @@ public final Disposable subscribe(final Action onComplete, final Consumer<? supe
public final Disposable subscribe(final Action onComplete) {
ObjectHelper.requireNonNull(onComplete, "onComplete is null");

CallbackCompletableObserver s = new CallbackCompletableObserver(onComplete);
subscribe(s);
return s;
CallbackCompletableObserver observer = new CallbackCompletableObserver(onComplete);
subscribe(observer);
return observer;
}

/**
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/reactivex/FlowableOperator.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
public interface FlowableOperator<Downstream, Upstream> {
/**
* Applies a function to the child Subscriber and returns a new parent Subscriber.
* @param observer the child Subscriber instance
* @param subscriber the child Subscriber instance
* @return the parent Subscriber instance
* @throws Exception on failure
*/
@NonNull
Subscriber<? super Upstream> apply(@NonNull Subscriber<? super Downstream> observer) throws Exception;
Subscriber<? super Upstream> apply(@NonNull Subscriber<? super Downstream> subscriber) throws Exception;
}
48 changes: 24 additions & 24 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -4982,9 +4982,9 @@ public final <R> R as(@NonNull ObservableConverter<T, ? extends R> converter) {
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final T blockingFirst() {
BlockingFirstObserver<T> s = new BlockingFirstObserver<T>();
subscribe(s);
T v = s.blockingGet();
BlockingFirstObserver<T> observer = new BlockingFirstObserver<T>();
subscribe(observer);
T v = observer.blockingGet();
if (v != null) {
return v;
}
Expand All @@ -5010,9 +5010,9 @@ public final T blockingFirst() {
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final T blockingFirst(T defaultItem) {
BlockingFirstObserver<T> s = new BlockingFirstObserver<T>();
subscribe(s);
T v = s.blockingGet();
BlockingFirstObserver<T> observer = new BlockingFirstObserver<T>();
subscribe(observer);
T v = observer.blockingGet();
return v != null ? v : defaultItem;
}

Expand Down Expand Up @@ -5119,9 +5119,9 @@ public final Iterable<T> blockingIterable(int bufferSize) {
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final T blockingLast() {
BlockingLastObserver<T> s = new BlockingLastObserver<T>();
subscribe(s);
T v = s.blockingGet();
BlockingLastObserver<T> observer = new BlockingLastObserver<T>();
subscribe(observer);
T v = observer.blockingGet();
if (v != null) {
return v;
}
Expand Down Expand Up @@ -5151,9 +5151,9 @@ public final T blockingLast() {
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final T blockingLast(T defaultItem) {
BlockingLastObserver<T> s = new BlockingLastObserver<T>();
subscribe(s);
T v = s.blockingGet();
BlockingLastObserver<T> observer = new BlockingLastObserver<T>();
subscribe(observer);
T v = observer.blockingGet();
return v != null ? v : defaultItem;
}

Expand Down Expand Up @@ -10998,16 +10998,16 @@ public final Observable<T> retryWhen(
* <dt><b>Scheduler:</b></dt>
* <dd>{@code safeSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param s the incoming Observer instance
* @param observer the incoming Observer instance
* @throws NullPointerException if s is null
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final void safeSubscribe(Observer<? super T> s) {
ObjectHelper.requireNonNull(s, "s is null");
if (s instanceof SafeObserver) {
subscribe(s);
public final void safeSubscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "s is null");
if (observer instanceof SafeObserver) {
subscribe(observer);
} else {
subscribe(new SafeObserver<T>(s));
subscribe(new SafeObserver<T>(observer));
}
}

Expand Down Expand Up @@ -14072,19 +14072,19 @@ public final <K, V> Single<Map<K, Collection<V>>> toMultimap(
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<T> toFlowable(BackpressureStrategy strategy) {
Flowable<T> o = new FlowableFromObservable<T>(this);
Flowable<T> f = new FlowableFromObservable<T>(this);

switch (strategy) {
case DROP:
return o.onBackpressureDrop();
return f.onBackpressureDrop();
case LATEST:
return o.onBackpressureLatest();
return f.onBackpressureLatest();
case MISSING:
return o;
return f;
case ERROR:
return RxJavaPlugins.onAssembly(new FlowableOnBackpressureError<T>(o));
return RxJavaPlugins.onAssembly(new FlowableOnBackpressureError<T>(f));
default:
return o.onBackpressureBuffer();
return f.onBackpressureBuffer();
}
}

Expand Down
22 changes: 11 additions & 11 deletions src/main/java/io/reactivex/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -3322,9 +3322,9 @@ public final Disposable subscribe() {
public final Disposable subscribe(final BiConsumer<? super T, ? super Throwable> onCallback) {
ObjectHelper.requireNonNull(onCallback, "onCallback is null");

BiConsumerSingleObserver<T> s = new BiConsumerSingleObserver<T>(onCallback);
subscribe(s);
return s;
BiConsumerSingleObserver<T> observer = new BiConsumerSingleObserver<T>(onCallback);
subscribe(observer);
return observer;
}

/**
Expand Down Expand Up @@ -3376,22 +3376,22 @@ public final Disposable subscribe(final Consumer<? super T> onSuccess, final Con
ObjectHelper.requireNonNull(onSuccess, "onSuccess is null");
ObjectHelper.requireNonNull(onError, "onError is null");

ConsumerSingleObserver<T> s = new ConsumerSingleObserver<T>(onSuccess, onError);
subscribe(s);
return s;
ConsumerSingleObserver<T> observer = new ConsumerSingleObserver<T>(onSuccess, onError);
subscribe(observer);
return observer;
}

@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(SingleObserver<? super T> subscriber) {
ObjectHelper.requireNonNull(subscriber, "subscriber is null");
public final void subscribe(SingleObserver<? super T> observer) {
ObjectHelper.requireNonNull(observer, "subscriber is null");

subscriber = RxJavaPlugins.onSubscribe(this, subscriber);
observer = RxJavaPlugins.onSubscribe(this, observer);

ObjectHelper.requireNonNull(subscriber, "subscriber returned by the RxJavaPlugins hook is null");
ObjectHelper.requireNonNull(observer, "subscriber returned by the RxJavaPlugins hook is null");

try {
subscribeActual(subscriber);
subscribeActual(observer);
} catch (NullPointerException ex) {
throw ex;
} catch (Throwable ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,39 +48,39 @@ public boolean isDisposed() {
return this == INSTANCE;
}

public static void complete(Observer<?> s) {
s.onSubscribe(INSTANCE);
s.onComplete();
public static void complete(Observer<?> observer) {
observer.onSubscribe(INSTANCE);
observer.onComplete();
}

public static void complete(MaybeObserver<?> s) {
s.onSubscribe(INSTANCE);
s.onComplete();
public static void complete(MaybeObserver<?> observer) {
observer.onSubscribe(INSTANCE);
observer.onComplete();
}

public static void error(Throwable e, Observer<?> s) {
s.onSubscribe(INSTANCE);
s.onError(e);
public static void error(Throwable e, Observer<?> observer) {
observer.onSubscribe(INSTANCE);
observer.onError(e);
}

public static void complete(CompletableObserver s) {
s.onSubscribe(INSTANCE);
s.onComplete();
public static void complete(CompletableObserver observer) {
observer.onSubscribe(INSTANCE);
observer.onComplete();
}

public static void error(Throwable e, CompletableObserver s) {
s.onSubscribe(INSTANCE);
s.onError(e);
public static void error(Throwable e, CompletableObserver observer) {
observer.onSubscribe(INSTANCE);
observer.onError(e);
}

public static void error(Throwable e, SingleObserver<?> s) {
s.onSubscribe(INSTANCE);
s.onError(e);
public static void error(Throwable e, SingleObserver<?> observer) {
observer.onSubscribe(INSTANCE);
observer.onError(e);
}

public static void error(Throwable e, MaybeObserver<?> s) {
s.onSubscribe(INSTANCE);
s.onError(e);
public static void error(Throwable e, MaybeObserver<?> observer) {
observer.onSubscribe(INSTANCE);
observer.onError(e);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ public final boolean fastEnter() {
}

protected final void fastPathEmit(U value, boolean delayError, Disposable dispose) {
final Observer<? super V> s = actual;
final Observer<? super V> observer = actual;
final SimplePlainQueue<U> q = queue;

if (wip.get() == 0 && wip.compareAndSet(0, 1)) {
accept(s, value);
accept(observer, value);
if (leave(-1) == 0) {
return;
}
Expand All @@ -76,7 +76,7 @@ protected final void fastPathEmit(U value, boolean delayError, Disposable dispos
return;
}
}
QueueDrainHelper.drainLoop(q, s, delayError, dispose, this);
QueueDrainHelper.drainLoop(q, observer, delayError, dispose, this);
}

/**
Expand All @@ -86,12 +86,12 @@ protected final void fastPathEmit(U value, boolean delayError, Disposable dispos
* @param disposable the resource to dispose if the drain terminates
*/
protected final void fastPathOrderedEmit(U value, boolean delayError, Disposable disposable) {
final Observer<? super V> s = actual;
final Observer<? super V> observer = actual;
final SimplePlainQueue<U> q = queue;

if (wip.get() == 0 && wip.compareAndSet(0, 1)) {
if (q.isEmpty()) {
accept(s, value);
accept(observer, value);
if (leave(-1) == 0) {
return;
}
Expand All @@ -104,7 +104,7 @@ protected final void fastPathOrderedEmit(U value, boolean delayError, Disposable
return;
}
}
QueueDrainHelper.drainLoop(q, s, delayError, disposable, this);
QueueDrainHelper.drainLoop(q, observer, delayError, disposable, this);
}

@Override
Expand Down
Loading