Skip to content

Commit 307f97b

Browse files
authored
2.x: option to fail for using blockingX on the computation scheduler (#5020)
* 2.x: option to fail for using blockingX on the computation scheduler * Increase sleep time in XFlatMapTest * Add a custom RxJavaPlugins callback onBeforeBlocking
1 parent f53e029 commit 307f97b

23 files changed

+913
-43
lines changed

src/main/java/io/reactivex/internal/observers/BlockingBaseObserver.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
import io.reactivex.Observer;
1818
import io.reactivex.disposables.Disposable;
19-
import io.reactivex.internal.util.ExceptionHelper;
19+
import io.reactivex.internal.util.*;
2020

2121
public abstract class BlockingBaseObserver<T> extends CountDownLatch
2222
implements Observer<T>, Disposable {
@@ -67,6 +67,7 @@ public final boolean isDisposed() {
6767
public final T blockingGet() {
6868
if (getCount() != 0) {
6969
try {
70+
BlockingHelper.verifyNonBlocking();
7071
await();
7172
} catch (InterruptedException ex) {
7273
dispose();

src/main/java/io/reactivex/internal/observers/BlockingMultiObserver.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
import io.reactivex.*;
1919
import io.reactivex.disposables.Disposable;
20-
import io.reactivex.internal.util.ExceptionHelper;
20+
import io.reactivex.internal.util.*;
2121

2222
/**
2323
* A combined Observer that awaits the success or error signal via a CountDownLatch.
@@ -79,6 +79,7 @@ public void onComplete() {
7979
public T blockingGet() {
8080
if (getCount() != 0) {
8181
try {
82+
BlockingHelper.verifyNonBlocking();
8283
await();
8384
} catch (InterruptedException ex) {
8485
dispose();
@@ -101,6 +102,7 @@ public T blockingGet() {
101102
public T blockingGet(T defaultValue) {
102103
if (getCount() != 0) {
103104
try {
105+
BlockingHelper.verifyNonBlocking();
104106
await();
105107
} catch (InterruptedException ex) {
106108
dispose();
@@ -123,6 +125,7 @@ public T blockingGet(T defaultValue) {
123125
public Throwable blockingGetError() {
124126
if (getCount() != 0) {
125127
try {
128+
BlockingHelper.verifyNonBlocking();
126129
await();
127130
} catch (InterruptedException ex) {
128131
dispose();
@@ -142,6 +145,7 @@ public Throwable blockingGetError() {
142145
public Throwable blockingGetError(long timeout, TimeUnit unit) {
143146
if (getCount() != 0) {
144147
try {
148+
BlockingHelper.verifyNonBlocking();
145149
if (!await(timeout, unit)) {
146150
dispose();
147151
throw ExceptionHelper.wrapOrThrow(new TimeoutException());
@@ -164,6 +168,7 @@ public Throwable blockingGetError(long timeout, TimeUnit unit) {
164168
public boolean blockingAwait(long timeout, TimeUnit unit) {
165169
if (getCount() != 0) {
166170
try {
171+
BlockingHelper.verifyNonBlocking();
167172
if (!await(timeout, unit)) {
168173
dispose();
169174
return false;

src/main/java/io/reactivex/internal/observers/FutureObserver.java

+3
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.reactivex.Observer;
2121
import io.reactivex.disposables.Disposable;
2222
import io.reactivex.internal.disposables.DisposableHelper;
23+
import io.reactivex.internal.util.BlockingHelper;
2324
import io.reactivex.plugins.RxJavaPlugins;
2425

2526
/**
@@ -72,6 +73,7 @@ public boolean isDone() {
7273
@Override
7374
public T get() throws InterruptedException, ExecutionException {
7475
if (getCount() != 0) {
76+
BlockingHelper.verifyNonBlocking();
7577
await();
7678
}
7779

@@ -88,6 +90,7 @@ public T get() throws InterruptedException, ExecutionException {
8890
@Override
8991
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
9092
if (getCount() != 0) {
93+
BlockingHelper.verifyNonBlocking();
9194
if (!await(timeout, unit)) {
9295
throw new TimeoutException();
9396
}

src/main/java/io/reactivex/internal/observers/FutureSingleObserver.java

+3
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.reactivex.SingleObserver;
2020
import io.reactivex.disposables.Disposable;
2121
import io.reactivex.internal.disposables.DisposableHelper;
22+
import io.reactivex.internal.util.BlockingHelper;
2223
import io.reactivex.plugins.RxJavaPlugins;
2324

2425
/**
@@ -71,6 +72,7 @@ public boolean isDone() {
7172
@Override
7273
public T get() throws InterruptedException, ExecutionException {
7374
if (getCount() != 0) {
75+
BlockingHelper.verifyNonBlocking();
7476
await();
7577
}
7678

@@ -87,6 +89,7 @@ public T get() throws InterruptedException, ExecutionException {
8789
@Override
8890
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
8991
if (getCount() != 0) {
92+
BlockingHelper.verifyNonBlocking();
9093
if (!await(timeout, unit)) {
9194
throw new TimeoutException();
9295
}

src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableIterable.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import io.reactivex.exceptions.MissingBackpressureException;
2424
import io.reactivex.internal.queue.SpscArrayQueue;
2525
import io.reactivex.internal.subscriptions.SubscriptionHelper;
26-
import io.reactivex.internal.util.ExceptionHelper;
26+
import io.reactivex.internal.util.*;
2727

2828
public final class BlockingFlowableIterable<T> implements Iterable<T> {
2929
final Publisher<? extends T> source;
@@ -86,6 +86,7 @@ public boolean hasNext() {
8686
}
8787
}
8888
if (empty) {
89+
BlockingHelper.verifyNonBlocking();
8990
lock.lock();
9091
try {
9192
while (!done && queue.isEmpty()) {

src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableLatest.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import org.reactivestreams.Publisher;
2121

2222
import io.reactivex.*;
23-
import io.reactivex.internal.util.ExceptionHelper;
23+
import io.reactivex.internal.util.*;
2424
import io.reactivex.plugins.RxJavaPlugins;
2525
import io.reactivex.subscribers.DisposableSubscriber;
2626

@@ -79,6 +79,7 @@ public boolean hasNext() {
7979
if (iteratorNotification == null || iteratorNotification.isOnNext()) {
8080
if (iteratorNotification == null) {
8181
try {
82+
BlockingHelper.verifyNonBlocking();
8283
notify.acquire();
8384
} catch (InterruptedException ex) {
8485
dispose();

src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableNext.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import org.reactivestreams.Publisher;
2121

2222
import io.reactivex.*;
23-
import io.reactivex.internal.util.ExceptionHelper;
23+
import io.reactivex.internal.util.*;
2424
import io.reactivex.plugins.RxJavaPlugins;
2525
import io.reactivex.subscribers.DisposableSubscriber;
2626

@@ -165,6 +165,7 @@ public void onNext(Notification<T> args) {
165165

166166
public Notification<T> takeNext() throws InterruptedException {
167167
setWaiting();
168+
BlockingHelper.verifyNonBlocking();
168169
return buf.take();
169170
}
170171
void setWaiting() {

src/main/java/io/reactivex/internal/operators/flowable/FlowableBlockingSubscribe.java

+1
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ public static <T> void subscribe(Publisher<? extends T> o, Subscriber<? super T>
5757
if (bs.isCancelled()) {
5858
break;
5959
}
60+
BlockingHelper.verifyNonBlocking();
6061
v = queue.take();
6162
}
6263
if (bs.isCancelled()) {

src/main/java/io/reactivex/internal/operators/observable/BlockingObservableIterable.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import io.reactivex.disposables.Disposable;
2222
import io.reactivex.internal.disposables.DisposableHelper;
2323
import io.reactivex.internal.queue.SpscLinkedArrayQueue;
24-
import io.reactivex.internal.util.ExceptionHelper;
24+
import io.reactivex.internal.util.*;
2525

2626
public final class BlockingObservableIterable<T> implements Iterable<T> {
2727
final ObservableSource<? extends T> source;
@@ -78,6 +78,7 @@ public boolean hasNext() {
7878
}
7979
if (empty) {
8080
try {
81+
BlockingHelper.verifyNonBlocking();
8182
lock.lock();
8283
try {
8384
while (!done && queue.isEmpty()) {

src/main/java/io/reactivex/internal/operators/observable/BlockingObservableLatest.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
import io.reactivex.*;
2121
import io.reactivex.Observable;
22-
import io.reactivex.internal.util.ExceptionHelper;
22+
import io.reactivex.internal.util.*;
2323
import io.reactivex.observers.DisposableObserver;
2424
import io.reactivex.plugins.RxJavaPlugins;
2525

@@ -79,6 +79,7 @@ public boolean hasNext() {
7979
}
8080
if (iteratorNotification == null) {
8181
try {
82+
BlockingHelper.verifyNonBlocking();
8283
notify.acquire();
8384
} catch (InterruptedException ex) {
8485
dispose();

src/main/java/io/reactivex/internal/operators/observable/BlockingObservableNext.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import java.util.concurrent.atomic.AtomicInteger;
1919

2020
import io.reactivex.*;
21-
import io.reactivex.internal.util.ExceptionHelper;
21+
import io.reactivex.internal.util.*;
2222
import io.reactivex.observers.DisposableObserver;
2323
import io.reactivex.plugins.RxJavaPlugins;
2424

@@ -162,6 +162,7 @@ public void onNext(Notification<T> args) {
162162

163163
public Notification<T> takeNext() throws InterruptedException {
164164
setWaiting();
165+
BlockingHelper.verifyNonBlocking();
165166
return buf.take();
166167
}
167168
void setWaiting() {

src/main/java/io/reactivex/internal/schedulers/ComputationScheduler.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public final class ComputationScheduler extends Scheduler {
5656
int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY,
5757
Integer.getInteger(KEY_COMPUTATION_PRIORITY, Thread.NORM_PRIORITY)));
5858

59-
THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority);
59+
THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority, true);
6060

6161
NONE = new FixedSchedulerPool(0, THREAD_FACTORY);
6262
NONE.shutdown();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.schedulers;
15+
16+
/**
17+
* Marker interface to indicate blocking is not recommended while running
18+
* on a Scheduler with a thread type implementing it.
19+
*/
20+
public interface NonBlockingThread {
21+
22+
}

src/main/java/io/reactivex/internal/schedulers/RxThreadFactory.java

+16-2
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,22 @@ public final class RxThreadFactory extends AtomicLong implements ThreadFactory {
2828

2929
final int priority;
3030

31+
final boolean nonBlocking;
32+
3133
// static volatile boolean CREATE_TRACE;
3234

3335
public RxThreadFactory(String prefix) {
34-
this(prefix, Thread.NORM_PRIORITY);
36+
this(prefix, Thread.NORM_PRIORITY, false);
3537
}
3638

3739
public RxThreadFactory(String prefix, int priority) {
40+
this(prefix, priority, false);
41+
}
42+
43+
public RxThreadFactory(String prefix, int priority, boolean nonBlocking) {
3844
this.prefix = prefix;
3945
this.priority = priority;
46+
this.nonBlocking = nonBlocking;
4047
}
4148

4249
@Override
@@ -63,7 +70,8 @@ public Thread newThread(Runnable r) {
6370
// }
6471
// }
6572

66-
Thread t = new Thread(r, nameBuilder.toString());
73+
String name = nameBuilder.toString();
74+
Thread t = nonBlocking ? new RxCustomThread(r, name) : new Thread(r, name);
6775
t.setPriority(priority);
6876
t.setDaemon(true);
6977
return t;
@@ -73,4 +81,10 @@ public Thread newThread(Runnable r) {
7381
public String toString() {
7482
return "RxThreadFactory[" + prefix + "]";
7583
}
84+
85+
static final class RxCustomThread extends Thread implements NonBlockingThread {
86+
RxCustomThread(Runnable run, String name) {
87+
super(run, name);
88+
}
89+
}
7690
}

src/main/java/io/reactivex/internal/schedulers/SingleScheduler.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public final class SingleScheduler extends Scheduler {
4444
int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY,
4545
Integer.getInteger(KEY_SINGLE_PRIORITY, Thread.NORM_PRIORITY)));
4646

47-
SINGLE_THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority);
47+
SINGLE_THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority, true);
4848
}
4949

5050
public SingleScheduler() {

src/main/java/io/reactivex/internal/subscribers/BlockingBaseSubscriber.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import org.reactivestreams.*;
1818

1919
import io.reactivex.internal.subscriptions.SubscriptionHelper;
20-
import io.reactivex.internal.util.ExceptionHelper;
20+
import io.reactivex.internal.util.*;
2121

2222
public abstract class BlockingBaseSubscriber<T> extends CountDownLatch
2323
implements Subscriber<T> {
@@ -60,6 +60,7 @@ public final void onComplete() {
6060
public final T blockingGet() {
6161
if (getCount() != 0) {
6262
try {
63+
BlockingHelper.verifyNonBlocking();
6364
await();
6465
} catch (InterruptedException ex) {
6566
Subscription s = this.s;

src/main/java/io/reactivex/internal/subscribers/FutureSubscriber.java

+3
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.reactivestreams.*;
2121

2222
import io.reactivex.internal.subscriptions.SubscriptionHelper;
23+
import io.reactivex.internal.util.BlockingHelper;
2324
import io.reactivex.plugins.RxJavaPlugins;
2425

2526
/**
@@ -72,6 +73,7 @@ public boolean isDone() {
7273
@Override
7374
public T get() throws InterruptedException, ExecutionException {
7475
if (getCount() != 0) {
76+
BlockingHelper.verifyNonBlocking();
7577
await();
7678
}
7779

@@ -88,6 +90,7 @@ public T get() throws InterruptedException, ExecutionException {
8890
@Override
8991
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
9092
if (getCount() != 0) {
93+
BlockingHelper.verifyNonBlocking();
9194
if (!await(timeout, unit)) {
9295
throw new TimeoutException();
9396
}

src/main/java/io/reactivex/internal/util/BlockingHelper.java

+15-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
import java.util.concurrent.CountDownLatch;
1717

1818
import io.reactivex.disposables.Disposable;
19+
import io.reactivex.internal.schedulers.NonBlockingThread;
20+
import io.reactivex.plugins.RxJavaPlugins;
1921

2022
/**
2123
* Utility methods for helping common blocking operations.
@@ -34,6 +36,7 @@ public static void awaitForComplete(CountDownLatch latch, Disposable subscriptio
3436
}
3537
// block until the subscription completes and then return
3638
try {
39+
verifyNonBlocking();
3740
latch.await();
3841
} catch (InterruptedException e) {
3942
subscription.dispose();
@@ -45,5 +48,16 @@ public static void awaitForComplete(CountDownLatch latch, Disposable subscriptio
4548
}
4649
}
4750

48-
51+
/**
52+
* Checks if the {@code failOnNonBlockingScheduler} plugin setting is enabled and the current
53+
* thread is a Scheduler sensitive to blocking operators.
54+
* @throws IllegalStateException if the {@code failOnNonBlockingScheduler} and the current thread is sensitive to blocking
55+
*/
56+
public static void verifyNonBlocking() {
57+
if (RxJavaPlugins.isFailOnNonBlockingScheduler()
58+
&& (Thread.currentThread() instanceof NonBlockingThread
59+
|| RxJavaPlugins.onBeforeBlocking())) {
60+
throw new IllegalStateException("Attempt to block on a Scheduler " + Thread.currentThread().getName() + " that doesn't support blocking operators as they may lead to deadlock");
61+
}
62+
}
4963
}

0 commit comments

Comments
 (0)