Avoid many threads blocking on AbstractChangeSet#processSet (#2226)

Dieser Commit ist enthalten in:
Hannes Greule 2023-06-02 11:40:34 +02:00 committet von GitHub
Ursprung 03c4bafc45
Commit 7646a067eb
Es konnte kein GPG-Schlüssel zu dieser Signatur gefunden werden
GPG-Schlüssel-ID: 4AEE18F83AFDEB23
3 geänderte Dateien mit 64 neuen und 44 gelöschten Zeilen

Datei anzeigen

@ -39,19 +39,27 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
/**
* This batch processor writes changes to a concrete implementation.
* {@link #processSet(IChunk, IChunkGet, IChunkSet)} is synchronized to guarantee consistency.
* To avoid many blocking threads on this method, changes are enqueued in {@link #queue}.
* This allows to keep other threads free for other work.
*/
public abstract class AbstractChangeSet implements ChangeSet, IBatchProcessor {
private static final Logger LOGGER = LogManagerCompat.getLogger();
private final World world;
private final AtomicInteger lastException = new AtomicInteger();
protected AtomicInteger waitingCombined = new AtomicInteger(0);
protected AtomicInteger waitingAsync = new AtomicInteger(0);
protected boolean closed;
private final Semaphore workerSemaphore = new Semaphore(1, false);
private final ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();
protected volatile boolean closed;
public AbstractChangeSet(World world) {
this.world = world;
@ -65,16 +73,11 @@ public abstract class AbstractChangeSet implements ChangeSet, IBatchProcessor {
if (closed) {
return;
}
waitingAsync.incrementAndGet();
TaskManager.taskManager().async(() -> {
waitingAsync.decrementAndGet();
synchronized (waitingAsync) {
waitingAsync.notifyAll();
}
try {
close();
} catch (IOException e) {
e.printStackTrace();
LOGGER.catching(e);
}
});
}
@ -82,20 +85,10 @@ public abstract class AbstractChangeSet implements ChangeSet, IBatchProcessor {
@Override
public void flush() {
try {
if (!Fawe.isMainThread()) {
while (waitingAsync.get() > 0) {
synchronized (waitingAsync) {
waitingAsync.wait(1000);
}
}
}
while (waitingCombined.get() > 0) {
synchronized (waitingCombined) {
waitingCombined.wait(1000);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
// drain with this thread too
drainQueue(true);
} catch (Exception e) {
LOGGER.catching(e);
}
}
@ -125,7 +118,7 @@ public abstract class AbstractChangeSet implements ChangeSet, IBatchProcessor {
}
@Override
public synchronized IChunkSet processSet(IChunk chunk, IChunkGet get, IChunkSet set) {
public final synchronized IChunkSet processSet(IChunk chunk, IChunkGet get, IChunkSet set) {
int bx = chunk.getX() << 4;
int bz = chunk.getZ() << 4;
@ -306,12 +299,12 @@ public abstract class AbstractChangeSet implements ChangeSet, IBatchProcessor {
BaseBlock to = change.getCurrent();
add(loc, from, to);
} catch (Exception e) {
e.printStackTrace();
LOGGER.catching(e);
}
}
public boolean isEmpty() {
return waitingCombined.get() == 0 && waitingAsync.get() == 0 && size() == 0;
return queue.isEmpty() && workerSemaphore.availablePermits() == 1 && size() == 0;
}
public void add(BlockVector3 loc, BaseBlock from, BaseBlock to) {
@ -353,7 +346,7 @@ public abstract class AbstractChangeSet implements ChangeSet, IBatchProcessor {
add(x, y, z, combinedFrom, combinedTo);
} catch (Exception e) {
e.printStackTrace();
LOGGER.catching(e);
}
}
@ -362,7 +355,6 @@ public abstract class AbstractChangeSet implements ChangeSet, IBatchProcessor {
}
public Future<?> addWriteTask(final Runnable writeTask, final boolean completeNow) {
AbstractChangeSet.this.waitingCombined.incrementAndGet();
Runnable wrappedTask = () -> {
try {
writeTask.run();
@ -372,25 +364,55 @@ public abstract class AbstractChangeSet implements ChangeSet, IBatchProcessor {
} else {
int hash = t.getMessage().hashCode();
if (lastException.getAndSet(hash) != hash) {
t.printStackTrace();
}
}
} finally {
if (AbstractChangeSet.this.waitingCombined.decrementAndGet() <= 0) {
synchronized (AbstractChangeSet.this.waitingAsync) {
AbstractChangeSet.this.waitingAsync.notifyAll();
}
synchronized (AbstractChangeSet.this.waitingCombined) {
AbstractChangeSet.this.waitingCombined.notifyAll();
LOGGER.catching(t);
}
}
}
};
if (completeNow) {
wrappedTask.run();
return Futures.immediateCancelledFuture();
return Futures.immediateVoidFuture();
} else {
return Fawe.instance().getQueueHandler().submit(wrappedTask);
CompletableFuture<?> task = new CompletableFuture<>();
queue.add(() -> {
wrappedTask.run();
task.complete(null);
});
// make sure changes are processed
triggerWorker();
return task;
}
}
private void triggerWorker() {
if (workerSemaphore.availablePermits() == 0) {
return; // fast path to avoid additional tasks: a worker is already draining the queue
}
// create a new worker to drain the current queue
Fawe.instance().getQueueHandler().submit(() -> drainQueue(false));
}
private void drainQueue(boolean ignoreRunningState) {
if (!workerSemaphore.tryAcquire()) {
if (ignoreRunningState) {
// ignoreRunningState means we want to block
// even if another thread is already draining
try {
workerSemaphore.acquire();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
} else {
return; // another thread is draining the queue already, ignore
}
}
try {
Runnable next;
while ((next = queue.poll()) != null) { // process all tasks in the queue
next.run();
}
} finally {
workerSemaphore.release();
}
}

Datei anzeigen

@ -25,8 +25,6 @@ public class AbstractDelegateChangeSet extends AbstractChangeSet {
public AbstractDelegateChangeSet(AbstractChangeSet parent) {
super(parent.getWorld());
this.parent = parent;
this.waitingCombined = parent.waitingCombined;
this.waitingAsync = parent.waitingAsync;
}
public final AbstractChangeSet getParent() {

Datei anzeigen

@ -258,7 +258,7 @@ public abstract class FaweStreamChangeSet extends AbstractChangeSet {
if (blockSize > 0) {
return false;
}
if (waitingCombined.get() != 0 || waitingAsync.get() != 0) {
if (!super.isEmpty()) {
return false;
}
flush();