Skip to content

Commit 486f35d

Browse files
committed
2.x: operator test map, materialize
- OperatorMulticastTest is empty because it is not supported - Fixed headers - increased wating time in RefCount test testConnectUnsubscriberRaceCondition - fixed bugs in toFuture, materialize
1 parent 8224f8b commit 486f35d

11 files changed

+787
-27
lines changed

src/main/java/io/reactivex/Notification.java

+25
Original file line numberDiff line numberDiff line change
@@ -39,4 +39,29 @@ public static <T> Try<Optional<T>> next(T value) {
3939
Objects.requireNonNull(value); // TODO this coud instead return an error of NPE
4040
return Try.ofValue(Optional.of(value));
4141
}
42+
43+
public static <T> boolean isNext(Try<Optional<T>> notification) {
44+
if (notification.hasValue()) {
45+
return notification.value().isPresent();
46+
}
47+
return false;
48+
}
49+
50+
public static <T> boolean isComplete(Try<Optional<T>> notification) {
51+
if (notification.hasValue()) {
52+
return !notification.value().isPresent();
53+
}
54+
return false;
55+
}
56+
57+
public static <T> boolean isError(Try<Optional<T>> notification) {
58+
return notification.hasError();
59+
}
60+
61+
public static <T> T getValue(Try<Optional<T>> notification) {
62+
if (notification.hasValue()) {
63+
return notification.value.get();
64+
}
65+
return null;
66+
}
4267
}

src/main/java/io/reactivex/internal/operators/OperatorMaterialize.java

+19-22
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ void tryEmit(Try<Optional<T>> v) {
101101
return;
102102
} else {
103103
value = v;
104+
done = true;
104105
if (STATE.compareAndSet(this, s, NO_REQUEST_HAS_VALUE)) {
105106
return;
106107
}
@@ -111,17 +112,13 @@ void tryEmit(Try<Optional<T>> v) {
111112

112113
@Override
113114
public void onError(Throwable t) {
114-
done = true;
115-
116115
Try<Optional<T>> v = Notification.error(t);
117116

118117
tryEmit(v);
119118
}
120119

121120
@Override
122121
public void onComplete() {
123-
done = true;
124-
125122
Try<Optional<T>> v = Notification.complete();
126123

127124
tryEmit(v);
@@ -132,28 +129,28 @@ public void request(long n) {
132129
if (SubscriptionHelper.validateRequest(n)) {
133130
return;
134131
}
135-
s.request(n);
136-
if (BackpressureHelper.add(this, n) == 0) {
137-
if (done) {
138-
for (;;) {
139-
int s = state;
140-
if (s == NO_REQUEST_HAS_VALUE) {
141-
if (STATE.compareAndSet(this, s, HAS_REQUEST_HAS_VALUE)) {
142-
Try<Optional<T>> v = value;
143-
value = null;
144-
actual.onNext(v);
145-
actual.onComplete();
146-
return;
147-
}
148-
} else
149-
if (s == HAS_REQUEST_NO_VALUE || s == HAS_REQUEST_HAS_VALUE) {
150-
return;
151-
} else
152-
if (STATE.compareAndSet(this, s, HAS_REQUEST_NO_VALUE)) {
132+
BackpressureHelper.add(this, n);
133+
if (done) {
134+
for (;;) {
135+
int s = state;
136+
if (s == NO_REQUEST_HAS_VALUE) {
137+
if (STATE.compareAndSet(this, s, HAS_REQUEST_HAS_VALUE)) {
138+
Try<Optional<T>> v = value;
139+
value = null;
140+
actual.onNext(v);
141+
actual.onComplete();
153142
return;
154143
}
144+
} else
145+
if (s == HAS_REQUEST_NO_VALUE || s == HAS_REQUEST_HAS_VALUE) {
146+
return;
147+
} else
148+
if (STATE.compareAndSet(this, s, HAS_REQUEST_NO_VALUE)) {
149+
return;
155150
}
156151
}
152+
} else {
153+
s.request(n);
157154
}
158155
}
159156

src/main/java/io/reactivex/internal/operators/OperatorTakeLastOne.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
import io.reactivex.Observable.Operator;
2020
import io.reactivex.internal.subscriptions.SubscriptionHelper;
21-
import io.reactivex.plugins.RxJavaPlugins;
2221

2322
public enum OperatorTakeLastOne implements Operator<Object, Object> {
2423
INSTANCE
@@ -93,8 +92,7 @@ public void onComplete() {
9392

9493
@Override
9594
public void request(long n) {
96-
if (n <= 0) {
97-
RxJavaPlugins.onError(new IllegalArgumentException("n > 0 required but it was " + n));
95+
if (SubscriptionHelper.validateRequest(n)) {
9896
return;
9997
}
10098
for (;;) {

src/main/java/io/reactivex/observables/BlockingObservable.java

+1
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,7 @@ protected void onStart() {
226226
f.whenComplete((v, e) -> {
227227
cancel();
228228
});
229+
request(Long.MAX_VALUE);
229230
}
230231

231232
@Override

src/test/java/io/reactivex/internal/operators/OnSubscribeRefCountTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ public void testConnectUnsubscribe() throws InterruptedException {
194194

195195
@Test
196196
public void testConnectUnsubscribeRaceConditionLoop() throws InterruptedException {
197-
for (int i = 0; i < 1000; i++) {
197+
for (int i = 0; i < 100; i++) {
198198
testConnectUnsubscribeRaceCondition();
199199
}
200200
}
@@ -221,7 +221,7 @@ public void testConnectUnsubscribeRaceCondition() throws InterruptedException {
221221
s.dispose();
222222
// this generally will mean it won't even subscribe as it is already unsubscribed by the time connect() gets scheduled
223223
// give time to the counter to update
224-
Thread.sleep(5);
224+
Thread.sleep(10);
225225
// either we subscribed and then unsubscribed, or we didn't ever even subscribe
226226
assertEquals(0, subUnsubCount.get());
227227

src/test/java/io/reactivex/internal/operators/OperatorIgnoreElementsTest.java

+13
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,16 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
114
package io.reactivex.internal.operators;
215

316
import static org.junit.Assert.*;

src/test/java/io/reactivex/internal/operators/OperatorLastTest.java

+13
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,16 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
114
package io.reactivex.internal.operators;
215

316
import static org.junit.Assert.assertEquals;

src/test/java/io/reactivex/internal/operators/OperatorMapNotificationTest.java

+13
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,16 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
114
package io.reactivex.internal.operators;
215

316
import java.util.function.*;

0 commit comments

Comments
 (0)