Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Add dematerialize(selector), deprecate old #6281

Merged
merged 3 commits into from
Nov 6, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 68 additions & 5 deletions src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import io.reactivex.internal.fuseable.*;
import io.reactivex.internal.operators.flowable.*;
import io.reactivex.internal.operators.mixed.*;
import io.reactivex.internal.operators.observable.ObservableFromPublisher;
import io.reactivex.internal.operators.observable.*;
import io.reactivex.internal.schedulers.ImmediateThinScheduler;
import io.reactivex.internal.subscribers.*;
import io.reactivex.internal.util.*;
Expand Down Expand Up @@ -8484,13 +8484,15 @@ public final Flowable<T> delaySubscription(long delay, TimeUnit unit, Scheduler
* <pre><code>
* Flowable.just(createOnNext(1), createOnComplete(), createOnNext(2))
* .doOnCancel(() -&gt; System.out.println("Cancelled!"));
* .dematerialize()
* .test()
* .assertResult(1);
* </code></pre>
* If the upstream signals {@code onError} or {@code onComplete} directly, the flow is terminated
* with the same event.
* <pre><code>
* Flowable.just(createOnNext(1), createOnNext(2))
* .dematerialize()
* .test()
* .assertResult(1, 2);
* </code></pre>
Expand All @@ -8508,14 +8510,74 @@ public final Flowable<T> delaySubscription(long delay, TimeUnit unit, Scheduler
* @return a Flowable that emits the items and notifications embedded in the {@link Notification} objects
* emitted by the source Publisher
* @see <a href="http://reactivex.io/documentation/operators/materialize-dematerialize.html">ReactiveX operators documentation: Dematerialize</a>
* @see #dematerialize(Function)
* @deprecated in 2.2.4; inherently type-unsafe as it overrides the output generic type. Use {@link #dematerialize(Function)} instead.
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@Deprecated
@SuppressWarnings({ "unchecked", "rawtypes" })
public final <T2> Flowable<T2> dematerialize() {
@SuppressWarnings("unchecked")
Flowable<Notification<T2>> m = (Flowable<Notification<T2>>)this;
return RxJavaPlugins.onAssembly(new FlowableDematerialize<T2>(m));
return RxJavaPlugins.onAssembly(new FlowableDematerialize(this, Functions.identity()));
}

/**
* Returns a Flowable that reverses the effect of {@link #materialize materialize} by transforming the
* {@link Notification} objects extracted from the source items via a selector function
* into their respective {@code Subscriber} signal types.
* <p>
* <img width="640" height="335" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/dematerialize.png" alt="">
* <p>
* The intended use of the {@code selector} function is to perform a
* type-safe identity mapping (see example) on a source that is already of type
* {@code Notification<T>}. The Java language doesn't allow
* limiting instance methods to a certain generic argument shape, therefore,
* a function is used to ensure the conversion remains type safe.
* <p>
* When the upstream signals an {@link Notification#createOnError(Throwable) onError} or
* {@link Notification#createOnComplete() onComplete} item, the
* returned Flowable cancels of the flow and terminates with that type of terminal event:
* <pre><code>
* Flowable.just(createOnNext(1), createOnComplete(), createOnNext(2))
* .doOnCancel(() -&gt; System.out.println("Canceled!"));
* .dematerialize(notification -&gt; notification)
* .test()
* .assertResult(1);
* </code></pre>
* If the upstream signals {@code onError} or {@code onComplete} directly, the flow is terminated
* with the same event.
* <pre><code>
* Flowable.just(createOnNext(1), createOnNext(2))
* .dematerialize(notification -&gt; notification)
* .test()
* .assertResult(1, 2);
* </code></pre>
* If this behavior is not desired, the completion can be suppressed by applying {@link #concatWith(Publisher)}
* with a {@link #never()} source.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s
* backpressure behavior.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code dematerialize} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <R> the output value type
* @param selector function that returns the upstream item and should return a Notification to signal
* the corresponding {@code Subscriber} event to the downstream.
* @return a Flowable that emits the items and notifications embedded in the {@link Notification} objects
* selected from the items emitted by the source Flowable
* @see <a href="http://reactivex.io/documentation/operators/materialize-dematerialize.html">ReactiveX operators documentation: Dematerialize</a>
* @since 2.2.4 - experimental
*/
@Experimental
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
public final <R> Flowable<R> dematerialize(Function<? super T, Notification<R>> selector) {
ObjectHelper.requireNonNull(selector, "selector is null");
return RxJavaPlugins.onAssembly(new FlowableDematerialize<T, R>(this, selector));
}

/**
Expand Down Expand Up @@ -11069,6 +11131,7 @@ public final <R> Flowable<R> map(Function<? super T, ? extends R> mapper) {
* @return a Flowable that emits items that are the result of materializing the items and notifications
* of the source Publisher
* @see <a href="http://reactivex.io/documentation/operators/materialize-dematerialize.html">ReactiveX operators documentation: Materialize</a>
* @see #dematerialize(Function)
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
Expand Down
65 changes: 62 additions & 3 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -7587,13 +7587,15 @@ public final Observable<T> delaySubscription(long delay, TimeUnit unit, Schedule
* <pre><code>
* Observable.just(createOnNext(1), createOnComplete(), createOnNext(2))
* .doOnDispose(() -&gt; System.out.println("Disposed!"));
* .dematerialize()
* .test()
* .assertResult(1);
* </code></pre>
* If the upstream signals {@code onError} or {@code onComplete} directly, the flow is terminated
* with the same event.
* <pre><code>
* Observable.just(createOnNext(1), createOnNext(2))
* .dematerialize()
* .test()
* .assertResult(1, 2);
* </code></pre>
Expand All @@ -7608,13 +7610,69 @@ public final Observable<T> delaySubscription(long delay, TimeUnit unit, Schedule
* @return an Observable that emits the items and notifications embedded in the {@link Notification} objects
* emitted by the source ObservableSource
* @see <a href="http://reactivex.io/documentation/operators/materialize-dematerialize.html">ReactiveX operators documentation: Dematerialize</a>
* @see #dematerialize(Function)
* @deprecated in 2.2.4; inherently type-unsafe as it overrides the output generic type. Use {@link #dematerialize(Function)} instead.
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@Deprecated
@SuppressWarnings({ "unchecked", "rawtypes" })
public final <T2> Observable<T2> dematerialize() {
@SuppressWarnings("unchecked")
Observable<Notification<T2>> m = (Observable<Notification<T2>>)this;
return RxJavaPlugins.onAssembly(new ObservableDematerialize<T2>(m));
return RxJavaPlugins.onAssembly(new ObservableDematerialize(this, Functions.identity()));
}

/**
* Returns an Observable that reverses the effect of {@link #materialize materialize} by transforming the
* {@link Notification} objects extracted from the source items via a selector function
* into their respective {@code Observer} signal types.
* <p>
* <img width="640" height="335" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/dematerialize.png" alt="">
* <p>
* The intended use of the {@code selector} function is to perform a
* type-safe identity mapping (see example) on a source that is already of type
* {@code Notification<T>}. The Java language doesn't allow
* limiting instance methods to a certain generic argument shape, therefore,
* a function is used to ensure the conversion remains type safe.
* <p>
* When the upstream signals an {@link Notification#createOnError(Throwable) onError} or
* {@link Notification#createOnComplete() onComplete} item, the
* returned Observable disposes of the flow and terminates with that type of terminal event:
* <pre><code>
* Observable.just(createOnNext(1), createOnComplete(), createOnNext(2))
* .doOnDispose(() -&gt; System.out.println("Disposed!"));
* .dematerialize(notification -&gt; notification)
* .test()
* .assertResult(1);
* </code></pre>
* If the upstream signals {@code onError} or {@code onComplete} directly, the flow is terminated
* with the same event.
* <pre><code>
* Observable.just(createOnNext(1), createOnNext(2))
* .dematerialize(notification -&gt; notification)
* .test()
* .assertResult(1, 2);
* </code></pre>
* If this behavior is not desired, the completion can be suppressed by applying {@link #concatWith(ObservableSource)}
* with a {@link #never()} source.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code dematerialize} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <R> the output value type
* @param selector function that returns the upstream item and should return a Notification to signal
* the corresponding {@code Observer} event to the downstream.
* @return an Observable that emits the items and notifications embedded in the {@link Notification} objects
* selected from the items emitted by the source ObservableSource
* @see <a href="http://reactivex.io/documentation/operators/materialize-dematerialize.html">ReactiveX operators documentation: Dematerialize</a>
* @since 2.2.4 - experimental
*/
@Experimental
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> dematerialize(Function<? super T, Notification<R>> selector) {
ObjectHelper.requireNonNull(selector, "selector is null");
return RxJavaPlugins.onAssembly(new ObservableDematerialize<T, R>(this, selector));
}

/**
Expand Down Expand Up @@ -9620,6 +9678,7 @@ public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
* @return an Observable that emits items that are the result of materializing the items and notifications
* of the source ObservableSource
* @see <a href="http://reactivex.io/documentation/operators/materialize-dematerialize.html">ReactiveX operators documentation: Materialize</a>
* @see #dematerialize(Function)
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,39 @@
import org.reactivestreams.*;

import io.reactivex.*;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.Function;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.subscriptions.SubscriptionHelper;
import io.reactivex.plugins.RxJavaPlugins;

public final class FlowableDematerialize<T> extends AbstractFlowableWithUpstream<Notification<T>, T> {
public final class FlowableDematerialize<T, R> extends AbstractFlowableWithUpstream<T, R> {

public FlowableDematerialize(Flowable<Notification<T>> source) {
final Function<? super T, ? extends Notification<R>> selector;

public FlowableDematerialize(Flowable<T> source, Function<? super T, ? extends Notification<R>> selector) {
super(source);
this.selector = selector;
}

@Override
protected void subscribeActual(Subscriber<? super T> s) {
source.subscribe(new DematerializeSubscriber<T>(s));
protected void subscribeActual(Subscriber<? super R> subscriber) {
source.subscribe(new DematerializeSubscriber<T, R>(subscriber, selector));
}

static final class DematerializeSubscriber<T> implements FlowableSubscriber<Notification<T>>, Subscription {
final Subscriber<? super T> downstream;
static final class DematerializeSubscriber<T, R> implements FlowableSubscriber<T>, Subscription {

final Subscriber<? super R> downstream;

final Function<? super T, ? extends Notification<R>> selector;

boolean done;

Subscription upstream;

DematerializeSubscriber(Subscriber<? super T> downstream) {
DematerializeSubscriber(Subscriber<? super R> downstream, Function<? super T, ? extends Notification<R>> selector) {
this.downstream = downstream;
this.selector = selector;
}

@Override
Expand All @@ -50,22 +60,35 @@ public void onSubscribe(Subscription s) {
}

@Override
public void onNext(Notification<T> t) {
public void onNext(T item) {
if (done) {
if (t.isOnError()) {
RxJavaPlugins.onError(t.getError());
if (item instanceof Notification) {
Notification<?> notification = (Notification<?>)item;
if (notification.isOnError()) {
RxJavaPlugins.onError(notification.getError());
}
}
return;
}
if (t.isOnError()) {

Notification<R> notification;

try {
notification = ObjectHelper.requireNonNull(selector.apply(item), "The selector returned a null Notification");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
upstream.cancel();
onError(t.getError());
onError(ex);
return;
}
else if (t.isOnComplete()) {
if (notification.isOnError()) {
upstream.cancel();
onError(notification.getError());
} else if (notification.isOnComplete()) {
upstream.cancel();
onComplete();
} else {
downstream.onNext(t.getValue());
downstream.onNext(notification.getValue());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,38 @@

import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.Function;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.plugins.RxJavaPlugins;

public final class ObservableDematerialize<T> extends AbstractObservableWithUpstream<Notification<T>, T> {
public final class ObservableDematerialize<T, R> extends AbstractObservableWithUpstream<T, R> {

public ObservableDematerialize(ObservableSource<Notification<T>> source) {
final Function<? super T, ? extends Notification<R>> selector;

public ObservableDematerialize(ObservableSource<T> source, Function<? super T, ? extends Notification<R>> selector) {
super(source);
this.selector = selector;
}

@Override
public void subscribeActual(Observer<? super T> t) {
source.subscribe(new DematerializeObserver<T>(t));
public void subscribeActual(Observer<? super R> observer) {
source.subscribe(new DematerializeObserver<T, R>(observer, selector));
}

static final class DematerializeObserver<T> implements Observer<Notification<T>>, Disposable {
final Observer<? super T> downstream;
static final class DematerializeObserver<T, R> implements Observer<T>, Disposable {
final Observer<? super R> downstream;

final Function<? super T, ? extends Notification<R>> selector;

boolean done;

Disposable upstream;

DematerializeObserver(Observer<? super T> downstream) {
DematerializeObserver(Observer<? super R> downstream, Function<? super T, ? extends Notification<R>> selector) {
this.downstream = downstream;
this.selector = selector;
}

@Override
Expand All @@ -60,22 +69,36 @@ public boolean isDisposed() {
}

@Override
public void onNext(Notification<T> t) {
public void onNext(T item) {
if (done) {
if (t.isOnError()) {
RxJavaPlugins.onError(t.getError());
if (item instanceof Notification) {
Notification<?> notification = (Notification<?>)item;
if (notification.isOnError()) {
RxJavaPlugins.onError(notification.getError());
}
}
return;
}
if (t.isOnError()) {

Notification<R> notification;

try {
notification = ObjectHelper.requireNonNull(selector.apply(item), "The selector returned a null Notification");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
upstream.dispose();
onError(ex);
return;
}
if (notification.isOnError()) {
upstream.dispose();
onError(t.getError());
onError(notification.getError());
}
else if (t.isOnComplete()) {
else if (notification.isOnComplete()) {
upstream.dispose();
onComplete();
} else {
downstream.onNext(t.getValue());
downstream.onNext(notification.getValue());
}
}

Expand Down
Loading