Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: flatMap{Completable, Maybe, Single} operators #4667

Merged
merged 4 commits into from
Oct 5, 2016
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
2.x: flatMapCompletable operator
akarnokd committed Oct 4, 2016
commit 80b545900e6405f6460618a0312817c801e3475d
45 changes: 44 additions & 1 deletion src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
@@ -25,7 +25,7 @@
import io.reactivex.internal.functions.*;
import io.reactivex.internal.fuseable.ScalarCallable;
import io.reactivex.internal.operators.flowable.*;
import io.reactivex.internal.operators.observable.ObservableFromPublisher;
import io.reactivex.internal.operators.observable.*;
import io.reactivex.internal.schedulers.ImmediateThinScheduler;
import io.reactivex.internal.subscribers.*;
import io.reactivex.internal.util.*;
@@ -8266,6 +8266,49 @@ public final <U, R> Flowable<R> flatMap(Function<? super T, ? extends Publisher<
return flatMap(mapper, combiner, false, maxConcurrency, bufferSize());
}


/**
* Maps each element of the upstream Flowable into CompletableSources, subscribes to them and
* waits until the upstream and all CompletableSources complete.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator consumes the upstream in an unbounded manner.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flatMapCompletable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param mapper the function that received each source value and transforms them into CompletableSources.
* @return the new Completable instance
*/
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable flatMapCompletable(Function<? super T, ? extends CompletableSource> mapper) {
return flatMapCompletable(mapper, false, Integer.MAX_VALUE);
}

/**
* Maps each element of the upstream Flowable into CompletableSources, subscribes to them and
* waits until the upstream and all CompletableSources complete, optionally delaying all errors.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>If {@code maxConcurrency == Integer.MAX_VALUE} the operator consumes the upstream in an unbounded manner.
* Otherwise the operator expects the upstream to honor backpressure. If the upstream doesn't support backpressure
* the operator behaves as if {@code maxConcurrency == Integer.MAX_VALUE} was used.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flatMapCompletable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param mapper the function that received each source value and transforms them into CompletableSources.
* @param delayErrors if true errors from the upstream and inner CompletableSources are delayed until each of them
* terminates.
* @param maxConcurrency the maximum number of active subscriptions to the CompletableSources.
* @return the new Completable instance
*/
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable flatMapCompletable(Function<? super T, ? extends CompletableSource> mapper, boolean delayErrors, int maxConcurrency) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new FlowableFlatMapCompletableCompletable<T>(this, mapper, delayErrors, maxConcurrency));
}

/**
* Returns a Flowable that merges each item emitted by the source Publisher with the values in an
* Iterable corresponding to that item that is generated by a selector.
33 changes: 33 additions & 0 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
@@ -7166,6 +7166,39 @@ public final <U, R> Observable<R> flatMap(Function<? super T, ? extends Observab
return flatMap(mapper, combiner, false, maxConcurrency, bufferSize());
}

/**
* Maps each element of the upstream Observable into CompletableSources, subscribes to them and
* waits until the upstream and all CompletableSources complete.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flatMapCompletable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param mapper the function that received each source value and transforms them into CompletableSources.
* @return the new Completable instance
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable flatMapCompletable(Function<? super T, ? extends CompletableSource> mapper) {
return flatMapCompletable(mapper, false);
}

/**
* Maps each element of the upstream Observable into CompletableSources, subscribes to them and
* waits until the upstream and all CompletableSources complete, optionally delaying all errors.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flatMapCompletable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param mapper the function that received each source value and transforms them into CompletableSources.
* @param delayErrors if true errors from the upstream and inner CompletableSources are delayed until each of them
* terminates.
* @return the new Completable instance
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable flatMapCompletable(Function<? super T, ? extends CompletableSource> mapper, boolean delayErrors) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableFlatMapCompletableCompletable<T>(this, mapper, delayErrors));
}

/**
* Returns an Observable that merges each item emitted by the source ObservableSource with the values in an
* Iterable corresponding to that item that is generated by a selector.
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators.flowable;

import java.util.concurrent.atomic.AtomicReference;

import org.reactivestreams.*;

import io.reactivex.*;
import io.reactivex.disposables.*;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.Function;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.subscriptions.*;
import io.reactivex.internal.util.AtomicThrowable;
import io.reactivex.plugins.RxJavaPlugins;

/**
* Maps a sequence of values into CompletableSources and awaits their termination.
* @param <T> the value type
*/
public final class FlowableFlatMapCompletable<T> extends AbstractFlowableWithUpstream<T, T> {

final Function<? super T, ? extends CompletableSource> mapper;

final int maxConcurrency;

final boolean delayErrors;

public FlowableFlatMapCompletable(Publisher<T> source,
Function<? super T, ? extends CompletableSource> mapper, boolean delayErrors,
int maxConcurrency) {
super(source);
this.mapper = mapper;
this.delayErrors = delayErrors;
this.maxConcurrency = maxConcurrency;
}

@Override
protected void subscribeActual(Subscriber<? super T> observer) {
source.subscribe(new FlatMapCompletableMainSubscriber<T>(observer, mapper, delayErrors, maxConcurrency));
}

static final class FlatMapCompletableMainSubscriber<T> extends BasicIntQueueSubscription<T>
implements Subscriber<T> {
private static final long serialVersionUID = 8443155186132538303L;

final Subscriber<? super T> actual;

final AtomicThrowable errors;

final Function<? super T, ? extends CompletableSource> mapper;

final boolean delayErrors;

final CompositeDisposable set;

final int maxConcurrency;

Subscription s;

public FlatMapCompletableMainSubscriber(Subscriber<? super T> observer,
Function<? super T, ? extends CompletableSource> mapper, boolean delayErrors,
int maxConcurrency) {
this.actual = observer;
this.mapper = mapper;
this.delayErrors = delayErrors;
this.errors = new AtomicThrowable();
this.set = new CompositeDisposable();
this.maxConcurrency = maxConcurrency;
this.lazySet(1);
}

@Override
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.validate(this.s, s)) {
this.s = s;

actual.onSubscribe(this);

int m = maxConcurrency;
if (m == Integer.MAX_VALUE) {
s.request(Long.MAX_VALUE);
} else {
s.request(m);
}
}
}

@Override
public void onNext(T value) {
CompletableSource cs;

try {
cs = ObjectHelper.requireNonNull(mapper.apply(value), "The mapper returned a null CompletableSource");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.cancel();
onError(ex);
return;
}

getAndIncrement();

InnerConsumer inner = new InnerConsumer();

set.add(inner);

cs.subscribe(inner);
}

@Override
public void onError(Throwable e) {
if (errors.addThrowable(e)) {
if (delayErrors) {
if (decrementAndGet() == 0) {
Throwable ex = errors.terminate();
actual.onError(ex);
return;
} else {
if (maxConcurrency != Integer.MAX_VALUE) {
s.request(1);
}
}
} else {
cancel();
if (getAndSet(0) > 0) {
Throwable ex = errors.terminate();
actual.onError(ex);
return;
}
}
} else {
RxJavaPlugins.onError(e);
}
}

@Override
public void onComplete() {
if (decrementAndGet() == 0) {
if (delayErrors) {
Throwable ex = errors.terminate();
if (ex != null) {
actual.onError(ex);
return;
}
}
actual.onComplete();
} else {
if (maxConcurrency != Integer.MAX_VALUE) {
s.request(1);
}
}
}

@Override
public void cancel() {
s.cancel();
set.dispose();
}

@Override
public void request(long n) {
// ignored, no values emitted
}

@Override
public T poll() throws Exception {
return null; // always empty
}

@Override
public boolean isEmpty() {
return true; // always empty
}

@Override
public void clear() {
// nothing to clear
}

@Override
public int requestFusion(int mode) {
return mode & ASYNC;
}

void innerComplete(InnerConsumer inner) {
set.delete(inner);
onComplete();
}

void innerError(InnerConsumer inner, Throwable e) {
set.delete(inner);
onError(e);
}

final class InnerConsumer extends AtomicReference<Disposable> implements CompletableObserver, Disposable {
private static final long serialVersionUID = 8606673141535671828L;

@Override
public void onSubscribe(Disposable d) {
DisposableHelper.setOnce(this, d);
}

@Override
public void onComplete() {
innerComplete(this);
}

@Override
public void onError(Throwable e) {
innerError(this, e);
}

@Override
public void dispose() {
DisposableHelper.dispose(this);
}

@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
}
}
}
Loading