Skip to content

Commit a6bbf46

Browse files
authored
2.x: remove Try+Optional, introduce Notification (#4370)
1 parent ec4eb4f commit a6bbf46

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+502
-749
lines changed

src/main/java/io/reactivex/Flowable.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -6816,7 +6816,7 @@ public final Flowable<T> delaySubscription(long delay, TimeUnit unit, Scheduler
68166816
@SchedulerSupport(SchedulerSupport.NONE)
68176817
public final <T2> Flowable<T2> dematerialize() {
68186818
@SuppressWarnings("unchecked")
6819-
Flowable<Try<Optional<T2>>> m = (Flowable<Try<Optional<T2>>>)this;
6819+
Flowable<Notification<T2>> m = (Flowable<Notification<T2>>)this;
68206820
return new FlowableDematerialize<T2>(m);
68216821
}
68226822

@@ -7108,7 +7108,7 @@ private Flowable<T> doOnEach(Consumer<? super T> onNext, Consumer<? super Throwa
71087108
*/
71097109
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
71107110
@SchedulerSupport(SchedulerSupport.NONE)
7111-
public final Flowable<T> doOnEach(final Consumer<? super Try<Optional<T>>> onNotification) {
7111+
public final Flowable<T> doOnEach(final Consumer<? super Notification<T>> onNotification) {
71127112
Objects.requireNonNull(onNotification, "consumer is null");
71137113
return doOnEach(
71147114
Functions.notificationOnNext(onNotification),
@@ -8670,7 +8670,7 @@ public final <R> Flowable<R> map(Function<? super T, ? extends R> mapper) {
86708670
*/
86718671
@BackpressureSupport(BackpressureKind.FULL)
86728672
@SchedulerSupport(SchedulerSupport.NONE)
8673-
public final Flowable<Try<Optional<T>>> materialize() {
8673+
public final Flowable<Notification<T>> materialize() {
86748674
return new FlowableMaterialize<T>(this);
86758675
}
86768676

src/main/java/io/reactivex/Notification.java

+115-27
Original file line numberDiff line numberDiff line change
@@ -14,54 +14,142 @@
1414
package io.reactivex;
1515

1616
import io.reactivex.internal.functions.Objects;
17+
import io.reactivex.internal.util.NotificationLite;
1718

1819
/**
19-
* Utility class to help construct notification objects.
20+
* Represents the reactive signal types: onNext, onError and onComplete and
21+
* holds their parameter values (a value, a Throwable, nothing).
22+
* @param <T> the value type
2023
*/
21-
public final class Notification {
22-
static final Try<Optional<Object>> COMPLETE = Try.ofValue(Optional.<Object>empty());
24+
public final class Notification<T> {
2325

24-
private Notification() {
25-
throw new IllegalStateException();
26+
final Object value;
27+
28+
/** Not meant to be implemented externally. */
29+
private Notification(Object value) {
30+
this.value = value;
2631
}
2732

28-
@SuppressWarnings({ "rawtypes", "unchecked" })
29-
public static <T> Try<Optional<T>> complete() {
30-
return (Try)COMPLETE;
33+
/**
34+
* Returns true if this notification is an onComplete signal.
35+
* @return true if this notification is an onComplete signal
36+
*/
37+
public boolean isOnComplete() {
38+
return value == null;
3139
}
3240

33-
@SuppressWarnings({ "rawtypes", "unchecked" })
34-
public static <T> Try<Optional<T>> error(Throwable e) {
35-
return (Try)Try.ofError(e);
41+
/**
42+
* Returns true if this notification is an onError signal and
43+
* {@link #getError()} returns the contained Throwable.
44+
* @return true if this notification is an onError signal
45+
* @see #getError()
46+
*/
47+
public boolean isOnError() {
48+
return NotificationLite.isError(value);
3649
}
3750

38-
public static <T> Try<Optional<T>> next(T value) {
39-
Objects.requireNonNull(value, "value is null"); // TODO this coud instead return an error of NPE
40-
return Try.ofValue(Optional.of(value));
51+
/**
52+
* Returns true if this notification is an onNext signal and
53+
* {@link #getValue()} returns the contained value.
54+
* @return true if this notification is an onNext signal
55+
* @see #getValue()
56+
*/
57+
public boolean isOnNext() {
58+
Object o = value;
59+
return o != null && !NotificationLite.isError(o);
4160
}
4261

43-
public static <T> boolean isNext(Try<Optional<T>> notification) {
44-
if (notification.hasValue()) {
45-
return notification.value().isPresent();
62+
/**
63+
* Returns the contained value if this notification is an onNext
64+
* signal, null otherwise.
65+
* @return the value contained or null
66+
* @see #isOnNext()
67+
*/
68+
@SuppressWarnings("unchecked")
69+
public T getValue() {
70+
Object o = value;
71+
if (o != null && !NotificationLite.isError(o)) {
72+
return (T)value;
4673
}
47-
return false;
74+
return null;
75+
}
76+
77+
/**
78+
* Returns the container Throwable error if this notification is an onError
79+
* signal, null otherwise.
80+
* @return the Throwable error contained or null
81+
* @see #isOnError()
82+
*/
83+
public Throwable getError() {
84+
Object o = value;
85+
if (NotificationLite.isError(o)) {
86+
return NotificationLite.getError(o);
87+
}
88+
return null;
4889
}
4990

50-
public static <T> boolean isComplete(Try<Optional<T>> notification) {
51-
if (notification.hasValue()) {
52-
return !notification.value().isPresent();
91+
@Override
92+
public boolean equals(Object obj) {
93+
if (obj instanceof Notification) {
94+
Notification<?> n = (Notification<?>) obj;
95+
return Objects.equals(value, n.value);
5396
}
5497
return false;
5598
}
5699

57-
public static <T> boolean isError(Try<Optional<T>> notification) {
58-
return notification.hasError();
100+
@Override
101+
public int hashCode() {
102+
Object o = value;
103+
return o != null ? o.hashCode() : 0;
59104
}
60105

61-
public static <T> T getValue(Try<Optional<T>> notification) {
62-
if (notification.hasValue()) {
63-
return notification.value.get();
106+
@Override
107+
public String toString() {
108+
Object o = value;
109+
if (o == null) {
110+
return "OnCompleteNotification";
64111
}
65-
return null;
112+
if (NotificationLite.isError(o)) {
113+
return "OnErrorNotification[" + NotificationLite.getError(o) + "]";
114+
}
115+
return "OnNextNotification[" + value + "]";
116+
}
117+
118+
/**
119+
* Constructs an onNext notification containing the given value.
120+
* @param <T> the value type
121+
* @param value the value to carry around in the notification, not null
122+
* @return the new Notification instance
123+
* @throws NullPointerException if value is null
124+
*/
125+
public static <T> Notification<T> createOnNext(T value) {
126+
Objects.requireNonNull(value, "value is null");
127+
return new Notification<T>(value);
128+
}
129+
130+
/**
131+
* Constructs an onError notification containing the error.
132+
* @param <T> the value type
133+
* @param error the error Throwable to carry around in the notification, not null
134+
* @return the new Notification instance
135+
* @throws NullPointerException if error is null
136+
*/
137+
public static <T> Notification<T> createOnError(Throwable error) {
138+
Objects.requireNonNull(error, "error is null");
139+
return new Notification<T>(NotificationLite.error(error));
140+
}
141+
142+
/**
143+
* Returns the empty and stateless shared instance of a notification representing
144+
* an onComplete signal.
145+
* @param <T> the target value type
146+
* @return the shared Notification instance representing an onComplete signal
147+
*/
148+
@SuppressWarnings("unchecked")
149+
public static <T> Notification<T> createOnComplete() {
150+
return (Notification<T>)COMPLETE;
66151
}
152+
153+
/** The singleton instance for createOnComplete. */
154+
static final Notification<Object> COMPLETE = new Notification<Object>(null);
67155
}

src/main/java/io/reactivex/Observable.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -6043,7 +6043,7 @@ public final Observable<T> delaySubscription(long delay, TimeUnit unit, Schedule
60436043
@SchedulerSupport(SchedulerSupport.NONE)
60446044
public final <T2> Observable<T2> dematerialize() {
60456045
@SuppressWarnings("unchecked")
6046-
Observable<Try<Optional<T2>>> m = (Observable<Try<Optional<T2>>>)this;
6046+
Observable<Notification<T2>> m = (Observable<Notification<T2>>)this;
60476047
return new ObservableDematerialize<T2>(m);
60486048
}
60496049

@@ -6288,7 +6288,7 @@ private Observable<T> doOnEach(Consumer<? super T> onNext, Consumer<? super Thro
62886288
* @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
62896289
*/
62906290
@SchedulerSupport(SchedulerSupport.NONE)
6291-
public final Observable<T> doOnEach(final Consumer<? super Try<Optional<T>>> onNotification) {
6291+
public final Observable<T> doOnEach(final Consumer<? super Notification<T>> onNotification) {
62926292
Objects.requireNonNull(onNotification, "consumer is null");
62936293
return doOnEach(
62946294
Functions.notificationOnNext(onNotification),
@@ -7576,7 +7576,7 @@ public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
75767576
* @see <a href="http://reactivex.io/documentation/operators/materialize-dematerialize.html">ReactiveX operators documentation: Materialize</a>
75777577
*/
75787578
@SchedulerSupport(SchedulerSupport.NONE)
7579-
public final Observable<Try<Optional<T>>> materialize() {
7579+
public final Observable<Notification<T>> materialize() {
75807580
return new ObservableMaterialize<T>(this);
75817581
}
75827582

src/main/java/io/reactivex/Optional.java

-83
This file was deleted.

0 commit comments

Comments
 (0)