Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SwtScheduler that honors the Scheduler/Worker contracts #1

Merged
merged 1 commit into from
May 3, 2016
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
281 changes: 208 additions & 73 deletions src/com/diffplug/common/swt/SwtExec.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,41 +15,27 @@
*/
package com.diffplug.common.swt;

import java.util.*;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.function.Supplier;

import org.eclipse.swt.SWT;
import org.eclipse.swt.widgets.Display;
import org.eclipse.swt.widgets.Widget;
import org.eclipse.swt.widgets.*;

import rx.*;
import rx.Observable;
import rx.Scheduler;
import rx.Subscription;
import rx.functions.Action0;
import rx.subscriptions.BooleanSubscription;
import rx.subscriptions.Subscriptions;
import rx.subscriptions.*;

import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.ListenableFuture;

import com.diffplug.common.base.Box.Nullable;
import com.diffplug.common.base.Unhandled;
import com.diffplug.common.rx.Rx;
import com.diffplug.common.rx.RxSubscriber;
import com.diffplug.common.rx.*;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

/**
Expand Down Expand Up @@ -326,58 +312,7 @@ public Scheduler getRxScheduler() {

private SwtExec(Display display) {
this.display = display;
this.scheduler = new Scheduler() {
@Override
public Worker createWorker() {
return new Worker() {
private BooleanSubscription workerSub = BooleanSubscription.create();

@Override
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
if (isUnsubscribed()) {
return Subscriptions.unsubscribed();
}
if (delayTime <= 0) {
return schedule(action);
} else {
ScheduledFuture<?> future = SwtExec.this.schedule(() -> {
if (!workerSub.isUnsubscribed()) {
action.call();
}
}, delayTime, unit);
Subscription sub = Subscriptions.create(() -> {
future.cancel(true);
});
return sub;
}
}

@Override
public Subscription schedule(Action0 action) {
if (isUnsubscribed()) {
return Subscriptions.unsubscribed();
}
BooleanSubscription sub = BooleanSubscription.create();
execute(() -> {
if (!sub.isUnsubscribed() && !workerSub.isUnsubscribed()) {
action.call();
}
});
return sub;
}

@Override
public void unsubscribe() {
workerSub.unsubscribe();
}

@Override
public boolean isUnsubscribed() {
return workerSub.isUnsubscribed();
}
};
}
};
this.scheduler = new SwtScheduler(this);
this.rxExecutor = Rx.on(this, scheduler);
}

Expand Down Expand Up @@ -673,4 +608,204 @@ public boolean isDone() {
return runnableFuture.isDone();
}
}

/** Scheduler that runs tasks on Swt's event dispatch thread. */
static final class SwtScheduler extends Scheduler {
final SwtExec exec;

public SwtScheduler(SwtExec exec) {
this.exec = exec;
}

@Override
public Worker createWorker() {
return new SwtWorker(exec);
}

static final class SwtWorker extends Scheduler.Worker {
final SwtExec exec;

volatile boolean unsubscribed;

/** Set of active tasks, guarded by this. */
Set<SwtScheduledAction> tasks;

public SwtWorker(SwtExec exec) {
this.exec = exec;
this.tasks = new HashSet<>();
}

@Override
public void unsubscribe() {
if (unsubscribed) {
return;
}
unsubscribed = true;

Set<SwtScheduledAction> set;
synchronized (this) {
set = tasks;
tasks = null;
}

if (set != null) {
for (SwtScheduledAction a : set) {
a.cancelFuture();
}
}
}

void remove(SwtScheduledAction a) {
if (unsubscribed) {
return;
}
synchronized (this) {
if (unsubscribed) {
return;
}

tasks.remove(a);
}
}

@Override
public boolean isUnsubscribed() {
return unsubscribed;
}

@Override
public Subscription schedule(Action0 action) {
if (unsubscribed) {
return Subscriptions.unsubscribed();
}

SwtScheduledAction a = new SwtScheduledAction(action, this);

synchronized (this) {
if (unsubscribed) {
return Subscriptions.unsubscribed();
}

tasks.add(a);
}

exec.execute(a);

if (unsubscribed) {
a.cancel();
return Subscriptions.unsubscribed();
}

return a;
}

@Override
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
if (unsubscribed) {
return Subscriptions.unsubscribed();
}

SwtScheduledAction a = new SwtScheduledAction(action, this);

synchronized (this) {
if (unsubscribed) {
return Subscriptions.unsubscribed();
}

tasks.add(a);
}

Future<?> f = exec.schedule(a, delayTime, unit);

if (unsubscribed) {
a.cancel();
f.cancel(true);
return Subscriptions.unsubscribed();
}

a.setFuture(f);

return a;
}

/**
* Represents a cancellable asynchronous Runnable that wraps an action
* and manages the associated Worker lifecycle.
*/
static final class SwtScheduledAction implements Runnable, Subscription {
final Action0 action;

final SwtWorker parent;

volatile Future<?> future;
@SuppressWarnings("rawtypes")
static final AtomicReferenceFieldUpdater<SwtScheduledAction, Future> FUTURE = AtomicReferenceFieldUpdater.newUpdater(SwtScheduledAction.class, Future.class, "future");

static final Future<?> CANCELLED = new FutureTask<>(() -> {}, null);

static final Future<?> FINISHED = new FutureTask<>(() -> {}, null);

volatile int state;
static final AtomicIntegerFieldUpdater<SwtScheduledAction> STATE = AtomicIntegerFieldUpdater.newUpdater(SwtScheduledAction.class, "state");

static final int STATE_ACTIVE = 0;
static final int STATE_FINISHED = 1;
static final int STATE_CANCELLED = 2;

public SwtScheduledAction(Action0 action, SwtWorker parent) {
this.action = action;
this.parent = parent;
}

@Override
public void run() {
if (!parent.unsubscribed && state == STATE_ACTIVE) {
try {
action.call();
} finally {
FUTURE.lazySet(this, FINISHED);
if (STATE.compareAndSet(this, STATE_ACTIVE, STATE_FINISHED)) {
parent.remove(this);
}
}
}
}

@Override
public boolean isUnsubscribed() {
return state != STATE_ACTIVE;
}

@Override
public void unsubscribe() {
if (STATE.compareAndSet(this, STATE_ACTIVE, STATE_CANCELLED)) {
parent.remove(this);
}
cancelFuture();
}

void setFuture(Future<?> f) {
if (FUTURE.compareAndSet(this, null, f)) {
if (future != FINISHED) {
f.cancel(true);
}
}
}

void cancelFuture() {
Future<?> f = future;
if (f != CANCELLED && f != FINISHED) {
f = FUTURE.getAndSet(this, CANCELLED);
if (f != null && f != CANCELLED && f != FINISHED) {
f.cancel(true);
}
}
}

void cancel() {
state = STATE_CANCELLED;
}
}
}
}
}