From 9543adc776c1afcb81477e2113920f9746ea88a6 Mon Sep 17 00:00:00 2001 From: Jordan Date: Thu, 20 Jul 2023 16:58:01 +0100 Subject: [PATCH] Implement async notify queue that submits to a KeyQueuedExecutorService (#2334) --- .../queue/implementation/QueueHandler.java | 26 ++++++ .../core/util/task/AsyncNotifyKeyedQueue.java | 81 +++++++++++++++++++ .../core/util/task/AsyncNotifyQueue.java | 16 +--- .../platform/AbstractNonPlayerActor.java | 7 +- .../platform/AbstractPlayerActor.java | 6 +- 5 files changed, 114 insertions(+), 22 deletions(-) create mode 100644 worldedit-core/src/main/java/com/fastasyncworldedit/core/util/task/AsyncNotifyKeyedQueue.java diff --git a/worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/implementation/QueueHandler.java b/worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/implementation/QueueHandler.java index 7f604a277..bc7f9ddec 100644 --- a/worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/implementation/QueueHandler.java +++ b/worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/implementation/QueueHandler.java @@ -27,6 +27,7 @@ import java.util.Queue; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.concurrent.Future; @@ -52,6 +53,7 @@ public abstract class QueueHandler implements Trimable, Runnable { null, false ); + /** * Secondary queue should be used for "cleanup" tasks that are likely to be shorter in life than those submitted to the * primary queue. They may be IO-bound tasks. @@ -508,4 +510,28 @@ public abstract class QueueHandler implements Trimable, Runnable { return result; } + /** + * Primary queue should be used for tasks that are unlikely to wait on other tasks, IO, etc. (i.e. spend most of their + * time utilising CPU. + *

+ * Internal API usage only. + * + * @since TODO + */ + public ExecutorService getForkJoinPoolPrimary() { + return forkJoinPoolPrimary; + } + + /** + * Secondary queue should be used for "cleanup" tasks that are likely to be shorter in life than those submitted to the + * primary queue. They may be IO-bound tasks. + *

+ * Internal API usage only. + * + * @since TODO + */ + public ExecutorService getForkJoinPoolSecondary() { + return forkJoinPoolSecondary; + } + } diff --git a/worldedit-core/src/main/java/com/fastasyncworldedit/core/util/task/AsyncNotifyKeyedQueue.java b/worldedit-core/src/main/java/com/fastasyncworldedit/core/util/task/AsyncNotifyKeyedQueue.java new file mode 100644 index 000000000..9df6b197c --- /dev/null +++ b/worldedit-core/src/main/java/com/fastasyncworldedit/core/util/task/AsyncNotifyKeyedQueue.java @@ -0,0 +1,81 @@ +package com.fastasyncworldedit.core.util.task; + +import com.fastasyncworldedit.core.configuration.Settings; + +import java.io.Closeable; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.Future; +import java.util.function.Supplier; + +/** + * async queue that accepts a {@link Thread.UncaughtExceptionHandler} for exception handling per instance, delegating to a + * parent {@link KeyQueuedExecutorService}. + * + * @since TODO + */ +public class AsyncNotifyKeyedQueue implements Closeable { + + private static final KeyQueuedExecutorService QUEUE_SUBMISSIONS = new KeyQueuedExecutorService<>(new ForkJoinPool( + Settings.settings().QUEUE.PARALLEL_THREADS, + new FaweForkJoinWorkerThreadFactory("AsyncNotifyKeyedQueue - %s"), + null, + false + )); + + private final Thread.UncaughtExceptionHandler handler; + private final Supplier key; + private volatile boolean closed; + + /** + * New instance + * + * @param handler exception handler + * @param key supplier of UUID key + */ + public AsyncNotifyKeyedQueue(Thread.UncaughtExceptionHandler handler, Supplier key) { + this.handler = handler; + this.key = key; + } + + public Thread.UncaughtExceptionHandler getHandler() { + return handler; + } + + public Future run(Runnable task) { + return call(() -> { + task.run(); + return null; + }); + } + + public Future call(Callable task) { + Future[] self = new Future[1]; + Callable wrapped = () -> { + if (!closed) { + try { + return task.call(); + } catch (Throwable e) { + handler.uncaughtException(Thread.currentThread(), e); + } + } + if (self[0] != null) { + self[0].cancel(true); + } + return null; + }; + self[0] = QUEUE_SUBMISSIONS.submit(key.get(), wrapped); + return self[0]; + } + + @Override + public void close() { + closed = true; + } + + public boolean isClosed() { + return closed; + } + +} diff --git a/worldedit-core/src/main/java/com/fastasyncworldedit/core/util/task/AsyncNotifyQueue.java b/worldedit-core/src/main/java/com/fastasyncworldedit/core/util/task/AsyncNotifyQueue.java index 7cd91f87a..9cf9ddef2 100644 --- a/worldedit-core/src/main/java/com/fastasyncworldedit/core/util/task/AsyncNotifyQueue.java +++ b/worldedit-core/src/main/java/com/fastasyncworldedit/core/util/task/AsyncNotifyQueue.java @@ -1,13 +1,9 @@ package com.fastasyncworldedit.core.util.task; import com.fastasyncworldedit.core.Fawe; -import com.fastasyncworldedit.core.configuration.Settings; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.io.Closeable; import java.util.concurrent.Callable; -import java.util.concurrent.Executors; -import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -15,13 +11,6 @@ import java.util.function.Supplier; public class AsyncNotifyQueue implements Closeable { - private static final ForkJoinPool QUEUE_SUBMISSIONS = new ForkJoinPool( - Settings.settings().QUEUE.PARALLEL_THREADS, - new FaweForkJoinWorkerThreadFactory("AsyncNotifyQueue - %s"), - null, - false - ); - private final Lock lock = new ReentrantLock(true); private final Thread.UncaughtExceptionHandler handler; private boolean closed; @@ -56,9 +45,6 @@ public class AsyncNotifyQueue implements Closeable { return task.call(); } catch (Throwable e) { handler.uncaughtException(Thread.currentThread(), e); - if (self[0] != null) { - self[0].cancel(true); - } } } } finally { @@ -70,7 +56,7 @@ public class AsyncNotifyQueue implements Closeable { } return null; }; - self[0] = QUEUE_SUBMISSIONS.submit(wrapped); + self[0] = Fawe.instance().getQueueHandler().async(wrapped); return self[0]; } diff --git a/worldedit-core/src/main/java/com/sk89q/worldedit/extension/platform/AbstractNonPlayerActor.java b/worldedit-core/src/main/java/com/sk89q/worldedit/extension/platform/AbstractNonPlayerActor.java index 0b8909198..8337a7327 100644 --- a/worldedit-core/src/main/java/com/sk89q/worldedit/extension/platform/AbstractNonPlayerActor.java +++ b/worldedit-core/src/main/java/com/sk89q/worldedit/extension/platform/AbstractNonPlayerActor.java @@ -19,10 +19,9 @@ package com.sk89q.worldedit.extension.platform; -import com.fastasyncworldedit.core.configuration.Caption; import com.fastasyncworldedit.core.internal.exception.FaweException; import com.fastasyncworldedit.core.util.TaskManager; -import com.fastasyncworldedit.core.util.task.AsyncNotifyQueue; +import com.fastasyncworldedit.core.util.task.AsyncNotifyKeyedQueue; import com.sk89q.worldedit.WorldEditException; import com.sk89q.worldedit.internal.cui.CUIEvent; import com.sk89q.worldedit.util.formatting.text.TextComponent; @@ -68,7 +67,7 @@ public abstract class AbstractNonPlayerActor implements Actor { // Queue for async tasks private final AtomicInteger runningCount = new AtomicInteger(); - private final AsyncNotifyQueue asyncNotifyQueue = new AsyncNotifyQueue((thread, throwable) -> { + private final AsyncNotifyKeyedQueue asyncNotifyQueue = new AsyncNotifyKeyedQueue((thread, throwable) -> { while (throwable.getCause() != null) { throwable = throwable.getCause(); } @@ -82,7 +81,7 @@ public abstract class AbstractNonPlayerActor implements Actor { throwable.printStackTrace(); } } - }); + }, this::getUniqueId); /** * Run a task either async, or on the current thread. diff --git a/worldedit-core/src/main/java/com/sk89q/worldedit/extension/platform/AbstractPlayerActor.java b/worldedit-core/src/main/java/com/sk89q/worldedit/extension/platform/AbstractPlayerActor.java index 6adf7236c..ae4eb2144 100644 --- a/worldedit-core/src/main/java/com/sk89q/worldedit/extension/platform/AbstractPlayerActor.java +++ b/worldedit-core/src/main/java/com/sk89q/worldedit/extension/platform/AbstractPlayerActor.java @@ -25,7 +25,7 @@ import com.fastasyncworldedit.core.math.MutableBlockVector3; import com.fastasyncworldedit.core.regions.FaweMaskManager; import com.fastasyncworldedit.core.util.TaskManager; import com.fastasyncworldedit.core.util.WEManager; -import com.fastasyncworldedit.core.util.task.AsyncNotifyQueue; +import com.fastasyncworldedit.core.util.task.AsyncNotifyKeyedQueue; import com.sk89q.worldedit.EditSession; import com.sk89q.worldedit.MaxChangedBlocksException; import com.sk89q.worldedit.WorldEdit; @@ -81,7 +81,7 @@ public abstract class AbstractPlayerActor implements Actor, Player, Cloneable { // Queue for async tasks private final AtomicInteger runningCount = new AtomicInteger(); - private final AsyncNotifyQueue asyncNotifyQueue = new AsyncNotifyQueue( + private final AsyncNotifyKeyedQueue asyncNotifyQueue = new AsyncNotifyKeyedQueue( (thread, throwable) -> { while (throwable.getCause() != null) { throwable = throwable.getCause(); @@ -96,7 +96,7 @@ public abstract class AbstractPlayerActor implements Actor, Player, Cloneable { throwable.printStackTrace(); } } - }); + }, this::getUniqueId); public AbstractPlayerActor(Map meta) { this.meta = meta;