diff --git a/src/main/java/org/bukkit/craftbukkit/scheduler/CraftAsyncTask.java b/src/main/java/org/bukkit/craftbukkit/scheduler/CraftAsyncTask.java new file mode 100644 index 0000000000..6901e4eed8 --- /dev/null +++ b/src/main/java/org/bukkit/craftbukkit/scheduler/CraftAsyncTask.java @@ -0,0 +1,98 @@ +package org.bukkit.craftbukkit.scheduler; + +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Map; + +import org.apache.commons.lang.UnhandledException; +import org.bukkit.plugin.Plugin; +import org.bukkit.scheduler.BukkitWorker; + + +class CraftAsyncTask extends CraftTask { + + private final LinkedList workers = new LinkedList(); + private final Map runners; + + CraftAsyncTask(final Map runners, final Plugin plugin, final Runnable task, final int id, final long delay) { + super(plugin, task, id, delay); + this.runners = runners; + } + + @Override + public boolean isSync() { + return false; + } + + @Override + public void run() { + final Thread thread = Thread.currentThread(); + synchronized(workers) { + if (getPeriod() == -2) { + // Never continue running after cancelled. + // Checking this with the lock is important! + return; + } + workers.add( + new BukkitWorker() { + public Thread getThread() { + return thread; + } + + public int getTaskId() { + return CraftAsyncTask.this.getTaskId(); + } + + public Plugin getOwner() { + return CraftAsyncTask.this.getOwner(); + } + }); + } + Throwable thrown = null; + try { + super.run(); + } catch (final Throwable t) { + thrown = t; + throw new UnhandledException( + String.format( + "Plugin %s generated an exception while executing task %s", + getOwner().getDescription().getFullName(), + getTaskId()), + thrown); + } finally { + // Cleanup is important for any async task, otherwise ghost tasks are everywhere + synchronized(workers) { + try { + final Iterator workers = this.workers.iterator(); + boolean removed = false; + while (workers.hasNext()) { + if (workers.next().getThread() == thread) { + workers.remove(); + removed = true; // Don't throw exception + break; + } + } + if (!removed) { + throw new IllegalStateException( + String.format( + "Unable to remove worker %s on task %s for %s", + thread.getName(), + getTaskId(), + getOwner().getDescription().getFullName()), + thrown); // We don't want to lose the original exception, if any + } + } finally { + if (getPeriod() < 0 && workers.isEmpty()) { + // At this spot, we know we are the final async task being executed! + // Because we have the lock, nothing else is running or will run because delay < 0 + runners.remove(getTaskId()); + } + } + } + } + } + + LinkedList getWorkers() { + return workers; + } +} diff --git a/src/main/java/org/bukkit/craftbukkit/scheduler/CraftFuture.java b/src/main/java/org/bukkit/craftbukkit/scheduler/CraftFuture.java index c30d00a7ba..de96ec9b7c 100644 --- a/src/main/java/org/bukkit/craftbukkit/scheduler/CraftFuture.java +++ b/src/main/java/org/bukkit/craftbukkit/scheduler/CraftFuture.java @@ -7,100 +7,88 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -public class CraftFuture implements Runnable, Future { +import org.bukkit.plugin.Plugin; + +class CraftFuture extends CraftTask implements Future { - private final CraftScheduler craftScheduler; private final Callable callable; - private final ObjectContainer returnStore = new ObjectContainer(); - private boolean done = false; - private boolean running = false; - private boolean cancelled = false; - private Exception e = null; - private int taskId = -1; + private T value; + private Exception exception = null; - CraftFuture(CraftScheduler craftScheduler, Callable callable) { + CraftFuture(final Callable callable, final Plugin plugin, final int id) { + super(plugin, null, id, -1l); this.callable = callable; - this.craftScheduler = craftScheduler; } - public void run() { - synchronized (this) { - if (cancelled) { - return; - } - running = true; - } - try { - returnStore.setObject(callable.call()); - } catch (Exception e) { - this.e = e; - } - synchronized (this) { - running = false; - done = true; - this.notify(); - } - } - - public T get() throws InterruptedException, ExecutionException { - try { - return get(0L, TimeUnit.MILLISECONDS); - } catch (TimeoutException te) {} - return null; - } - - public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - synchronized (this) { - if (isDone()) { - return getResult(); - } - this.wait(TimeUnit.MILLISECONDS.convert(timeout, unit)); - return getResult(); - } - } - - public T getResult() throws ExecutionException { - if (cancelled) { - throw new CancellationException(); - } - if (e != null) { - throw new ExecutionException(e); - } - return returnStore.getObject(); - } - - public boolean isDone() { - synchronized (this) { - return done; + public synchronized boolean cancel(final boolean mayInterruptIfRunning) { + if (getPeriod() != -1l) { + return false; } + setPeriod(-2l); + return true; } public boolean isCancelled() { - synchronized (this) { - return cancelled; + return getPeriod() == -2l; + } + + public boolean isDone() { + final long period = this.getPeriod(); + return period != -1l && period != -3l; + } + + public T get() throws CancellationException, InterruptedException, ExecutionException { + try { + return get(0, TimeUnit.MILLISECONDS); + } catch (final TimeoutException e) { + throw new Error(e); } } - public boolean cancel(boolean mayInterruptIfRunning) { - synchronized (this) { - if (cancelled) { - return false; + public synchronized T get(long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + timeout = unit.toMillis(timeout); + long period = this.getPeriod(); + while (true) { + if (period == -1l || period == -3l) { + this.wait(unit.toMillis(timeout)); + period = this.getPeriod(); + if (period == -1l || period == -3l) { + if (timeout == 0l) { + continue; + } + throw new TimeoutException(); + } } - cancelled = true; - if (taskId != -1) { - craftScheduler.cancelTask(taskId); + if (period == -2l) { + throw new CancellationException(); } - if (!running && !done) { - return true; - } else { - return false; + if (period == -4l) { + if (exception == null) { + return value; + } + throw new ExecutionException(exception); } + throw new IllegalStateException("Expected " + -1l + " to " + -4l + ", got " + period); } } - public void setTaskId(int taskId) { + @Override + public void run() { synchronized (this) { - this.taskId = taskId; + if (getPeriod() == -2l) { + return; + } + setPeriod(-3l); + } + try { + value = callable.call(); + } catch (final Exception e) { + exception = e; + } finally { + synchronized (this) { + setPeriod(-4l); + this.notifyAll(); + } } } } diff --git a/src/main/java/org/bukkit/craftbukkit/scheduler/CraftScheduler.java b/src/main/java/org/bukkit/craftbukkit/scheduler/CraftScheduler.java index e2ecd6cf85..6b70a78bc1 100644 --- a/src/main/java/org/bukkit/craftbukkit/scheduler/CraftScheduler.java +++ b/src/main/java/org/bukkit/craftbukkit/scheduler/CraftScheduler.java @@ -1,385 +1,421 @@ package org.bukkit.craftbukkit.scheduler; import java.util.ArrayList; +import java.util.Comparator; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; -import java.util.TreeMap; +import java.util.PriorityQueue; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; -import java.util.logging.Logger; +import org.apache.commons.lang.Validate; import org.bukkit.plugin.Plugin; import org.bukkit.scheduler.BukkitScheduler; import org.bukkit.scheduler.BukkitTask; import org.bukkit.scheduler.BukkitWorker; -public class CraftScheduler implements BukkitScheduler, Runnable { +/** + * The fundamental concepts for this implementation: + *
  • Main thread owns {@link #head} and {@link #currentTick}, but it may be read from any thread
  • + *
  • Main thread exclusively controls {@link #temp} and {@link #pending}. + * They are never to be accessed outside of the main thread; alternatives exist to prevent locking.
  • + *
  • {@link #head} to {@link #tail} act as a linked list/queue, with 1 consumer and infinite producers. + * Adding to the tail is atomic and very efficient; utility method is {@link #handle(CraftTask, long)} or {@link #addTask(CraftTask)}.
  • + *
  • Changing the period on a task is delicate. + * Any future task needs to notify waiting threads. + * Async tasks must be synchronized to make sure that any thread that's finishing will remove itself from {@link #runners}. + * Another utility method is provided for this, {@link #cancelTask(CraftTask)}
  • + *
  • {@link #runners} provides a moderately up-to-date view of active tasks. + * If the linked head to tail set is read, all remaining tasks that were active at the time execution started will be located in runners.
  • + *
  • Async tasks are responsible for removing themselves from runners
  • + *
  • Sync tasks are only to be removed from runners on the main thread when coupled with a removal from pending and temp.
  • + *
  • Most of the design in this scheduler relies on queuing special tasks to perform any data changes on the main thread. + * When executed from inside a synchronous method, the scheduler will be updated before next execution by virtue of the frequent {@link #parsePending()} calls.
  • + */ +public class CraftScheduler implements BukkitScheduler { - private static final Logger logger = Logger.getLogger("Minecraft"); + /** + * Counter for IDs. Order doesn't matter, only uniqueness. + */ + private final AtomicInteger ids = new AtomicInteger(1); + /** + * Current head of linked-list. This reference is always stale, {@link CraftTask#next} is the live reference. + */ + private volatile CraftTask head = new CraftTask(); + /** + * Tail of a linked-list. AtomicReference only matters when adding to queue + */ + private final AtomicReference tail = new AtomicReference(head); + /** + * Main thread logic only + */ + private final PriorityQueue pending = new PriorityQueue(10, + new Comparator() { + public int compare(final CraftTask o1, final CraftTask o2) { + return (int) (o1.getNextRun() - o2.getNextRun()); + } + }); + /** + * Main thread logic only + */ + private final List temp = new ArrayList(); + /** + * These are tasks that are currently active. It's provided for 'viewing' the current state. + */ + private final ConcurrentHashMap runners = new ConcurrentHashMap(); + private volatile int currentTick = -1; + private final Executor executor = Executors.newCachedThreadPool(); - private final CraftThreadManager craftThreadManager = new CraftThreadManager(); - - private final LinkedList mainThreadQueue = new LinkedList(); - private final LinkedList syncedTasks = new LinkedList(); - - private final TreeMap schedulerQueue = new TreeMap(); - - private Long currentTick = 0L; - - // This lock locks the mainThreadQueue and the currentTick value - private final Lock mainThreadLock = new ReentrantLock(); - private final Lock syncedTasksLock = new ReentrantLock(); - - public CraftScheduler() { - Thread t = new Thread(this); - t.start(); + public int scheduleSyncDelayedTask(final Plugin plugin, final Runnable task) { + return this.scheduleSyncDelayedTask(plugin, task, 0l); } - public void run() { + public int scheduleAsyncDelayedTask(final Plugin plugin, final Runnable task) { + return this.scheduleAsyncDelayedTask(plugin, task, 0l); + } - while (true) { - boolean stop = false; - long firstTick = -1; - long currentTick = -1; - CraftTask first = null; - do { - synchronized (schedulerQueue) { - first = null; - if (!schedulerQueue.isEmpty()) { - first = schedulerQueue.firstKey(); - if (first != null) { - currentTick = getCurrentTick(); + public int scheduleSyncDelayedTask(final Plugin plugin, final Runnable task, final long delay) { + return this.scheduleSyncRepeatingTask(plugin, task, delay, -1l); + } - firstTick = first.getExecutionTick(); + public int scheduleAsyncDelayedTask(final Plugin plugin, final Runnable task, final long delay) { + return this.scheduleAsyncRepeatingTask(plugin, task, delay, -1l); + } - if (currentTick >= firstTick) { - schedulerQueue.remove(first); - processTask(first); - if (first.getPeriod() >= 0) { - first.updateExecution(); - schedulerQueue.put(first, first.isSync()); + public int scheduleSyncRepeatingTask(final Plugin plugin, final Runnable runnable, long delay, long period) { + validate(plugin, runnable); + if (delay < 0l) { + delay = 0; + } + if (period == 0l) { + period = 1l; + } else if (period < -1l) { + period = -1l; + } + return handle(new CraftTask(plugin, runnable, nextId(), period), delay); + } + + public int scheduleAsyncRepeatingTask(final Plugin plugin, final Runnable runnable, long delay, long period) { + validate(plugin, runnable); + if (delay < 0l) { + delay = 0; + } + if (period == 0l) { + period = 1l; + } else if (period < -1l) { + period = -1l; + } + return handle(new CraftAsyncTask(runners, plugin, runnable, nextId(), period), delay); + } + + public Future callSyncMethod(final Plugin plugin, final Callable task) { + validate(plugin, task); + final CraftFuture future = new CraftFuture(task, plugin, nextId()); + handle(future, 0l); + return future; + } + + public void cancelTask(final int taskId) { + if (taskId <= 0) { + return; + } + CraftTask task = runners.get(taskId); + if (task != null) { + cancelTask(task); + } + task = new CraftTask( + new Runnable() { + public void run() { + if (!check(CraftScheduler.this.temp)) { + check(CraftScheduler.this.pending); + } + } + private boolean check(final Iterable collection) { + final Iterator tasks = collection.iterator(); + while (tasks.hasNext()) { + final CraftTask task = tasks.next(); + if (task.getTaskId() == taskId) { + cancelTask(task); + tasks.remove(); + if (task.isSync()) { + runners.remove(taskId); } - } else { - stop = true; + return true; } - } else { - stop = true; } - } else { - stop = true; - } - } - } while (!stop); - - long sleepTime = 0; - if (first == null) { - sleepTime = 60000L; - } else { - currentTick = getCurrentTick(); - sleepTime = (firstTick - currentTick) * 50 + 25; + return false; + }}); + handle(task, 0l); + for (CraftTask taskPending = head.getNext(); taskPending != null; taskPending = taskPending.getNext()) { + if (taskPending == task) { + return; } - - if (sleepTime < 50L) { - sleepTime = 50L; - } else if (sleepTime > 60000L) { - sleepTime = 60000L; - } - - synchronized (schedulerQueue) { - try { - schedulerQueue.wait(sleepTime); - } catch (InterruptedException ie) {} + if (taskPending.getTaskId() == taskId) { + cancelTask(taskPending); } } } - void processTask(CraftTask task) { - if (task.isSync()) { - addToMainThreadQueue(task); - } else { - craftThreadManager.executeTask(task.getTask(), task.getOwner(), task.getIdNumber()); - } - } - - // If the main thread cannot obtain the lock, it doesn't wait - public void mainThreadHeartbeat(long currentTick) { - if (syncedTasksLock.tryLock()) { - try { - if (mainThreadLock.tryLock()) { - try { - this.currentTick = currentTick; - while (!mainThreadQueue.isEmpty()) { - syncedTasks.addLast(mainThreadQueue.removeFirst()); - } - } finally { - mainThreadLock.unlock(); + public void cancelTasks(final Plugin plugin) { + Validate.notNull(plugin, "Cannot cancel tasks of null plugin"); + final CraftTask task = new CraftTask( + new Runnable() { + public void run() { + check(CraftScheduler.this.pending); + check(CraftScheduler.this.temp); } - } - long breakTime = System.currentTimeMillis() + 35; // max time spent in loop = 35ms - while (!syncedTasks.isEmpty() && System.currentTimeMillis() <= breakTime) { - CraftTask task = syncedTasks.removeFirst(); - try { - task.getTask().run(); - } catch (Throwable t) { - // Bad plugin! - logger.log(Level.WARNING, "Task of '" + task.getOwner().getDescription().getName() + "' generated an exception", t); - synchronized (schedulerQueue) { - schedulerQueue.remove(task); + void check(final Iterable collection) { + final Iterator tasks = collection.iterator(); + while (tasks.hasNext()) { + final CraftTask task = tasks.next(); + if (task.getOwner().equals(plugin)) { + cancelTask(task); + tasks.remove(); + if (task.isSync()) { + runners.remove(task.getTaskId()); + } + break; + } } } - } - } finally { - syncedTasksLock.unlock(); + }); + handle(task, 0l); + for (CraftTask taskPending = head.getNext(); taskPending != null; taskPending = taskPending.getNext()) { + if (taskPending == task) { + return; + } + if (taskPending.getTaskId() != -1 && taskPending.getOwner().equals(plugin)) { + cancelTask(taskPending); } } - } - - long getCurrentTick() { - mainThreadLock.lock(); - long tempTick = 0; - try { - tempTick = currentTick; - } finally { - mainThreadLock.unlock(); - } - return tempTick; - } - - void addToMainThreadQueue(CraftTask task) { - mainThreadLock.lock(); - try { - mainThreadQueue.addLast(task); - } finally { - mainThreadLock.unlock(); - } - } - - void wipeSyncedTasks() { - syncedTasksLock.lock(); - try { - syncedTasks.clear(); - } finally { - syncedTasksLock.unlock(); - } - } - - void wipeMainThreadQueue() { - mainThreadLock.lock(); - try { - mainThreadQueue.clear(); - } finally { - mainThreadLock.unlock(); - } - } - - public int scheduleSyncDelayedTask(Plugin plugin, Runnable task, long delay) { - return scheduleSyncRepeatingTask(plugin, task, delay, -1); - } - - public int scheduleSyncDelayedTask(Plugin plugin, Runnable task) { - return scheduleSyncDelayedTask(plugin, task, 0L); - } - - public int scheduleSyncRepeatingTask(Plugin plugin, Runnable task, long delay, long period) { - if (plugin == null) { - throw new IllegalArgumentException("Plugin cannot be null"); - } - if (task == null) { - throw new IllegalArgumentException("Task cannot be null"); - } - if (delay < 0) { - throw new IllegalArgumentException("Delay cannot be less than 0"); - } - - CraftTask newTask = new CraftTask(plugin, task, true, getCurrentTick() + delay, period); - - synchronized (schedulerQueue) { - schedulerQueue.put(newTask, true); - schedulerQueue.notify(); - } - return newTask.getIdNumber(); - } - - public int scheduleAsyncDelayedTask(Plugin plugin, Runnable task, long delay) { - return scheduleAsyncRepeatingTask(plugin, task, delay, -1); - } - - public int scheduleAsyncDelayedTask(Plugin plugin, Runnable task) { - return scheduleAsyncDelayedTask(plugin, task, 0L); - } - - public int scheduleAsyncRepeatingTask(Plugin plugin, Runnable task, long delay, long period) { - if (plugin == null) { - throw new IllegalArgumentException("Plugin cannot be null"); - } - if (task == null) { - throw new IllegalArgumentException("Task cannot be null"); - } - if (delay < 0) { - throw new IllegalArgumentException("Delay cannot be less than 0"); - } - - CraftTask newTask = new CraftTask(plugin, task, false, getCurrentTick() + delay, period); - - synchronized (schedulerQueue) { - schedulerQueue.put(newTask, false); - schedulerQueue.notify(); - } - return newTask.getIdNumber(); - } - - public Future callSyncMethod(Plugin plugin, Callable task) { - CraftFuture craftFuture = new CraftFuture(this, task); - synchronized (craftFuture) { - int taskId = scheduleSyncDelayedTask(plugin, craftFuture); - craftFuture.setTaskId(taskId); - } - return craftFuture; - } - - public void cancelTask(int taskId) { - syncedTasksLock.lock(); - try { - synchronized (schedulerQueue) { - mainThreadLock.lock(); - try { - Iterator itr = schedulerQueue.keySet().iterator(); - while (itr.hasNext()) { - CraftTask current = itr.next(); - if (current.getIdNumber() == taskId) { - itr.remove(); - } - } - itr = mainThreadQueue.iterator(); - while (itr.hasNext()) { - CraftTask current = itr.next(); - if (current.getIdNumber() == taskId) { - itr.remove(); - } - } - itr = syncedTasks.iterator(); - while (itr.hasNext()) { - CraftTask current = itr.next(); - if (current.getIdNumber() == taskId) { - itr.remove(); - } - } - } finally { - mainThreadLock.unlock(); - } + for (CraftTask runner : runners.values()) { + if (runner.getOwner().equals(plugin)) { + cancelTask(runner); } - } finally { - syncedTasksLock.unlock(); } - - craftThreadManager.interruptTask(taskId); - } - - public void cancelTasks(Plugin plugin) { - syncedTasksLock.lock(); - try { - synchronized (schedulerQueue) { - mainThreadLock.lock(); - try { - Iterator itr = schedulerQueue.keySet().iterator(); - while (itr.hasNext()) { - CraftTask current = itr.next(); - if (current.getOwner().equals(plugin)) { - itr.remove(); - } - } - itr = mainThreadQueue.iterator(); - while (itr.hasNext()) { - CraftTask current = itr.next(); - if (current.getOwner().equals(plugin)) { - itr.remove(); - } - } - itr = syncedTasks.iterator(); - while (itr.hasNext()) { - CraftTask current = itr.next(); - if (current.getOwner().equals(plugin)) { - itr.remove(); - } - } - } finally { - mainThreadLock.unlock(); - } - } - } finally { - syncedTasksLock.unlock(); - } - - craftThreadManager.interruptTasks(plugin); } public void cancelAllTasks() { - synchronized (schedulerQueue) { - schedulerQueue.clear(); - } - wipeMainThreadQueue(); - wipeSyncedTasks(); - - craftThreadManager.interruptAllTasks(); - } - - public boolean isCurrentlyRunning(int taskId) { - return craftThreadManager.isAlive(taskId); - } - - public boolean isQueued(int taskId) { - synchronized (schedulerQueue) { - Iterator itr = schedulerQueue.keySet().iterator(); - while (itr.hasNext()) { - CraftTask current = itr.next(); - if (current.getIdNumber() == taskId) { - return true; - } + final CraftTask task = new CraftTask( + new Runnable() { + public void run() { + Iterator it = CraftScheduler.this.runners.values().iterator(); + while (it.hasNext()) { + CraftTask task = it.next(); + cancelTask(task); + if (task.isSync()) { + it.remove(); + } + } + CraftScheduler.this.pending.clear(); + CraftScheduler.this.temp.clear(); + } + }); + handle(task, 0l); + for (CraftTask taskPending = head.getNext(); taskPending != null; taskPending = taskPending.getNext()) { + if (taskPending == task) { + break; } + cancelTask(taskPending); + } + for (CraftTask runner : runners.values()) { + cancelTask(runner); + } + } + + public boolean isCurrentlyRunning(final int taskId) { + final CraftTask task = runners.get(taskId); + if (task == null || task.isSync()) { return false; } + final CraftAsyncTask asyncTask = (CraftAsyncTask) task; + synchronized (asyncTask.getWorkers()) { + return asyncTask.getWorkers().isEmpty(); + } + } + + public boolean isQueued(final int taskId) { + if (taskId <= 0) { + return false; + } + for (CraftTask task = head.getNext(); task != null; task = task.getNext()) { + if (task.getTaskId() == taskId) { + return task.getPeriod() >= -1l; // The task will run + } + } + CraftTask task = runners.get(taskId); + return task != null && task.getPeriod() >= -1l; } public List getActiveWorkers() { - synchronized (craftThreadManager.workers) { - List workerList = new ArrayList(craftThreadManager.workers.size()); - Iterator itr = craftThreadManager.workers.iterator(); - - while (itr.hasNext()) { - workerList.add((BukkitWorker) itr.next()); + final ArrayList workers = new ArrayList(); + for (final CraftTask taskObj : runners.values()) { + // Iterator will be a best-effort (may fail to grab very new values) if called from an async thread + if (taskObj.isSync()) { + continue; + } + final CraftAsyncTask task = (CraftAsyncTask) taskObj; + synchronized (task.getWorkers()) { + // This will never have an issue with stale threads; it's state-safe + workers.addAll(task.getWorkers()); } - return workerList; } + return workers; } public List getPendingTasks() { - List taskList = null; - syncedTasksLock.lock(); - try { - synchronized (schedulerQueue) { - mainThreadLock.lock(); - try { - taskList = new ArrayList(mainThreadQueue.size() + syncedTasks.size() + schedulerQueue.size()); - taskList.addAll(mainThreadQueue); - taskList.addAll(syncedTasks); - taskList.addAll(schedulerQueue.keySet()); - } finally { - mainThreadLock.unlock(); - } + final ArrayList truePending = new ArrayList(); + for (CraftTask task = head.getNext(); task != null; task = task.getNext()) { + if (task.getTaskId() != -1) { + // -1 is special code + truePending.add(task); } - } finally { - syncedTasksLock.unlock(); } - List newTaskList = new ArrayList(taskList.size()); - for (CraftTask craftTask : taskList) { - newTaskList.add((BukkitTask) craftTask); + final ArrayList pending = new ArrayList(); + final Iterator it = runners.values().iterator(); + while (it.hasNext()) { + final CraftTask task = it.next(); + if (task.getPeriod() >= -1l) { + pending.add(task); + } } - return newTaskList; + + for (final CraftTask task : truePending) { + if (task.getPeriod() >= -1l && !pending.contains(task)) { + pending.add(task); + } + } + return pending; } + /** + * This method is designed to never block or wait for locks; an immediate execution of all current tasks. + */ + public void mainThreadHeartbeat(final int currentTick) { + this.currentTick = currentTick; + final List temp = this.temp; + parsePending(); + while (isReady(currentTick)) { + final CraftTask task = pending.remove(); + if (task.getPeriod() < -1l) { + if (task.isSync()) { + runners.remove(task.getTaskId(), task); + } + parsePending(); + continue; + } + if (task.isSync()) { + try { + task.run(); + } catch (final Throwable throwable) { + task.getOwner().getLogger().log( + Level.WARNING, + String.format( + "Task #%s for %s generated an exception", + task.getTaskId(), + task.getOwner().getDescription().getFullName()), + throwable); + } + parsePending(); + } else { + executor.execute(task); + // We don't need to parse pending + // (async tasks must live with race-conditions if they attempt to cancel between these few lines of code) + } + final long period = task.getPeriod(); // State consistency + if (period > 0) { + task.setNextRun(currentTick + period); + temp.add(task); + } else if (task.isSync()) { + runners.remove(task.getTaskId()); + } + } + pending.addAll(temp); + temp.clear(); + } + + private void addTask(final CraftTask task) { + final AtomicReference tail = this.tail; + CraftTask tailTask = tail.get(); + while (!tail.compareAndSet(tailTask, task)) { + tailTask = tail.get(); + } + tailTask.setNext(task); + } + + private int handle(final CraftTask task, final long delay) { + task.setNextRun(currentTick + delay); + addTask(task); + return task.getTaskId(); + } + + private static void validate(final Plugin plugin, final Object task) { + Validate.notNull(plugin, "Plugin cannot be null"); + Validate.notNull(task, "Task cannot be null"); + } + + private int nextId() { + return ids.incrementAndGet(); + } + + private void parsePending() { + CraftTask head = this.head; + CraftTask task = head.getNext(); + CraftTask lastTask = head; + for (; task != null; task = (lastTask = task).getNext()) { + if (task.getTaskId() == -1) { + task.run(); + } else if (task.getPeriod() >= -1l) { + pending.add(task); + runners.put(task.getTaskId(), task); + } + } + // We split this because of the way things are ordered for all of the async calls in CraftScheduler + // (it prevents race-conditions) + for (task = head; task != lastTask; task = head) { + head = task.getNext(); + task.setNext(null); + } + this.head = lastTask; + } + + private boolean isReady(final int currentTick) { + return !pending.isEmpty() && pending.peek().getNextRun() <= currentTick; + } + + /** + * This method is important to make sure the code is consistent everywhere. + * Synchronizing is needed for future and async to prevent race conditions, + * main thread or otherwise. + * @return True if cancelled + */ + private boolean cancelTask(final CraftTask task) { + if (task.isSync()) { + if (task instanceof CraftFuture) { + synchronized (task) { + if (task.getPeriod() != -1l) { + return false; + } + // This needs to be set INSIDE of the synchronized block + task.setPeriod(-2l); + task.notifyAll(); + } + } else { + task.setPeriod(-2l); + } + } else { + synchronized (((CraftAsyncTask) task).getWorkers()) { + // Synchronizing here prevents race condition for a completing task + task.setPeriod(-2l); + } + } + return true; + } } diff --git a/src/main/java/org/bukkit/craftbukkit/scheduler/CraftTask.java b/src/main/java/org/bukkit/craftbukkit/scheduler/CraftTask.java index 2304719e99..8e56766d2a 100644 --- a/src/main/java/org/bukkit/craftbukkit/scheduler/CraftTask.java +++ b/src/main/java/org/bukkit/craftbukkit/scheduler/CraftTask.java @@ -1,112 +1,78 @@ package org.bukkit.craftbukkit.scheduler; -import java.lang.Comparable; - import org.bukkit.plugin.Plugin; import org.bukkit.scheduler.BukkitTask; -public class CraftTask implements Comparable, BukkitTask { +class CraftTask implements BukkitTask, Runnable { + + private volatile CraftTask next = null; + /** + * -1 means no repeating
    + * -2 means cancel
    + * -3 means processing for Future
    + * -4 means done for Future
    + * Never 0
    + * >0 means number of ticks to wait between each execution + */ + private volatile long period; + private long nextRun; private final Runnable task; - private final boolean syncTask; - private long executionTick; - private final long period; - private final Plugin owner; - private final int idNumber; + private final Plugin plugin; + private final int id; - private static Integer idCounter = 1; - private static Object idCounterSync = new Object(); - - CraftTask(Plugin owner, Runnable task, boolean syncTask) { - this(owner, task, syncTask, -1, -1); + CraftTask() { + this(null, null, -1, -1); } - CraftTask(Plugin owner, Runnable task, boolean syncTask, long executionTick) { - this(owner, task, syncTask, executionTick, -1); + CraftTask(final Runnable task) { + this(null, task, -1, -1); } - CraftTask(Plugin owner, Runnable task, boolean syncTask, long executionTick, long period) { + CraftTask(final Plugin plugin, final Runnable task, final int id, final long period) { + this.plugin = plugin; this.task = task; - this.syncTask = syncTask; - this.executionTick = executionTick; + this.id = id; this.period = period; - this.owner = owner; - this.idNumber = CraftTask.getNextId(); } - static int getNextId() { - synchronized (idCounterSync) { - idCounter++; - return idCounter; - } + public final int getTaskId() { + return id; } - Runnable getTask() { - return task; + public final Plugin getOwner() { + return plugin; } public boolean isSync() { - return syncTask; + return true; } - long getExecutionTick() { - return executionTick; + public void run() { + task.run(); } long getPeriod() { return period; } - public Plugin getOwner() { - return owner; + void setPeriod(long period) { + this.period = period; } - void updateExecution() { - executionTick += period; + long getNextRun() { + return nextRun; } - public int getTaskId() { - return getIdNumber(); + void setNextRun(long nextRun) { + this.nextRun = nextRun; } - int getIdNumber() { - return idNumber; + CraftTask getNext() { + return next; } - public int compareTo(Object other) { - if (!(other instanceof CraftTask)) { - return 0; - } else { - CraftTask o = (CraftTask) other; - long timeDiff = executionTick - o.getExecutionTick(); - if (timeDiff > 0) { - return 1; - } else if (timeDiff < 0) { - return -1; - } else { - CraftTask otherCraftTask = (CraftTask) other; - return getIdNumber() - otherCraftTask.getIdNumber(); - } - } - } - - @Override - public boolean equals(Object other) { - - if (other == null) { - return false; - } - - if (!(other instanceof CraftTask)) { - return false; - } - - CraftTask otherCraftTask = (CraftTask) other; - return otherCraftTask.getIdNumber() == getIdNumber(); - } - - @Override - public int hashCode() { - return getIdNumber(); + void setNext(CraftTask next) { + this.next = next; } } diff --git a/src/main/java/org/bukkit/craftbukkit/scheduler/CraftThreadManager.java b/src/main/java/org/bukkit/craftbukkit/scheduler/CraftThreadManager.java deleted file mode 100644 index 29defcdaaf..0000000000 --- a/src/main/java/org/bukkit/craftbukkit/scheduler/CraftThreadManager.java +++ /dev/null @@ -1,68 +0,0 @@ -package org.bukkit.craftbukkit.scheduler; - -import java.util.HashSet; -import java.util.Iterator; - -import org.bukkit.plugin.Plugin; - -public class CraftThreadManager { - - final HashSet workers = new HashSet(); - - void executeTask(Runnable task, Plugin owner, int taskId) { - - CraftWorker craftWorker = new CraftWorker(this, task, owner, taskId); - synchronized (workers) { - workers.add(craftWorker); - } - - } - - void interruptTask(int taskId) { - synchronized (workers) { - Iterator itr = workers.iterator(); - while (itr.hasNext()) { - CraftWorker craftWorker = itr.next(); - if (craftWorker.getTaskId() == taskId) { - craftWorker.interrupt(); - } - } - } - } - - void interruptTasks(Plugin owner) { - synchronized (workers) { - Iterator itr = workers.iterator(); - while (itr.hasNext()) { - CraftWorker craftWorker = itr.next(); - if (craftWorker.getOwner().equals(owner)) { - craftWorker.interrupt(); - } - } - } - } - - void interruptAllTasks() { - synchronized (workers) { - Iterator itr = workers.iterator(); - while (itr.hasNext()) { - CraftWorker craftWorker = itr.next(); - craftWorker.interrupt(); - } - } - } - - boolean isAlive(int taskId) { - synchronized (workers) { - Iterator itr = workers.iterator(); - while (itr.hasNext()) { - CraftWorker craftWorker = itr.next(); - if (craftWorker.getTaskId() == taskId) { - return craftWorker.isAlive(); - } - } - // didn't find it, so it must have been removed - return false; - } - } -} diff --git a/src/main/java/org/bukkit/craftbukkit/scheduler/CraftWorker.java b/src/main/java/org/bukkit/craftbukkit/scheduler/CraftWorker.java deleted file mode 100644 index 94aa411c45..0000000000 --- a/src/main/java/org/bukkit/craftbukkit/scheduler/CraftWorker.java +++ /dev/null @@ -1,90 +0,0 @@ -package org.bukkit.craftbukkit.scheduler; - -import org.bukkit.plugin.Plugin; -import org.bukkit.scheduler.BukkitWorker; - -public class CraftWorker implements Runnable, BukkitWorker { - - private static int hashIdCounter = 1; - private static Object hashIdCounterSync = new Object(); - - private final int hashId; - - private final Plugin owner; - private final int taskId; - - private final Thread t; - private final CraftThreadManager parent; - - private final Runnable task; - - CraftWorker(CraftThreadManager parent, Runnable task, Plugin owner, int taskId) { - this.parent = parent; - this.taskId = taskId; - this.task = task; - this.owner = owner; - this.hashId = CraftWorker.getNextHashId(); - t = new Thread(this); - t.start(); - } - - public void run() { - - try { - task.run(); - } catch (Exception e) { - e.printStackTrace(); - } - - synchronized (parent.workers) { - parent.workers.remove(this); - } - - } - - public int getTaskId() { - return taskId; - } - - public Plugin getOwner() { - return owner; - } - - public Thread getThread() { - return t; - } - - public void interrupt() { - t.interrupt(); - } - - public boolean isAlive() { - return t.isAlive(); - } - - private static int getNextHashId() { - synchronized (hashIdCounterSync) { - return hashIdCounter++; - } - } - - @Override - public int hashCode() { - return hashId; - } - - @Override - public boolean equals(Object other) { - if (other == null) { - return false; - } - - if (!(other instanceof CraftWorker)) { - return false; - } - - CraftWorker otherCraftWorker = (CraftWorker) other; - return otherCraftWorker.hashCode() == hashId; - } - -} diff --git a/src/main/java/org/bukkit/craftbukkit/scheduler/ObjectContainer.java b/src/main/java/org/bukkit/craftbukkit/scheduler/ObjectContainer.java deleted file mode 100644 index c0fa37cb11..0000000000 --- a/src/main/java/org/bukkit/craftbukkit/scheduler/ObjectContainer.java +++ /dev/null @@ -1,15 +0,0 @@ -package org.bukkit.craftbukkit.scheduler; - -public class ObjectContainer { - - T object; - - public void setObject(T object) { - this.object = object; - } - - public T getObject() { - return object; - } - -}