Skip to content

Commit 292dc62

Browse files
authored
3.x: Add fair mode overload to Schedulers.from(Executor) (#6744)
1 parent e912237 commit 292dc62

File tree

5 files changed

+655
-105
lines changed

5 files changed

+655
-105
lines changed

src/main/java/io/reactivex/rxjava3/internal/schedulers/ExecutorScheduler.java

+39-3
Original file line numberDiff line numberDiff line change
@@ -33,20 +33,23 @@ public final class ExecutorScheduler extends Scheduler {
3333

3434
final boolean interruptibleWorker;
3535

36+
final boolean fair;
37+
3638
@NonNull
3739
final Executor executor;
3840

3941
static final Scheduler HELPER = Schedulers.single();
4042

41-
public ExecutorScheduler(@NonNull Executor executor, boolean interruptibleWorker) {
43+
public ExecutorScheduler(@NonNull Executor executor, boolean interruptibleWorker, boolean fair) {
4244
this.executor = executor;
4345
this.interruptibleWorker = interruptibleWorker;
46+
this.fair = fair;
4447
}
4548

4649
@NonNull
4750
@Override
4851
public Worker createWorker() {
49-
return new ExecutorWorker(executor, interruptibleWorker);
52+
return new ExecutorWorker(executor, interruptibleWorker, fair);
5053
}
5154

5255
@NonNull
@@ -123,6 +126,8 @@ public static final class ExecutorWorker extends Scheduler.Worker implements Run
123126

124127
final boolean interruptibleWorker;
125128

129+
final boolean fair;
130+
126131
final Executor executor;
127132

128133
final MpscLinkedQueue<Runnable> queue;
@@ -133,10 +138,11 @@ public static final class ExecutorWorker extends Scheduler.Worker implements Run
133138

134139
final CompositeDisposable tasks = new CompositeDisposable();
135140

136-
public ExecutorWorker(Executor executor, boolean interruptibleWorker) {
141+
public ExecutorWorker(Executor executor, boolean interruptibleWorker, boolean fair) {
137142
this.executor = executor;
138143
this.queue = new MpscLinkedQueue<Runnable>();
139144
this.interruptibleWorker = interruptibleWorker;
145+
this.fair = fair;
140146
}
141147

142148
@NonNull
@@ -236,6 +242,36 @@ public boolean isDisposed() {
236242

237243
@Override
238244
public void run() {
245+
if (fair) {
246+
runFair();
247+
} else {
248+
runEager();
249+
}
250+
}
251+
252+
void runFair() {
253+
final MpscLinkedQueue<Runnable> q = queue;
254+
if (disposed) {
255+
q.clear();
256+
return;
257+
}
258+
259+
Runnable run = q.poll();
260+
if (run != null) {
261+
run.run();
262+
}
263+
264+
if (disposed) {
265+
q.clear();
266+
return;
267+
}
268+
269+
if (wip.decrementAndGet() != 0) {
270+
executor.execute(this);
271+
}
272+
}
273+
274+
void runEager() {
239275
int missed = 1;
240276
final MpscLinkedQueue<Runnable> q = queue;
241277
for (;;) {

src/main/java/io/reactivex/rxjava3/schedulers/Schedulers.java

+79-2
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,8 @@ public static Scheduler single() {
321321
* non-delayed tasks as it can, which may result in a longer than expected occupation of a
322322
* thread of the given backing Executor. In other terms, it does not allow per-Runnable fairness
323323
* in case the worker runs on a shared underlying thread of the Executor.
324+
* See {@link #from(Executor, boolean, boolean)} to create a wrapper that uses the underlying Executor
325+
* more fairly.
324326
* <p>
325327
* Starting, stopping and restarting this scheduler is not supported (no-op) and the provided
326328
* executor's lifecycle must be managed externally:
@@ -346,10 +348,11 @@ public static Scheduler single() {
346348
* @param executor
347349
* the executor to wrap
348350
* @return the new Scheduler wrapping the Executor
351+
* @see #from(Executor, boolean, boolean)
349352
*/
350353
@NonNull
351354
public static Scheduler from(@NonNull Executor executor) {
352-
return new ExecutorScheduler(executor, false);
355+
return new ExecutorScheduler(executor, false, false);
353356
}
354357

355358
/**
@@ -382,6 +385,8 @@ public static Scheduler from(@NonNull Executor executor) {
382385
* non-delayed tasks as it can, which may result in a longer than expected occupation of a
383386
* thread of the given backing Executor. In other terms, it does not allow per-Runnable fairness
384387
* in case the worker runs on a shared underlying thread of the Executor.
388+
* See {@link #from(Executor, boolean, boolean)} to create a wrapper that uses the underlying Executor
389+
* more fairly.
385390
* <p>
386391
* Starting, stopping and restarting this scheduler is not supported (no-op) and the provided
387392
* executor's lifecycle must be managed externally:
@@ -411,10 +416,82 @@ public static Scheduler from(@NonNull Executor executor) {
411416
* be interrupted when the task is disposed.
412417
* @return the new Scheduler wrapping the Executor
413418
* @since 3.0.0
419+
* @see #from(Executor, boolean, boolean)
414420
*/
415421
@NonNull
416422
public static Scheduler from(@NonNull Executor executor, boolean interruptibleWorker) {
417-
return new ExecutorScheduler(executor, interruptibleWorker);
423+
return new ExecutorScheduler(executor, interruptibleWorker, false);
424+
}
425+
426+
/**
427+
* Wraps an {@link Executor} into a new Scheduler instance and delegates {@code schedule()}
428+
* calls to it.
429+
* <p>
430+
* The tasks scheduled by the returned {@link Scheduler} and its {@link io.reactivex.rxjava3.core.Scheduler.Worker Scheduler.Worker}
431+
* can be optionally interrupted.
432+
* <p>
433+
* If the provided executor doesn't support any of the more specific standard Java executor
434+
* APIs, tasks scheduled with a time delay or periodically will use the
435+
* {@link #single()} scheduler for the timed waiting
436+
* before posting the actual task to the given executor.
437+
* <p>
438+
* If the provided executor supports the standard Java {@link ExecutorService} API,
439+
* canceling tasks scheduled by this scheduler can be cancelled/interrupted by calling
440+
* {@link io.reactivex.rxjava3.disposables.Disposable#dispose()}. In addition, tasks scheduled with
441+
* a time delay or periodically will use the {@link #single()} scheduler for the timed waiting
442+
* before posting the actual task to the given executor.
443+
* <p>
444+
* If the provided executor supports the standard Java {@link ScheduledExecutorService} API,
445+
* canceling tasks scheduled by this scheduler can be cancelled/interrupted by calling
446+
* {@link io.reactivex.rxjava3.disposables.Disposable#dispose()}. In addition, tasks scheduled with
447+
* a time delay or periodically will use the provided executor. Note, however, if the provided
448+
* {@code ScheduledExecutorService} instance is not single threaded, tasks scheduled
449+
* with a time delay close to each other may end up executing in different order than
450+
* the original schedule() call was issued. This limitation may be lifted in a future patch.
451+
* <p>
452+
* The implementation of the Worker of this wrapper Scheduler can operate in both eager (non-fair) and
453+
* fair modes depending on the specified parameter. In <em>eager</em> mode, it will execute as many
454+
* non-delayed tasks as it can, which may result in a longer than expected occupation of a
455+
* thread of the given backing Executor. In other terms, it does not allow per-Runnable fairness
456+
* in case the worker runs on a shared underlying thread of the Executor. In <em>fair</em> mode,
457+
* non-delayed tasks will still be executed in a FIFO and non-overlapping manner, but after each task,
458+
* the execution for the next task is rescheduled with the same underlying Executor, allowing interleaving
459+
* from both the same Scheduler or other external usages of the underlying Executor.
460+
* <p>
461+
* Starting, stopping and restarting this scheduler is not supported (no-op) and the provided
462+
* executor's lifecycle must be managed externally:
463+
* <pre><code>
464+
* ExecutorService exec = Executors.newSingleThreadedExecutor();
465+
* try {
466+
* Scheduler scheduler = Schedulers.from(exec, true, true);
467+
* Flowable.just(1)
468+
* .subscribeOn(scheduler)
469+
* .map(v -&gt; v + 1)
470+
* .observeOn(scheduler)
471+
* .blockingSubscribe(System.out::println);
472+
* } finally {
473+
* exec.shutdown();
474+
* }
475+
* </code></pre>
476+
* <p>
477+
* This type of scheduler is less sensitive to leaking {@link io.reactivex.rxjava3.core.Scheduler.Worker Scheduler.Worker} instances, although
478+
* not disposing a worker that has timed/delayed tasks not cancelled by other means may leak resources and/or
479+
* execute those tasks "unexpectedly".
480+
* <p>
481+
* Note that this method returns a new {@link Scheduler} instance, even for the same {@link Executor} instance.
482+
* @param executor
483+
* the executor to wrap
484+
* @param interruptibleWorker if {@code true} the tasks submitted to the {@link io.reactivex.rxjava3.core.Scheduler.Worker Scheduler.Worker} will
485+
* be interrupted when the task is disposed.
486+
* @param fair if {@code true} tasks submitted to the will be executed by the underlying {@link Executor} one after the other, still
487+
* in a FIFO and non-overlapping manner, but allows interleaving with other tasks submitted to the underlying {@code Executor}.
488+
* If {@code false}, the underlying FIFO scheme will execute as many tasks as it can before giving up the underlying {@code Executor} thread.
489+
* @return the new Scheduler wrapping the Executor
490+
* @since 3.0.0
491+
*/
492+
@NonNull
493+
public static Scheduler from(@NonNull Executor executor, boolean interruptibleWorker, boolean fair) {
494+
return new ExecutorScheduler(executor, interruptibleWorker, fair);
418495
}
419496

420497
/**

0 commit comments

Comments
 (0)