Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: collect, toList, toSortedList, toMap, toMultimap to return Single #4574

Merged
merged 1 commit into from
Sep 21, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 56 additions & 57 deletions src/main/java/io/reactivex/Flowable.java

Large diffs are not rendered by default.

110 changes: 55 additions & 55 deletions src/main/java/io/reactivex/Observable.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
public final class FlowableAllSingle<T> extends Single<Boolean> implements FuseToFlowable<Boolean> {

final Publisher<T> source;

final Predicate<? super T> predicate;

public FlowableAllSingle(Publisher<T> source, Predicate<? super T> predicate) {
Expand All @@ -42,11 +42,11 @@ protected void subscribeActual(SingleObserver<? super Boolean> s) {
public Flowable<Boolean> fuseToFlowable() {
return RxJavaPlugins.onAssembly(new FlowableAll<T>(source, predicate));
}

static final class AllSubscriber<T> implements Subscriber<T>, Disposable {

final SingleObserver<? super Boolean> actual;

final Predicate<? super T> predicate;

Subscription s;
Expand Down Expand Up @@ -116,7 +116,7 @@ public void dispose() {
s.cancel();
s = SubscriptionHelper.CANCELLED;
}

@Override
public boolean isDisposed() {
return s == SubscriptionHelper.CANCELLED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@

public final class FlowableAnySingle<T> extends Single<Boolean> implements FuseToFlowable<Boolean> {
final Publisher<T> source;

final Predicate<? super T> predicate;

public FlowableAnySingle(Publisher<T> source, Predicate<? super T> predicate) {
this.source = source;
this.predicate = predicate;
Expand All @@ -36,7 +36,7 @@ public FlowableAnySingle(Publisher<T> source, Predicate<? super T> predicate) {
protected void subscribeActual(SingleObserver<? super Boolean> s) {
source.subscribe(new AnySubscriber<T>(s, predicate));
}

@Override
public Flowable<Boolean> fuseToFlowable() {
return RxJavaPlugins.onAssembly(new FlowableAny<T>(source, predicate));
Expand All @@ -45,7 +45,7 @@ public Flowable<Boolean> fuseToFlowable() {
static final class AnySubscriber<T> implements Subscriber<T>, Disposable {

final SingleObserver<? super Boolean> actual;

final Predicate<? super T> predicate;

Subscription s;
Expand Down Expand Up @@ -111,7 +111,7 @@ public void dispose() {
s.cancel();
s = SubscriptionHelper.CANCELLED;
}

@Override
public boolean isDisposed() {
return s == SubscriptionHelper.CANCELLED;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.internal.operators.flowable;

import java.util.concurrent.Callable;

import org.reactivestreams.*;

import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.BiConsumer;
import io.reactivex.internal.disposables.EmptyDisposable;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.fuseable.FuseToFlowable;
import io.reactivex.internal.subscriptions.SubscriptionHelper;
import io.reactivex.plugins.RxJavaPlugins;

public final class FlowableCollectSingle<T, U> extends Single<U> implements FuseToFlowable<U> {

final Publisher<T> source;

final Callable<? extends U> initialSupplier;
final BiConsumer<? super U, ? super T> collector;

public FlowableCollectSingle(Publisher<T> source, Callable<? extends U> initialSupplier, BiConsumer<? super U, ? super T> collector) {
this.source = source;
this.initialSupplier = initialSupplier;
this.collector = collector;
}

@Override
protected void subscribeActual(SingleObserver<? super U> s) {
U u;
try {
u = ObjectHelper.requireNonNull(initialSupplier.call(), "The initialSupplier returned a null value");
} catch (Throwable e) {
EmptyDisposable.error(e, s);
return;
}

source.subscribe(new CollectSubscriber<T, U>(s, u, collector));
}

@Override
public Flowable<U> fuseToFlowable() {
return RxJavaPlugins.onAssembly(new FlowableCollect<T, U>(source, initialSupplier, collector));
}

static final class CollectSubscriber<T, U> implements Subscriber<T>, Disposable {

final SingleObserver<? super U> actual;

final BiConsumer<? super U, ? super T> collector;

final U u;

Subscription s;

boolean done;

CollectSubscriber(SingleObserver<? super U> actual, U u, BiConsumer<? super U, ? super T> collector) {
this.actual = actual;
this.collector = collector;
this.u = u;
}

@Override
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.validate(this.s, s)) {
this.s = s;
actual.onSubscribe(this);
s.request(Long.MAX_VALUE);
}
}

@Override
public void onNext(T t) {
if (done) {
return;
}
try {
collector.accept(u, t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
s.cancel();
onError(e);
}
}

@Override
public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
done = true;
s = SubscriptionHelper.CANCELLED;
actual.onError(t);
}

@Override
public void onComplete() {
if (done) {
return;
}
done = true;
s = SubscriptionHelper.CANCELLED;
actual.onSuccess(u);
}

@Override
public void dispose() {
s.cancel();
s = SubscriptionHelper.CANCELLED;
}

@Override
public boolean isDisposed() {
return s == SubscriptionHelper.CANCELLED;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,31 +30,31 @@
public final class FlowableLastSingle<T> extends Single<T> {

final Publisher<T> source;

final T defaultItem;

public FlowableLastSingle(Publisher<T> source, T defaultItem) {
this.source = source;
this.defaultItem = defaultItem;
}

// TODO fuse back to Flowable

@Override
protected void subscribeActual(SingleObserver<? super T> observer) {
source.subscribe(new LastSubscriber<T>(observer, defaultItem));
}

static final class LastSubscriber<T> implements Subscriber<T>, Disposable {

final SingleObserver<? super T> actual;

final T defaultItem;

Subscription s;

T item;

public LastSubscriber(SingleObserver<? super T> actual, T defaultItem) {
this.actual = actual;
this.defaultItem = defaultItem;
Expand All @@ -75,9 +75,9 @@ public boolean isDisposed() {
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.validate(this.s, s)) {
this.s = s;

actual.onSubscribe(this);

s.request(Long.MAX_VALUE);
}
}
Expand Down Expand Up @@ -110,7 +110,5 @@ public void onComplete() {
}
}
}


}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators.flowable;

import java.util.Collection;
import java.util.concurrent.Callable;

import org.reactivestreams.*;

import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.internal.disposables.EmptyDisposable;
import io.reactivex.internal.fuseable.FuseToFlowable;
import io.reactivex.internal.subscriptions.SubscriptionHelper;
import io.reactivex.internal.util.ArrayListSupplier;
import io.reactivex.plugins.RxJavaPlugins;

public final class FlowableToListSingle<T, U extends Collection<? super T>> extends Single<U> implements FuseToFlowable<U> {

final Publisher<T> source;

final Callable<U> collectionSupplier;

@SuppressWarnings("unchecked")
public FlowableToListSingle(Publisher<T> source) {
this(source, (Callable<U>)ArrayListSupplier.asCallable());
}

public FlowableToListSingle(Publisher<T> source, Callable<U> collectionSupplier) {
this.source = source;
this.collectionSupplier = collectionSupplier;
}

@Override
protected void subscribeActual(SingleObserver<? super U> s) {
U coll;
try {
coll = collectionSupplier.call();
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
EmptyDisposable.error(e, s);
return;
}
source.subscribe(new ToListSubscriber<T, U>(s, coll));
}

@Override
public Flowable<U> fuseToFlowable() {
return RxJavaPlugins.onAssembly(new FlowableToList<T, U>(source, collectionSupplier));
}

static final class ToListSubscriber<T, U extends Collection<? super T>>
implements Subscriber<T>, Disposable {

final SingleObserver<? super U> actual;

Subscription s;

U value;

ToListSubscriber(SingleObserver<? super U> actual, U collection) {
this.actual = actual;
this.value = collection;
}

@Override
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.validate(this.s, s)) {
this.s = s;
actual.onSubscribe(this);
s.request(Long.MAX_VALUE);
}
}

@Override
public void onNext(T t) {
value.add(t);
}

@Override
public void onError(Throwable t) {
value = null;
s = SubscriptionHelper.CANCELLED;
actual.onError(t);
}

@Override
public void onComplete() {
s = SubscriptionHelper.CANCELLED;
actual.onSuccess(value);
}

@Override
public void dispose() {
s.cancel();
s = SubscriptionHelper.CANCELLED;
}

@Override
public boolean isDisposed() {
return s == SubscriptionHelper.CANCELLED;
}
}
}
Loading