Skip to content

Commit b8a3320

Browse files
abersnazeakarnokd
authored andcommitted
Add a maybe type as a lazy Optional type. (#4436)
1 parent b868a24 commit b8a3320

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+4451
-0
lines changed

src/main/java/io/reactivex/Flowable.java

+7
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import io.reactivex.internal.fuseable.*;
2828
import io.reactivex.internal.operators.completable.CompletableFromPublisher;
2929
import io.reactivex.internal.operators.flowable.*;
30+
import io.reactivex.internal.operators.maybe.MaybeFromPublisher;
3031
import io.reactivex.internal.operators.observable.ObservableFromPublisher;
3132
import io.reactivex.internal.operators.single.SingleFromPublisher;
3233
import io.reactivex.internal.schedulers.ImmediateThinScheduler;
@@ -13602,6 +13603,12 @@ public final Single<T> toSingle() {
1360213603
return RxJavaPlugins.onAssembly(new SingleFromPublisher<T>(this));
1360313604
}
1360413605

13606+
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
13607+
@SchedulerSupport(SchedulerSupport.NONE)
13608+
public final Maybe<T> toMaybe() {
13609+
return new MaybeFromPublisher<T>(this);
13610+
}
13611+
1360513612
/**
1360613613
* Returns a Flowable that emits a list that contains the items emitted by the source Publisher, in a
1360713614
* sorted order. Each item emitted by the Publisher must implement {@link Comparable} with respect to all

src/main/java/io/reactivex/Maybe.java

+850
Large diffs are not rendered by default.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex;
15+
16+
import io.reactivex.disposables.Disposable;
17+
18+
public interface MaybeObserver<T> {
19+
void onSubscribe(Disposable d);
20+
21+
void onSuccess(T value);
22+
23+
void onComplete();
24+
25+
void onError(Throwable e);
26+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex;
15+
16+
import io.reactivex.functions.Function;
17+
18+
public interface MaybeOperator<Downstream, Upstream> extends Function<MaybeObserver<? super Downstream>, MaybeObserver<? super Upstream>> {
19+
20+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
package io.reactivex;
14+
15+
/**
16+
* Represents a basic {@link Single} source base interface,
17+
* consumable via an {@link SingleObserver}.
18+
* <p>
19+
* This class also serves the base type for custom operators wrapped into
20+
* Single via {@link Single#create(MaybeSource)}.
21+
*
22+
* @param <T> the element type
23+
* @since 2.0
24+
*/
25+
public interface MaybeSource<T> {
26+
27+
void subscribe(MaybeObserver<? super T> s);
28+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex;
15+
16+
import io.reactivex.functions.Function;
17+
18+
public interface MaybeTransformer<Upstream, Downstream> extends Function<Maybe<Upstream>, MaybeSource<Downstream>> {
19+
20+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.functions;
15+
16+
/**
17+
* A functional interface (callback) that returns an Object value.
18+
*/
19+
public interface Supplier<R> {
20+
/**
21+
* Returns an Object value.
22+
* @return an Object value
23+
* @throws Exception on error
24+
*/
25+
R get() throws Exception; // NOPMD
26+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.maybe;
15+
16+
import java.util.concurrent.atomic.AtomicBoolean;
17+
18+
import io.reactivex.*;
19+
import io.reactivex.disposables.*;
20+
import io.reactivex.plugins.RxJavaPlugins;
21+
22+
public final class MaybeAmbArray<T> extends Maybe<T> {
23+
24+
final MaybeSource<? extends T>[] sources;
25+
26+
public MaybeAmbArray(MaybeSource<? extends T>[] sources) {
27+
this.sources = sources;
28+
}
29+
30+
@Override
31+
protected void subscribeActual(final MaybeObserver<? super T> s) {
32+
33+
final AtomicBoolean once = new AtomicBoolean();
34+
final CompositeDisposable set = new CompositeDisposable();
35+
s.onSubscribe(set);
36+
37+
for (MaybeSource<? extends T> s1 : sources) {
38+
if (once.get()) {
39+
return;
40+
}
41+
42+
if (s1 == null) {
43+
set.dispose();
44+
Throwable e = new NullPointerException("One of the sources is null");
45+
if (once.compareAndSet(false, true)) {
46+
s.onError(e);
47+
} else {
48+
RxJavaPlugins.onError(e);
49+
}
50+
return;
51+
}
52+
53+
s1.subscribe(new MaybeObserver<T>() {
54+
55+
@Override
56+
public void onSubscribe(Disposable d) {
57+
set.add(d);
58+
}
59+
60+
@Override
61+
public void onSuccess(T value) {
62+
if (once.compareAndSet(false, true)) {
63+
s.onSuccess(value);
64+
}
65+
}
66+
67+
@Override
68+
public void onComplete() {
69+
if (once.compareAndSet(false, true)) {
70+
s.onComplete();
71+
}
72+
}
73+
74+
@Override
75+
public void onError(Throwable e) {
76+
if (once.compareAndSet(false, true)) {
77+
s.onError(e);
78+
} else {
79+
RxJavaPlugins.onError(e);
80+
}
81+
}
82+
83+
});
84+
}
85+
}
86+
87+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.maybe;
15+
16+
import java.util.*;
17+
import java.util.concurrent.atomic.AtomicBoolean;
18+
19+
import io.reactivex.*;
20+
import io.reactivex.disposables.*;
21+
import io.reactivex.plugins.RxJavaPlugins;
22+
23+
public final class MaybeAmbIterable<T> extends Maybe<T> {
24+
25+
final Iterable<? extends MaybeSource<? extends T>> sources;
26+
27+
public MaybeAmbIterable(Iterable<? extends MaybeSource<? extends T>> sources) {
28+
this.sources = sources;
29+
}
30+
31+
@Override
32+
protected void subscribeActual(final MaybeObserver<? super T> s) {
33+
final CompositeDisposable set = new CompositeDisposable();
34+
s.onSubscribe(set);
35+
36+
Iterator<? extends MaybeSource<? extends T>> iterator;
37+
38+
try {
39+
iterator = sources.iterator();
40+
} catch (Throwable e) {
41+
s.onError(e);
42+
return;
43+
}
44+
45+
if (iterator == null) {
46+
s.onError(new NullPointerException("The iterator returned is null"));
47+
return;
48+
}
49+
50+
final AtomicBoolean once = new AtomicBoolean();
51+
int c = 0;
52+
53+
for (;;) {
54+
if (once.get()) {
55+
return;
56+
}
57+
58+
boolean b;
59+
60+
try {
61+
b = iterator.hasNext();
62+
} catch (Throwable e) {
63+
s.onError(e);
64+
return;
65+
}
66+
67+
if (once.get()) {
68+
return;
69+
}
70+
71+
if (!b) {
72+
break;
73+
}
74+
75+
if (once.get()) {
76+
return;
77+
}
78+
79+
MaybeSource<? extends T> s1;
80+
81+
try {
82+
s1 = iterator.next();
83+
} catch (Throwable e) {
84+
set.dispose();
85+
s.onError(e);
86+
return;
87+
}
88+
89+
if (s1 == null) {
90+
set.dispose();
91+
s.onError(new NullPointerException("The single source returned by the iterator is null"));
92+
return;
93+
}
94+
95+
s1.subscribe(new MaybeObserver<T>() {
96+
97+
@Override
98+
public void onSubscribe(Disposable d) {
99+
set.add(d);
100+
}
101+
102+
@Override
103+
public void onSuccess(T value) {
104+
if (once.compareAndSet(false, true)) {
105+
s.onSuccess(value);
106+
}
107+
}
108+
109+
@Override
110+
public void onComplete() {
111+
if (once.compareAndSet(false, true)) {
112+
s.onComplete();
113+
}
114+
}
115+
116+
@Override
117+
public void onError(Throwable e) {
118+
if (once.compareAndSet(false, true)) {
119+
s.onError(e);
120+
} else {
121+
RxJavaPlugins.onError(e);
122+
}
123+
}
124+
125+
});
126+
c++;
127+
}
128+
129+
if (c == 0 && !set.isDisposed()) {
130+
s.onError(new NoSuchElementException());
131+
}
132+
}
133+
134+
}

0 commit comments

Comments
 (0)