diff --git a/src/main/java/org/bukkit/craftbukkit/util/AsynchronousExecutor.java b/src/main/java/org/bukkit/craftbukkit/util/AsynchronousExecutor.java new file mode 100644 index 0000000000..8f3e80e534 --- /dev/null +++ b/src/main/java/org/bukkit/craftbukkit/util/AsynchronousExecutor.java @@ -0,0 +1,294 @@ +package org.bukkit.craftbukkit.util; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; + +import org.apache.commons.lang.Validate; + +/** + * Executes tasks using a multi-stage process executor. Synchronous executions are via {@link AsynchronousExecutor#finishActive()} or the {@link AsynchronousExecutor#get(Object)} methods. + *
The type of parameter you provide to make the object that will be created. It should implement {@link Object#hashCode()} and {@link Object#equals(Object)} if you want to get the value early.
+ * @param {
+
+ public static interface CallBackProvider extends ThreadFactory {
+
+ /**
+ * Normally an asynchronous call, but can be synchronous
+ *
+ * @param parameter parameter object provided
+ * @return the created object
+ */
+ T callStage1(P parameter) throws E;
+
+ /**
+ * Synchronous call
+ *
+ * @param parameter parameter object provided
+ * @param object the previously created object
+ */
+ void callStage2(P parameter, T object) throws E;
+
+ /**
+ * Synchronous call, called multiple times, once per registered callback
+ *
+ * @param parameter parameter object provided
+ * @param object the previously created object
+ * @param callback the current callback to execute
+ */
+ void callStage3(P parameter, T object, C callback) throws E;
+ }
+
+ @SuppressWarnings("rawtypes")
+ static final AtomicIntegerFieldUpdater STATE_FIELD = AtomicIntegerFieldUpdater.newUpdater(AsynchronousExecutor.Task.class, "state");
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ private static boolean set(AsynchronousExecutor.Task $this, int expected, int value) {
+ return STATE_FIELD.compareAndSet($this, expected, value);
+ }
+
+ class Task implements Runnable {
+ static final int PENDING = 0x0;
+ static final int STAGE_1_ASYNC = PENDING + 1;
+ static final int STAGE_1_SYNC = STAGE_1_ASYNC + 1;
+ static final int STAGE_1_COMPLETE = STAGE_1_SYNC + 1;
+ static final int FINISHED = STAGE_1_COMPLETE + 1;
+
+ volatile int state = PENDING;
+ final P parameter;
+ T object;
+ final List provider = AsynchronousExecutor.this.provider;
+ final P parameter = this.parameter;
+ final T object = this.object;
+
+ provider.callStage2(parameter, object);
+ for (C callback : callbacks) {
+ provider.callStage3(parameter, object, callback);
+ }
+ } finally {
+ tasks.remove(parameter);
+ state = FINISHED;
+ }
+ case FINISHED:
+ }
+ }
+ }
+
+ final CallBackProvider provider;
+ final Queue tasks = new HashMap ();
+ final ThreadPoolExecutor pool;
+
+ /**
+ * Uses a thread pool to pass executions to the provider.
+ * @see AsynchronousExecutor
+ */
+ public AsynchronousExecutor(final CallBackProvider provider, final int coreSize) {
+ Validate.notNull(provider, "Provider cannot be null");
+ this.provider = provider;
+
+ // We have an unbound queue size so do not need a max thread size
+ pool = new ThreadPoolExecutor(coreSize, Integer.MAX_VALUE, 60l, TimeUnit.SECONDS, new LinkedBlockingQueue provider = this.provider;
+ final T object = skipQueue(provider, parameter);
+ for (C callback : callbacks) {
+ provider.callStage3(parameter, object, callback);
+ }
+ return object;
+ }
+
+ /**
+ * Processes a parameter as if it was in the queue, without ever passing to another thread.
+ */
+ public T getSkipQueue(P parameter, Iterable provider = this.provider;
+ final T object = skipQueue(provider, parameter);
+ for (C callback : callbacks) {
+ provider.callStage3(parameter, object, callback);
+ }
+ return object;
+ }
+
+ private static provider, P parameter) throws E {
+ T object = provider.callStage1(parameter);
+ provider.callStage2(parameter, object);
+ return object;
+ }
+
+ /**
+ * This is the 'heartbeat' that should be called synchronously to finish any pending tasks
+ */
+ public void finishActive() throws E {
+ final Queue