From 7646a067eb09e5999f4b0cf6f43fe7bd10efbbb7 Mon Sep 17 00:00:00 2001 From: Hannes Greule Date: Fri, 2 Jun 2023 11:40:34 +0200 Subject: [PATCH] Avoid many threads blocking on AbstractChangeSet#processSet (#2226) --- .../history/changeset/AbstractChangeSet.java | 104 +++++++++++------- .../changeset/AbstractDelegateChangeSet.java | 2 - .../changeset/FaweStreamChangeSet.java | 2 +- 3 files changed, 64 insertions(+), 44 deletions(-) diff --git a/worldedit-core/src/main/java/com/fastasyncworldedit/core/history/changeset/AbstractChangeSet.java b/worldedit-core/src/main/java/com/fastasyncworldedit/core/history/changeset/AbstractChangeSet.java index e52ef0cba..b06fc7926 100644 --- a/worldedit-core/src/main/java/com/fastasyncworldedit/core/history/changeset/AbstractChangeSet.java +++ b/worldedit-core/src/main/java/com/fastasyncworldedit/core/history/changeset/AbstractChangeSet.java @@ -39,19 +39,27 @@ import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Future; +import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicInteger; +/** + * This batch processor writes changes to a concrete implementation. + * {@link #processSet(IChunk, IChunkGet, IChunkSet)} is synchronized to guarantee consistency. + * To avoid many blocking threads on this method, changes are enqueued in {@link #queue}. + * This allows to keep other threads free for other work. + */ public abstract class AbstractChangeSet implements ChangeSet, IBatchProcessor { private static final Logger LOGGER = LogManagerCompat.getLogger(); private final World world; private final AtomicInteger lastException = new AtomicInteger(); - protected AtomicInteger waitingCombined = new AtomicInteger(0); - protected AtomicInteger waitingAsync = new AtomicInteger(0); - - protected boolean closed; + private final Semaphore workerSemaphore = new Semaphore(1, false); + private final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); + protected volatile boolean closed; public AbstractChangeSet(World world) { this.world = world; @@ -65,16 +73,11 @@ public abstract class AbstractChangeSet implements ChangeSet, IBatchProcessor { if (closed) { return; } - waitingAsync.incrementAndGet(); TaskManager.taskManager().async(() -> { - waitingAsync.decrementAndGet(); - synchronized (waitingAsync) { - waitingAsync.notifyAll(); - } try { close(); } catch (IOException e) { - e.printStackTrace(); + LOGGER.catching(e); } }); } @@ -82,20 +85,10 @@ public abstract class AbstractChangeSet implements ChangeSet, IBatchProcessor { @Override public void flush() { try { - if (!Fawe.isMainThread()) { - while (waitingAsync.get() > 0) { - synchronized (waitingAsync) { - waitingAsync.wait(1000); - } - } - } - while (waitingCombined.get() > 0) { - synchronized (waitingCombined) { - waitingCombined.wait(1000); - } - } - } catch (InterruptedException e) { - e.printStackTrace(); + // drain with this thread too + drainQueue(true); + } catch (Exception e) { + LOGGER.catching(e); } } @@ -125,7 +118,7 @@ public abstract class AbstractChangeSet implements ChangeSet, IBatchProcessor { } @Override - public synchronized IChunkSet processSet(IChunk chunk, IChunkGet get, IChunkSet set) { + public final synchronized IChunkSet processSet(IChunk chunk, IChunkGet get, IChunkSet set) { int bx = chunk.getX() << 4; int bz = chunk.getZ() << 4; @@ -306,12 +299,12 @@ public abstract class AbstractChangeSet implements ChangeSet, IBatchProcessor { BaseBlock to = change.getCurrent(); add(loc, from, to); } catch (Exception e) { - e.printStackTrace(); + LOGGER.catching(e); } } public boolean isEmpty() { - return waitingCombined.get() == 0 && waitingAsync.get() == 0 && size() == 0; + return queue.isEmpty() && workerSemaphore.availablePermits() == 1 && size() == 0; } public void add(BlockVector3 loc, BaseBlock from, BaseBlock to) { @@ -353,7 +346,7 @@ public abstract class AbstractChangeSet implements ChangeSet, IBatchProcessor { add(x, y, z, combinedFrom, combinedTo); } catch (Exception e) { - e.printStackTrace(); + LOGGER.catching(e); } } @@ -362,7 +355,6 @@ public abstract class AbstractChangeSet implements ChangeSet, IBatchProcessor { } public Future addWriteTask(final Runnable writeTask, final boolean completeNow) { - AbstractChangeSet.this.waitingCombined.incrementAndGet(); Runnable wrappedTask = () -> { try { writeTask.run(); @@ -372,25 +364,55 @@ public abstract class AbstractChangeSet implements ChangeSet, IBatchProcessor { } else { int hash = t.getMessage().hashCode(); if (lastException.getAndSet(hash) != hash) { - t.printStackTrace(); - } - } - } finally { - if (AbstractChangeSet.this.waitingCombined.decrementAndGet() <= 0) { - synchronized (AbstractChangeSet.this.waitingAsync) { - AbstractChangeSet.this.waitingAsync.notifyAll(); - } - synchronized (AbstractChangeSet.this.waitingCombined) { - AbstractChangeSet.this.waitingCombined.notifyAll(); + LOGGER.catching(t); } } } }; if (completeNow) { wrappedTask.run(); - return Futures.immediateCancelledFuture(); + return Futures.immediateVoidFuture(); } else { - return Fawe.instance().getQueueHandler().submit(wrappedTask); + CompletableFuture task = new CompletableFuture<>(); + queue.add(() -> { + wrappedTask.run(); + task.complete(null); + }); + // make sure changes are processed + triggerWorker(); + return task; + } + } + + private void triggerWorker() { + if (workerSemaphore.availablePermits() == 0) { + return; // fast path to avoid additional tasks: a worker is already draining the queue + } + // create a new worker to drain the current queue + Fawe.instance().getQueueHandler().submit(() -> drainQueue(false)); + } + + private void drainQueue(boolean ignoreRunningState) { + if (!workerSemaphore.tryAcquire()) { + if (ignoreRunningState) { + // ignoreRunningState means we want to block + // even if another thread is already draining + try { + workerSemaphore.acquire(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } else { + return; // another thread is draining the queue already, ignore + } + } + try { + Runnable next; + while ((next = queue.poll()) != null) { // process all tasks in the queue + next.run(); + } + } finally { + workerSemaphore.release(); } } diff --git a/worldedit-core/src/main/java/com/fastasyncworldedit/core/history/changeset/AbstractDelegateChangeSet.java b/worldedit-core/src/main/java/com/fastasyncworldedit/core/history/changeset/AbstractDelegateChangeSet.java index ad3173fd6..195a5fdbd 100644 --- a/worldedit-core/src/main/java/com/fastasyncworldedit/core/history/changeset/AbstractDelegateChangeSet.java +++ b/worldedit-core/src/main/java/com/fastasyncworldedit/core/history/changeset/AbstractDelegateChangeSet.java @@ -25,8 +25,6 @@ public class AbstractDelegateChangeSet extends AbstractChangeSet { public AbstractDelegateChangeSet(AbstractChangeSet parent) { super(parent.getWorld()); this.parent = parent; - this.waitingCombined = parent.waitingCombined; - this.waitingAsync = parent.waitingAsync; } public final AbstractChangeSet getParent() { diff --git a/worldedit-core/src/main/java/com/fastasyncworldedit/core/history/changeset/FaweStreamChangeSet.java b/worldedit-core/src/main/java/com/fastasyncworldedit/core/history/changeset/FaweStreamChangeSet.java index c75a163f7..c2ae362ae 100644 --- a/worldedit-core/src/main/java/com/fastasyncworldedit/core/history/changeset/FaweStreamChangeSet.java +++ b/worldedit-core/src/main/java/com/fastasyncworldedit/core/history/changeset/FaweStreamChangeSet.java @@ -258,7 +258,7 @@ public abstract class FaweStreamChangeSet extends AbstractChangeSet { if (blockSize > 0) { return false; } - if (waitingCombined.get() != 0 || waitingAsync.get() != 0) { + if (!super.isEmpty()) { return false; } flush();