Skip to content

Commit db0bd71

Browse files
authored
3.x: Add Single.ofType (#6876)
1 parent 8c85f5a commit db0bd71

File tree

4 files changed

+147
-3
lines changed

4 files changed

+147
-3
lines changed

src/main/java/io/reactivex/rxjava3/core/Maybe.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -3472,7 +3472,7 @@ public final Maybe<T> doOnSuccess(@NonNull Consumer<? super T> onSuccess) {
34723472
* Filters the success item of the {@code Maybe} via a predicate function and emitting it if the predicate
34733473
* returns {@code true}, completing otherwise.
34743474
* <p>
3475-
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/filter.png" alt="">
3475+
* <img width="640" height="291" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.filter.png" alt="">
34763476
* <dl>
34773477
* <dt><b>Scheduler:</b></dt>
34783478
* <dd>{@code filter} does not operate by default on a particular {@link Scheduler}.</dd>
@@ -4098,10 +4098,10 @@ public final Maybe<T> observeOn(@NonNull Scheduler scheduler) {
40984098
}
40994099

41004100
/**
4101-
* Filters the items emitted by a {@code Maybe}, only emitting its success value if that
4101+
* Filters the items emitted by the current {@code Maybe}, only emitting its success value if that
41024102
* is an instance of the supplied {@link Class}.
41034103
* <p>
4104-
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/ofClass.png" alt="">
4104+
* <img width="640" height="291" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.ofType.png" alt="">
41054105
* <dl>
41064106
* <dt><b>Scheduler:</b></dt>
41074107
* <dd>{@code ofType} does not operate by default on a particular {@link Scheduler}.</dd>

src/main/java/io/reactivex/rxjava3/core/Single.java

+24
Original file line numberDiff line numberDiff line change
@@ -3439,6 +3439,30 @@ public final Single<Boolean> contains(@NonNull Object item, @NonNull BiPredicate
34393439
public final Flowable<T> mergeWith(@NonNull SingleSource<? extends T> other) {
34403440
return merge(this, other);
34413441
}
3442+
/**
3443+
* Filters the items emitted by the current {@code Single}, only emitting its success value if that
3444+
* is an instance of the supplied {@link Class}.
3445+
* <p>
3446+
* <img width="640" height="399" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.ofType.png" alt="">
3447+
* <dl>
3448+
* <dt><b>Scheduler:</b></dt>
3449+
* <dd>{@code ofType} does not operate by default on a particular {@link Scheduler}.</dd>
3450+
* </dl>
3451+
*
3452+
* @param <U> the output type
3453+
* @param clazz
3454+
* the class type to filter the items emitted by the current {@code Single}
3455+
* @return the new {@link Maybe} instance
3456+
* @throws NullPointerException if {@code clazz} is {@code null}
3457+
* @see <a href="http://reactivex.io/documentation/operators/filter.html">ReactiveX operators documentation: Filter</a>
3458+
*/
3459+
@CheckReturnValue
3460+
@NonNull
3461+
@SchedulerSupport(SchedulerSupport.NONE)
3462+
public final <U> Maybe<U> ofType(@NonNull Class<U> clazz) {
3463+
Objects.requireNonNull(clazz, "clazz is null");
3464+
return filter(Functions.isInstanceOf(clazz)).cast(clazz);
3465+
}
34423466

34433467
/**
34443468
* Signals the success item or the terminal signals of the current {@code Single} on the specified {@link Scheduler},
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava3.internal.operators.single;
15+
16+
import org.junit.Test;
17+
18+
import io.reactivex.rxjava3.core.*;
19+
import io.reactivex.rxjava3.exceptions.TestException;
20+
import io.reactivex.rxjava3.functions.Function;
21+
import io.reactivex.rxjava3.observers.TestObserver;
22+
import io.reactivex.rxjava3.processors.PublishProcessor;
23+
import io.reactivex.rxjava3.testsupport.TestHelper;
24+
25+
public class SingleOfTypeTest extends RxJavaTest {
26+
27+
@Test
28+
public void normal() {
29+
Single.just(1).ofType(Integer.class)
30+
.test()
31+
.assertResult(1);
32+
}
33+
34+
@Test
35+
public void normalDowncast() {
36+
TestObserver<Number> to = Single.just(1)
37+
.ofType(Number.class)
38+
.test();
39+
// don't make this fluent, target type required!
40+
to.assertResult((Number)1);
41+
}
42+
43+
@Test
44+
public void notInstance() {
45+
TestObserver<String> to = Single.just(1)
46+
.ofType(String.class)
47+
.test();
48+
// don't make this fluent, target type required!
49+
to.assertResult();
50+
}
51+
52+
@Test
53+
public void error() {
54+
TestObserver<Number> to = Single.<Integer>error(new TestException())
55+
.ofType(Number.class)
56+
.test();
57+
// don't make this fluent, target type required!
58+
to.assertFailure(TestException.class);
59+
}
60+
61+
@Test
62+
public void errorNotInstance() {
63+
TestObserver<String> to = Single.<Integer>error(new TestException())
64+
.ofType(String.class)
65+
.test();
66+
// don't make this fluent, target type required!
67+
to.assertFailure(TestException.class);
68+
}
69+
70+
@Test
71+
public void dispose() {
72+
TestHelper.checkDisposedSingleToMaybe(new Function<Single<Object>, Maybe<Object>>() {
73+
@Override
74+
public Maybe<Object> apply(Single<Object> m) throws Exception {
75+
return m.ofType(Object.class);
76+
}
77+
});
78+
}
79+
80+
@Test
81+
public void isDisposed() {
82+
PublishProcessor<Integer> pp = PublishProcessor.create();
83+
84+
TestHelper.checkDisposed(pp.singleElement().ofType(Object.class));
85+
}
86+
87+
@Test
88+
public void doubleOnSubscribe() {
89+
TestHelper.checkDoubleOnSubscribeSingleToMaybe(new Function<Single<Object>, Maybe<Object>>() {
90+
@Override
91+
public Maybe<Object> apply(Single<Object> f) throws Exception {
92+
return f.ofType(Object.class);
93+
}
94+
});
95+
}
96+
}

src/test/java/io/reactivex/rxjava3/testsupport/TestHelper.java

+24
Original file line numberDiff line numberDiff line change
@@ -2248,6 +2248,30 @@ public static <T, U> void checkDisposedMaybeToSingle(Function<Maybe<T>, ? extend
22482248
assertFalse(pp.hasSubscribers());
22492249
}
22502250

2251+
/**
2252+
* Check if the operator applied to a Maybe source propagates dispose properly.
2253+
* @param <T> the source value type
2254+
* @param <U> the output value type
2255+
* @param composer the function to apply an operator to the provided Maybe source
2256+
*/
2257+
public static <T, U> void checkDisposedSingleToMaybe(Function<Single<T>, ? extends MaybeSource<U>> composer) {
2258+
PublishProcessor<T> pp = PublishProcessor.create();
2259+
2260+
TestSubscriber<U> ts = new TestSubscriber<>();
2261+
2262+
try {
2263+
new MaybeToFlowable<>(composer.apply(pp.singleOrError())).subscribe(ts);
2264+
} catch (Throwable ex) {
2265+
throw ExceptionHelper.wrapOrThrow(ex);
2266+
}
2267+
2268+
assertTrue(pp.hasSubscribers());
2269+
2270+
ts.cancel();
2271+
2272+
assertFalse(pp.hasSubscribers());
2273+
}
2274+
22512275
/**
22522276
* Check if the TestSubscriber has a CompositeException with the specified class
22532277
* of Throwables in the given order.

0 commit comments

Comments
 (0)