Skip to content

Commit f31aed3

Browse files
authored
Support for scheduled release of threads in Io Scheduler (#7162)
1 parent 947b05f commit f31aed3

File tree

3 files changed

+95
-4
lines changed

3 files changed

+95
-4
lines changed

src/main/java/io/reactivex/internal/schedulers/IoScheduler.java

+18-4
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ public final class IoScheduler extends Scheduler {
4848
/** The name of the system property for setting the thread priority for this Scheduler. */
4949
private static final String KEY_IO_PRIORITY = "rx2.io-priority";
5050

51+
/** The name of the system property for setting the release behaviour for this Scheduler. */
52+
private static final String KEY_SCHEDULED_RELEASE = "rx2.io-scheduled-release";
53+
static boolean USE_SCHEDULED_RELEASE;
54+
5155
static final CachedWorkerPool NONE;
5256

5357
static {
@@ -63,6 +67,8 @@ public final class IoScheduler extends Scheduler {
6367

6468
EVICTOR_THREAD_FACTORY = new RxThreadFactory(EVICTOR_THREAD_NAME_PREFIX, priority);
6569

70+
USE_SCHEDULED_RELEASE = Boolean.getBoolean(KEY_SCHEDULED_RELEASE);
71+
6672
NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY);
6773
NONE.shutdown();
6874
}
@@ -200,7 +206,7 @@ public int size() {
200206
return pool.get().allWorkers.size();
201207
}
202208

203-
static final class EventLoopWorker extends Scheduler.Worker {
209+
static final class EventLoopWorker extends Scheduler.Worker implements Runnable {
204210
private final CompositeDisposable tasks;
205211
private final CachedWorkerPool pool;
206212
private final ThreadWorker threadWorker;
@@ -217,12 +223,20 @@ static final class EventLoopWorker extends Scheduler.Worker {
217223
public void dispose() {
218224
if (once.compareAndSet(false, true)) {
219225
tasks.dispose();
220-
221-
// releasing the pool should be the last action
222-
pool.release(threadWorker);
226+
if (USE_SCHEDULED_RELEASE) {
227+
threadWorker.scheduleActual(this, 0, TimeUnit.NANOSECONDS, null);
228+
} else {
229+
// releasing the pool should be the last action
230+
pool.release(threadWorker);
231+
}
223232
}
224233
}
225234

235+
@Override
236+
public void run() {
237+
pool.release(threadWorker);
238+
}
239+
226240
@Override
227241
public boolean isDisposed() {
228242
return once.get();

src/main/java/io/reactivex/schedulers/Schedulers.java

+19
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
* <ul>
3232
* <li>{@code rx2.io-keep-alive-time} (long): sets the keep-alive time of the {@link #io()} Scheduler workers, default is {@link IoScheduler#KEEP_ALIVE_TIME_DEFAULT}</li>
3333
* <li>{@code rx2.io-priority} (int): sets the thread priority of the {@link #io()} Scheduler, default is {@link Thread#NORM_PRIORITY}</li>
34+
* <li>{@code rx2.io-scheduled-release} (boolean): {@code true} sets the worker release mode of the
35+
* {@link #io()} Scheduler to <em>scheduled</em>, default is {@code false} for <em>eager</em> mode.</li>
3436
* <li>{@code rx2.computation-threads} (int): sets the number of threads in the {@link #computation()} Scheduler, default is the number of available CPUs</li>
3537
* <li>{@code rx2.computation-priority} (int): sets the thread priority of the {@link #computation()} Scheduler, default is {@link Thread#NORM_PRIORITY}</li>
3638
* <li>{@code rx2.newthread-priority} (int): sets the thread priority of the {@link #newThread()} Scheduler, default is {@link Thread#NORM_PRIORITY}</li>
@@ -113,6 +115,8 @@ private Schedulers() {
113115
* <ul>
114116
* <li>{@code rx2.computation-threads} (int): sets the number of threads in the {@link #computation()} Scheduler, default is the number of available CPUs</li>
115117
* <li>{@code rx2.computation-priority} (int): sets the thread priority of the {@link #computation()} Scheduler, default is {@link Thread#NORM_PRIORITY}</li>
118+
* <li>{@code rx2.io-scheduled-release} (boolean): {@code true} sets the worker release mode of the
119+
* {@code #io()} Scheduler to <em>scheduled</em>, default is {@code false} for <em>eager</em> mode.</li>
116120
* </ul>
117121
* <p>
118122
* The default value of this scheduler can be overridden at initialization time via the
@@ -129,6 +133,21 @@ private Schedulers() {
129133
* <p>Operators on the base reactive classes that use this scheduler are marked with the
130134
* &#64;{@link io.reactivex.annotations.SchedulerSupport SchedulerSupport}({@link io.reactivex.annotations.SchedulerSupport#COMPUTATION COMPUTATION})
131135
* annotation.
136+
* <p>
137+
* When the {@link Scheduler.Worker} is disposed, the underlying worker can be released to the cached worker pool in two modes:
138+
* <ul>
139+
* <li>In <em>eager</em> mode (default), the underlying worker is returned immediately to the cached worker pool
140+
* and can be reused much quicker by operators. The drawback is that if the currently running task doesn't
141+
* respond to interruption in time or at all, this may lead to delays or deadlock with the reuse use of the
142+
* underlying worker.
143+
* </li>
144+
* <li>In <em>scheduled</em> mode (enabled via the system parameter {@code rx2.io-scheduled-release}
145+
* set to {@code true}), the underlying worker is returned to the cached worker pool only after the currently running task
146+
* has finished. This can help prevent premature reuse of the underlying worker and likely won't lead to delays or
147+
* deadlock with such reuses. The drawback is that the delay in release may lead to an excess amount of underlying
148+
* workers being created.
149+
* </li>
150+
* </ul>
132151
* @return a {@link Scheduler} meant for computation-bound work
133152
*/
134153
@NonNull
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.reactivex.internal.schedulers;
18+
19+
import io.reactivex.Completable;
20+
import io.reactivex.Flowable;
21+
import io.reactivex.annotations.NonNull;
22+
import io.reactivex.functions.Function;
23+
import io.reactivex.schedulers.Schedulers;
24+
import org.junit.Test;
25+
26+
import java.util.concurrent.TimeUnit;
27+
28+
public class IoScheduledReleaseTest {
29+
30+
/* This test will be stuck in a deadlock if IoScheduler.USE_SCHEDULED_RELEASE is not set */
31+
@Test
32+
public void scheduledRelease() {
33+
boolean savedScheduledRelease = IoScheduler.USE_SCHEDULED_RELEASE;
34+
IoScheduler.USE_SCHEDULED_RELEASE = true;
35+
try {
36+
Flowable.just("item")
37+
.observeOn(Schedulers.io())
38+
.firstOrError()
39+
.map(new Function<String, String>() {
40+
@Override
41+
public String apply(@NonNull final String item) throws Exception {
42+
for (int i = 0; i < 50; i++) {
43+
Completable.complete()
44+
.observeOn(Schedulers.io())
45+
.blockingAwait();
46+
}
47+
return "Done";
48+
}
49+
})
50+
.ignoreElement()
51+
.test()
52+
.awaitDone(5, TimeUnit.SECONDS)
53+
.assertComplete();
54+
} finally {
55+
IoScheduler.USE_SCHEDULED_RELEASE = savedScheduledRelease;
56+
}
57+
}
58+
}

0 commit comments

Comments
 (0)