diff --git a/worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/implementation/QueueHandler.java b/worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/implementation/QueueHandler.java
index 7f604a277..bc7f9ddec 100644
--- a/worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/implementation/QueueHandler.java
+++ b/worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/implementation/QueueHandler.java
@@ -27,6 +27,7 @@ import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.Future;
@@ -52,6 +53,7 @@ public abstract class QueueHandler implements Trimable, Runnable {
null,
false
);
+
/**
* Secondary queue should be used for "cleanup" tasks that are likely to be shorter in life than those submitted to the
* primary queue. They may be IO-bound tasks.
@@ -508,4 +510,28 @@ public abstract class QueueHandler implements Trimable, Runnable {
return result;
}
+ /**
+ * Primary queue should be used for tasks that are unlikely to wait on other tasks, IO, etc. (i.e. spend most of their
+ * time utilising CPU.
+ *
+ * Internal API usage only.
+ *
+ * @since TODO
+ */
+ public ExecutorService getForkJoinPoolPrimary() {
+ return forkJoinPoolPrimary;
+ }
+
+ /**
+ * Secondary queue should be used for "cleanup" tasks that are likely to be shorter in life than those submitted to the
+ * primary queue. They may be IO-bound tasks.
+ *
+ * Internal API usage only.
+ *
+ * @since TODO
+ */
+ public ExecutorService getForkJoinPoolSecondary() {
+ return forkJoinPoolSecondary;
+ }
+
}
diff --git a/worldedit-core/src/main/java/com/fastasyncworldedit/core/util/task/AsyncNotifyKeyedQueue.java b/worldedit-core/src/main/java/com/fastasyncworldedit/core/util/task/AsyncNotifyKeyedQueue.java
new file mode 100644
index 000000000..9df6b197c
--- /dev/null
+++ b/worldedit-core/src/main/java/com/fastasyncworldedit/core/util/task/AsyncNotifyKeyedQueue.java
@@ -0,0 +1,81 @@
+package com.fastasyncworldedit.core.util.task;
+
+import com.fastasyncworldedit.core.configuration.Settings;
+
+import java.io.Closeable;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.Future;
+import java.util.function.Supplier;
+
+/**
+ * async queue that accepts a {@link Thread.UncaughtExceptionHandler} for exception handling per instance, delegating to a
+ * parent {@link KeyQueuedExecutorService}.
+ *
+ * @since TODO
+ */
+public class AsyncNotifyKeyedQueue implements Closeable {
+
+ private static final KeyQueuedExecutorService QUEUE_SUBMISSIONS = new KeyQueuedExecutorService<>(new ForkJoinPool(
+ Settings.settings().QUEUE.PARALLEL_THREADS,
+ new FaweForkJoinWorkerThreadFactory("AsyncNotifyKeyedQueue - %s"),
+ null,
+ false
+ ));
+
+ private final Thread.UncaughtExceptionHandler handler;
+ private final Supplier key;
+ private volatile boolean closed;
+
+ /**
+ * New instance
+ *
+ * @param handler exception handler
+ * @param key supplier of UUID key
+ */
+ public AsyncNotifyKeyedQueue(Thread.UncaughtExceptionHandler handler, Supplier key) {
+ this.handler = handler;
+ this.key = key;
+ }
+
+ public Thread.UncaughtExceptionHandler getHandler() {
+ return handler;
+ }
+
+ public Future run(Runnable task) {
+ return call(() -> {
+ task.run();
+ return null;
+ });
+ }
+
+ public Future call(Callable task) {
+ Future[] self = new Future[1];
+ Callable wrapped = () -> {
+ if (!closed) {
+ try {
+ return task.call();
+ } catch (Throwable e) {
+ handler.uncaughtException(Thread.currentThread(), e);
+ }
+ }
+ if (self[0] != null) {
+ self[0].cancel(true);
+ }
+ return null;
+ };
+ self[0] = QUEUE_SUBMISSIONS.submit(key.get(), wrapped);
+ return self[0];
+ }
+
+ @Override
+ public void close() {
+ closed = true;
+ }
+
+ public boolean isClosed() {
+ return closed;
+ }
+
+}
diff --git a/worldedit-core/src/main/java/com/fastasyncworldedit/core/util/task/AsyncNotifyQueue.java b/worldedit-core/src/main/java/com/fastasyncworldedit/core/util/task/AsyncNotifyQueue.java
index 7cd91f87a..9cf9ddef2 100644
--- a/worldedit-core/src/main/java/com/fastasyncworldedit/core/util/task/AsyncNotifyQueue.java
+++ b/worldedit-core/src/main/java/com/fastasyncworldedit/core/util/task/AsyncNotifyQueue.java
@@ -1,13 +1,9 @@
package com.fastasyncworldedit.core.util.task;
import com.fastasyncworldedit.core.Fawe;
-import com.fastasyncworldedit.core.configuration.Settings;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.Closeable;
import java.util.concurrent.Callable;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -15,13 +11,6 @@ import java.util.function.Supplier;
public class AsyncNotifyQueue implements Closeable {
- private static final ForkJoinPool QUEUE_SUBMISSIONS = new ForkJoinPool(
- Settings.settings().QUEUE.PARALLEL_THREADS,
- new FaweForkJoinWorkerThreadFactory("AsyncNotifyQueue - %s"),
- null,
- false
- );
-
private final Lock lock = new ReentrantLock(true);
private final Thread.UncaughtExceptionHandler handler;
private boolean closed;
@@ -56,9 +45,6 @@ public class AsyncNotifyQueue implements Closeable {
return task.call();
} catch (Throwable e) {
handler.uncaughtException(Thread.currentThread(), e);
- if (self[0] != null) {
- self[0].cancel(true);
- }
}
}
} finally {
@@ -70,7 +56,7 @@ public class AsyncNotifyQueue implements Closeable {
}
return null;
};
- self[0] = QUEUE_SUBMISSIONS.submit(wrapped);
+ self[0] = Fawe.instance().getQueueHandler().async(wrapped);
return self[0];
}
diff --git a/worldedit-core/src/main/java/com/sk89q/worldedit/extension/platform/AbstractNonPlayerActor.java b/worldedit-core/src/main/java/com/sk89q/worldedit/extension/platform/AbstractNonPlayerActor.java
index 0b8909198..8337a7327 100644
--- a/worldedit-core/src/main/java/com/sk89q/worldedit/extension/platform/AbstractNonPlayerActor.java
+++ b/worldedit-core/src/main/java/com/sk89q/worldedit/extension/platform/AbstractNonPlayerActor.java
@@ -19,10 +19,9 @@
package com.sk89q.worldedit.extension.platform;
-import com.fastasyncworldedit.core.configuration.Caption;
import com.fastasyncworldedit.core.internal.exception.FaweException;
import com.fastasyncworldedit.core.util.TaskManager;
-import com.fastasyncworldedit.core.util.task.AsyncNotifyQueue;
+import com.fastasyncworldedit.core.util.task.AsyncNotifyKeyedQueue;
import com.sk89q.worldedit.WorldEditException;
import com.sk89q.worldedit.internal.cui.CUIEvent;
import com.sk89q.worldedit.util.formatting.text.TextComponent;
@@ -68,7 +67,7 @@ public abstract class AbstractNonPlayerActor implements Actor {
// Queue for async tasks
private final AtomicInteger runningCount = new AtomicInteger();
- private final AsyncNotifyQueue asyncNotifyQueue = new AsyncNotifyQueue((thread, throwable) -> {
+ private final AsyncNotifyKeyedQueue asyncNotifyQueue = new AsyncNotifyKeyedQueue((thread, throwable) -> {
while (throwable.getCause() != null) {
throwable = throwable.getCause();
}
@@ -82,7 +81,7 @@ public abstract class AbstractNonPlayerActor implements Actor {
throwable.printStackTrace();
}
}
- });
+ }, this::getUniqueId);
/**
* Run a task either async, or on the current thread.
diff --git a/worldedit-core/src/main/java/com/sk89q/worldedit/extension/platform/AbstractPlayerActor.java b/worldedit-core/src/main/java/com/sk89q/worldedit/extension/platform/AbstractPlayerActor.java
index 6adf7236c..ae4eb2144 100644
--- a/worldedit-core/src/main/java/com/sk89q/worldedit/extension/platform/AbstractPlayerActor.java
+++ b/worldedit-core/src/main/java/com/sk89q/worldedit/extension/platform/AbstractPlayerActor.java
@@ -25,7 +25,7 @@ import com.fastasyncworldedit.core.math.MutableBlockVector3;
import com.fastasyncworldedit.core.regions.FaweMaskManager;
import com.fastasyncworldedit.core.util.TaskManager;
import com.fastasyncworldedit.core.util.WEManager;
-import com.fastasyncworldedit.core.util.task.AsyncNotifyQueue;
+import com.fastasyncworldedit.core.util.task.AsyncNotifyKeyedQueue;
import com.sk89q.worldedit.EditSession;
import com.sk89q.worldedit.MaxChangedBlocksException;
import com.sk89q.worldedit.WorldEdit;
@@ -81,7 +81,7 @@ public abstract class AbstractPlayerActor implements Actor, Player, Cloneable {
// Queue for async tasks
private final AtomicInteger runningCount = new AtomicInteger();
- private final AsyncNotifyQueue asyncNotifyQueue = new AsyncNotifyQueue(
+ private final AsyncNotifyKeyedQueue asyncNotifyQueue = new AsyncNotifyKeyedQueue(
(thread, throwable) -> {
while (throwable.getCause() != null) {
throwable = throwable.getCause();
@@ -96,7 +96,7 @@ public abstract class AbstractPlayerActor implements Actor, Player, Cloneable {
throwable.printStackTrace();
}
}
- });
+ }, this::getUniqueId);
public AbstractPlayerActor(Map meta) {
this.meta = meta;