Skip to content

Commit a4fb7da

Browse files
authored
2.x: test sync and missing operators (8/02) (#4273)
1 parent 9099f90 commit a4fb7da

11 files changed

+981
-170
lines changed

src/main/java/io/reactivex/Flowable.java

+12-5
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import io.reactivex.internal.fuseable.*;
2929
import io.reactivex.internal.operators.flowable.*;
3030
import io.reactivex.internal.operators.flowable.FlowableConcatMap.ErrorMode;
31+
import io.reactivex.internal.schedulers.ImmediateThinScheduler;
3132
import io.reactivex.internal.subscribers.flowable.*;
3233
import io.reactivex.internal.subscriptions.EmptySubscription;
3334
import io.reactivex.plugins.RxJavaPlugins;
@@ -941,14 +942,14 @@ public static <T> Flowable<T> merge(
941942
@SuppressWarnings({ "unchecked", "rawtypes" })
942943
@BackpressureSupport(BackpressureKind.FULL)
943944
@SchedulerSupport(SchedulerSupport.NONE)
944-
public static <T> Flowable<T> mergeDelayError(boolean delayErrors, Iterable<? extends Publisher<? extends T>> sources) {
945+
public static <T> Flowable<T> mergeDelayError(Iterable<? extends Publisher<? extends T>> sources) {
945946
return fromIterable(sources).flatMap((Function)Functions.identity(), true);
946947
}
947948

948949
@SuppressWarnings({ "unchecked", "rawtypes" })
949950
@BackpressureSupport(BackpressureKind.FULL)
950951
@SchedulerSupport(SchedulerSupport.NONE)
951-
public static <T> Flowable<T> mergeDelayError(int maxConcurrency, int bufferSize, Iterable<? extends Publisher<? extends T>> sources) {
952+
public static <T> Flowable<T> mergeDelayError(Iterable<? extends Publisher<? extends T>> sources, int maxConcurrency, int bufferSize) {
952953
return fromIterable(sources).flatMap((Function)Functions.identity(), true, maxConcurrency, bufferSize);
953954
}
954955

@@ -962,7 +963,7 @@ public static <T> Flowable<T> mergeDelayError(int maxConcurrency, int bufferSize
962963
@SuppressWarnings({ "unchecked", "rawtypes" })
963964
@BackpressureSupport(BackpressureKind.FULL)
964965
@SchedulerSupport(SchedulerSupport.NONE)
965-
public static <T> Flowable<T> mergeDelayError(int maxConcurrency, Iterable<? extends Publisher<? extends T>> sources) {
966+
public static <T> Flowable<T> mergeDelayError(Iterable<? extends Publisher<? extends T>> sources, int maxConcurrency) {
966967
return fromIterable(sources).flatMap((Function)Functions.identity(), true, maxConcurrency);
967968
}
968969

@@ -2130,7 +2131,7 @@ public final <R> Flowable<R> flatMap(
21302131
Objects.requireNonNull(onErrorMapper, "onErrorMapper is null");
21312132
Objects.requireNonNull(onCompleteSupplier, "onCompleteSupplier is null");
21322133
// FIXME run flatMap directly
2133-
return merge(new FlowableMapNotification<T, R>(this, onNextMapper, onErrorMapper, onCompleteSupplier));
2134+
return merge(new FlowableMapNotification<T, Publisher<? extends R>>(this, onNextMapper, onErrorMapper, onCompleteSupplier));
21342135
}
21352136

21362137
@BackpressureSupport(BackpressureKind.FULL)
@@ -2144,7 +2145,7 @@ public final <R> Flowable<R> flatMap(
21442145
Objects.requireNonNull(onErrorMapper, "onErrorMapper is null");
21452146
Objects.requireNonNull(onCompleteSupplier, "onCompleteSupplier is null");
21462147
// FIXME run flatMap directly
2147-
return merge(new FlowableMapNotification<T, R>(this, onNextMapper, onErrorMapper, onCompleteSupplier), maxConcurrency);
2148+
return merge(new FlowableMapNotification<T, Publisher<? extends R>>(this, onNextMapper, onErrorMapper, onCompleteSupplier), maxConcurrency);
21482149
}
21492150

21502151
@BackpressureSupport(BackpressureKind.FULL)
@@ -2584,6 +2585,12 @@ public final ConnectableFlowable<T> publish(int bufferSize) {
25842585
return FlowablePublish.create(this, bufferSize);
25852586
}
25862587

2588+
@BackpressureSupport(BackpressureKind.FULL)
2589+
@SchedulerSupport(SchedulerSupport.NONE)
2590+
public final Flowable<T> rebatchRequests(int bufferSize) {
2591+
return observeOn(ImmediateThinScheduler.INSTANCE, true, bufferSize);
2592+
}
2593+
25872594
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
25882595
@SchedulerSupport(SchedulerSupport.NONE)
25892596
public final Flowable<T> reduce(BiFunction<T, T, T> reducer) {

src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMap.java

+10-4
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,8 @@ Queue<U> getMainQueue() {
226226
void tryEmitScalar(U value) {
227227
if (get() == 0 && compareAndSet(0, 1)) {
228228
long r = requested.get();
229-
if (r != 0L) {
229+
Queue<U> q = queue;
230+
if (r != 0L && (q == null || q.isEmpty())) {
230231
actual.onNext(value);
231232
if (r != Long.MAX_VALUE) {
232233
requested.decrementAndGet();
@@ -237,7 +238,9 @@ void tryEmitScalar(U value) {
237238
s.request(scalarLimit);
238239
}
239240
} else {
240-
Queue<U> q = getMainQueue();
241+
if (q == null) {
242+
q = getMainQueue();
243+
}
241244
if (!q.offer(value)) {
242245
onError(new IllegalStateException("Scalar queue full?!"));
243246
return;
@@ -271,14 +274,17 @@ Queue<U> getInnerQueue(InnerSubscriber<T, U> inner) {
271274
void tryEmit(U value, InnerSubscriber<T, U> inner) {
272275
if (get() == 0 && compareAndSet(0, 1)) {
273276
long r = requested.get();
274-
if (r != 0L) {
277+
Queue<U> q = inner.queue;
278+
if (r != 0L && (q == null || q.isEmpty())) {
275279
actual.onNext(value);
276280
if (r != Long.MAX_VALUE) {
277281
requested.decrementAndGet();
278282
}
279283
inner.requestMore(1);
280284
} else {
281-
Queue<U> q = getInnerQueue(inner);
285+
if (q == null) {
286+
q = getInnerQueue(inner);
287+
}
282288
if (!q.offer(value)) {
283289
onError(new MissingBackpressureException("Inner queue full?!"));
284290
return;

src/main/java/io/reactivex/internal/operators/flowable/FlowableMapNotification.java

+22-22
Original file line numberDiff line numberDiff line change
@@ -22,27 +22,27 @@
2222
import io.reactivex.internal.subscriptions.SubscriptionHelper;
2323
import io.reactivex.internal.util.BackpressureHelper;
2424

25-
public final class FlowableMapNotification<T, R> extends Flowable<Publisher<? extends R>>{
25+
public final class FlowableMapNotification<T, R> extends Flowable<R> {
2626

2727
final Publisher<T> source;
2828

29-
final Function<? super T, ? extends Publisher<? extends R>> onNextMapper;
30-
final Function<? super Throwable, ? extends Publisher<? extends R>> onErrorMapper;
31-
final Supplier<? extends Publisher<? extends R>> onCompleteSupplier;
29+
final Function<? super T, ? extends R> onNextMapper;
30+
final Function<? super Throwable, ? extends R> onErrorMapper;
31+
final Supplier<? extends R> onCompleteSupplier;
3232

3333
public FlowableMapNotification(
3434
Publisher<T> source,
35-
Function<? super T, ? extends Publisher<? extends R>> onNextMapper,
36-
Function<? super Throwable, ? extends Publisher<? extends R>> onErrorMapper,
37-
Supplier<? extends Publisher<? extends R>> onCompleteSupplier) {
35+
Function<? super T, ? extends R> onNextMapper,
36+
Function<? super Throwable, ? extends R> onErrorMapper,
37+
Supplier<? extends R> onCompleteSupplier) {
3838
this.source = source;
3939
this.onNextMapper = onNextMapper;
4040
this.onErrorMapper = onErrorMapper;
4141
this.onCompleteSupplier = onCompleteSupplier;
4242
}
4343

4444
@Override
45-
protected void subscribeActual(Subscriber<? super Publisher<? extends R>> s) {
45+
protected void subscribeActual(Subscriber<? super R> s) {
4646
source.subscribe(new MapNotificationSubscriber<T, R>(s, onNextMapper, onErrorMapper, onCompleteSupplier));
4747
}
4848

@@ -53,14 +53,14 @@ static final class MapNotificationSubscriber<T, R>
5353
/** */
5454
private static final long serialVersionUID = 2757120512858778108L;
5555

56-
final Subscriber<? super Publisher<? extends R>> actual;
57-
final Function<? super T, ? extends Publisher<? extends R>> onNextMapper;
58-
final Function<? super Throwable, ? extends Publisher<? extends R>> onErrorMapper;
59-
final Supplier<? extends Publisher<? extends R>> onCompleteSupplier;
56+
final Subscriber<? super R> actual;
57+
final Function<? super T, ? extends R> onNextMapper;
58+
final Function<? super Throwable, ? extends R> onErrorMapper;
59+
final Supplier<? extends R> onCompleteSupplier;
6060

6161
Subscription s;
6262

63-
Publisher<? extends R> value;
63+
R value;
6464

6565
volatile boolean done;
6666

@@ -71,10 +71,10 @@ static final class MapNotificationSubscriber<T, R>
7171
static final int HAS_REQUEST_NO_VALUE = 2;
7272
static final int HAS_REQUEST_HAS_VALUE = 3;
7373

74-
public MapNotificationSubscriber(Subscriber<? super Publisher<? extends R>> actual,
75-
Function<? super T, ? extends Publisher<? extends R>> onNextMapper,
76-
Function<? super Throwable, ? extends Publisher<? extends R>> onErrorMapper,
77-
Supplier<? extends Publisher<? extends R>> onCompleteSupplier) {
74+
public MapNotificationSubscriber(Subscriber<? super R> actual,
75+
Function<? super T, ? extends R> onNextMapper,
76+
Function<? super Throwable, ? extends R> onErrorMapper,
77+
Supplier<? extends R> onCompleteSupplier) {
7878
this.actual = actual;
7979
this.onNextMapper = onNextMapper;
8080
this.onErrorMapper = onErrorMapper;
@@ -91,7 +91,7 @@ public void onSubscribe(Subscription s) {
9191

9292
@Override
9393
public void onNext(T t) {
94-
Publisher<? extends R> p;
94+
R p;
9595

9696
try {
9797
p = onNextMapper.apply(t);
@@ -115,7 +115,7 @@ public void onNext(T t) {
115115

116116
@Override
117117
public void onError(Throwable t) {
118-
Publisher<? extends R> p;
118+
R p;
119119

120120
try {
121121
p = onErrorMapper.apply(t);
@@ -134,7 +134,7 @@ public void onError(Throwable t) {
134134

135135
@Override
136136
public void onComplete() {
137-
Publisher<? extends R> p;
137+
R p;
138138

139139
try {
140140
p = onCompleteSupplier.get();
@@ -152,7 +152,7 @@ public void onComplete() {
152152
}
153153

154154

155-
void tryEmit(Publisher<? extends R> p) {
155+
void tryEmit(R p) {
156156
long r = get();
157157
if (r != 0L) {
158158
actual.onNext(p);
@@ -197,7 +197,7 @@ public void request(long n) {
197197
} else
198198
if (s == NO_REQUEST_HAS_VALUE) {
199199
if (state.compareAndSet(NO_REQUEST_HAS_VALUE, HAS_REQUEST_HAS_VALUE)) {
200-
Publisher<? extends R> p = value;
200+
R p = value;
201201
value = null;
202202
actual.onNext(p);
203203
actual.onComplete();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.schedulers;
15+
16+
import java.util.concurrent.TimeUnit;
17+
18+
import io.reactivex.Scheduler;
19+
import io.reactivex.disposables.*;
20+
21+
/**
22+
* A Scheduler partially implementing the API by allowing only non-delayed, non-periodic
23+
* task execution on the current thread immediately.
24+
* <p>
25+
* Note that this doesn't support recursive scheduling and disposing the returned Disposable
26+
* has no effect (because when the schedule() method returns, the task has been already run).
27+
*/
28+
public final class ImmediateThinScheduler extends Scheduler {
29+
30+
/**
31+
* The singleton instance of the immediate (thin) scheduler.
32+
*/
33+
public static final Scheduler INSTANCE = new ImmediateThinScheduler();
34+
35+
static final Worker WORKER = new ImmediateThinWorker();
36+
37+
static final Disposable DISPOSED;
38+
39+
static {
40+
DISPOSED = Disposables.empty();
41+
DISPOSED.dispose();
42+
}
43+
44+
private ImmediateThinScheduler() {
45+
// singleton class
46+
}
47+
48+
@Override
49+
public Disposable scheduleDirect(Runnable run) {
50+
run.run();
51+
return DISPOSED;
52+
}
53+
54+
@Override
55+
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
56+
throw new UnsupportedOperationException("This scheduler doesn't support delayed execution");
57+
}
58+
59+
@Override
60+
public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, long period, TimeUnit unit) {
61+
throw new UnsupportedOperationException("This scheduler doesn't support periodic execution");
62+
}
63+
64+
@Override
65+
public Worker createWorker() {
66+
return WORKER;
67+
}
68+
69+
static final class ImmediateThinWorker extends Worker {
70+
71+
@Override
72+
public void dispose() {
73+
// This worker is always stateless and won't track tasks
74+
}
75+
76+
@Override
77+
public boolean isDisposed() {
78+
return false; // dispose() has no effect
79+
}
80+
81+
@Override
82+
public Disposable schedule(Runnable run) {
83+
run.run();
84+
return DISPOSED;
85+
}
86+
87+
@Override
88+
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
89+
throw new UnsupportedOperationException("This scheduler doesn't support delayed execution");
90+
}
91+
92+
@Override
93+
public Disposable schedulePeriodically(Runnable run, long initialDelay, long period, TimeUnit unit) {
94+
throw new UnsupportedOperationException("This scheduler doesn't support periodic execution");
95+
}
96+
}
97+
}

src/test/java/io/reactivex/flowable/FlowableNullTests.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -528,23 +528,23 @@ public void mergeArrayOneIsNull() {
528528

529529
@Test(expected = NullPointerException.class)
530530
public void mergeDelayErrorIterableNull() {
531-
Flowable.mergeDelayError(128, 128, (Iterable<Publisher<Object>>)null);
531+
Flowable.mergeDelayError((Iterable<Publisher<Object>>)null, 128, 128);
532532
}
533533

534534
@Test(expected = NullPointerException.class)
535535
public void mergeDelayErrorIterableIteratorNull() {
536-
Flowable.mergeDelayError(128, 128, new Iterable<Publisher<Object>>() {
536+
Flowable.mergeDelayError(new Iterable<Publisher<Object>>() {
537537
@Override
538538
public Iterator<Publisher<Object>> iterator() {
539539
return null;
540540
}
541-
}).toBlocking().lastOption();
541+
}, 128, 128).toBlocking().lastOption();
542542
}
543543

544544
@SuppressWarnings("unchecked")
545545
@Test(expected = NullPointerException.class)
546546
public void mergeDelayErrorIterableOneIsNull() {
547-
Flowable.mergeDelayError(128, 128, Arrays.asList(just1, null)).toBlocking().lastOption();
547+
Flowable.mergeDelayError(Arrays.asList(just1, null), 128, 128).toBlocking().lastOption();
548548
}
549549

550550
@Test(expected = NullPointerException.class)

src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapTest.java

+29
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import io.reactivex.*;
2828
import io.reactivex.exceptions.TestException;
2929
import io.reactivex.functions.*;
30+
import io.reactivex.processors.PublishProcessor;
3031
import io.reactivex.schedulers.Schedulers;
3132
import io.reactivex.subscribers.TestSubscriber;
3233

@@ -649,4 +650,32 @@ public Flowable<Integer> apply(Integer v) {
649650
j += 2;
650651
}
651652
}
653+
654+
@Test
655+
public void castCrashUnsubscribes() {
656+
657+
PublishProcessor<Integer> ps = PublishProcessor.create();
658+
659+
TestSubscriber<Integer> ts = TestSubscriber.create();
660+
661+
ps.flatMap(new Function<Integer, Publisher<Integer>>() {
662+
@Override
663+
public Publisher<Integer> apply(Integer t) {
664+
throw new TestException();
665+
}
666+
}, new BiFunction<Integer, Integer, Integer>() {
667+
@Override
668+
public Integer apply(Integer t1, Integer t2) {
669+
return t1;
670+
}
671+
}).unsafeSubscribe(ts);
672+
673+
Assert.assertTrue("Not subscribed?", ps.hasSubscribers());
674+
675+
ps.onNext(1);
676+
677+
Assert.assertFalse("Subscribed?", ps.hasSubscribers());
678+
679+
ts.assertError(TestException.class);
680+
}
652681
}

0 commit comments

Comments
 (0)