Skip to content

Commit c60a21a

Browse files
authored
2.x: add most relevant ~100 operators' Reactive-Streams TCK tests (#4538)
1 parent 988cf23 commit c60a21a

File tree

101 files changed

+3288
-12
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

101 files changed

+3288
-12
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableUnsubscribeOn.java

+12-3
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@
1717

1818
import org.reactivestreams.*;
1919

20-
import io.reactivex.*;
20+
import io.reactivex.Scheduler;
2121
import io.reactivex.internal.subscriptions.SubscriptionHelper;
22+
import io.reactivex.plugins.RxJavaPlugins;
2223

2324
public final class FlowableUnsubscribeOn<T> extends AbstractFlowableWithUpstream<T, T> {
2425
final Scheduler scheduler;
@@ -56,17 +57,25 @@ public void onSubscribe(Subscription s) {
5657

5758
@Override
5859
public void onNext(T t) {
59-
actual.onNext(t);
60+
if (!get()) {
61+
actual.onNext(t);
62+
}
6063
}
6164

6265
@Override
6366
public void onError(Throwable t) {
67+
if (get()) {
68+
RxJavaPlugins.onError(t);
69+
return;
70+
}
6471
actual.onError(t);
6572
}
6673

6774
@Override
6875
public void onComplete() {
69-
actual.onComplete();
76+
if (!get()) {
77+
actual.onComplete();
78+
}
7079
}
7180

7281
@Override

src/main/java/io/reactivex/internal/operators/observable/ObservableUnsubscribeOn.java

+11-2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import io.reactivex.*;
1919
import io.reactivex.disposables.Disposable;
2020
import io.reactivex.internal.disposables.DisposableHelper;
21+
import io.reactivex.plugins.RxJavaPlugins;
2122

2223
public final class ObservableUnsubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
2324
final Scheduler scheduler;
@@ -55,17 +56,25 @@ public void onSubscribe(Disposable s) {
5556

5657
@Override
5758
public void onNext(T t) {
58-
actual.onNext(t);
59+
if (!get()) {
60+
actual.onNext(t);
61+
}
5962
}
6063

6164
@Override
6265
public void onError(Throwable t) {
66+
if (get()) {
67+
RxJavaPlugins.onError(t);
68+
return;
69+
}
6370
actual.onError(t);
6471
}
6572

6673
@Override
6774
public void onComplete() {
68-
actual.onComplete();
75+
if (!get()) {
76+
actual.onComplete();
77+
}
6978
}
7079

7180
@Override

src/test/java/io/reactivex/internal/operators/flowable/FlowableUnsubscribeOnTest.java

+13
Original file line numberDiff line numberDiff line change
@@ -182,4 +182,17 @@ public Thread getThread() {
182182
}
183183

184184
}
185+
186+
@Test
187+
public void takeHalf() {
188+
int elements = 1024;
189+
Flowable.range(0, elements * 2).unsubscribeOn(Schedulers.single())
190+
.take(elements)
191+
.test()
192+
.awaitDone(5, TimeUnit.SECONDS)
193+
.assertValueCount(elements)
194+
.assertComplete()
195+
.assertNoErrors()
196+
.assertSubscribed();
197+
}
185198
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.tck;
15+
16+
import org.reactivestreams.Publisher;
17+
import org.testng.annotations.Test;
18+
19+
import io.reactivex.Flowable;
20+
import io.reactivex.functions.Predicate;
21+
22+
@Test
23+
public class AllTckTest extends BaseTck<Boolean> {
24+
25+
@Override
26+
public Publisher<Boolean> createPublisher(final long elements) {
27+
return FlowableTck.wrap(
28+
Flowable.range(1, 1000).all(new Predicate<Integer>() {
29+
@Override
30+
public boolean test(Integer e) throws Exception {
31+
return e < 800;
32+
}
33+
})
34+
);
35+
}
36+
37+
@Override
38+
public long maxElementsFromPublisher() {
39+
return 1;
40+
}
41+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.tck;
15+
16+
import org.reactivestreams.Publisher;
17+
import org.testng.annotations.Test;
18+
19+
import io.reactivex.Flowable;
20+
21+
@Test
22+
public class AmbArrayTckTest extends BaseTck<Long> {
23+
24+
@SuppressWarnings("unchecked")
25+
@Override
26+
public Publisher<Long> createPublisher(long elements) {
27+
return FlowableTck.wrap(
28+
Flowable.ambArray(
29+
Flowable.fromIterable(iterate(elements)),
30+
Flowable.<Long>never()
31+
)
32+
);
33+
}
34+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.tck;
15+
16+
import java.util.Arrays;
17+
18+
import org.reactivestreams.Publisher;
19+
import org.testng.annotations.Test;
20+
21+
import io.reactivex.Flowable;
22+
23+
@Test
24+
public class AmbTckTest extends BaseTck<Long> {
25+
26+
@SuppressWarnings("unchecked")
27+
@Override
28+
public Publisher<Long> createPublisher(long elements) {
29+
return FlowableTck.wrap(
30+
Flowable.amb(Arrays.asList(
31+
Flowable.fromIterable(iterate(elements)),
32+
Flowable.<Long>never()
33+
)
34+
)
35+
);
36+
}
37+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.tck;
15+
16+
import org.reactivestreams.Publisher;
17+
import org.testng.annotations.Test;
18+
19+
import io.reactivex.Flowable;
20+
import io.reactivex.functions.Predicate;
21+
22+
@Test
23+
public class AnyTckTest extends BaseTck<Boolean> {
24+
25+
@Override
26+
public Publisher<Boolean> createPublisher(final long elements) {
27+
return FlowableTck.wrap(
28+
Flowable.range(1, 1000).any(new Predicate<Integer>() {
29+
@Override
30+
public boolean test(Integer e) throws Exception {
31+
return e == 500;
32+
}
33+
})
34+
);
35+
}
36+
37+
@Override
38+
public long maxElementsFromPublisher() {
39+
return 1;
40+
}
41+
}

src/test/java/io/reactivex/tck/BaseTck.java

+11-3
Original file line numberDiff line numberDiff line change
@@ -25,19 +25,27 @@
2525
/**
2626
* Base abstract class for Flowable verifications, contains support for creating
2727
* Iterable range of values
28+
*
29+
* @param <T> the element type
2830
*/
2931
@Test
30-
public abstract class BaseTck extends PublisherVerification<Long> {
32+
public abstract class BaseTck<T> extends PublisherVerification<T> {
3133

3234
public BaseTck() {
33-
super(new TestEnvironment(300L));
35+
super(new TestEnvironment(25L));
3436
}
3537

3638
@Override
37-
public Publisher<Long> createFailedPublisher() {
39+
public Publisher<T> createFailedPublisher() {
3840
return Flowable.error(new TestException());
3941
}
4042

43+
44+
@Override
45+
public long maxElementsFromPublisher() {
46+
return 1024;
47+
}
48+
4149
/**
4250
* Creates an Iterable with the specified number of elements or an infinite one if
4351
* elements > Integer.MAX_VALUE
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.tck;
15+
16+
import java.util.List;
17+
18+
import org.reactivestreams.Publisher;
19+
import org.testng.annotations.Test;
20+
21+
import io.reactivex.Flowable;
22+
23+
@Test
24+
public class BufferBoundaryTckTest extends BaseTck<List<Long>> {
25+
26+
@Override
27+
public Publisher<List<Long>> createPublisher(long elements) {
28+
return FlowableTck.wrap(
29+
Flowable.fromIterable(iterate(elements))
30+
.buffer(Flowable.just(1).concatWith(Flowable.<Integer>never()))
31+
.onBackpressureLatest()
32+
);
33+
}
34+
35+
@Override
36+
public long maxElementsFromPublisher() {
37+
return 1;
38+
}
39+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.tck;
15+
16+
import java.util.List;
17+
18+
import org.reactivestreams.Publisher;
19+
import org.testng.annotations.Test;
20+
21+
import io.reactivex.Flowable;
22+
23+
@Test
24+
public class BufferExactSizeTckTest extends BaseTck<List<Long>> {
25+
26+
@Override
27+
public Publisher<List<Long>> createPublisher(long elements) {
28+
return FlowableTck.wrap(
29+
Flowable.fromIterable(iterate(elements * 2))
30+
.buffer(2)
31+
);
32+
}
33+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.tck;
15+
16+
import org.reactivestreams.Publisher;
17+
import org.testng.annotations.Test;
18+
19+
import io.reactivex.Flowable;
20+
21+
@Test
22+
public class CacheTckTest extends BaseTck<Long> {
23+
24+
@Override
25+
public Publisher<Long> createPublisher(long elements) {
26+
return FlowableTck.wrap(
27+
Flowable.fromIterable(iterate(elements)).cache()
28+
);
29+
}
30+
}

0 commit comments

Comments
 (0)