Skip to content

Commit acd5466

Browse files
authoredNov 6, 2018
2.x: Add dematerialize(selector), deprecate old (#6281)
* 2.x: Add dematerialize(selector), deprecate old * Restore full coverage * Fix parameter naming
1 parent bc9d594 commit acd5466

File tree

8 files changed

+335
-41
lines changed

8 files changed

+335
-41
lines changed
 

‎src/main/java/io/reactivex/Flowable.java

+68-5
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import io.reactivex.internal.fuseable.*;
2828
import io.reactivex.internal.operators.flowable.*;
2929
import io.reactivex.internal.operators.mixed.*;
30-
import io.reactivex.internal.operators.observable.ObservableFromPublisher;
30+
import io.reactivex.internal.operators.observable.*;
3131
import io.reactivex.internal.schedulers.ImmediateThinScheduler;
3232
import io.reactivex.internal.subscribers.*;
3333
import io.reactivex.internal.util.*;
@@ -8484,13 +8484,15 @@ public final Flowable<T> delaySubscription(long delay, TimeUnit unit, Scheduler
84848484
* <pre><code>
84858485
* Flowable.just(createOnNext(1), createOnComplete(), createOnNext(2))
84868486
* .doOnCancel(() -&gt; System.out.println("Cancelled!"));
8487+
* .dematerialize()
84878488
* .test()
84888489
* .assertResult(1);
84898490
* </code></pre>
84908491
* If the upstream signals {@code onError} or {@code onComplete} directly, the flow is terminated
84918492
* with the same event.
84928493
* <pre><code>
84938494
* Flowable.just(createOnNext(1), createOnNext(2))
8495+
* .dematerialize()
84948496
* .test()
84958497
* .assertResult(1, 2);
84968498
* </code></pre>
@@ -8508,14 +8510,74 @@ public final Flowable<T> delaySubscription(long delay, TimeUnit unit, Scheduler
85088510
* @return a Flowable that emits the items and notifications embedded in the {@link Notification} objects
85098511
* emitted by the source Publisher
85108512
* @see <a href="http://reactivex.io/documentation/operators/materialize-dematerialize.html">ReactiveX operators documentation: Dematerialize</a>
8513+
* @see #dematerialize(Function)
8514+
* @deprecated in 2.2.4; inherently type-unsafe as it overrides the output generic type. Use {@link #dematerialize(Function)} instead.
85118515
*/
85128516
@CheckReturnValue
8513-
@BackpressureSupport(BackpressureKind.FULL)
85148517
@SchedulerSupport(SchedulerSupport.NONE)
8518+
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
8519+
@Deprecated
8520+
@SuppressWarnings({ "unchecked", "rawtypes" })
85158521
public final <T2> Flowable<T2> dematerialize() {
8516-
@SuppressWarnings("unchecked")
8517-
Flowable<Notification<T2>> m = (Flowable<Notification<T2>>)this;
8518-
return RxJavaPlugins.onAssembly(new FlowableDematerialize<T2>(m));
8522+
return RxJavaPlugins.onAssembly(new FlowableDematerialize(this, Functions.identity()));
8523+
}
8524+
8525+
/**
8526+
* Returns a Flowable that reverses the effect of {@link #materialize materialize} by transforming the
8527+
* {@link Notification} objects extracted from the source items via a selector function
8528+
* into their respective {@code Subscriber} signal types.
8529+
* <p>
8530+
* <img width="640" height="335" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/dematerialize.png" alt="">
8531+
* <p>
8532+
* The intended use of the {@code selector} function is to perform a
8533+
* type-safe identity mapping (see example) on a source that is already of type
8534+
* {@code Notification<T>}. The Java language doesn't allow
8535+
* limiting instance methods to a certain generic argument shape, therefore,
8536+
* a function is used to ensure the conversion remains type safe.
8537+
* <p>
8538+
* When the upstream signals an {@link Notification#createOnError(Throwable) onError} or
8539+
* {@link Notification#createOnComplete() onComplete} item, the
8540+
* returned Flowable cancels of the flow and terminates with that type of terminal event:
8541+
* <pre><code>
8542+
* Flowable.just(createOnNext(1), createOnComplete(), createOnNext(2))
8543+
* .doOnCancel(() -&gt; System.out.println("Canceled!"));
8544+
* .dematerialize(notification -&gt; notification)
8545+
* .test()
8546+
* .assertResult(1);
8547+
* </code></pre>
8548+
* If the upstream signals {@code onError} or {@code onComplete} directly, the flow is terminated
8549+
* with the same event.
8550+
* <pre><code>
8551+
* Flowable.just(createOnNext(1), createOnNext(2))
8552+
* .dematerialize(notification -&gt; notification)
8553+
* .test()
8554+
* .assertResult(1, 2);
8555+
* </code></pre>
8556+
* If this behavior is not desired, the completion can be suppressed by applying {@link #concatWith(Publisher)}
8557+
* with a {@link #never()} source.
8558+
* <dl>
8559+
* <dt><b>Backpressure:</b></dt>
8560+
* <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s
8561+
* backpressure behavior.</dd>
8562+
* <dt><b>Scheduler:</b></dt>
8563+
* <dd>{@code dematerialize} does not operate by default on a particular {@link Scheduler}.</dd>
8564+
* </dl>
8565+
*
8566+
* @param <R> the output value type
8567+
* @param selector function that returns the upstream item and should return a Notification to signal
8568+
* the corresponding {@code Subscriber} event to the downstream.
8569+
* @return a Flowable that emits the items and notifications embedded in the {@link Notification} objects
8570+
* selected from the items emitted by the source Flowable
8571+
* @see <a href="http://reactivex.io/documentation/operators/materialize-dematerialize.html">ReactiveX operators documentation: Dematerialize</a>
8572+
* @since 2.2.4 - experimental
8573+
*/
8574+
@Experimental
8575+
@CheckReturnValue
8576+
@SchedulerSupport(SchedulerSupport.NONE)
8577+
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
8578+
public final <R> Flowable<R> dematerialize(Function<? super T, Notification<R>> selector) {
8579+
ObjectHelper.requireNonNull(selector, "selector is null");
8580+
return RxJavaPlugins.onAssembly(new FlowableDematerialize<T, R>(this, selector));
85198581
}
85208582

85218583
/**
@@ -11069,6 +11131,7 @@ public final <R> Flowable<R> map(Function<? super T, ? extends R> mapper) {
1106911131
* @return a Flowable that emits items that are the result of materializing the items and notifications
1107011132
* of the source Publisher
1107111133
* @see <a href="http://reactivex.io/documentation/operators/materialize-dematerialize.html">ReactiveX operators documentation: Materialize</a>
11134+
* @see #dematerialize(Function)
1107211135
*/
1107311136
@CheckReturnValue
1107411137
@BackpressureSupport(BackpressureKind.FULL)

‎src/main/java/io/reactivex/Observable.java

+62-3
Original file line numberDiff line numberDiff line change
@@ -7587,13 +7587,15 @@ public final Observable<T> delaySubscription(long delay, TimeUnit unit, Schedule
75877587
* <pre><code>
75887588
* Observable.just(createOnNext(1), createOnComplete(), createOnNext(2))
75897589
* .doOnDispose(() -&gt; System.out.println("Disposed!"));
7590+
* .dematerialize()
75907591
* .test()
75917592
* .assertResult(1);
75927593
* </code></pre>
75937594
* If the upstream signals {@code onError} or {@code onComplete} directly, the flow is terminated
75947595
* with the same event.
75957596
* <pre><code>
75967597
* Observable.just(createOnNext(1), createOnNext(2))
7598+
* .dematerialize()
75977599
* .test()
75987600
* .assertResult(1, 2);
75997601
* </code></pre>
@@ -7608,13 +7610,69 @@ public final Observable<T> delaySubscription(long delay, TimeUnit unit, Schedule
76087610
* @return an Observable that emits the items and notifications embedded in the {@link Notification} objects
76097611
* emitted by the source ObservableSource
76107612
* @see <a href="http://reactivex.io/documentation/operators/materialize-dematerialize.html">ReactiveX operators documentation: Dematerialize</a>
7613+
* @see #dematerialize(Function)
7614+
* @deprecated in 2.2.4; inherently type-unsafe as it overrides the output generic type. Use {@link #dematerialize(Function)} instead.
76117615
*/
76127616
@CheckReturnValue
76137617
@SchedulerSupport(SchedulerSupport.NONE)
7618+
@Deprecated
7619+
@SuppressWarnings({ "unchecked", "rawtypes" })
76147620
public final <T2> Observable<T2> dematerialize() {
7615-
@SuppressWarnings("unchecked")
7616-
Observable<Notification<T2>> m = (Observable<Notification<T2>>)this;
7617-
return RxJavaPlugins.onAssembly(new ObservableDematerialize<T2>(m));
7621+
return RxJavaPlugins.onAssembly(new ObservableDematerialize(this, Functions.identity()));
7622+
}
7623+
7624+
/**
7625+
* Returns an Observable that reverses the effect of {@link #materialize materialize} by transforming the
7626+
* {@link Notification} objects extracted from the source items via a selector function
7627+
* into their respective {@code Observer} signal types.
7628+
* <p>
7629+
* <img width="640" height="335" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/dematerialize.png" alt="">
7630+
* <p>
7631+
* The intended use of the {@code selector} function is to perform a
7632+
* type-safe identity mapping (see example) on a source that is already of type
7633+
* {@code Notification<T>}. The Java language doesn't allow
7634+
* limiting instance methods to a certain generic argument shape, therefore,
7635+
* a function is used to ensure the conversion remains type safe.
7636+
* <p>
7637+
* When the upstream signals an {@link Notification#createOnError(Throwable) onError} or
7638+
* {@link Notification#createOnComplete() onComplete} item, the
7639+
* returned Observable disposes of the flow and terminates with that type of terminal event:
7640+
* <pre><code>
7641+
* Observable.just(createOnNext(1), createOnComplete(), createOnNext(2))
7642+
* .doOnDispose(() -&gt; System.out.println("Disposed!"));
7643+
* .dematerialize(notification -&gt; notification)
7644+
* .test()
7645+
* .assertResult(1);
7646+
* </code></pre>
7647+
* If the upstream signals {@code onError} or {@code onComplete} directly, the flow is terminated
7648+
* with the same event.
7649+
* <pre><code>
7650+
* Observable.just(createOnNext(1), createOnNext(2))
7651+
* .dematerialize(notification -&gt; notification)
7652+
* .test()
7653+
* .assertResult(1, 2);
7654+
* </code></pre>
7655+
* If this behavior is not desired, the completion can be suppressed by applying {@link #concatWith(ObservableSource)}
7656+
* with a {@link #never()} source.
7657+
* <dl>
7658+
* <dt><b>Scheduler:</b></dt>
7659+
* <dd>{@code dematerialize} does not operate by default on a particular {@link Scheduler}.</dd>
7660+
* </dl>
7661+
*
7662+
* @param <R> the output value type
7663+
* @param selector function that returns the upstream item and should return a Notification to signal
7664+
* the corresponding {@code Observer} event to the downstream.
7665+
* @return an Observable that emits the items and notifications embedded in the {@link Notification} objects
7666+
* selected from the items emitted by the source ObservableSource
7667+
* @see <a href="http://reactivex.io/documentation/operators/materialize-dematerialize.html">ReactiveX operators documentation: Dematerialize</a>
7668+
* @since 2.2.4 - experimental
7669+
*/
7670+
@Experimental
7671+
@CheckReturnValue
7672+
@SchedulerSupport(SchedulerSupport.NONE)
7673+
public final <R> Observable<R> dematerialize(Function<? super T, Notification<R>> selector) {
7674+
ObjectHelper.requireNonNull(selector, "selector is null");
7675+
return RxJavaPlugins.onAssembly(new ObservableDematerialize<T, R>(this, selector));
76187676
}
76197677

76207678
/**
@@ -9620,6 +9678,7 @@ public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
96209678
* @return an Observable that emits items that are the result of materializing the items and notifications
96219679
* of the source ObservableSource
96229680
* @see <a href="http://reactivex.io/documentation/operators/materialize-dematerialize.html">ReactiveX operators documentation: Materialize</a>
9681+
* @see #dematerialize(Function)
96239682
*/
96249683
@CheckReturnValue
96259684
@SchedulerSupport(SchedulerSupport.NONE)

‎src/main/java/io/reactivex/internal/operators/flowable/FlowableDematerialize.java

+37-14
Original file line numberDiff line numberDiff line change
@@ -16,29 +16,39 @@
1616
import org.reactivestreams.*;
1717

1818
import io.reactivex.*;
19+
import io.reactivex.exceptions.Exceptions;
20+
import io.reactivex.functions.Function;
21+
import io.reactivex.internal.functions.ObjectHelper;
1922
import io.reactivex.internal.subscriptions.SubscriptionHelper;
2023
import io.reactivex.plugins.RxJavaPlugins;
2124

22-
public final class FlowableDematerialize<T> extends AbstractFlowableWithUpstream<Notification<T>, T> {
25+
public final class FlowableDematerialize<T, R> extends AbstractFlowableWithUpstream<T, R> {
2326

24-
public FlowableDematerialize(Flowable<Notification<T>> source) {
27+
final Function<? super T, ? extends Notification<R>> selector;
28+
29+
public FlowableDematerialize(Flowable<T> source, Function<? super T, ? extends Notification<R>> selector) {
2530
super(source);
31+
this.selector = selector;
2632
}
2733

2834
@Override
29-
protected void subscribeActual(Subscriber<? super T> s) {
30-
source.subscribe(new DematerializeSubscriber<T>(s));
35+
protected void subscribeActual(Subscriber<? super R> subscriber) {
36+
source.subscribe(new DematerializeSubscriber<T, R>(subscriber, selector));
3137
}
3238

33-
static final class DematerializeSubscriber<T> implements FlowableSubscriber<Notification<T>>, Subscription {
34-
final Subscriber<? super T> downstream;
39+
static final class DematerializeSubscriber<T, R> implements FlowableSubscriber<T>, Subscription {
40+
41+
final Subscriber<? super R> downstream;
42+
43+
final Function<? super T, ? extends Notification<R>> selector;
3544

3645
boolean done;
3746

3847
Subscription upstream;
3948

40-
DematerializeSubscriber(Subscriber<? super T> downstream) {
49+
DematerializeSubscriber(Subscriber<? super R> downstream, Function<? super T, ? extends Notification<R>> selector) {
4150
this.downstream = downstream;
51+
this.selector = selector;
4252
}
4353

4454
@Override
@@ -50,22 +60,35 @@ public void onSubscribe(Subscription s) {
5060
}
5161

5262
@Override
53-
public void onNext(Notification<T> t) {
63+
public void onNext(T item) {
5464
if (done) {
55-
if (t.isOnError()) {
56-
RxJavaPlugins.onError(t.getError());
65+
if (item instanceof Notification) {
66+
Notification<?> notification = (Notification<?>)item;
67+
if (notification.isOnError()) {
68+
RxJavaPlugins.onError(notification.getError());
69+
}
5770
}
5871
return;
5972
}
60-
if (t.isOnError()) {
73+
74+
Notification<R> notification;
75+
76+
try {
77+
notification = ObjectHelper.requireNonNull(selector.apply(item), "The selector returned a null Notification");
78+
} catch (Throwable ex) {
79+
Exceptions.throwIfFatal(ex);
6180
upstream.cancel();
62-
onError(t.getError());
81+
onError(ex);
82+
return;
6383
}
64-
else if (t.isOnComplete()) {
84+
if (notification.isOnError()) {
85+
upstream.cancel();
86+
onError(notification.getError());
87+
} else if (notification.isOnComplete()) {
6588
upstream.cancel();
6689
onComplete();
6790
} else {
68-
downstream.onNext(t.getValue());
91+
downstream.onNext(notification.getValue());
6992
}
7093
}
7194

‎src/main/java/io/reactivex/internal/operators/observable/ObservableDematerialize.java

+37-14
Original file line numberDiff line numberDiff line change
@@ -15,29 +15,38 @@
1515

1616
import io.reactivex.*;
1717
import io.reactivex.disposables.Disposable;
18+
import io.reactivex.exceptions.Exceptions;
19+
import io.reactivex.functions.Function;
1820
import io.reactivex.internal.disposables.DisposableHelper;
21+
import io.reactivex.internal.functions.ObjectHelper;
1922
import io.reactivex.plugins.RxJavaPlugins;
2023

21-
public final class ObservableDematerialize<T> extends AbstractObservableWithUpstream<Notification<T>, T> {
24+
public final class ObservableDematerialize<T, R> extends AbstractObservableWithUpstream<T, R> {
2225

23-
public ObservableDematerialize(ObservableSource<Notification<T>> source) {
26+
final Function<? super T, ? extends Notification<R>> selector;
27+
28+
public ObservableDematerialize(ObservableSource<T> source, Function<? super T, ? extends Notification<R>> selector) {
2429
super(source);
30+
this.selector = selector;
2531
}
2632

2733
@Override
28-
public void subscribeActual(Observer<? super T> t) {
29-
source.subscribe(new DematerializeObserver<T>(t));
34+
public void subscribeActual(Observer<? super R> observer) {
35+
source.subscribe(new DematerializeObserver<T, R>(observer, selector));
3036
}
3137

32-
static final class DematerializeObserver<T> implements Observer<Notification<T>>, Disposable {
33-
final Observer<? super T> downstream;
38+
static final class DematerializeObserver<T, R> implements Observer<T>, Disposable {
39+
final Observer<? super R> downstream;
40+
41+
final Function<? super T, ? extends Notification<R>> selector;
3442

3543
boolean done;
3644

3745
Disposable upstream;
3846

39-
DematerializeObserver(Observer<? super T> downstream) {
47+
DematerializeObserver(Observer<? super R> downstream, Function<? super T, ? extends Notification<R>> selector) {
4048
this.downstream = downstream;
49+
this.selector = selector;
4150
}
4251

4352
@Override
@@ -60,22 +69,36 @@ public boolean isDisposed() {
6069
}
6170

6271
@Override
63-
public void onNext(Notification<T> t) {
72+
public void onNext(T item) {
6473
if (done) {
65-
if (t.isOnError()) {
66-
RxJavaPlugins.onError(t.getError());
74+
if (item instanceof Notification) {
75+
Notification<?> notification = (Notification<?>)item;
76+
if (notification.isOnError()) {
77+
RxJavaPlugins.onError(notification.getError());
78+
}
6779
}
6880
return;
6981
}
70-
if (t.isOnError()) {
82+
83+
Notification<R> notification;
84+
85+
try {
86+
notification = ObjectHelper.requireNonNull(selector.apply(item), "The selector returned a null Notification");
87+
} catch (Throwable ex) {
88+
Exceptions.throwIfFatal(ex);
89+
upstream.dispose();
90+
onError(ex);
91+
return;
92+
}
93+
if (notification.isOnError()) {
7194
upstream.dispose();
72-
onError(t.getError());
95+
onError(notification.getError());
7396
}
74-
else if (t.isOnComplete()) {
97+
else if (notification.isOnComplete()) {
7598
upstream.dispose();
7699
onComplete();
77100
} else {
78-
downstream.onNext(t.getValue());
101+
downstream.onNext(notification.getValue());
79102
}
80103
}
81104

0 commit comments

Comments
 (0)
Please sign in to comment.