diff --git a/proxy/src/main/java/com/velocitypowered/proxy/event/VelocityEventManager.java b/proxy/src/main/java/com/velocitypowered/proxy/event/VelocityEventManager.java index ee2069fd0..f5f7ca518 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/event/VelocityEventManager.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/event/VelocityEventManager.java @@ -16,6 +16,7 @@ import com.velocitypowered.api.event.Subscribe; import com.velocitypowered.api.plugin.PluginContainer; import com.velocitypowered.api.plugin.PluginManager; import java.lang.invoke.MethodHandles; +import java.lang.invoke.VarHandle; import java.lang.reflect.Method; import java.lang.reflect.Modifier; import java.util.ArrayList; @@ -32,7 +33,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Predicate; @@ -415,6 +415,112 @@ public class VelocityEventManager implements EventManager { } } + private static final int TASK_STATE_DEFAULT = 0; + private static final int TASK_STATE_EXECUTING = 1; + private static final int TASK_STATE_CONTINUE_IMMEDIATELY = 2; + + private static final VarHandle CONTINUATION_TASK_RESUMED; + private static final VarHandle CONTINUATION_TASK_STATE; + + static { + try { + CONTINUATION_TASK_RESUMED = MethodHandles.lookup() + .findVarHandle(ContinuationTask.class, "resumed", boolean.class); + CONTINUATION_TASK_STATE = MethodHandles.lookup() + .findVarHandle(ContinuationTask.class, "state", int.class); + } catch (final ReflectiveOperationException e) { + throw new IllegalStateException(); + } + } + + final class ContinuationTask implements Continuation, Runnable { + + private final EventTask.WithContinuation task; + private final int index; + private final HandlerRegistration[] registrations; + private final @Nullable CompletableFuture future; + private final boolean currentlyAsync; + private final E event; + + private volatile int state = TASK_STATE_DEFAULT; + private volatile boolean resumed = false; + + private ContinuationTask( + final EventTask.WithContinuation task, + final HandlerRegistration[] registrations, + final @Nullable CompletableFuture future, + final E event, + final int index, + final boolean currentlyAsync) { + this.task = task; + this.registrations = registrations; + this.future = future; + this.event = event; + this.index = index; + this.currentlyAsync = currentlyAsync; + } + + @Override + public void run() { + if (execute()) { + fire(future, event, index + 1, currentlyAsync, registrations); + } + } + + /** + * Executes the task and returns whether the next one should be executed + * immediately after this one without scheduling. + */ + boolean execute() { + state = TASK_STATE_EXECUTING; + try { + task.run(this); + } catch (final Throwable t) { + // validateOnlyOnce false here so don't get an exception if the + // continuation was resumed before + resume(t, false); + } + return !CONTINUATION_TASK_STATE.compareAndSet( + this, TASK_STATE_EXECUTING, TASK_STATE_DEFAULT); + } + + @Override + public void resume() { + resume(null, true); + } + + void resume(final @Nullable Throwable exception, final boolean validateOnlyOnce) { + final boolean changed = CONTINUATION_TASK_RESUMED.compareAndSet(this, false, true); + // Only allow the continuation to be resumed once + if (!changed && validateOnlyOnce) { + throw new IllegalStateException("The continuation can only be resumed once."); + } + final HandlerRegistration registration = registrations[index]; + if (exception != null) { + logHandlerException(registration, exception); + } + if (!changed) { + return; + } + if (index + 1 == registrations.length) { + // Optimization: don't schedule a task just to complete the future + if (future != null) { + future.complete(event); + } + return; + } + if (!CONTINUATION_TASK_STATE.compareAndSet( + this, TASK_STATE_EXECUTING, TASK_STATE_CONTINUE_IMMEDIATELY)) { + asyncExecutor.execute(() -> fire(future, event, index + 1, true, registrations)); + } + } + + @Override + public void resumeWithException(final Throwable exception) { + resume(requireNonNull(exception, "exception"), true); + } + } + private void fire(final @Nullable CompletableFuture future, final E event, final int offset, final boolean currentlyAsync, final HandlerRegistration[] registrations) { for (int i = offset; i < registrations.length; i++) { @@ -427,49 +533,14 @@ public class VelocityEventManager implements EventManager { if (eventTask instanceof EventTask.WithContinuation) { final EventTask.WithContinuation withContinuation = (EventTask.WithContinuation) eventTask; - final int index = i; - final Continuation continuation = new Continuation() { - private final AtomicBoolean resumed = new AtomicBoolean(); - - @Override - public void resume() { - resume(null); - } - - void resume(final @Nullable Throwable exception) { - // Only allow the continuation to be resumed once - if (!resumed.compareAndSet(false, true)) { - throw new IllegalStateException("The continuation can only be resumed once."); - } - if (exception != null) { - logHandlerException(registration, exception); - } - if (index + 1 == registrations.length) { - // Optimization: don't schedule a task just to complete the future - if (future != null) { - future.complete(event); - } - return; - } - asyncExecutor.execute(() -> fire(future, event, index + 1, true, registrations)); - } - - @Override - public void resumeWithException(final Throwable exception) { - resume(requireNonNull(exception, "exception")); - } - }; - final Runnable task = () -> { - try { - withContinuation.run(continuation); - } catch (final Throwable t) { - continuation.resumeWithException(t); - } - }; + final ContinuationTask continuationTask = new ContinuationTask<>(withContinuation, + registrations, future, event, i, currentlyAsync); if (currentlyAsync || !eventTask.requiresAsync()) { - task.run(); + if (continuationTask.execute()) { + continue; + } } else { - asyncExecutor.execute(task); + asyncExecutor.execute(continuationTask); } // fire will continue in another thread once the async task is // executed and the continuation is resumed diff --git a/proxy/src/test/java/com/velocitypowered/proxy/event/EventTest.java b/proxy/src/test/java/com/velocitypowered/proxy/event/EventTest.java index 93d71e052..ae7590701 100644 --- a/proxy/src/test/java/com/velocitypowered/proxy/event/EventTest.java +++ b/proxy/src/test/java/com/velocitypowered/proxy/event/EventTest.java @@ -223,4 +223,38 @@ public class EventTest { threadC = Thread.currentThread(); } } + + @Test + void testResumeContinuationImmediately() throws Exception { + final ResumeContinuationImmediatelyListener listener = new ResumeContinuationImmediatelyListener(); + handleMethodListener(listener); + assertSyncThread(listener.threadA); + assertSyncThread(listener.threadB); + assertSyncThread(listener.threadC); + assertEquals(2, listener.result); + } + + static final class ResumeContinuationImmediatelyListener { + + Thread threadA; + Thread threadB; + Thread threadC; + int result; + + @Subscribe(order = PostOrder.EARLY) + EventTask continuation(TestEvent event) { + threadA = Thread.currentThread(); + return EventTask.withContinuation(continuation -> { + threadB = Thread.currentThread(); + result++; + continuation.resume(); + }); + } + + @Subscribe(order = PostOrder.LATE) + void afterContinuation(TestEvent event) { + threadC = Thread.currentThread(); + result++; + } + } }