From 29d7cc711b44f15d7357fc948bc305bdcb024dbc Mon Sep 17 00:00:00 2001 From: Wesley Wolfe Date: Wed, 12 Dec 2012 03:54:11 -0600 Subject: [PATCH] Add utility class AsynchronousExecutor This class is a general purpose task execution system, that uses stages to separate processing blocks for asynchronous and synchronous executions. --- .../util/AsynchronousExecutor.java | 294 ++++++++++++++++++ 1 file changed, 294 insertions(+) create mode 100644 src/main/java/org/bukkit/craftbukkit/util/AsynchronousExecutor.java diff --git a/src/main/java/org/bukkit/craftbukkit/util/AsynchronousExecutor.java b/src/main/java/org/bukkit/craftbukkit/util/AsynchronousExecutor.java new file mode 100644 index 0000000000..8f3e80e534 --- /dev/null +++ b/src/main/java/org/bukkit/craftbukkit/util/AsynchronousExecutor.java @@ -0,0 +1,294 @@ +package org.bukkit.craftbukkit.util; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; + +import org.apache.commons.lang.Validate; + +/** + * Executes tasks using a multi-stage process executor. Synchronous executions are via {@link AsynchronousExecutor#finishActive()} or the {@link AsynchronousExecutor#get(Object)} methods. + *
  • Stage 1 creates the object from a parameter, and is usually called asynchronously. + *
  • Stage 2 takes the parameter and object from stage 1 and does any synchronous processing to prepare it. + *
  • Stage 3 takes the parameter and object from stage 1, as well as a callback that was registered, and performs any synchronous calculations. + * + * @param

    The type of parameter you provide to make the object that will be created. It should implement {@link Object#hashCode()} and {@link Object#equals(Object)} if you want to get the value early. + * @param The type of object you provide. This is created in stage 1, and passed to stage 2, 3, and returned if get() is called. + * @param The type of callback you provide. You may register many of these to be passed to the provider in stage 3, one at a time. + * @param A type of exception you may throw and expect to be handled by the main thread + * @author Wesley Wolfe (c) 2012 + */ +public final class AsynchronousExecutor { + + public static interface CallBackProvider extends ThreadFactory { + + /** + * Normally an asynchronous call, but can be synchronous + * + * @param parameter parameter object provided + * @return the created object + */ + T callStage1(P parameter) throws E; + + /** + * Synchronous call + * + * @param parameter parameter object provided + * @param object the previously created object + */ + void callStage2(P parameter, T object) throws E; + + /** + * Synchronous call, called multiple times, once per registered callback + * + * @param parameter parameter object provided + * @param object the previously created object + * @param callback the current callback to execute + */ + void callStage3(P parameter, T object, C callback) throws E; + } + + @SuppressWarnings("rawtypes") + static final AtomicIntegerFieldUpdater STATE_FIELD = AtomicIntegerFieldUpdater.newUpdater(AsynchronousExecutor.Task.class, "state"); + + @SuppressWarnings({ "unchecked", "rawtypes" }) + private static boolean set(AsynchronousExecutor.Task $this, int expected, int value) { + return STATE_FIELD.compareAndSet($this, expected, value); + } + + class Task implements Runnable { + static final int PENDING = 0x0; + static final int STAGE_1_ASYNC = PENDING + 1; + static final int STAGE_1_SYNC = STAGE_1_ASYNC + 1; + static final int STAGE_1_COMPLETE = STAGE_1_SYNC + 1; + static final int FINISHED = STAGE_1_COMPLETE + 1; + + volatile int state = PENDING; + final P parameter; + T object; + final List callbacks = new LinkedList(); + E t = null; + + Task(final P parameter) { + this.parameter = parameter; + } + + public void run() { + if (initAsync()) { + finished.add(this); + } + } + + boolean initAsync() { + if (set(this, PENDING, STAGE_1_ASYNC)) { + boolean ret = true; + + try { + init(); + } finally { + if (set(this, STAGE_1_ASYNC, STAGE_1_COMPLETE)) { + // No one is/will be waiting + } else { + // We know that the sync thread will be waiting + synchronized (this) { + if (state != STAGE_1_SYNC) { + // They beat us to the synchronized block + this.notifyAll(); + } else { + // We beat them to the synchronized block + } + state = STAGE_1_COMPLETE; // They're already synchronized, atomic locks are not needed + } + // We want to return false, because we know a synchronous task already handled the finish() + ret = false; // Don't return inside finally; VERY bad practice. + } + } + + return ret; + } else { + return false; + } + } + + void initSync() { + if (set(this, PENDING, STAGE_1_COMPLETE)) { + // If we succeed that variable switch, good as done + init(); + } else if (set(this, STAGE_1_ASYNC, STAGE_1_SYNC)) { + // Async thread is running, but this shouldn't be likely; we need to sync to wait on them because of it. + synchronized (this) { + if (set(this, STAGE_1_SYNC, PENDING)) { // They might NOT synchronized yet, atomic lock IS needed + // We are the first into the lock + while (state != STAGE_1_COMPLETE) { + try { + this.wait(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Unable to handle interruption on " + parameter, e); + } + } + } else { + // They beat us to the synchronized block + } + } + } else { + // Async thread is not pending, the more likely situation for a task not pending + } + } + + @SuppressWarnings("unchecked") + void init() { + try { + object = provider.callStage1(parameter); + } catch (final Throwable t) { + this.t = (E) t; + } + } + + T get() throws E { + initSync(); + finish(); + return object; + } + + void finish() throws E { + switch (state) { + default: + case PENDING: + case STAGE_1_ASYNC: + case STAGE_1_SYNC: + throw new IllegalStateException("Attempting to finish unprepared(" + state + ") task(" + parameter + ")"); + case STAGE_1_COMPLETE: + try { + if (t != null) { + throw t; + } + + final CallBackProvider provider = AsynchronousExecutor.this.provider; + final P parameter = this.parameter; + final T object = this.object; + + provider.callStage2(parameter, object); + for (C callback : callbacks) { + provider.callStage3(parameter, object, callback); + } + } finally { + tasks.remove(parameter); + state = FINISHED; + } + case FINISHED: + } + } + } + + final CallBackProvider provider; + final Queue finished = new ConcurrentLinkedQueue(); + final Map tasks = new HashMap(); + final ThreadPoolExecutor pool; + + /** + * Uses a thread pool to pass executions to the provider. + * @see AsynchronousExecutor + */ + public AsynchronousExecutor(final CallBackProvider provider, final int coreSize) { + Validate.notNull(provider, "Provider cannot be null"); + this.provider = provider; + + // We have an unbound queue size so do not need a max thread size + pool = new ThreadPoolExecutor(coreSize, Integer.MAX_VALUE, 60l, TimeUnit.SECONDS, new LinkedBlockingQueue(), provider); + } + + /** + * Adds a callback to the parameter provided, adding parameter to the queue if needed. + * This should always be synchronous. + */ + public void add(P parameter, C callback) { + Task task = tasks.get(parameter); + if (task == null) { + tasks.put(parameter, task = new Task(parameter)); + pool.execute(task); + } + task.callbacks.add(callback); + } + + /** + * This method attempts to skip the waiting period for said parameter. + * This should always be synchronous. + * @throws IllegalStateException if the parameter is not in the queue anymore, or sometimes if called from asynchronous thread + */ + public T get(P parameter) throws E, IllegalStateException { + final Task task = tasks.get(parameter); + if (task == null) { + throw new IllegalStateException("Unknown " + parameter); + } + return task.get(); + } + + /** + * Processes a parameter as if it was in the queue, without ever passing to another thread. + */ + public T getSkipQueue(P parameter) throws E { + return skipQueue(provider, parameter); + } + + /** + * Processes a parameter as if it was in the queue, without ever passing to another thread. + */ + public T getSkipQueue(P parameter, C callback) throws E { + final T object = skipQueue(provider, parameter); + provider.callStage3(parameter, object, callback); + return object; + } + + /** + * Processes a parameter as if it was in the queue, without ever passing to another thread. + */ + public T getSkipQueue(P parameter, C...callbacks) throws E { + final CallBackProvider provider = this.provider; + final T object = skipQueue(provider, parameter); + for (C callback : callbacks) { + provider.callStage3(parameter, object, callback); + } + return object; + } + + /** + * Processes a parameter as if it was in the queue, without ever passing to another thread. + */ + public T getSkipQueue(P parameter, Iterable callbacks) throws E { + final CallBackProvider provider = this.provider; + final T object = skipQueue(provider, parameter); + for (C callback : callbacks) { + provider.callStage3(parameter, object, callback); + } + return object; + } + + private static T skipQueue(CallBackProvider provider, P parameter) throws E { + T object = provider.callStage1(parameter); + provider.callStage2(parameter, object); + return object; + } + + /** + * This is the 'heartbeat' that should be called synchronously to finish any pending tasks + */ + public void finishActive() throws E { + final Queue finished = this.finished; + while (!finished.isEmpty()) { + finished.poll().finish(); + } + } + + public void setActiveThreads(final int coreSize) { + pool.setCorePoolSize(coreSize); + } +}