-
Notifications
You must be signed in to change notification settings - Fork 7.6k
/
Copy pathBlockingObservableNext.java
193 lines (169 loc) · 6.61 KB
/
BlockingObservableNext.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.internal.operators.observable;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import io.reactivex.Observable;
import io.reactivex.Optional;
import io.reactivex.Try;
import io.reactivex.internal.subscribers.observable.DisposableObserver;
import io.reactivex.internal.util.Exceptions;
/**
* Returns an Iterable that blocks until the Observable emits another item, then returns that item.
* <p>
* <img width="640" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.next.png" alt="">
*/
public enum BlockingObservableNext {
;
/**
* Returns an {@code Iterable} that blocks until the {@code Observable} emits another item, then returns
* that item.
*
* @param <T> the value type
* @param items
* the {@code Observable} to observe
* @return an {@code Iterable} that behaves like a blocking version of {@code items}
*/
public static <T> Iterable<T> next(final Observable<? extends T> items) {
return new Iterable<T>() {
@Override
public Iterator<T> iterator() {
NextObserver<T> nextObserver = new NextObserver<T>();
return new NextIterator<T>(items, nextObserver);
}
};
}
// test needs to access the observer.waiting flag non-blockingly.
/* private */static final class NextIterator<T> implements Iterator<T> {
private final NextObserver<T> observer;
private final Observable<? extends T> items;
private T next;
private boolean hasNext = true;
private boolean isNextConsumed = true;
private Throwable error = null;
private boolean started = false;
private NextIterator(Observable<? extends T> items, NextObserver<T> observer) {
this.items = items;
this.observer = observer;
}
@Override
public boolean hasNext() {
if (error != null) {
// If any error has already been thrown, throw it again.
throw Exceptions.propagate(error);
}
// Since an iterator should not be used in different thread,
// so we do not need any synchronization.
if (hasNext == false) {
// the iterator has reached the end.
return false;
}
if (isNextConsumed == false) {
// next has not been used yet.
return true;
}
return moveToNext();
}
private boolean moveToNext() {
try {
if (!started) {
started = true;
// if not started, start now
observer.setWaiting(1);
@SuppressWarnings("unchecked")
Observable<T> nbpObservable = (Observable<T>)items;
nbpObservable.materialize().subscribe(observer);
}
Try<Optional<T>> nextNotification = observer.takeNext();
if (isOnNext(nextNotification)) {
isNextConsumed = false;
next = nextNotification.value().get();
return true;
}
// If an observable is completed or fails,
// hasNext() always return false.
hasNext = false;
if (isOnComplete(nextNotification)) {
return false;
}
if (nextNotification.hasError()) {
error = nextNotification.error();
throw Exceptions.propagate(error);
}
throw new IllegalStateException("Should not reach here");
} catch (InterruptedException e) {
observer.dispose();
Thread.currentThread().interrupt();
error = e;
throw Exceptions.propagate(error);
}
}
@Override
public T next() {
if (error != null) {
// If any error has already been thrown, throw it again.
throw Exceptions.propagate(error);
}
if (hasNext()) {
isNextConsumed = true;
return next;
}
else {
throw new NoSuchElementException("No more elements");
}
}
@Override
public void remove() {
throw new UnsupportedOperationException("Read only iterator");
}
}
static final class NextObserver<T> extends DisposableObserver<Try<Optional<T>>> {
private final BlockingQueue<Try<Optional<T>>> buf = new ArrayBlockingQueue<Try<Optional<T>>>(1);
final AtomicInteger waiting = new AtomicInteger();
@Override
public void onComplete() {
// ignore
}
@Override
public void onError(Throwable e) {
// ignore
}
@Override
public void onNext(Try<Optional<T>> args) {
if (waiting.getAndSet(0) == 1 || !isOnNext(args)) {
Try<Optional<T>> toOffer = args;
while (!buf.offer(toOffer)) {
Try<Optional<T>> concurrentItem = buf.poll();
// in case if we won race condition with onComplete/onError method
if (concurrentItem != null && !isOnNext(concurrentItem)) {
toOffer = concurrentItem;
}
}
}
}
public Try<Optional<T>> takeNext() throws InterruptedException {
setWaiting(1);
return buf.take();
}
void setWaiting(int value) {
waiting.set(value);
}
}
static <T> boolean isOnNext(Try<Optional<T>> notification) {
return notification.hasValue() && notification.value().isPresent();
}
static <T> boolean isOnComplete(Try<Optional<T>> notification) {
return notification.hasValue() && !notification.value().isPresent();
}
}