geforkt von Mirrors/FastAsyncWorldEdit
chore: improve queue documentation and submit history to better queue (#2266)
Dieser Commit ist enthalten in:
Ursprung
b7719d17bd
Commit
84872cf9a2
@ -3,8 +3,6 @@ package com.fastasyncworldedit.bukkit.adapter;
|
||||
import co.aikar.timings.Timings;
|
||||
import com.fastasyncworldedit.bukkit.listener.ChunkListener;
|
||||
import com.fastasyncworldedit.core.queue.implementation.QueueHandler;
|
||||
import com.sk89q.worldedit.internal.util.LogManagerCompat;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
import java.lang.reflect.Method;
|
||||
@ -31,7 +29,7 @@ public class BukkitQueueHandler extends QueueHandler {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void startSet(boolean parallel) {
|
||||
public void startUnsafe(boolean parallel) {
|
||||
ChunkListener.physicsFreeze = true;
|
||||
if (parallel) {
|
||||
try {
|
||||
@ -51,7 +49,7 @@ public class BukkitQueueHandler extends QueueHandler {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void endSet(boolean parallel) {
|
||||
public void endUnsafe(boolean parallel) {
|
||||
ChunkListener.physicsFreeze = false;
|
||||
if (parallel) {
|
||||
try {
|
||||
|
@ -389,7 +389,7 @@ public abstract class AbstractChangeSet implements ChangeSet, IBatchProcessor {
|
||||
return; // fast path to avoid additional tasks: a worker is already draining the queue
|
||||
}
|
||||
// create a new worker to drain the current queue
|
||||
Fawe.instance().getQueueHandler().submit(() -> drainQueue(false));
|
||||
Fawe.instance().getQueueHandler().async(() -> drainQueue(false));
|
||||
}
|
||||
|
||||
private void drainQueue(boolean ignoreRunningState) {
|
||||
|
@ -14,6 +14,7 @@ import com.fastasyncworldedit.core.queue.implementation.chunk.ChunkCache;
|
||||
import com.fastasyncworldedit.core.util.MemUtil;
|
||||
import com.fastasyncworldedit.core.util.TaskManager;
|
||||
import com.fastasyncworldedit.core.util.collection.CleanableThreadLocal;
|
||||
import com.fastasyncworldedit.core.util.task.FaweForkJoinWorkerThreadFactory;
|
||||
import com.fastasyncworldedit.core.wrappers.WorldWrapper;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.sk89q.worldedit.world.World;
|
||||
@ -39,10 +40,41 @@ import java.util.function.Supplier;
|
||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||
public abstract class QueueHandler implements Trimable, Runnable {
|
||||
|
||||
private final ForkJoinPool forkJoinPoolPrimary = new ForkJoinPool();
|
||||
private final ForkJoinPool forkJoinPoolSecondary = new ForkJoinPool();
|
||||
private static final int PROCESSORS = Runtime.getRuntime().availableProcessors();
|
||||
|
||||
/**
|
||||
* Primary queue should be used for tasks that are unlikely to wait on other tasks, IO, etc. (i.e. spend most of their
|
||||
* time utilising CPU.
|
||||
*/
|
||||
private final ForkJoinPool forkJoinPoolPrimary = new ForkJoinPool(
|
||||
PROCESSORS,
|
||||
new FaweForkJoinWorkerThreadFactory("FAWE Fork Join Pool Primary - %s"),
|
||||
null,
|
||||
false
|
||||
);
|
||||
/**
|
||||
* Secondary queue should be used for "cleanup" tasks that are likely to be shorter in life than those submitted to the
|
||||
* primary queue. They may be IO-bound tasks.
|
||||
*/
|
||||
private final ForkJoinPool forkJoinPoolSecondary = new ForkJoinPool(
|
||||
PROCESSORS,
|
||||
new FaweForkJoinWorkerThreadFactory("FAWE Fork Join Pool Secondary - %s"),
|
||||
null,
|
||||
false
|
||||
);
|
||||
/**
|
||||
* Main "work-horse" queue for FAWE. Handles chunk submission (and chunk submission alone). Blocking in order to forcibly
|
||||
* prevent overworking/over-submission of chunk process tasks.
|
||||
*/
|
||||
private final ThreadPoolExecutor blockingExecutor = FaweCache.INSTANCE.newBlockingExecutor();
|
||||
/**
|
||||
* Queue for tasks to be completed on the main thread. These take priority of tasks submitted to syncWhenFree queue
|
||||
*/
|
||||
private final ConcurrentLinkedQueue<FutureTask> syncTasks = new ConcurrentLinkedQueue<>();
|
||||
/**
|
||||
* Queue for tasks to be completed on the main thread. These are completed only if and when there is time left in a tick
|
||||
* after completing all tasks in the syncTasks queue
|
||||
*/
|
||||
private final ConcurrentLinkedQueue<FutureTask> syncWhenFree = new ConcurrentLinkedQueue<>();
|
||||
|
||||
private final Map<World, WeakReference<IChunkCache<IChunkGet>>> chunkGetCache = new HashMap<>();
|
||||
@ -54,7 +86,7 @@ public abstract class QueueHandler implements Trimable, Runnable {
|
||||
private long last;
|
||||
private long allocate = 50;
|
||||
|
||||
public QueueHandler() {
|
||||
protected QueueHandler() {
|
||||
TaskManager.taskManager().repeat(this, 1);
|
||||
}
|
||||
|
||||
@ -80,6 +112,12 @@ public abstract class QueueHandler implements Trimable, Runnable {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get if the {@code blockingExecutor} is saturated with tasks or not. Under-utilisation implies the queue has space for
|
||||
* more submissions.
|
||||
*
|
||||
* @return true if {@code blockingExecutor} is not saturated with tasks
|
||||
*/
|
||||
public boolean isUnderutilized() {
|
||||
return blockingExecutor.getActiveCount() < blockingExecutor.getMaximumPoolSize();
|
||||
}
|
||||
@ -125,6 +163,10 @@ public abstract class QueueHandler implements Trimable, Runnable {
|
||||
} while (System.currentTimeMillis() - start < currentAllocate);
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated For removal without replacement.
|
||||
*/
|
||||
@Deprecated(forRemoval = true, since = "TODO")
|
||||
public <T extends Future<T>> void complete(Future<T> task) {
|
||||
try {
|
||||
while (task != null) {
|
||||
@ -135,49 +177,140 @@ public abstract class QueueHandler implements Trimable, Runnable {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Complete a task in the {@code forkJoinPoolSecondary} queue. Secondary queue should be used for "cleanup" tasks that are
|
||||
* likely to be shorter in life than those submitted to the primary queue. They may be IO-bound tasks.
|
||||
*
|
||||
* @param run Runnable to run
|
||||
* @param value Value to return when done
|
||||
* @param <T> Value type
|
||||
* @return Future for submitted task
|
||||
*/
|
||||
public <T> Future<T> async(Runnable run, T value) {
|
||||
return forkJoinPoolSecondary.submit(run, value);
|
||||
}
|
||||
|
||||
/**
|
||||
* Complete a task in the {@code forkJoinPoolSecondary} queue. Secondary queue should be used for "cleanup" tasks that are
|
||||
* likely to be shorter in life than those submitted to the primary queue. They may be IO-bound tasks.
|
||||
*
|
||||
* @param run Runnable to run
|
||||
* @return Future for submitted task
|
||||
*/
|
||||
public Future<?> async(Runnable run) {
|
||||
return forkJoinPoolSecondary.submit(run);
|
||||
}
|
||||
|
||||
/**
|
||||
* Complete a task in the {@code forkJoinPoolSecondary} queue. Secondary queue should be used for "cleanup" tasks that are
|
||||
* likely to be shorter in life than those submitted to the primary queue. They may be IO-bound tasks.
|
||||
*
|
||||
* @param call Callable to run
|
||||
* @param <T> Return value type
|
||||
* @return Future for submitted task
|
||||
*/
|
||||
public <T> Future<T> async(Callable<T> call) {
|
||||
return forkJoinPoolSecondary.submit(call);
|
||||
}
|
||||
|
||||
public ForkJoinTask submit(Runnable call) {
|
||||
return forkJoinPoolPrimary.submit(call);
|
||||
/**
|
||||
* Complete a task in the {@code forkJoinPoolPrimary} queue. Primary queue should be used for tasks that are unlikely to
|
||||
* wait on other tasks, IO, etc. (i.e. spend most of their time utilising CPU.
|
||||
*
|
||||
* @param run Task to run
|
||||
* @return {@link ForkJoinTask} representing task being run
|
||||
*/
|
||||
public ForkJoinTask submit(Runnable run) {
|
||||
return forkJoinPoolPrimary.submit(run);
|
||||
}
|
||||
|
||||
/**
|
||||
* Submit a task to be run on the main thread. Does not guarantee to be run on the next tick as FAWE will only operate to
|
||||
* maintain approx. 18 tps.
|
||||
*
|
||||
* @param run Task to run
|
||||
* @param <T> Value type
|
||||
* @return Future representing task
|
||||
*/
|
||||
public <T> Future<T> sync(Runnable run) {
|
||||
return sync(run, syncTasks);
|
||||
}
|
||||
|
||||
/**
|
||||
* Submit a task to be run on the main thread. Does not guarantee to be run on the next tick as FAWE will only operate to
|
||||
* maintain approx. 18 tps.
|
||||
*
|
||||
* @param call Task to run
|
||||
* @param <T> Value type
|
||||
* @return Future representing task
|
||||
*/
|
||||
public <T> Future<T> sync(Callable<T> call) throws Exception {
|
||||
return sync(call, syncTasks);
|
||||
}
|
||||
|
||||
public <T> Future<T> sync(Supplier<T> call) {
|
||||
return sync(call, syncTasks);
|
||||
/**
|
||||
* Submit a task to be run on the main thread. Does not guarantee to be run on the next tick as FAWE will only operate to
|
||||
* maintain approx. 18 tps.
|
||||
*
|
||||
* @param supplier Task to run
|
||||
* @param <T> Value type
|
||||
* @return Future representing task
|
||||
*/
|
||||
public <T> Future<T> sync(Supplier<T> supplier) {
|
||||
return sync(supplier, syncTasks);
|
||||
}
|
||||
|
||||
// Lower priority sync task (runs only when there are no other tasks)
|
||||
/**
|
||||
* Submit a task to be run on the main thread. Does not guarantee to be run on the next tick as FAWE will only operate to
|
||||
* maintain approx. 18 tps. Takes lower priority than tasks submitted via any {@code QueueHandler#sync} method. Completed
|
||||
* only if and when there is time left in a tick after completing all sync tasks submitted using the aforementioned methods.
|
||||
*
|
||||
* @param run Task to run
|
||||
* @param value Value to return when done
|
||||
* @param <T> Value type
|
||||
* @return Future representing task
|
||||
*/
|
||||
public <T> Future<T> syncWhenFree(Runnable run, T value) {
|
||||
return sync(run, value, syncWhenFree);
|
||||
}
|
||||
|
||||
/**
|
||||
* Submit a task to be run on the main thread. Does not guarantee to be run on the next tick as FAWE will only operate to
|
||||
* maintain approx. 18 tps. Takes lower priority than tasks submitted via any {@code QueueHandler#sync} method. Completed
|
||||
* only if and when there is time left in a tick after completing all sync tasks submitted using the aforementioned methods.
|
||||
*
|
||||
* @param run Task to run
|
||||
* @param <T> Value type
|
||||
* @return Future representing task
|
||||
*/
|
||||
public <T> Future<T> syncWhenFree(Runnable run) {
|
||||
return sync(run, syncWhenFree);
|
||||
}
|
||||
|
||||
/**
|
||||
* Submit a task to be run on the main thread. Does not guarantee to be run on the next tick as FAWE will only operate to
|
||||
* maintain approx. 18 tps. Takes lower priority than tasks submitted via any {@code QueueHandler#sync} method. Completed
|
||||
* only if and when there is time left in a tick after completing all sync tasks submitted using the aforementioned methods.
|
||||
*
|
||||
* @param call Task to run
|
||||
* @param <T> Value type
|
||||
* @return Future representing task
|
||||
*/
|
||||
public <T> Future<T> syncWhenFree(Callable<T> call) throws Exception {
|
||||
return sync(call, syncWhenFree);
|
||||
}
|
||||
|
||||
public <T> Future<T> syncWhenFree(Supplier<T> call) {
|
||||
return sync(call, syncWhenFree);
|
||||
/**
|
||||
* Submit a task to be run on the main thread. Does not guarantee to be run on the next tick as FAWE will only operate to
|
||||
* maintain approx. 18 tps. Takes lower priority than tasks submitted via any {@code QueueHandler#sync} method. Completed
|
||||
* only if and when there is time left in a tick after completing all sync tasks submitted using the aforementioned methods.
|
||||
*
|
||||
* @param supplier Task to run
|
||||
* @param <T> Value type
|
||||
* @return Future representing task
|
||||
*/
|
||||
public <T> Future<T> syncWhenFree(Supplier<T> supplier) {
|
||||
return sync(supplier, syncWhenFree);
|
||||
}
|
||||
|
||||
private <T> Future<T> sync(Runnable run, T value, Queue<FutureTask> queue) {
|
||||
@ -228,6 +361,15 @@ public abstract class QueueHandler implements Trimable, Runnable {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Internal use only. Specifically for submitting {@link IQueueChunk} for "processing" an edit. Submits to the blocking
|
||||
* executor, the main "work-horse" queue for FAWE. Handles chunk submission (and chunk submission alone). Blocking in order
|
||||
* to forcibly prevent overworking/over-submission of chunk process tasks.
|
||||
*
|
||||
* @param chunk chunk
|
||||
* @param <T>
|
||||
* @return Future representing task
|
||||
*/
|
||||
public <T extends Future<T>> T submit(IQueueChunk<T> chunk) {
|
||||
// if (MemUtil.isMemoryFree()) { TODO NOT IMPLEMENTED - optimize this
|
||||
// return (T) forkJoinPoolSecondary.submit(chunk);
|
||||
@ -259,6 +401,9 @@ public abstract class QueueHandler implements Trimable, Runnable {
|
||||
return new SingleThreadQueueExtent();
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the current thread's {@link IQueueExtent} instance in the queue pool to null.
|
||||
*/
|
||||
public void unCache() {
|
||||
queuePool.set(null);
|
||||
}
|
||||
@ -271,14 +416,58 @@ public abstract class QueueHandler implements Trimable, Runnable {
|
||||
return queue;
|
||||
}
|
||||
|
||||
public abstract void startSet(boolean parallel);
|
||||
/**
|
||||
* Indicate a "set" task is being started.
|
||||
*
|
||||
* @param parallel if the "set" being started is parallel/async
|
||||
* @deprecated To be replaced by better-named {@link QueueHandler#startUnsafe(boolean)} )}
|
||||
*/
|
||||
@Deprecated(forRemoval = true, since = "TODO")
|
||||
public void startSet(boolean parallel) {
|
||||
startUnsafe(parallel);
|
||||
}
|
||||
|
||||
public abstract void endSet(boolean parallel);
|
||||
|
||||
/**
|
||||
* Indicate a "set" task is ending.
|
||||
*
|
||||
* @param parallel if the "set" being started is parallel/async
|
||||
* @deprecated To be replaced by better-named {@link QueueHandler#endUnsafe(boolean)} )}
|
||||
*/
|
||||
@Deprecated(forRemoval = true, since = "TODO")
|
||||
public void endSet(boolean parallel) {
|
||||
startUnsafe(parallel);
|
||||
}
|
||||
|
||||
/**
|
||||
* Indicate an unsafe task is starting. Physics are frozen, async catchers disabled, etc. for the duration of the task
|
||||
*
|
||||
* @param parallel If the task is being run async and/or in parallel
|
||||
*/
|
||||
public abstract void startUnsafe(boolean parallel);
|
||||
|
||||
/**
|
||||
* Indicate a/the unsafe task submitted after a {@link QueueHandler#startUnsafe(boolean)} call has ended.
|
||||
*
|
||||
* @param parallel If the task was being run async and/or in parallel
|
||||
*/
|
||||
public abstract void endUnsafe(boolean parallel);
|
||||
|
||||
/**
|
||||
* Create a new queue for a given world.
|
||||
*/
|
||||
public IQueueExtent<IQueueChunk> getQueue(World world) {
|
||||
return getQueue(world, null, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new queue for a given world.
|
||||
*
|
||||
* @param world World to create queue for
|
||||
* @param processor existing processor to set to queue or null
|
||||
* @param postProcessor existing post-processor to set to queue or null
|
||||
* @return New queue for given world
|
||||
*/
|
||||
public IQueueExtent<IQueueChunk> getQueue(World world, IBatchProcessor processor, IBatchProcessor postProcessor) {
|
||||
final IQueueExtent<IQueueChunk> queue = pool();
|
||||
IChunkCache<IChunkGet> cacheGet = getOrCreateWorldCache(world);
|
||||
@ -293,6 +482,12 @@ public abstract class QueueHandler implements Trimable, Runnable {
|
||||
return queue;
|
||||
}
|
||||
|
||||
/**
|
||||
* Trims each chunk GET cache
|
||||
*
|
||||
* @param aggressive if each chunk GET cache should be trimmed aggressively
|
||||
* @return true if all chunk GET caches could be trimmed
|
||||
*/
|
||||
@Override
|
||||
public boolean trim(boolean aggressive) {
|
||||
boolean result = true;
|
||||
|
@ -157,13 +157,13 @@ public abstract class TaskManager {
|
||||
*/
|
||||
public void runUnsafe(Runnable run) {
|
||||
QueueHandler queue = Fawe.instance().getQueueHandler();
|
||||
queue.startSet(true);
|
||||
queue.startUnsafe(Fawe.isMainThread());
|
||||
try {
|
||||
run.run();
|
||||
} catch (Throwable e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
queue.endSet(true);
|
||||
queue.endUnsafe(Fawe.isMainThread());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1,9 +1,13 @@
|
||||
package com.fastasyncworldedit.core.util.task;
|
||||
|
||||
import com.fastasyncworldedit.core.Fawe;
|
||||
import com.fastasyncworldedit.core.configuration.Settings;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ForkJoinPool;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
@ -11,6 +15,13 @@ import java.util.function.Supplier;
|
||||
|
||||
public class AsyncNotifyQueue implements Closeable {
|
||||
|
||||
private static final ForkJoinPool QUEUE_SUBMISSIONS = new ForkJoinPool(
|
||||
Settings.settings().QUEUE.PARALLEL_THREADS,
|
||||
new FaweForkJoinWorkerThreadFactory("AsyncNotifyQueue - %s"),
|
||||
null,
|
||||
false
|
||||
);
|
||||
|
||||
private final Lock lock = new ReentrantLock(true);
|
||||
private final Thread.UncaughtExceptionHandler handler;
|
||||
private boolean closed;
|
||||
@ -59,7 +70,7 @@ public class AsyncNotifyQueue implements Closeable {
|
||||
}
|
||||
return null;
|
||||
};
|
||||
self[0] = Fawe.instance().getQueueHandler().async(wrapped);
|
||||
self[0] = QUEUE_SUBMISSIONS.submit(wrapped);
|
||||
return self[0];
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,21 @@
|
||||
package com.fastasyncworldedit.core.util.task;
|
||||
|
||||
import java.util.concurrent.ForkJoinPool;
|
||||
import java.util.concurrent.ForkJoinWorkerThread;
|
||||
|
||||
public class FaweForkJoinWorkerThreadFactory implements ForkJoinPool.ForkJoinWorkerThreadFactory {
|
||||
|
||||
private final String nameFormat;
|
||||
|
||||
public FaweForkJoinWorkerThreadFactory(String nameFormat) {
|
||||
this.nameFormat = nameFormat;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
|
||||
final ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
|
||||
worker.setName(String.format(nameFormat, worker.getPoolIndex()));
|
||||
return worker;
|
||||
}
|
||||
|
||||
}
|
Laden…
In neuem Issue referenzieren
Einen Benutzer sperren