Skip to content

Commit 3c47643

Browse files
committed
Basic Subscribers and RxJavaPlugin
1 parent 90a1b90 commit 3c47643

File tree

7 files changed

+463
-26
lines changed

7 files changed

+463
-26
lines changed

src/main/java/io/reactivex/Observable.java

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222

2323
import io.reactivex.internal.operators.*;
2424
import io.reactivex.internal.subscriptions.EmptySubscription;
25+
import io.reactivex.plugins.RxJavaPlugins;
26+
import io.reactivex.subscribers.SafeSubscriber;
2527

2628
public class Observable<T> implements Publisher<T> {
2729
final Publisher<T> onSubscribe;
@@ -36,23 +38,43 @@ protected Observable(Publisher<T> onSubscribe) {
3638
}
3739

3840
public static <T> Observable<T> create(Publisher<T> onSubscribe) {
39-
// TODO plugin wrapping
41+
onSubscribe = RxJavaPlugins.onCreate(onSubscribe);
4042
return new Observable<>(onSubscribe);
4143
}
4244

43-
@Override
44-
public final void subscribe(Subscriber<? super T> s) {
45+
private void subscribeActual(Subscriber<? super T> s) {
4546
Objects.requireNonNull(s);
4647
try {
48+
s = RxJavaPlugins.onSubscribe(s);
49+
4750
onSubscribe.subscribe(s);
4851
} catch (NullPointerException e) {
4952
throw e;
5053
} catch (Throwable e) {
51-
// TODO throw if fatal
52-
// TODO plugin error handler
54+
// TODO throw if fatal?
5355
// can't call onError because no way to know if a Subscription has been set or not
5456
// can't call onSubscribe because the call might have set a Subscription already
55-
e.printStackTrace();
57+
RxJavaPlugins.onError(e);
58+
}
59+
}
60+
61+
// TODO decide if safe subscription or unsafe should be the default
62+
@Override
63+
public final void subscribe(Subscriber<? super T> s) {
64+
subscribeActual(s);
65+
}
66+
67+
// TODO decide if safe subscription or unsafe should be the default
68+
public final void unsafeSubscribe(Subscriber<? super T> s) {
69+
subscribeActual(s);
70+
}
71+
72+
// TODO decide if safe subscription or unsafe should be the default
73+
public final void safeSubscribe(Subscriber<? super T> s) {
74+
if (s instanceof SafeSubscriber) {
75+
subscribeActual(s);
76+
} else {
77+
subscribeActual(new SafeSubscriber<>(s));
5678
}
5779
}
5880

@@ -71,16 +93,17 @@ public final <R> Observable<R> lift(Operator<? extends R, ? super T> lifter) {
7193
return create(su -> {
7294
try {
7395
Subscriber<? super T> st = lifter.apply(su);
74-
// TODO plugin wrapping
96+
97+
st = RxJavaPlugins.onSubscribe(st);
98+
7599
onSubscribe.subscribe(st);
76100
} catch (NullPointerException e) {
77101
throw e;
78102
} catch (Throwable e) {
79-
// TODO throw if fatal
80-
// TODO plugin error handler
103+
// TODO throw if fatal?
81104
// can't call onError because no way to know if a Subscription has been set or not
82105
// can't call onSubscribe because the call might have set a Subscription already
83-
e.printStackTrace();
106+
RxJavaPlugins.onError(e);
84107
}
85108
});
86109
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
package io.reactivex.plugins;
14+
15+
import org.reactivestreams.*;
16+
17+
public final class RxJavaPlugins {
18+
private RxJavaPlugins() {
19+
throw new IllegalStateException("No instances!");
20+
}
21+
22+
/**
23+
* Called when an undeliverable error occurs.
24+
* @param error the error to report
25+
*/
26+
public static void onError(Throwable error) {
27+
// TODO dispatch to the appropriate plugin
28+
if (error != null) {
29+
error.printStackTrace();
30+
} else {
31+
new NullPointerException().printStackTrace();
32+
}
33+
}
34+
35+
/**
36+
* Called when a subscriber subscribes to an observable.
37+
* @param subscriber
38+
* @return
39+
*/
40+
public static <T> Subscriber<T> onSubscribe(Subscriber<T> subscriber) {
41+
// TODO dispatch to the appropriate plugin
42+
return subscriber;
43+
}
44+
45+
/**
46+
* Called when an Observable is created.
47+
* @param publisher
48+
* @return
49+
*/
50+
public static <T> Publisher<T> onCreate(Publisher<T> publisher) {
51+
// TODO dispatch to the appropriate plugin
52+
return publisher;
53+
}
54+
}

src/main/java/io/reactivex/subjects/AsyncSubject.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.reactivestreams.*;
2020

2121
import io.reactivex.internal.util.*;
22+
import io.reactivex.plugins.RxJavaPlugins;
2223

2324
/**
2425
* A Subject that emits the very last value followed by a completion event or the received error to Subscribers.
@@ -307,8 +308,7 @@ public void setError(Throwable e) {
307308
@Override
308309
public void request(long n) {
309310
if (n <= 0) {
310-
// TODO report error to plugin
311-
new IllegalArgumentException("n > 0 required but it was " + n).printStackTrace();
311+
RxJavaPlugins.onError(new IllegalArgumentException("n > 0 required but it was " + n));
312312
return;
313313
}
314314
for (;;) {

src/main/java/io/reactivex/subjects/PublishSubject.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.reactivestreams.*;
1919

2020
import io.reactivex.internal.util.*;
21+
import io.reactivex.plugins.RxJavaPlugins;
2122

2223
/**
2324
* A Subject that multicasts events to Subscribers that are currently subscribed to it.
@@ -311,8 +312,7 @@ public void onComplete() {
311312
public void request(long n) {
312313
if (n <= 0) {
313314
// can't really call onError here because request could be async in respect to other onXXX calls
314-
// TODO report error to plugins
315-
new IllegalStateException("n > 0 required but it was " + n).printStackTrace();
315+
RxJavaPlugins.onError(new IllegalStateException("n > 0 required but it was " + n));
316316
return;
317317
}
318318
BackpressureHelper.add(this, n);

src/main/java/io/reactivex/subjects/SerializedSubject.java

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.reactivestreams.Subscription;
1717

1818
import io.reactivex.internal.util.*;
19+
import io.reactivex.plugins.RxJavaPlugins;
1920

2021
/**
2122
* Serializes calls to the Subscriber methods.
@@ -94,25 +95,32 @@ public void onNext(T t) {
9495
@Override
9596
public void onError(Throwable t) {
9697
if (done) {
97-
// TODO perhaps report the exception to plugins
98+
RxJavaPlugins.onError(t);
9899
return;
99100
}
101+
boolean reportError;
100102
synchronized (this) {
101103
if (done) {
102-
// TODO perhaps report the exception to plugins, outside the sync block of course
104+
reportError = true;
103105
return;
104-
}
105-
done = true;
106-
if (emitting) {
107-
AppendOnlyLinkedArrayList<Object> q = queue;
108-
if (q == null) {
109-
q = new AppendOnlyLinkedArrayList<>(4);
110-
queue = q;
106+
} else {
107+
done = true;
108+
if (emitting) {
109+
AppendOnlyLinkedArrayList<Object> q = queue;
110+
if (q == null) {
111+
q = new AppendOnlyLinkedArrayList<>(4);
112+
queue = q;
113+
}
114+
q.setFirst(NotificationLite.error(t));
115+
return;
111116
}
112-
q.setFirst(NotificationLite.error(t));
113-
return;
117+
reportError = false;
118+
emitting = true;
114119
}
115-
emitting = true;
120+
}
121+
if (reportError) {
122+
RxJavaPlugins.onError(t);
123+
return;
116124
}
117125
actual.onError(t);
118126
}

0 commit comments

Comments
 (0)