diff --git a/src/com/diffplug/common/swt/SwtExec.java b/src/com/diffplug/common/swt/SwtExec.java index d31bc870..4d4d6a9e 100644 --- a/src/com/diffplug/common/swt/SwtExec.java +++ b/src/com/diffplug/common/swt/SwtExec.java @@ -15,32 +15,19 @@ */ package com.diffplug.common.swt; +import java.util.*; import java.util.List; -import java.util.Objects; -import java.util.concurrent.AbstractExecutorService; -import java.util.concurrent.Callable; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.Delayed; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.RunnableFuture; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; import java.util.function.Supplier; import org.eclipse.swt.SWT; -import org.eclipse.swt.widgets.Display; -import org.eclipse.swt.widgets.Widget; +import org.eclipse.swt.widgets.*; +import rx.*; import rx.Observable; -import rx.Scheduler; -import rx.Subscription; import rx.functions.Action0; -import rx.subscriptions.BooleanSubscription; -import rx.subscriptions.Subscriptions; +import rx.subscriptions.*; import com.google.common.base.Preconditions; import com.google.common.primitives.Ints; @@ -48,8 +35,7 @@ import com.diffplug.common.base.Box.Nullable; import com.diffplug.common.base.Unhandled; -import com.diffplug.common.rx.Rx; -import com.diffplug.common.rx.RxSubscriber; +import com.diffplug.common.rx.*; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; /** @@ -326,58 +312,7 @@ public Scheduler getRxScheduler() { private SwtExec(Display display) { this.display = display; - this.scheduler = new Scheduler() { - @Override - public Worker createWorker() { - return new Worker() { - private BooleanSubscription workerSub = BooleanSubscription.create(); - - @Override - public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) { - if (isUnsubscribed()) { - return Subscriptions.unsubscribed(); - } - if (delayTime <= 0) { - return schedule(action); - } else { - ScheduledFuture future = SwtExec.this.schedule(() -> { - if (!workerSub.isUnsubscribed()) { - action.call(); - } - }, delayTime, unit); - Subscription sub = Subscriptions.create(() -> { - future.cancel(true); - }); - return sub; - } - } - - @Override - public Subscription schedule(Action0 action) { - if (isUnsubscribed()) { - return Subscriptions.unsubscribed(); - } - BooleanSubscription sub = BooleanSubscription.create(); - execute(() -> { - if (!sub.isUnsubscribed() && !workerSub.isUnsubscribed()) { - action.call(); - } - }); - return sub; - } - - @Override - public void unsubscribe() { - workerSub.unsubscribe(); - } - - @Override - public boolean isUnsubscribed() { - return workerSub.isUnsubscribed(); - } - }; - } - }; + this.scheduler = new SwtScheduler(this); this.rxExecutor = Rx.on(this, scheduler); } @@ -673,4 +608,204 @@ public boolean isDone() { return runnableFuture.isDone(); } } + + /** Scheduler that runs tasks on Swt's event dispatch thread. */ + static final class SwtScheduler extends Scheduler { + final SwtExec exec; + + public SwtScheduler(SwtExec exec) { + this.exec = exec; + } + + @Override + public Worker createWorker() { + return new SwtWorker(exec); + } + + static final class SwtWorker extends Scheduler.Worker { + final SwtExec exec; + + volatile boolean unsubscribed; + + /** Set of active tasks, guarded by this. */ + Set tasks; + + public SwtWorker(SwtExec exec) { + this.exec = exec; + this.tasks = new HashSet<>(); + } + + @Override + public void unsubscribe() { + if (unsubscribed) { + return; + } + unsubscribed = true; + + Set set; + synchronized (this) { + set = tasks; + tasks = null; + } + + if (set != null) { + for (SwtScheduledAction a : set) { + a.cancelFuture(); + } + } + } + + void remove(SwtScheduledAction a) { + if (unsubscribed) { + return; + } + synchronized (this) { + if (unsubscribed) { + return; + } + + tasks.remove(a); + } + } + + @Override + public boolean isUnsubscribed() { + return unsubscribed; + } + + @Override + public Subscription schedule(Action0 action) { + if (unsubscribed) { + return Subscriptions.unsubscribed(); + } + + SwtScheduledAction a = new SwtScheduledAction(action, this); + + synchronized (this) { + if (unsubscribed) { + return Subscriptions.unsubscribed(); + } + + tasks.add(a); + } + + exec.execute(a); + + if (unsubscribed) { + a.cancel(); + return Subscriptions.unsubscribed(); + } + + return a; + } + + @Override + public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) { + if (unsubscribed) { + return Subscriptions.unsubscribed(); + } + + SwtScheduledAction a = new SwtScheduledAction(action, this); + + synchronized (this) { + if (unsubscribed) { + return Subscriptions.unsubscribed(); + } + + tasks.add(a); + } + + Future f = exec.schedule(a, delayTime, unit); + + if (unsubscribed) { + a.cancel(); + f.cancel(true); + return Subscriptions.unsubscribed(); + } + + a.setFuture(f); + + return a; + } + + /** + * Represents a cancellable asynchronous Runnable that wraps an action + * and manages the associated Worker lifecycle. + */ + static final class SwtScheduledAction implements Runnable, Subscription { + final Action0 action; + + final SwtWorker parent; + + volatile Future future; + @SuppressWarnings("rawtypes") + static final AtomicReferenceFieldUpdater FUTURE = AtomicReferenceFieldUpdater.newUpdater(SwtScheduledAction.class, Future.class, "future"); + + static final Future CANCELLED = new FutureTask<>(() -> {}, null); + + static final Future FINISHED = new FutureTask<>(() -> {}, null); + + volatile int state; + static final AtomicIntegerFieldUpdater STATE = AtomicIntegerFieldUpdater.newUpdater(SwtScheduledAction.class, "state"); + + static final int STATE_ACTIVE = 0; + static final int STATE_FINISHED = 1; + static final int STATE_CANCELLED = 2; + + public SwtScheduledAction(Action0 action, SwtWorker parent) { + this.action = action; + this.parent = parent; + } + + @Override + public void run() { + if (!parent.unsubscribed && state == STATE_ACTIVE) { + try { + action.call(); + } finally { + FUTURE.lazySet(this, FINISHED); + if (STATE.compareAndSet(this, STATE_ACTIVE, STATE_FINISHED)) { + parent.remove(this); + } + } + } + } + + @Override + public boolean isUnsubscribed() { + return state != STATE_ACTIVE; + } + + @Override + public void unsubscribe() { + if (STATE.compareAndSet(this, STATE_ACTIVE, STATE_CANCELLED)) { + parent.remove(this); + } + cancelFuture(); + } + + void setFuture(Future f) { + if (FUTURE.compareAndSet(this, null, f)) { + if (future != FINISHED) { + f.cancel(true); + } + } + } + + void cancelFuture() { + Future f = future; + if (f != CANCELLED && f != FINISHED) { + f = FUTURE.getAndSet(this, CANCELLED); + if (f != null && f != CANCELLED && f != FINISHED) { + f.cancel(true); + } + } + } + + void cancel() { + state = STATE_CANCELLED; + } + } + } + } }