Skip to content

Commit eb2d610

Browse files
committed
2.x: platform-aware purge/removeOnCancelPolicy management.
1 parent aa400d1 commit eb2d610

12 files changed

+476
-124
lines changed

src/main/java/io/reactivex/internal/schedulers/IOScheduler.java

-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@ private static final class CachedWorkerPool {
6161
Future<?> task = null;
6262
if (unit != null) {
6363
evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
64-
((ScheduledThreadPoolExecutor)evictor).setRemoveOnCancelPolicy(true);
6564
try {
6665
task = evictor.scheduleWithFixedDelay(
6766
new Runnable() {

src/main/java/io/reactivex/internal/schedulers/NewThreadWorker.java

+9-10
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,7 @@ public class NewThreadWorker extends Scheduler.Worker implements Disposable {
3232

3333
/* package */
3434
public NewThreadWorker(ThreadFactory threadFactory) {
35-
ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory);
36-
// Java 7+: cancelled future tasks can be removed from the executor thus avoiding memory leak
37-
if (exec instanceof ScheduledThreadPoolExecutor) {
38-
((ScheduledThreadPoolExecutor)exec).setRemoveOnCancelPolicy(true);
39-
}
35+
ScheduledExecutorService exec = SchedulerPoolHelper.create(threadFactory);
4036
executor = exec;
4137
}
4238

@@ -59,7 +55,7 @@ public Disposable schedule(final Runnable action, long delayTime, TimeUnit unit)
5955
* @param run
6056
* @param delayTime
6157
* @param unit
62-
* @return
58+
* @return a Disposable that let's the caller cancel the scheduled runnable
6359
*/
6460
public Disposable scheduleDirect(final Runnable run, long delayTime, TimeUnit unit) {
6561
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
@@ -84,7 +80,7 @@ public Disposable scheduleDirect(final Runnable run, long delayTime, TimeUnit un
8480
* @param initialDelay
8581
* @param period
8682
* @param unit
87-
* @return
83+
* @return a Disposable that let's the caller cancel the scheduled runnable
8884
*/
8985
public Disposable schedulePeriodicallyDirect(final Runnable run, long initialDelay, long period, TimeUnit unit) {
9086
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
@@ -103,12 +99,15 @@ public Disposable schedulePeriodicallyDirect(final Runnable run, long initialDel
10399
* on the underlying ScheduledExecutorService.
104100
* <p>If the schedule has been rejected, the ScheduledRunnable.wasScheduled will return
105101
* false.
106-
* @param action
102+
* @param run
107103
* @param delayTime
108104
* @param unit
109-
* @return
105+
* @param parent the parent tracking structure to add the created ScheduledRunnable instance before
106+
* it gets scheduled.
107+
* @return a Disposable that let's the caller cancel the scheduled runnable
110108
*/
111-
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, TimeUnit unit, CompositeResource<Disposable> parent) {
109+
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, TimeUnit unit,
110+
CompositeResource<Disposable> parent) {
112111
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
113112

114113
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,244 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.schedulers;
15+
16+
import java.lang.reflect.*;
17+
import java.util.*;
18+
import java.util.concurrent.*;
19+
import java.util.concurrent.atomic.AtomicReference;
20+
21+
import io.reactivex.internal.util.Exceptions;
22+
23+
/**
24+
* Manages the purging of cancelled and delayed tasks considering platform specifics.
25+
*/
26+
public enum SchedulerPoolHelper {
27+
;
28+
29+
/** Key to force purging instead of using removeOnCancelPolicy if available. */
30+
private static final String FORCE_PURGE_KEY = "rx2.scheduler.purge-force";
31+
/**
32+
* Force periodic purging instead of removeOnCancelPolicy even if available.
33+
* Default {@code false}.
34+
*/
35+
private static volatile boolean FORCE_PURGE;
36+
37+
/** Key to the purge frequency parameter in milliseconds. */
38+
private static final String PURGE_FREQUENCY_KEY = "rx2.scheduler.purge-frequency";
39+
/** The purge frequency in milliseconds. */
40+
private static volatile int PURGE_FREQUENCY;
41+
/**
42+
* Holds onto the ScheduledExecutorService that periodically purges all known
43+
* ScheduledThreadPoolExecutors in POOLS.
44+
*/
45+
static final AtomicReference<ScheduledExecutorService> PURGE_THREAD;
46+
/**
47+
* Holds onto the created ScheduledThreadPoolExecutors by this helper.
48+
*/
49+
static final Map<ScheduledThreadPoolExecutor, ScheduledExecutorService> POOLS;
50+
51+
/**
52+
* The reflective method used for setting the removeOnCancelPolicy (JDK 6 safe way).
53+
*/
54+
static final Method SET_REMOVE_ON_CANCEL_POLICY;
55+
56+
static final ThreadFactory PURGE_THREAD_FACTORY;
57+
/**
58+
* Initializes the static fields and figures out the settings for purging.
59+
*/
60+
static {
61+
PURGE_THREAD = new AtomicReference<>();
62+
POOLS = new ConcurrentHashMap<>();
63+
PURGE_THREAD_FACTORY = new RxThreadFactory("RxSchedulerPurge-");
64+
65+
Properties props = System.getProperties();
66+
67+
boolean forcePurgeValue = false;
68+
Method removeOnCancelMethod = null;
69+
70+
// this is necessary because force is turned on and off by tests on desktop
71+
try {
72+
removeOnCancelMethod = ScheduledThreadPoolExecutor.class.getMethod("setRemoveOnCancelPolicy", Boolean.TYPE);
73+
} catch (NoSuchMethodException | SecurityException e) {
74+
// if not present, no problem
75+
forcePurgeValue = true;
76+
}
77+
78+
if (!forcePurgeValue && props.containsKey(FORCE_PURGE_KEY)) {
79+
forcePurgeValue = Boolean.getBoolean(FORCE_PURGE_KEY);
80+
}
81+
82+
PURGE_FREQUENCY = Integer.getInteger(PURGE_FREQUENCY_KEY, 2000);
83+
84+
FORCE_PURGE = forcePurgeValue;
85+
SET_REMOVE_ON_CANCEL_POLICY = removeOnCancelMethod;
86+
start();
87+
}
88+
89+
/**
90+
* Returns the status of the force-purge settings.
91+
* @return the force purge settings
92+
*/
93+
public static boolean forcePurge() {
94+
return FORCE_PURGE;
95+
}
96+
97+
/**
98+
* Sets the force-purge settings.
99+
* <p>Note that enabling or disabling the force-purge by itself doesn't apply to
100+
* existing schedulers and they have to be restarted.
101+
* @param force the new force state
102+
*/
103+
/* test */
104+
public static void forcePurge(boolean force) {
105+
FORCE_PURGE = force;
106+
}
107+
108+
/**
109+
* Returns purge frequency in milliseconds.
110+
* @return purge frequency in milliseconds
111+
*/
112+
public static int purgeFrequency() {
113+
return PURGE_FREQUENCY;
114+
}
115+
116+
/**
117+
* Returns true if the platform supports removeOnCancelPolicy.
118+
* @return true if the platform supports removeOnCancelPolicy.
119+
*/
120+
public static boolean isRemoveOnCancelPolicySupported() {
121+
return SET_REMOVE_ON_CANCEL_POLICY != null;
122+
}
123+
124+
/**
125+
* Creates a single threaded ScheduledExecutorService and wires up all
126+
* necessary purging or removeOnCancelPolicy settings with it.
127+
* @param factory the thread factory to use
128+
* @return the created ScheduledExecutorService
129+
* @throws IllegalStateException if force-purge is not enabled yet the platform doesn't support removeOnCancelPolicy;
130+
* or Executors.newScheduledThreadPool doesn't return a ScheduledThreadPoolExecutor.
131+
*/
132+
public static ScheduledExecutorService create(ThreadFactory factory) {
133+
ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
134+
if (FORCE_PURGE) {
135+
if (exec instanceof ScheduledThreadPoolExecutor) {
136+
ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
137+
POOLS.put(e, e);
138+
} else {
139+
throw new IllegalStateException("The Executors.newScheduledThreadPool didn't return a ScheduledThreadPoolExecutor.");
140+
}
141+
} else {
142+
Method m = SET_REMOVE_ON_CANCEL_POLICY;
143+
if (m == null) {
144+
throw new IllegalStateException("The ScheduledThreadPoolExecutor doesn't support the removeOnCancelPolicy and purging is not enabled.");
145+
}
146+
try {
147+
m.invoke(exec, true);
148+
} catch (IllegalAccessException | InvocationTargetException e) {
149+
Exceptions.propagate(e);
150+
}
151+
}
152+
153+
return exec;
154+
}
155+
156+
/**
157+
* Starts the purge thread and the periodic purging.
158+
*/
159+
public static void start() {
160+
// if purge is not enabled don't do anything
161+
if (!FORCE_PURGE) {
162+
return;
163+
}
164+
for (;;) {
165+
ScheduledExecutorService curr = PURGE_THREAD.get();
166+
if (curr != null) {
167+
return;
168+
}
169+
ScheduledExecutorService next = Executors.newScheduledThreadPool(1, PURGE_THREAD_FACTORY);
170+
if (PURGE_THREAD.compareAndSet(null, next)) {
171+
172+
next.scheduleAtFixedRate(SchedulerPoolHelper::doPurge,
173+
PURGE_FREQUENCY, PURGE_FREQUENCY, TimeUnit.MILLISECONDS);
174+
175+
return;
176+
} else {
177+
next.shutdownNow();
178+
}
179+
}
180+
}
181+
182+
/**
183+
* Shuts down the purge thread and forgets the known ScheduledExecutorServices.
184+
* <p>Note that this stops purging the known ScheduledExecutorServices which may be shut
185+
* down as well to appreciate a new forcePurge state.
186+
*/
187+
public static void shutdown() {
188+
shutdown(true);
189+
}
190+
/**
191+
* Shuts down the purge thread and clears the known ScheduledExecutorServices from POOLS when
192+
* requested.
193+
* <p>Note that this stops purging the known ScheduledExecutorServices which may be shut
194+
* down as well to appreciate a new forcePurge state.
195+
* @param clear if true, the helper forgets all associated ScheduledExecutorServices
196+
*/
197+
public static void shutdown(boolean clear) {
198+
for (;;) {
199+
ScheduledExecutorService curr = PURGE_THREAD.get();
200+
if (curr == null) {
201+
return;
202+
}
203+
if (PURGE_THREAD.compareAndSet(curr, null)) {
204+
curr.shutdownNow();
205+
if (clear) {
206+
POOLS.clear();
207+
}
208+
return;
209+
}
210+
}
211+
}
212+
213+
/**
214+
* Loops through the known ScheduledExecutors and removes the ones that were shut down
215+
* and purges the others
216+
*/
217+
static void doPurge() {
218+
try {
219+
for (ScheduledThreadPoolExecutor e : new ArrayList<>(POOLS.keySet())) {
220+
if (e.isShutdown()) {
221+
POOLS.remove(e);
222+
} else {
223+
e.purge();
224+
}
225+
}
226+
} catch (Throwable ex) {
227+
// ignoring any error, just in case
228+
}
229+
}
230+
231+
/**
232+
* Purges all known ScheduledExecutorServices immediately on the purge thread.
233+
*/
234+
public static void purgeAsync() {
235+
ScheduledExecutorService exec = PURGE_THREAD.get();
236+
if (exec != null) {
237+
try {
238+
exec.submit(SchedulerPoolHelper::doPurge);
239+
} catch (RejectedExecutionException ex) {
240+
// ignored, we are in shutdown
241+
}
242+
}
243+
}
244+
}

src/main/java/io/reactivex/internal/schedulers/SingleScheduler.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,7 @@ public SingleScheduler() {
3838
}
3939

4040
static ScheduledExecutorService createExecutor() {
41-
ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, new RxThreadFactory("RxSingleScheduler-"));
42-
((ScheduledThreadPoolExecutor)exec).setRemoveOnCancelPolicy(true);
41+
ScheduledExecutorService exec = SchedulerPoolHelper.create(new RxThreadFactory("RxSingleScheduler-"));
4342
return exec;
4443
}
4544

src/main/java/io/reactivex/schedulers/Schedulers.java

+13
Original file line numberDiff line numberDiff line change
@@ -94,15 +94,28 @@ public static Scheduler from(Executor executor) {
9494
return new ExecutorScheduler(executor);
9595
}
9696

97+
/**
98+
* Shuts down all standard schedulers: computation, io, newThread, single and trampoline.
99+
* <p>The method is threadsafe and idempotent with respect to other calls to it until the
100+
* {@link #start()} method is called.
101+
* <p>Note that this may cut streams in half and they may end up hanging indefinitely.
102+
* Make sure you cancel all outstanding streams before you shut down the standard schedulers.
103+
* <p>Schedulers created from Executors via {@link #from(Executor)} are not affected.
104+
*/
97105
public static void shutdown() {
98106
computation().shutdown();
99107
io().shutdown();
100108
newThread().shutdown();
101109
single().shutdown();
102110
trampoline().shutdown();
111+
SchedulerPoolHelper.shutdown();
103112
}
104113

114+
/**
115+
* Starts up all standard schedulers: computation, io, newThread, single and trampoline.
116+
*/
105117
public static void start() {
118+
SchedulerPoolHelper.start();
106119
computation().start();
107120
io().start();
108121
newThread().start();

src/test/java/io/reactivex/internal/operators/OperatorWindowWithSizeTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ public void accept(Integer t1) {
182182
})
183183
.observeOn(Schedulers.computation())
184184
.window(5, 4)
185-
.take(2))
185+
.take(2), 128)
186186
.subscribe(ts);
187187
ts.awaitTerminalEvent(500, TimeUnit.MILLISECONDS);
188188
ts.assertTerminated();

src/test/java/io/reactivex/internal/operators/nbp/NbpOperatorWindowWithSizeTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ public void accept(Integer t1) {
178178
})
179179
.observeOn(Schedulers.computation())
180180
.window(5, 4)
181-
.take(2))
181+
.take(2), 128)
182182
.subscribe(ts);
183183
ts.awaitTerminalEvent(500, TimeUnit.MILLISECONDS);
184184
ts.assertTerminated();

src/test/java/io/reactivex/schedulers/CachedThreadSchedulerTest.java

+2-14
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.junit.*;
2121

2222
import io.reactivex.*;
23-
import io.reactivex.Scheduler.Worker;
2423

2524
public class CachedThreadSchedulerTest extends AbstractSchedulerConcurrencyTests {
2625

@@ -66,20 +65,9 @@ public final void testHandledErrorIsNotDeliveredToThreadHandler() throws Interru
6665
SchedulerTests.testHandledErrorIsNotDeliveredToThreadHandler(getScheduler());
6766
}
6867

69-
@Test(timeout = 30000)
68+
@Test(timeout = 90000)
7069
public void testCancelledTaskRetention() throws InterruptedException {
71-
Worker w = Schedulers.io().createWorker();
72-
try {
73-
ExecutorSchedulerTest.testCancelledRetention(w, false);
74-
} finally {
75-
w.dispose();
76-
}
77-
w = Schedulers.io().createWorker();
78-
try {
79-
ExecutorSchedulerTest.testCancelledRetention(w, true);
80-
} finally {
81-
w.dispose();
82-
}
70+
SchedulerRetentionTest.testCancellationRetention(Schedulers.io(), true);
8371
}
8472

8573
}

0 commit comments

Comments
 (0)