Mirror von
https://github.com/PaperMC/Velocity.git
synchronisiert 2024-11-16 21:10:30 +01:00
Reduce some context switches if a continuation is resumed during the event task.
When a continuation was called during the execution of an event task, and not from an asynchronous thread, continue on the thread where the event task was executed instead of scheduling an async task to do it later. Can happen when resuming because no async work needed to be done, and for some reason returning a null EventTask isn't an option.
Dieser Commit ist enthalten in:
Ursprung
1b551a8e1c
Commit
e1d21c0c27
@ -16,6 +16,7 @@ import com.velocitypowered.api.event.Subscribe;
|
||||
import com.velocitypowered.api.plugin.PluginContainer;
|
||||
import com.velocitypowered.api.plugin.PluginManager;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.lang.invoke.VarHandle;
|
||||
import java.lang.reflect.Method;
|
||||
import java.lang.reflect.Modifier;
|
||||
import java.util.ArrayList;
|
||||
@ -32,7 +33,6 @@ import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.function.Predicate;
|
||||
@ -415,6 +415,106 @@ public class VelocityEventManager implements EventManager {
|
||||
}
|
||||
}
|
||||
|
||||
private static final int TASK_STATE_DEFAULT = 0;
|
||||
private static final int TASK_STATE_EXECUTING = 1;
|
||||
private static final int TASK_STATE_CONTINUE_IMMEDIATELY = 2;
|
||||
|
||||
private static final VarHandle CONTINUATION_TASK_RESUMED;
|
||||
private static final VarHandle CONTINUATION_TASK_STATE;
|
||||
|
||||
static {
|
||||
try {
|
||||
CONTINUATION_TASK_RESUMED = MethodHandles.lookup()
|
||||
.findVarHandle(ContinuationTask.class, "resumed", boolean.class);
|
||||
CONTINUATION_TASK_STATE = MethodHandles.lookup()
|
||||
.findVarHandle(ContinuationTask.class, "state", int.class);
|
||||
} catch (final ReflectiveOperationException e) {
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
}
|
||||
|
||||
final class ContinuationTask<E> implements Continuation, Runnable {
|
||||
|
||||
private final EventTask.WithContinuation task;
|
||||
private final int index;
|
||||
private final HandlerRegistration[] registrations;
|
||||
private final @Nullable CompletableFuture<E> future;
|
||||
private final boolean currentlyAsync;
|
||||
private final E event;
|
||||
|
||||
private volatile int state = TASK_STATE_DEFAULT;
|
||||
private volatile boolean resumed = false;
|
||||
|
||||
private ContinuationTask(
|
||||
final EventTask.WithContinuation task,
|
||||
final HandlerRegistration[] registrations,
|
||||
final @Nullable CompletableFuture<E> future,
|
||||
final E event,
|
||||
final int index,
|
||||
final boolean currentlyAsync) {
|
||||
this.task = task;
|
||||
this.registrations = registrations;
|
||||
this.future = future;
|
||||
this.event = event;
|
||||
this.index = index;
|
||||
this.currentlyAsync = currentlyAsync;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
if (execute()) {
|
||||
fire(future, event, index + 1, currentlyAsync, registrations);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes the task and returns whether the next one should be executed
|
||||
* immediately after this one without scheduling.
|
||||
*/
|
||||
boolean execute() {
|
||||
state = TASK_STATE_EXECUTING;
|
||||
try {
|
||||
task.run(this);
|
||||
} catch (final Throwable t) {
|
||||
resumeWithException(t);
|
||||
}
|
||||
return !CONTINUATION_TASK_STATE.compareAndSet(
|
||||
this, TASK_STATE_EXECUTING, TASK_STATE_DEFAULT);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resume() {
|
||||
resume(null);
|
||||
}
|
||||
|
||||
void resume(final @Nullable Throwable exception) {
|
||||
// Only allow the continuation to be resumed once
|
||||
if (!CONTINUATION_TASK_RESUMED.compareAndSet(this, false, true)) {
|
||||
throw new IllegalStateException("The continuation can only be resumed once.");
|
||||
}
|
||||
final HandlerRegistration registration = registrations[index];
|
||||
if (exception != null) {
|
||||
logHandlerException(registration, exception);
|
||||
}
|
||||
if (index + 1 == registrations.length) {
|
||||
// Optimization: don't schedule a task just to complete the future
|
||||
if (future != null) {
|
||||
future.complete(event);
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (!CONTINUATION_TASK_STATE.compareAndSet(
|
||||
this, TASK_STATE_EXECUTING, TASK_STATE_CONTINUE_IMMEDIATELY)) {
|
||||
asyncExecutor.execute(() -> fire(future, event, index + 1, true, registrations));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resumeWithException(final Throwable exception) {
|
||||
resume(requireNonNull(exception, "exception"));
|
||||
}
|
||||
}
|
||||
|
||||
private <E> void fire(final @Nullable CompletableFuture<E> future, final E event,
|
||||
final int offset, final boolean currentlyAsync, final HandlerRegistration[] registrations) {
|
||||
for (int i = offset; i < registrations.length; i++) {
|
||||
@ -427,49 +527,14 @@ public class VelocityEventManager implements EventManager {
|
||||
if (eventTask instanceof EventTask.WithContinuation) {
|
||||
final EventTask.WithContinuation withContinuation =
|
||||
(EventTask.WithContinuation) eventTask;
|
||||
final int index = i;
|
||||
final Continuation continuation = new Continuation() {
|
||||
private final AtomicBoolean resumed = new AtomicBoolean();
|
||||
|
||||
@Override
|
||||
public void resume() {
|
||||
resume(null);
|
||||
}
|
||||
|
||||
void resume(final @Nullable Throwable exception) {
|
||||
// Only allow the continuation to be resumed once
|
||||
if (!resumed.compareAndSet(false, true)) {
|
||||
throw new IllegalStateException("The continuation can only be resumed once.");
|
||||
}
|
||||
if (exception != null) {
|
||||
logHandlerException(registration, exception);
|
||||
}
|
||||
if (index + 1 == registrations.length) {
|
||||
// Optimization: don't schedule a task just to complete the future
|
||||
if (future != null) {
|
||||
future.complete(event);
|
||||
}
|
||||
return;
|
||||
}
|
||||
asyncExecutor.execute(() -> fire(future, event, index + 1, true, registrations));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resumeWithException(final Throwable exception) {
|
||||
resume(requireNonNull(exception, "exception"));
|
||||
}
|
||||
};
|
||||
final Runnable task = () -> {
|
||||
try {
|
||||
withContinuation.run(continuation);
|
||||
} catch (final Throwable t) {
|
||||
continuation.resumeWithException(t);
|
||||
}
|
||||
};
|
||||
final ContinuationTask<E> continuationTask = new ContinuationTask<>(withContinuation,
|
||||
registrations, future, event, i, currentlyAsync);
|
||||
if (currentlyAsync || !eventTask.requiresAsync()) {
|
||||
task.run();
|
||||
if (continuationTask.execute()) {
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
asyncExecutor.execute(task);
|
||||
asyncExecutor.execute(continuationTask);
|
||||
}
|
||||
// fire will continue in another thread once the async task is
|
||||
// executed and the continuation is resumed
|
||||
|
@ -223,4 +223,38 @@ public class EventTest {
|
||||
threadC = Thread.currentThread();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void testResumeContinuationImmediately() throws Exception {
|
||||
final ResumeContinuationImmediatelyListener listener = new ResumeContinuationImmediatelyListener();
|
||||
handleMethodListener(listener);
|
||||
assertSyncThread(listener.threadA);
|
||||
assertSyncThread(listener.threadB);
|
||||
assertSyncThread(listener.threadC);
|
||||
assertEquals(2, listener.result);
|
||||
}
|
||||
|
||||
static final class ResumeContinuationImmediatelyListener {
|
||||
|
||||
Thread threadA;
|
||||
Thread threadB;
|
||||
Thread threadC;
|
||||
int result;
|
||||
|
||||
@Subscribe(order = PostOrder.EARLY)
|
||||
EventTask continuation(TestEvent event) {
|
||||
threadA = Thread.currentThread();
|
||||
return EventTask.withContinuation(continuation -> {
|
||||
threadB = Thread.currentThread();
|
||||
result++;
|
||||
continuation.resume();
|
||||
});
|
||||
}
|
||||
|
||||
@Subscribe(order = PostOrder.LATE)
|
||||
void afterContinuation(TestEvent event) {
|
||||
threadC = Thread.currentThread();
|
||||
result++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Laden…
In neuem Issue referenzieren
Einen Benutzer sperren