From 1356cd5caa5659f2615eef9d12f33f34a75fa7b6 Mon Sep 17 00:00:00 2001 From: Jordan Date: Tue, 6 Jun 2023 23:35:37 +0100 Subject: [PATCH] feat: implement a player-specific queue for clipboard IO tasks (#2267) * feat: implement a player-specific queue for clipboard IO tasks - Addresses #2222 (hopefully fixes) * Address comments * Add since --------- Co-authored-by: Alexander Brandes --- .../com/fastasyncworldedit/core/Fawe.java | 26 +++ .../clipboard/DiskOptimizedClipboard.java | 4 + .../util/task/KeyQueuedExecutorService.java | 172 ++++++++++++++++++ .../com/sk89q/worldedit/entity/Player.java | 13 +- 4 files changed, 211 insertions(+), 4 deletions(-) create mode 100644 worldedit-core/src/main/java/com/fastasyncworldedit/core/util/task/KeyQueuedExecutorService.java diff --git a/worldedit-core/src/main/java/com/fastasyncworldedit/core/Fawe.java b/worldedit-core/src/main/java/com/fastasyncworldedit/core/Fawe.java index 8b8e6c530..7f31c6519 100644 --- a/worldedit-core/src/main/java/com/fastasyncworldedit/core/Fawe.java +++ b/worldedit-core/src/main/java/com/fastasyncworldedit/core/Fawe.java @@ -12,7 +12,9 @@ import com.fastasyncworldedit.core.util.RandomTextureUtil; import com.fastasyncworldedit.core.util.TaskManager; import com.fastasyncworldedit.core.util.TextureUtil; import com.fastasyncworldedit.core.util.WEManager; +import com.fastasyncworldedit.core.util.task.KeyQueuedExecutorService; import com.github.luben.zstd.Zstd; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.sk89q.worldedit.WorldEdit; import com.sk89q.worldedit.internal.util.LogManagerCompat; import net.jpountz.lz4.LZ4Factory; @@ -33,6 +35,9 @@ import java.lang.management.MemoryPoolMXBean; import java.lang.management.MemoryUsage; import java.util.Date; import java.util.List; +import java.util.UUID; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** @@ -86,6 +91,7 @@ public class Fawe { * The platform specific implementation. */ private final IFawe implementation; + private final KeyQueuedExecutorService clipboardExecutor; private FaweVersion version; private TextureUtil textures; private QueueHandler queueHandler; @@ -131,6 +137,15 @@ public class Fawe { }, 0); TaskManager.taskManager().repeat(timer, 1); + + clipboardExecutor = new KeyQueuedExecutorService<>(new ThreadPoolExecutor( + 1, + Settings.settings().QUEUE.PARALLEL_THREADS, + 0L, + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(), + new ThreadFactoryBuilder().setNameFormat("fawe-clipboard-%d").build() + )); } /** @@ -428,4 +443,15 @@ public class Fawe { return this.thread = Thread.currentThread(); } + /** + * Gets the executor used for clipboard IO if clipboard on disk is enabled or null + * + * @return Executor used for clipboard IO if clipboard on disk is enabled or null + * @since TODO + */ + @Nullable + public KeyQueuedExecutorService getClipboardExecutor() { + return this.clipboardExecutor; + } + } diff --git a/worldedit-core/src/main/java/com/fastasyncworldedit/core/extent/clipboard/DiskOptimizedClipboard.java b/worldedit-core/src/main/java/com/fastasyncworldedit/core/extent/clipboard/DiskOptimizedClipboard.java index 08941d4a5..28dc52333 100644 --- a/worldedit-core/src/main/java/com/fastasyncworldedit/core/extent/clipboard/DiskOptimizedClipboard.java +++ b/worldedit-core/src/main/java/com/fastasyncworldedit/core/extent/clipboard/DiskOptimizedClipboard.java @@ -156,7 +156,9 @@ public class DiskOptimizedClipboard extends LinearClipboard { /** * Load an existing file as a DiskOptimizedClipboard. The file MUST exist and MUST be created as a DiskOptimizedClipboard * with data written to it. + * @deprecated Will be made private, use {@link DiskOptimizedClipboard#loadFromFile(File)} */ + @Deprecated(forRemoval = true, since = "TODO") public DiskOptimizedClipboard(File file) { this(file, VERSION); } @@ -167,7 +169,9 @@ public class DiskOptimizedClipboard extends LinearClipboard { * * @param file File to read from * @param versionOverride An override version to allow loading of older clipboards if required + * @deprecated Will be made private, use {@link DiskOptimizedClipboard#loadFromFile(File)} */ + @Deprecated(forRemoval = true, since = "TODO") public DiskOptimizedClipboard(File file, int versionOverride) { super(readSize(file, versionOverride), BlockVector3.ZERO); headerSize = getHeaderSizeOverrideFromVersion(versionOverride); diff --git a/worldedit-core/src/main/java/com/fastasyncworldedit/core/util/task/KeyQueuedExecutorService.java b/worldedit-core/src/main/java/com/fastasyncworldedit/core/util/task/KeyQueuedExecutorService.java new file mode 100644 index 000000000..bfaafb071 --- /dev/null +++ b/worldedit-core/src/main/java/com/fastasyncworldedit/core/util/task/KeyQueuedExecutorService.java @@ -0,0 +1,172 @@ +package com.fastasyncworldedit.core.util.task; + +import javax.annotation.Nonnull; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * Executor service that queues tasks based on keys, executing tasks on a configurable {@link ThreadPoolExecutor} + * + * @param Key type + * @since TODO + */ +public class KeyQueuedExecutorService { + + private final ExecutorService parent; + private final Map keyQueue = new HashMap<>(); + + /** + * Create a new {@link KeyQueuedExecutorService} instance + * + * @param parent Parent {@link ExecutorService} to use for actual task completion + */ + public KeyQueuedExecutorService(ExecutorService parent) { + this.parent = parent; + } + + /** + * Delegates to {@link ThreadPoolExecutor#shutdown()} + */ + public void shutdown() { + parent.shutdown(); + } + + /** + * Delegates to {@link ThreadPoolExecutor#shutdownNow()} + */ + @Nonnull + public List shutdownNow() { + return parent.shutdownNow(); + } + + /** + * Delegates to {@link ThreadPoolExecutor#isShutdown()} + */ + public boolean isShutdown() { + return parent.isShutdown(); + } + + /** + * Delegates to {@link ThreadPoolExecutor#isTerminated()} + */ + public boolean isTerminated() { + return parent.isTerminated(); + } + + /** + * Delegates to {@link ThreadPoolExecutor#awaitTermination(long, TimeUnit)} + */ + public boolean awaitTermination(long timeout, @Nonnull TimeUnit unit) throws InterruptedException { + return parent.awaitTermination(timeout, unit); + } + + protected FutureTask newTaskFor(Runnable runnable, T value) { + return new FutureTask<>(runnable, value); + } + + protected FutureTask newTaskFor(Callable callable) { + return new FutureTask<>(callable); + } + + @Nonnull + public Future submit(@Nonnull K key, @Nonnull Callable task) { + FutureTask ftask = newTaskFor(task); + execute(key, ftask); + return ftask; + } + + @Nonnull + public Future submit(@Nonnull K key, @Nonnull Runnable task, T result) { + FutureTask ftask = newTaskFor(task, result); + execute(key, ftask); + return ftask; + } + + @Nonnull + public Future submit(@Nonnull K key, @Nonnull Runnable task) { + FutureTask ftask = newTaskFor(task, null); + execute(key, ftask); + return ftask; + } + + public void execute(@Nonnull K key, @Nonnull FutureTask command) { + synchronized (keyQueue) { + boolean triggerRun = false; + KeyRunner runner = keyQueue.get(key); + if (runner == null) { + runner = new KeyRunner(key); + keyQueue.put(key, runner); + triggerRun = true; + } + runner.add(command); + if (triggerRun) { + runner.triggerRun(); + } + } + } + + private final class KeyRunner { + + private final Queue> tasks = new ConcurrentLinkedQueue<>(); + private final K key; + + private KeyRunner(K key) { + this.key = key; + } + + void add(FutureTask task) { + if (!tasks.add(task)) { + throw new RejectedExecutionException(rejection()); + } + } + + void triggerRun() { + Runnable task = tasks.poll(); + if (task == null) { + throw new RejectedExecutionException(rejection()); + } + try { + run(task); + } catch (RejectedExecutionException e) { + synchronized (keyQueue) { + keyQueue.remove(key); + } + throw new RejectedExecutionException(rejection(), e); + } + } + + private void run(Runnable task) { + parent.execute(() -> { + task.run(); + Runnable next = tasks.poll(); + if (next == null) { + synchronized (keyQueue) { + next = tasks.poll(); + if (next == null) { + keyQueue.remove(key); + } + } + } + if (next != null) { + run(next); + } + }); + } + + private String rejection() { + return "Task for the key '" + key + "' rejected"; + } + + } + +} diff --git a/worldedit-core/src/main/java/com/sk89q/worldedit/entity/Player.java b/worldedit-core/src/main/java/com/sk89q/worldedit/entity/Player.java index 781b02ae2..8f5b5cb53 100644 --- a/worldedit-core/src/main/java/com/sk89q/worldedit/entity/Player.java +++ b/worldedit-core/src/main/java/com/sk89q/worldedit/entity/Player.java @@ -437,14 +437,16 @@ public interface Player extends Entity, Actor { } else { continue; } - WorldEdit.getInstance().getExecutorService().submit(() -> { + Fawe.instance().getClipboardExecutor().submit(getUniqueId(), () -> { doc.close(); // Ensure closed before deletion doc.getFile().delete(); }); } } - } else if (Settings.settings().CLIPBOARD.DELETE_ON_LOGOUT || Settings.settings().CLIPBOARD.USE_DISK) { - WorldEdit.getInstance().getExecutorService().submit(() -> session.setClipboard(null)); + } else if (Settings.settings().CLIPBOARD.USE_DISK) { + Fawe.instance().getClipboardExecutor().submit(getUniqueId(), () -> session.setClipboard(null)); + } else if (Settings.settings().CLIPBOARD.DELETE_ON_LOGOUT) { + session.setClipboard(null); } if (Settings.settings().HISTORY.DELETE_ON_LOGOUT) { session.clearHistory(); @@ -470,7 +472,10 @@ public interface Player extends Entity, Actor { } } catch (EmptyClipboardException ignored) { } - DiskOptimizedClipboard doc = DiskOptimizedClipboard.loadFromFile(file); + DiskOptimizedClipboard doc = Fawe.instance().getClipboardExecutor().submit( + getUniqueId(), + () -> DiskOptimizedClipboard.loadFromFile(file) + ).get(); Clipboard clip = doc.toClipboard(); ClipboardHolder holder = new ClipboardHolder(clip); session.setClipboard(holder);