Skip to content

Commit 476a69f

Browse files
authored
2.x: prevent tasks to self interrupt on the standard schedulers (#5207)
1 parent cd91a9f commit 476a69f

19 files changed

+924
-41
lines changed

src/main/java/io/reactivex/Scheduler.java

+25-6
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,12 @@
1515

1616
import java.util.concurrent.TimeUnit;
1717

18-
import io.reactivex.annotations.Experimental;
19-
import io.reactivex.annotations.NonNull;
18+
import io.reactivex.annotations.*;
2019
import io.reactivex.disposables.Disposable;
2120
import io.reactivex.exceptions.Exceptions;
2221
import io.reactivex.functions.Function;
2322
import io.reactivex.internal.disposables.*;
24-
import io.reactivex.internal.schedulers.SchedulerWhen;
23+
import io.reactivex.internal.schedulers.*;
2524
import io.reactivex.internal.util.ExceptionHelper;
2625
import io.reactivex.plugins.RxJavaPlugins;
2726

@@ -131,9 +130,11 @@ public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull Tim
131130

132131
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
133132

134-
w.schedule(new DisposeTask(decoratedRun, w), delay, unit);
133+
DisposeTask task = new DisposeTask(decoratedRun, w);
135134

136-
return w;
135+
w.schedule(task, delay, unit);
136+
137+
return task;
137138
}
138139

139140
/**
@@ -432,22 +433,40 @@ public boolean isDisposed() {
432433
}
433434
}
434435

435-
static final class DisposeTask implements Runnable {
436+
static final class DisposeTask implements Runnable, Disposable {
436437
final Runnable decoratedRun;
437438
final Worker w;
438439

440+
Thread runner;
441+
439442
DisposeTask(Runnable decoratedRun, Worker w) {
440443
this.decoratedRun = decoratedRun;
441444
this.w = w;
442445
}
443446

444447
@Override
445448
public void run() {
449+
runner = Thread.currentThread();
446450
try {
447451
decoratedRun.run();
448452
} finally {
453+
dispose();
454+
runner = null;
455+
}
456+
}
457+
458+
@Override
459+
public void dispose() {
460+
if (runner == Thread.currentThread() && w instanceof NewThreadWorker) {
461+
((NewThreadWorker)w).shutdown();
462+
} else {
449463
w.dispose();
450464
}
451465
}
466+
467+
@Override
468+
public boolean isDisposed() {
469+
return w.isDisposed();
470+
}
452471
}
453472
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.reactivex.internal.schedulers;
18+
19+
import java.util.concurrent.*;
20+
import java.util.concurrent.atomic.AtomicReference;
21+
22+
import io.reactivex.disposables.Disposable;
23+
import io.reactivex.internal.functions.Functions;
24+
25+
/**
26+
* Base functionality for direct tasks that manage a runnable and cancellation/completion.
27+
* @since 2.0.8
28+
*/
29+
abstract class AbstractDirectTask
30+
extends AtomicReference<Future<?>>
31+
implements Disposable {
32+
33+
private static final long serialVersionUID = 1811839108042568751L;
34+
35+
protected final Runnable runnable;
36+
37+
protected Thread runner;
38+
39+
protected static final FutureTask<Void> FINISHED = new FutureTask<Void>(Functions.EMPTY_RUNNABLE, null);
40+
41+
protected static final FutureTask<Void> DISPOSED = new FutureTask<Void>(Functions.EMPTY_RUNNABLE, null);
42+
43+
public AbstractDirectTask(Runnable runnable) {
44+
this.runnable = runnable;
45+
}
46+
47+
@Override
48+
public final void dispose() {
49+
Future<?> f = get();
50+
if (f != FINISHED && f != DISPOSED) {
51+
if (compareAndSet(f, DISPOSED)) {
52+
if (f != null) {
53+
f.cancel(runner != Thread.currentThread());
54+
}
55+
}
56+
}
57+
}
58+
59+
@Override
60+
public final boolean isDisposed() {
61+
Future<?> f = get();
62+
return f == FINISHED || f == DISPOSED;
63+
}
64+
65+
public final void setFuture(Future<?> future) {
66+
for (;;) {
67+
Future<?> f = get();
68+
if (f == FINISHED) {
69+
break;
70+
}
71+
if (f == DISPOSED) {
72+
future.cancel(runner != Thread.currentThread());
73+
break;
74+
}
75+
if (compareAndSet(f, future)) {
76+
break;
77+
}
78+
}
79+
}
80+
}

src/main/java/io/reactivex/internal/schedulers/ExecutorScheduler.java

+12-6
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,10 @@ public Disposable scheduleDirect(@NonNull Runnable run) {
5151
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
5252
try {
5353
if (executor instanceof ExecutorService) {
54-
Future<?> f = ((ExecutorService)executor).submit(decoratedRun);
55-
return Disposables.fromFuture(f);
54+
ScheduledDirectTask task = new ScheduledDirectTask(decoratedRun);
55+
Future<?> f = ((ExecutorService)executor).submit(task);
56+
task.setFuture(f);
57+
return task;
5658
}
5759

5860
BooleanRunnable br = new BooleanRunnable(decoratedRun);
@@ -70,8 +72,10 @@ public Disposable scheduleDirect(@NonNull Runnable run, final long delay, final
7072
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
7173
if (executor instanceof ScheduledExecutorService) {
7274
try {
73-
Future<?> f = ((ScheduledExecutorService)executor).schedule(decoratedRun, delay, unit);
74-
return Disposables.fromFuture(f);
75+
ScheduledDirectTask task = new ScheduledDirectTask(decoratedRun);
76+
Future<?> f = ((ScheduledExecutorService)executor).schedule(task, delay, unit);
77+
task.setFuture(f);
78+
return task;
7579
} catch (RejectedExecutionException ex) {
7680
RxJavaPlugins.onError(ex);
7781
return EmptyDisposable.INSTANCE;
@@ -93,8 +97,10 @@ public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initial
9397
if (executor instanceof ScheduledExecutorService) {
9498
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
9599
try {
96-
Future<?> f = ((ScheduledExecutorService)executor).scheduleAtFixedRate(decoratedRun, initialDelay, period, unit);
97-
return Disposables.fromFuture(f);
100+
ScheduledDirectPeriodicTask task = new ScheduledDirectPeriodicTask(decoratedRun);
101+
Future<?> f = ((ScheduledExecutorService)executor).scheduleAtFixedRate(task, initialDelay, period, unit);
102+
task.setFuture(f);
103+
return task;
98104
} catch (RejectedExecutionException ex) {
99105
RxJavaPlugins.onError(ex);
100106
return EmptyDisposable.INSTANCE;

src/main/java/io/reactivex/internal/schedulers/NewThreadWorker.java

+20-8
Original file line numberDiff line numberDiff line change
@@ -60,15 +60,16 @@ public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonN
6060
* @return the ScheduledRunnable instance
6161
*/
6262
public Disposable scheduleDirect(final Runnable run, long delayTime, TimeUnit unit) {
63-
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
63+
ScheduledDirectTask task = new ScheduledDirectTask(RxJavaPlugins.onSchedule(run));
6464
try {
6565
Future<?> f;
66-
if (delayTime <= 0) {
67-
f = executor.submit(decoratedRun);
66+
if (delayTime <= 0L) {
67+
f = executor.submit(task);
6868
} else {
69-
f = executor.schedule(decoratedRun, delayTime, unit);
69+
f = executor.schedule(task, delayTime, unit);
7070
}
71-
return Disposables.fromFuture(f);
71+
task.setFuture(f);
72+
return task;
7273
} catch (RejectedExecutionException ex) {
7374
RxJavaPlugins.onError(ex);
7475
return EmptyDisposable.INSTANCE;
@@ -85,10 +86,11 @@ public Disposable scheduleDirect(final Runnable run, long delayTime, TimeUnit un
8586
* @return the ScheduledRunnable instance
8687
*/
8788
public Disposable schedulePeriodicallyDirect(final Runnable run, long initialDelay, long period, TimeUnit unit) {
88-
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
89+
ScheduledDirectPeriodicTask task = new ScheduledDirectPeriodicTask(RxJavaPlugins.onSchedule(run));
8990
try {
90-
Future<?> f = executor.scheduleAtFixedRate(decoratedRun, initialDelay, period, unit);
91-
return Disposables.fromFuture(f);
91+
Future<?> f = executor.scheduleAtFixedRate(task, initialDelay, period, unit);
92+
task.setFuture(f);
93+
return task;
9294
} catch (RejectedExecutionException ex) {
9395
RxJavaPlugins.onError(ex);
9496
return EmptyDisposable.INSTANCE;
@@ -145,6 +147,16 @@ public void dispose() {
145147
}
146148
}
147149

150+
/**
151+
* Shuts down the underlying executor in a non-interrupting fashion.
152+
*/
153+
public void shutdown() {
154+
if (!disposed) {
155+
disposed = true;
156+
executor.shutdown();
157+
}
158+
}
159+
148160
@Override
149161
public boolean isDisposed() {
150162
return disposed;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.reactivex.internal.schedulers;
18+
19+
import io.reactivex.plugins.RxJavaPlugins;
20+
21+
/**
22+
* A Callable to be submitted to an ExecutorService that runs a Runnable
23+
* action periodically and manages completion/cancellation.
24+
* @since 2.0.8
25+
*/
26+
public final class ScheduledDirectPeriodicTask extends AbstractDirectTask implements Runnable {
27+
28+
private static final long serialVersionUID = 1811839108042568751L;
29+
30+
public ScheduledDirectPeriodicTask(Runnable runnable) {
31+
super(runnable);
32+
}
33+
34+
@Override
35+
public void run() {
36+
runner = Thread.currentThread();
37+
try {
38+
try {
39+
runnable.run();
40+
} catch (Throwable ex) {
41+
lazySet(FINISHED);
42+
RxJavaPlugins.onError(ex);
43+
}
44+
} finally {
45+
runner = null;
46+
}
47+
}
48+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.reactivex.internal.schedulers;
18+
19+
import java.util.concurrent.Callable;
20+
21+
/**
22+
* A Callable to be submitted to an ExecutorService that runs a Runnable
23+
* action and manages completion/cancellation.
24+
* @since 2.0.8
25+
*/
26+
public final class ScheduledDirectTask extends AbstractDirectTask implements Callable<Void> {
27+
28+
private static final long serialVersionUID = 1811839108042568751L;
29+
30+
public ScheduledDirectTask(Runnable runnable) {
31+
super(runnable);
32+
}
33+
34+
@Override
35+
public Void call() throws Exception {
36+
runner = Thread.currentThread();
37+
try {
38+
runnable.run();
39+
} finally {
40+
lazySet(FINISHED);
41+
runner = null;
42+
}
43+
return null;
44+
}
45+
}

src/main/java/io/reactivex/internal/schedulers/ScheduledRunnable.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ public final class ScheduledRunnable extends AtomicReferenceArray<Object>
3232

3333
static final int PARENT_INDEX = 0;
3434
static final int FUTURE_INDEX = 1;
35+
static final int THREAD_INDEX = 2;
3536

3637
/**
3738
* Creates a ScheduledRunnable by wrapping the given action and setting
@@ -40,7 +41,7 @@ public final class ScheduledRunnable extends AtomicReferenceArray<Object>
4041
* @param parent the parent tracking container or null if none
4142
*/
4243
public ScheduledRunnable(Runnable actual, DisposableContainer parent) {
43-
super(2);
44+
super(3);
4445
this.actual = actual;
4546
this.lazySet(0, parent);
4647
}
@@ -54,6 +55,7 @@ public Object call() {
5455

5556
@Override
5657
public void run() {
58+
lazySet(THREAD_INDEX, Thread.currentThread());
5759
try {
5860
try {
5961
actual.run();
@@ -62,6 +64,7 @@ public void run() {
6264
RxJavaPlugins.onError(e);
6365
}
6466
} finally {
67+
lazySet(THREAD_INDEX, null);
6568
Object o = get(PARENT_INDEX);
6669
if (o != DISPOSED && o != null && compareAndSet(PARENT_INDEX, o, DONE)) {
6770
((DisposableContainer)o).delete(this);
@@ -83,7 +86,7 @@ public void setFuture(Future<?> f) {
8386
return;
8487
}
8588
if (o == DISPOSED) {
86-
f.cancel(true);
89+
f.cancel(get(THREAD_INDEX) != Thread.currentThread());
8790
return;
8891
}
8992
if (compareAndSet(FUTURE_INDEX, o, f)) {
@@ -101,7 +104,7 @@ public void dispose() {
101104
}
102105
if (compareAndSet(FUTURE_INDEX, o, DISPOSED)) {
103106
if (o != null) {
104-
((Future<?>)o).cancel(true);
107+
((Future<?>)o).cancel(get(THREAD_INDEX) != Thread.currentThread());
105108
}
106109
break;
107110
}

0 commit comments

Comments
 (0)