Skip to content

Commit 487a0ba

Browse files
authored
2.x: test sync + cleanup (#4204)
* 2.x: test sync + cleanup * Adjust header copyright year * Disable PMD; eats to much memory and prone to travis kill-9 * Add missing headers * Trace scheduler leak * Fix a scheduler leak in AbstractSchedulerTests
1 parent 0705001 commit 487a0ba

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+3291
-612
lines changed

.travis.yml

+8-5
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
language: java
22
jdk:
3-
- oraclejdk8
3+
- oraclejdk7
44

55
# force upgrade Java8 as per https://github.com/travis-ci/travis-ci/issues/4042 (fixes compilation issue)
6-
addons:
7-
apt:
8-
packages:
9-
- oracle-java8-installer
6+
#addons:
7+
# apt:
8+
# packages:
9+
# - oracle-java8-installer
10+
11+
# prevent travis running gradle assemble; let the build script do it anyway
12+
install: true
1013

1114
sudo: false
1215
# as per http://blog.travis-ci.com/2014-12-17-faster-builds-with-container-based-infrastructure/

build.gradle

+57-34
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,56 @@
11
buildscript {
22
repositories { jcenter() }
3-
dependencies { classpath 'com.netflix.nebula:gradle-rxjava-project-plugin:4.0.0' }
3+
dependencies {
4+
classpath 'com.netflix.nebula:gradle-rxjava-project-plugin:4.0.0'
5+
classpath 'ru.vyarus:gradle-animalsniffer-plugin:1.1.0'
6+
}
47
}
58

69
group = 'io.reactivex.rxjava2'
710

811
description = 'RxJava: Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.'
912

1013
apply plugin: 'java'
11-
apply plugin: 'pmd'
14+
// apply plugin: 'pmd'
1215
apply plugin: 'findbugs'
1316
apply plugin: 'jacoco'
17+
apply plugin: 'ru.vyarus.animalsniffer'
1418
apply plugin: 'nebula.rxjava-project'
1519

1620
sourceCompatibility = JavaVersion.VERSION_1_6
1721
targetCompatibility = JavaVersion.VERSION_1_6
1822

1923
dependencies {
24+
signature 'org.codehaus.mojo.signature:java16:1.1@signature'
25+
2026
compile 'org.reactivestreams:reactive-streams:1.0.0'
27+
2128
testCompile 'junit:junit:4.12'
2229
testCompile 'org.mockito:mockito-core:1.10.19'
2330

2431
perfCompile 'org.openjdk.jmh:jmh-core:1.11.3'
2532
perfCompile 'org.openjdk.jmh:jmh-generator-annprocess:1.11.3'
26-
// perfCompile 'org.reactivex:rxjava:1.0.14'
2733
}
2834

2935
javadoc {
30-
exclude "**/io/reactivex/internal/**"
36+
exclude "**/rx/internal/**"
37+
exclude "**/test/**"
38+
exclude "**/perf/**"
39+
options {
40+
windowTitle = "RxJava Javadoc ${project.version}"
41+
}
42+
// Clear the following options to make the docs consistent with the old format
43+
options.addStringOption('top').value = ''
44+
options.addStringOption('doctitle').value = ''
45+
options.addStringOption('header').value = ''
46+
if (JavaVersion.current().isJava7()) {
47+
// "./gradle/stylesheet.css" only supports Java 7
48+
options.addStringOption('stylesheetfile', rootProject.file('./gradle/stylesheet.css').toString())
49+
}
50+
}
51+
52+
animalsniffer {
53+
annotation = 'io.reactivex.internal.util.SuppressAnimalSniffer'
3154
}
3255

3356
// support for snapshot/final releases with the various branches RxJava uses
@@ -41,7 +64,7 @@ if (project.hasProperty('release.useLastTag')) {
4164
}
4265

4366
test {
44-
maxHeapSize = "2g"
67+
maxHeapSize = "1200m"
4568

4669
if (System.getenv('CI') == null) {
4770
maxParallelForks = Runtime.runtime.availableProcessors().intdiv(2) ?: 1
@@ -61,42 +84,41 @@ jacocoTestReport {
6184

6285
build.dependsOn jacocoTestReport
6386

64-
pmd {
65-
toolVersion = '5.4.2'
66-
ignoreFailures = true
67-
sourceSets = [sourceSets.main]
68-
ruleSets = []
69-
ruleSetFiles = files('pmd.xml')
87+
// pmd {
88+
// toolVersion = '5.4.2'
89+
// ignoreFailures = true
90+
// sourceSets = [sourceSets.main]
91+
// ruleSets = []
92+
// ruleSetFiles = files('pmd.xml')
93+
// }
7094

71-
}
95+
// pmdMain {
96+
// reports {
97+
// html.enabled = true
98+
// xml.enabled = true
99+
// }
100+
// }
72101

73-
pmdMain {
74-
reports {
75-
html.enabled = true
76-
xml.enabled = true
77-
}
78-
}
102+
// build.dependsOn pmdMain
79103

80-
build.dependsOn pmdMain
104+
// task pmdPrint(dependsOn: 'pmdMain') << {
105+
// File file = new File('build/reports/pmd/main.xml')
106+
// if (file.exists()) {
81107

82-
task pmdPrint(dependsOn: 'pmdMain') << {
83-
File file = new File('build/reports/pmd/main.xml')
84-
if (file.exists()) {
108+
// println("Listing first 100 PMD violations")
85109

86-
println("Listing first 100 PMD violations")
110+
// file.eachLine { line, count ->
111+
// if (count <= 100) {
112+
// println(line)
113+
// }
114+
// }
87115

88-
file.eachLine { line, count ->
89-
if (count <= 100) {
90-
println(line)
91-
}
92-
}
116+
// } else {
117+
// println("PMD file not found.")
118+
// }
119+
// }
93120

94-
} else {
95-
println("PMD file not found.")
96-
}
97-
}
98-
99-
build.dependsOn pmdPrint
121+
// build.dependsOn pmdPrint
100122

101123
findbugs {
102124
ignoreFailures true
@@ -112,3 +134,4 @@ findbugsMain {
112134
xml.enabled = true
113135
}
114136
}
137+

src/main/java/io/reactivex/AsyncEmitter.java

+13
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,16 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
114
package io.reactivex;
215

316
import io.reactivex.disposables.Disposable;

src/main/java/io/reactivex/Flowable.java

+57-34
Original file line numberDiff line numberDiff line change
@@ -93,59 +93,82 @@ public static int bufferSize() {
9393
return BUFFER_SIZE;
9494
}
9595

96-
@BackpressureSupport(BackpressureKind.FULL)
9796
@SchedulerSupport(SchedulerSupport.NONE)
98-
public static <T, R> Flowable<R> combineLatest(Function<? super Object[], ? extends R> combiner, boolean delayError, int bufferSize, Publisher<? extends T>... sources) {
99-
return combineLatest(sources, combiner, delayError, bufferSize);
97+
@BackpressureSupport(BackpressureKind.FULL)
98+
public static <T, R> Flowable<R> combineLatest(Publisher<? extends T>[] sources, Function<Object[], ? extends R> combiner) {
99+
return combineLatest(sources, combiner, bufferSize());
100100
}
101101

102-
@BackpressureSupport(BackpressureKind.FULL)
103102
@SchedulerSupport(SchedulerSupport.NONE)
104-
public static <T, R> Flowable<R> combineLatest(Iterable<? extends Publisher<? extends T>> sources, Function<? super Object[], ? extends R> combiner) {
105-
return combineLatest(sources, combiner, false, bufferSize());
103+
@BackpressureSupport(BackpressureKind.FULL)
104+
public static <T, R> Flowable<R> combineLatest(Function<Object[], ? extends R> combiner, Publisher<? extends T>... sources) {
105+
return combineLatest(sources, combiner, bufferSize());
106106
}
107107

108-
@BackpressureSupport(BackpressureKind.FULL)
109108
@SchedulerSupport(SchedulerSupport.NONE)
110-
public static <T, R> Flowable<R> combineLatest(Iterable<? extends Publisher<? extends T>> sources, Function<? super Object[], ? extends R> combiner, boolean delayError) {
111-
return combineLatest(sources, combiner, delayError, bufferSize());
109+
@BackpressureSupport(BackpressureKind.FULL)
110+
public static <T, R> Flowable<R> combineLatest(Publisher<? extends T>[] sources, Function<Object[], ? extends R> combiner, int bufferSize) {
111+
Objects.requireNonNull(sources, "sources is null");
112+
Objects.requireNonNull(combiner, "combiner is null");
113+
validateBufferSize(bufferSize);
114+
if (sources.length == 0) {
115+
return empty();
116+
}
117+
return new FlowableCombineLatest<T, R>(sources, combiner, bufferSize, false);
112118
}
113119

120+
@SchedulerSupport(SchedulerSupport.NONE)
114121
@BackpressureSupport(BackpressureKind.FULL)
122+
public static <T, R> Flowable<R> combineLatest(Iterable<? extends Publisher<? extends T>> sources, Function<Object[], ? extends R> combiner) {
123+
return combineLatest(sources, combiner, bufferSize());
124+
}
125+
115126
@SchedulerSupport(SchedulerSupport.NONE)
116-
public static <T, R> Flowable<R> combineLatest(Iterable<? extends Publisher<? extends T>> sources, Function<? super Object[], ? extends R> combiner, boolean delayError, int bufferSize) {
127+
@BackpressureSupport(BackpressureKind.FULL)
128+
public static <T, R> Flowable<R> combineLatest(Iterable<? extends Publisher<? extends T>> sources, Function<Object[], ? extends R> combiner, int bufferSize) {
117129
Objects.requireNonNull(sources, "sources is null");
118130
Objects.requireNonNull(combiner, "combiner is null");
119131
validateBufferSize(bufferSize);
120-
121-
// the queue holds a pair of values so we need to double the capacity
122-
int s = bufferSize << 1;
123-
return new FlowableCombineLatest<T, R>(null, sources, combiner, s, delayError);
132+
return new FlowableCombineLatest<T, R>(sources, combiner, bufferSize, false);
124133
}
125134

126-
@BackpressureSupport(BackpressureKind.FULL)
127135
@SchedulerSupport(SchedulerSupport.NONE)
128-
public static <T, R> Flowable<R> combineLatest(Publisher<? extends T>[] sources, Function<? super Object[], ? extends R> combiner) {
129-
return combineLatest(sources, combiner, false, bufferSize());
136+
@BackpressureSupport(BackpressureKind.FULL)
137+
public static <T, R> Flowable<R> combineLatestDelayError(Publisher<? extends T>[] sources, Function<Object[], ? extends R> combiner) {
138+
return combineLatestDelayError(sources, combiner, bufferSize());
130139
}
131140

132-
@BackpressureSupport(BackpressureKind.FULL)
133141
@SchedulerSupport(SchedulerSupport.NONE)
134-
public static <T, R> Flowable<R> combineLatest(Publisher<? extends T>[] sources, Function<? super Object[], ? extends R> combiner, boolean delayError) {
135-
return combineLatest(sources, combiner, delayError, bufferSize());
142+
@BackpressureSupport(BackpressureKind.FULL)
143+
public static <T, R> Flowable<R> combineLatestDelayError(Function<Object[], ? extends R> combiner, Publisher<? extends T>... sources) {
144+
return combineLatestDelayError(sources, combiner, bufferSize());
136145
}
137146

138-
@BackpressureSupport(BackpressureKind.FULL)
139147
@SchedulerSupport(SchedulerSupport.NONE)
140-
public static <T, R> Flowable<R> combineLatest(Publisher<? extends T>[] sources, Function<? super Object[], ? extends R> combiner, boolean delayError, int bufferSize) {
141-
validateBufferSize(bufferSize);
148+
@BackpressureSupport(BackpressureKind.FULL)
149+
public static <T, R> Flowable<R> combineLatestDelayError(Publisher<? extends T>[] sources, Function<Object[], ? extends R> combiner, int bufferSize) {
150+
Objects.requireNonNull(sources, "sources is null");
142151
Objects.requireNonNull(combiner, "combiner is null");
152+
validateBufferSize(bufferSize);
143153
if (sources.length == 0) {
144154
return empty();
145155
}
146-
// the queue holds a pair of values so we need to double the capacity
147-
int s = bufferSize << 1;
148-
return new FlowableCombineLatest<T, R>(sources, null, combiner, s, delayError);
156+
return new FlowableCombineLatest<T, R>(sources, combiner, bufferSize, true);
157+
}
158+
159+
@SchedulerSupport(SchedulerSupport.NONE)
160+
@BackpressureSupport(BackpressureKind.FULL)
161+
public static <T, R> Flowable<R> combineLatestDelayError(Iterable<? extends Publisher<? extends T>> sources, Function<Object[], ? extends R> combiner) {
162+
return combineLatestDelayError(sources, combiner, bufferSize());
163+
}
164+
165+
@SchedulerSupport(SchedulerSupport.NONE)
166+
@BackpressureSupport(BackpressureKind.FULL)
167+
public static <T, R> Flowable<R> combineLatestDelayError(Iterable<? extends Publisher<? extends T>> sources, Function<Object[], ? extends R> combiner, int bufferSize) {
168+
Objects.requireNonNull(sources, "sources is null");
169+
Objects.requireNonNull(combiner, "combiner is null");
170+
validateBufferSize(bufferSize);
171+
return new FlowableCombineLatest<T, R>(sources, combiner, bufferSize, true);
149172
}
150173

151174
@SuppressWarnings("unchecked")
@@ -155,7 +178,7 @@ public static <T1, T2, R> Flowable<R> combineLatest(
155178
Publisher<? extends T1> p1, Publisher<? extends T2> p2,
156179
BiFunction<? super T1, ? super T2, ? extends R> combiner) {
157180
Function<Object[], R> f = Functions.toFunction(combiner);
158-
return combineLatest(f, false, bufferSize(), p1, p2);
181+
return combineLatest(f, p1, p2);
159182
}
160183

161184
@SuppressWarnings("unchecked")
@@ -165,7 +188,7 @@ public static <T1, T2, T3, R> Flowable<R> combineLatest(
165188
Publisher<? extends T1> p1, Publisher<? extends T2> p2,
166189
Publisher<? extends T3> p3,
167190
Function3<? super T1, ? super T2, ? super T3, ? extends R> combiner) {
168-
return combineLatest(Functions.toFunction(combiner), false, bufferSize(), p1, p2, p3);
191+
return combineLatest(Functions.toFunction(combiner), p1, p2, p3);
169192
}
170193

171194
@SuppressWarnings("unchecked")
@@ -175,7 +198,7 @@ public static <T1, T2, T3, T4, R> Flowable<R> combineLatest(
175198
Publisher<? extends T1> p1, Publisher<? extends T2> p2,
176199
Publisher<? extends T3> p3, Publisher<? extends T4> p4,
177200
Function4<? super T1, ? super T2, ? super T3, ? super T4, ? extends R> combiner) {
178-
return combineLatest(Functions.toFunction(combiner), false, bufferSize(), p1, p2, p3, p4);
201+
return combineLatest(Functions.toFunction(combiner), p1, p2, p3, p4);
179202
}
180203

181204
@SuppressWarnings("unchecked")
@@ -186,7 +209,7 @@ public static <T1, T2, T3, T4, T5, R> Flowable<R> combineLatest(
186209
Publisher<? extends T3> p3, Publisher<? extends T4> p4,
187210
Publisher<? extends T5> p5,
188211
Function5<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? extends R> combiner) {
189-
return combineLatest(Functions.toFunction(combiner), false, bufferSize(), p1, p2, p3, p4, p5);
212+
return combineLatest(Functions.toFunction(combiner), p1, p2, p3, p4, p5);
190213
}
191214

192215
@SuppressWarnings("unchecked")
@@ -197,7 +220,7 @@ public static <T1, T2, T3, T4, T5, T6, R> Flowable<R> combineLatest(
197220
Publisher<? extends T3> p3, Publisher<? extends T4> p4,
198221
Publisher<? extends T5> p5, Publisher<? extends T6> p6,
199222
Function6<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? extends R> combiner) {
200-
return combineLatest(Functions.toFunction(combiner), false, bufferSize(), p1, p2, p3, p4, p5, p6);
223+
return combineLatest(Functions.toFunction(combiner), p1, p2, p3, p4, p5, p6);
201224
}
202225

203226
@SuppressWarnings("unchecked")
@@ -209,7 +232,7 @@ public static <T1, T2, T3, T4, T5, T6, T7, R> Flowable<R> combineLatest(
209232
Publisher<? extends T5> p5, Publisher<? extends T6> p6,
210233
Publisher<? extends T7> p7,
211234
Function7<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? extends R> combiner) {
212-
return combineLatest(Functions.toFunction(combiner), false, bufferSize(), p1, p2, p3, p4, p5, p6, p7);
235+
return combineLatest(Functions.toFunction(combiner), p1, p2, p3, p4, p5, p6, p7);
213236
}
214237

215238
@SuppressWarnings("unchecked")
@@ -221,7 +244,7 @@ public static <T1, T2, T3, T4, T5, T6, T7, T8, R> Flowable<R> combineLatest(
221244
Publisher<? extends T5> p5, Publisher<? extends T6> p6,
222245
Publisher<? extends T7> p7, Publisher<? extends T8> p8,
223246
Function8<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? super T8, ? extends R> combiner) {
224-
return combineLatest(Functions.toFunction(combiner), false, bufferSize(), p1, p2, p3, p4, p5, p6, p7, p8);
247+
return combineLatest(Functions.toFunction(combiner), p1, p2, p3, p4, p5, p6, p7, p8);
225248
}
226249

227250
@SuppressWarnings("unchecked")
@@ -234,7 +257,7 @@ public static <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> Flowable<R> combineLatest(
234257
Publisher<? extends T7> p7, Publisher<? extends T8> p8,
235258
Publisher<? extends T9> p9,
236259
Function9<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? super T8, ? super T9, ? extends R> combiner) {
237-
return combineLatest(Functions.toFunction(combiner), false, bufferSize(), p1, p2, p3, p4, p5, p6, p7, p8, p9);
260+
return combineLatest(Functions.toFunction(combiner), p1, p2, p3, p4, p5, p6, p7, p8, p9);
238261
}
239262

240263
@SuppressWarnings({ "unchecked", "rawtypes" })

src/main/java/io/reactivex/Observable.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -2637,7 +2637,7 @@ public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super T
26372637
public final void subscribe(Observer<? super T> observer) {
26382638
Objects.requireNonNull(observer, "observer is null");
26392639

2640-
// TODO plugin wrappings
2640+
observer = RxJavaPlugins.onSubscribe(this, observer);
26412641

26422642
subscribeActual(observer);
26432643
}

src/main/java/io/reactivex/Single.java

+8
Original file line numberDiff line numberDiff line change
@@ -824,6 +824,10 @@ public final void subscribe(Subscriber<? super T> s) {
824824
toFlowable().subscribe(s);
825825
}
826826

827+
public final void subscribe(Observer<? super T> s) {
828+
toObservable().subscribe(s);
829+
}
830+
827831
public final Single<T> subscribeOn(final Scheduler scheduler) {
828832
Objects.requireNonNull(scheduler, "scheduler is null");
829833
return new SingleSubscribeOn<T>(this, scheduler);
@@ -861,6 +865,10 @@ public final Flowable<T> toFlowable() {
861865
return new SingleToFlowable<T>(this);
862866
}
863867

868+
public final Observable<T> toObservable() {
869+
return new SingleToObservable<T>(this);
870+
}
871+
864872
public final void unsafeSubscribe(Subscriber<? super T> s) {
865873
toFlowable().unsafeSubscribe(s);
866874
}

0 commit comments

Comments
 (0)