Skip to content

Commit 5328eaf

Browse files
author
akarnokd
committed
Operator materialize, dematerialize, minor notification fix.
1 parent 640e3c1 commit 5328eaf

File tree

6 files changed

+266
-25
lines changed

6 files changed

+266
-25
lines changed

src/main/java/io/reactivex/Notification.java

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -24,22 +24,6 @@ private Notification() {
2424
}
2525

2626
static final Try<Optional<?>> COMPLETE = Try.ofValue(Optional.empty());
27-
static final Try<Optional<?>> NULL = Try.ofValue(Optional.ofNullable(null));
28-
29-
/**
30-
* Returns a null notification.
31-
*
32-
* <p>Note that null values are generally forbidden.
33-
* Check the operator documentation to see if a null notification is
34-
* accepted or not.
35-
*
36-
* @return the null notification instance
37-
*/
38-
39-
@SuppressWarnings({ "rawtypes", "unchecked" })
40-
public static <T> Try<Optional<T>> ofNull() {
41-
return (Try)NULL; // because generics
42-
}
4327

4428
@SuppressWarnings({ "rawtypes", "unchecked" })
4529
public static <T> Try<Optional<T>> complete() {

src/main/java/io/reactivex/Observable.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1145,15 +1145,13 @@ public final Observable<Timed<T>> timestamp(TimeUnit unit, Scheduler scheduler)
11451145
}
11461146

11471147
public final Observable<Try<Optional<T>>> materialize() {
1148-
// TODO implement
1149-
throw new UnsupportedOperationException();
1148+
return lift(OperatorMaterialize.instance());
11501149
}
11511150

1152-
// @SuppressWarnings("unchecked")
11531151
public final Observable<T> dematerialize() {
1154-
// Observable<Try<Optional<T>>> m = (Observable<Try<Optional<T>>>)this;
1155-
// TODO implement
1156-
throw new UnsupportedOperationException();
1152+
@SuppressWarnings("unchecked")
1153+
Observable<Try<Optional<T>>> m = (Observable<Try<Optional<T>>>)this;
1154+
return m.lift(OperatorDematerialize.instance());
11571155
}
11581156

11591157
/**
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators;
15+
16+
import java.util.Optional;
17+
18+
import org.reactivestreams.*;
19+
20+
import io.reactivex.Observable.Operator;
21+
import io.reactivex.plugins.RxJavaPlugins;
22+
import io.reactivex.Try;
23+
24+
public enum OperatorDematerialize implements Operator<Object, Try<Optional<Object>>> {
25+
INSTANCE;
26+
27+
@SuppressWarnings({ "rawtypes", "unchecked" })
28+
public static <T> Operator<T, Try<Optional<T>>> instance() {
29+
return (Operator)INSTANCE;
30+
}
31+
32+
@Override
33+
public Subscriber<? super Try<Optional<Object>>> apply(Subscriber<? super Object> t) {
34+
return new DematerializeSubscriber<>(t);
35+
}
36+
37+
static final class DematerializeSubscriber<T> implements Subscriber<Try<Optional<T>>> {
38+
final Subscriber<? super T> actual;
39+
40+
boolean done;
41+
42+
public DematerializeSubscriber(Subscriber<? super T> actual) {
43+
this.actual = actual;
44+
}
45+
46+
@Override
47+
public void onSubscribe(Subscription s) {
48+
actual.onSubscribe(s);
49+
}
50+
51+
@Override
52+
public void onNext(Try<Optional<T>> t) {
53+
if (done) {
54+
return;
55+
}
56+
if (t.hasError()) {
57+
onError(t.error());
58+
} else {
59+
Optional<T> o = t.value();
60+
if (o.isPresent()) {
61+
actual.onNext(o.get());
62+
} else {
63+
onComplete();
64+
}
65+
}
66+
}
67+
68+
@Override
69+
public void onError(Throwable t) {
70+
if (done) {
71+
RxJavaPlugins.onError(t);
72+
return;
73+
}
74+
done = true;
75+
}
76+
@Override
77+
public void onComplete() {
78+
if (done) {
79+
return;
80+
}
81+
done = true;
82+
actual.onComplete();
83+
}
84+
}
85+
}
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators;
15+
16+
import java.util.Optional;
17+
import java.util.concurrent.atomic.*;
18+
19+
import org.reactivestreams.*;
20+
21+
import io.reactivex.*;
22+
import io.reactivex.Observable.Operator;
23+
import io.reactivex.internal.subscriptions.SubscriptionHelper;
24+
import io.reactivex.internal.util.BackpressureHelper;
25+
26+
public enum OperatorMaterialize implements Operator<Try<Optional<Object>>, Object> {
27+
INSTANCE;
28+
29+
@SuppressWarnings({ "rawtypes", "unchecked" })
30+
public static <T> Operator<Try<Optional<T>>, T> instance() {
31+
return (Operator)INSTANCE;
32+
}
33+
34+
@Override
35+
public Subscriber<? super Object> apply(Subscriber<? super Try<Optional<Object>>> t) {
36+
return new OnErrorReturnSubscriber<>(t);
37+
}
38+
39+
static final class OnErrorReturnSubscriber<T> extends AtomicLong implements Subscriber<T>, Subscription {
40+
/** */
41+
private static final long serialVersionUID = -3740826063558713822L;
42+
final Subscriber<? super Try<Optional<T>>> actual;
43+
44+
Subscription s;
45+
46+
volatile int state;
47+
@SuppressWarnings("rawtypes")
48+
static final AtomicIntegerFieldUpdater<OnErrorReturnSubscriber> STATE =
49+
AtomicIntegerFieldUpdater.newUpdater(OnErrorReturnSubscriber.class, "state");
50+
Try<Optional<T>> value;
51+
52+
volatile boolean done;
53+
54+
static final int NO_REQUEST_NO_VALUE = 0;
55+
static final int NO_REQUEST_HAS_VALUE = 1;
56+
static final int HAS_REQUEST_NO_VALUE = 2;
57+
static final int HAS_REQUEST_HAS_VALUE = 3;
58+
59+
public OnErrorReturnSubscriber(Subscriber<? super Try<Optional<T>>> actual) {
60+
this.actual = actual;
61+
}
62+
63+
@Override
64+
public void onSubscribe(Subscription s) {
65+
if (SubscriptionHelper.validateSubscription(this.s, s)) {
66+
return;
67+
}
68+
this.s = s;
69+
actual.onSubscribe(this);
70+
}
71+
72+
@Override
73+
public void onNext(T t) {
74+
actual.onNext(Notification.next(t));
75+
76+
if (get() != Long.MAX_VALUE) {
77+
decrementAndGet();
78+
}
79+
}
80+
81+
void tryEmit(Try<Optional<T>> v) {
82+
if (get() != 0L) {
83+
STATE.lazySet(this, HAS_REQUEST_HAS_VALUE);
84+
actual.onNext(v);
85+
actual.onComplete();
86+
} else {
87+
for (;;) {
88+
int s = state;
89+
if (s == HAS_REQUEST_NO_VALUE) {
90+
if (STATE.compareAndSet(this, s, HAS_REQUEST_HAS_VALUE)) {
91+
actual.onNext(v);
92+
actual.onComplete();
93+
return;
94+
}
95+
} else
96+
if (s == NO_REQUEST_HAS_VALUE) {
97+
return;
98+
} else
99+
if (s == HAS_REQUEST_HAS_VALUE) {
100+
value = null;
101+
return;
102+
} else {
103+
value = v;
104+
if (STATE.compareAndSet(this, s, NO_REQUEST_HAS_VALUE)) {
105+
return;
106+
}
107+
}
108+
}
109+
}
110+
}
111+
112+
@Override
113+
public void onError(Throwable t) {
114+
done = true;
115+
116+
Try<Optional<T>> v = Notification.error(t);
117+
118+
tryEmit(v);
119+
}
120+
121+
@Override
122+
public void onComplete() {
123+
done = true;
124+
125+
Try<Optional<T>> v = Notification.complete();
126+
127+
tryEmit(v);
128+
}
129+
130+
@Override
131+
public void request(long n) {
132+
if (SubscriptionHelper.validateRequest(n)) {
133+
return;
134+
}
135+
if (BackpressureHelper.add(this, n) == 0) {
136+
if (done) {
137+
for (;;) {
138+
int s = state;
139+
if (s == NO_REQUEST_HAS_VALUE) {
140+
if (STATE.compareAndSet(this, s, HAS_REQUEST_HAS_VALUE)) {
141+
Try<Optional<T>> v = value;
142+
value = null;
143+
actual.onNext(v);
144+
actual.onComplete();
145+
return;
146+
}
147+
} else
148+
if (s == HAS_REQUEST_NO_VALUE || s == HAS_REQUEST_HAS_VALUE) {
149+
return;
150+
} else
151+
if (STATE.compareAndSet(this, s, HAS_REQUEST_NO_VALUE)) {
152+
return;
153+
}
154+
}
155+
}
156+
}
157+
}
158+
159+
@Override
160+
public void cancel() {
161+
STATE.lazySet(this, HAS_REQUEST_HAS_VALUE);
162+
s.cancel();
163+
}
164+
}
165+
}

src/main/java/io/reactivex/internal/operators/PublisherRedo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public void subscribe(Subscriber<? super T> s) {
5353
s.onSubscribe(parent.arbiter);
5454

5555
// trigger first subscription
56-
parent.handle(Notification.ofNull());
56+
parent.handle(Notification.complete());
5757
}
5858

5959
static final class RedoSubscriber<T> extends AtomicBoolean implements Subscriber<T> {

src/main/java/io/reactivex/internal/subscribers/ToNotificationSubscriber.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,25 +19,34 @@
1919
import org.reactivestreams.*;
2020

2121
import io.reactivex.*;
22+
import io.reactivex.internal.subscriptions.SubscriptionHelper;
2223

2324
public final class ToNotificationSubscriber<T> implements Subscriber<T> {
2425
final Consumer<? super Try<Optional<Object>>> consumer;
2526

27+
Subscription s;
28+
2629
public ToNotificationSubscriber(Consumer<? super Try<Optional<Object>>> consumer) {
2730
this.consumer = consumer;
2831
}
2932

3033
@Override
3134
public void onSubscribe(Subscription s) {
35+
if (SubscriptionHelper.validateSubscription(this.s, s)) {
36+
return;
37+
}
38+
this.s = s;
3239
s.request(Long.MAX_VALUE);
3340
}
3441

3542
@Override
3643
public void onNext(T t) {
3744
if (t == null) {
38-
consumer.accept(Notification.ofNull());
45+
s.cancel();
46+
onError(new NullPointerException());
47+
} else {
48+
consumer.accept(Try.ofValue(Optional.of(t)));
3949
}
40-
consumer.accept(Try.ofValue(Optional.of(t)));
4150
}
4251

4352
@Override

0 commit comments

Comments
 (0)