From 4f227badc20dc30b0f6d84b5349c8809481dcbb1 Mon Sep 17 00:00:00 2001 From: Andrew Steinborn Date: Sun, 14 May 2023 04:32:58 -0400 Subject: [PATCH] Reintroduce sync event execution to the Velocity event system This required a not-insubstantial number of bug fixes, since the sync support had bit-rotted somewhat. This PR also corrects a number of bugs. Finally. the per-plugin executor services are now used to execute all async event tasks. --- .../api/event/EventManager.java | 18 ++ .../velocitypowered/api/event/PostOrder.java | 2 +- .../velocitypowered/api/event/Subscribe.java | 34 ++-- .../velocitypowered/proxy/VelocityServer.java | 1 - .../proxy/command/VelocityCommandManager.java | 9 +- .../proxy/event/VelocityEventManager.java | 154 +++++++++++++----- .../proxy/command/CommandTestSuite.java | 11 -- .../proxy/event/EventTest.java | 146 ++++++++++++----- .../proxy/testutil/FakePluginManager.java | 29 ++-- 9 files changed, 292 insertions(+), 112 deletions(-) diff --git a/api/src/main/java/com/velocitypowered/api/event/EventManager.java b/api/src/main/java/com/velocitypowered/api/event/EventManager.java index 63cae6f32..b8702ab35 100644 --- a/api/src/main/java/com/velocitypowered/api/event/EventManager.java +++ b/api/src/main/java/com/velocitypowered/api/event/EventManager.java @@ -45,10 +45,28 @@ public interface EventManager { * @param postOrder the order in which events should be posted to the handler * @param handler the handler to register * @param the event type to handle + * @deprecated use {@link #register(Object, Class, short, EventHandler)} instead */ + @Deprecated void register(Object plugin, Class eventClass, PostOrder postOrder, EventHandler handler); + /** + * Requests that the specified {@code handler} listen for events and associate it with the {@code + * plugin}. + * + *

Note that this method will register a non-asynchronous listener by default. If you want to + * use an asynchronous event handler, return {@link EventTask#async(Runnable)} from the handler.

+ * + * @param plugin the plugin to associate with the handler + * @param eventClass the class for the event handler to register + * @param postOrder the relative order in which events should be posted to the handler + * @param handler the handler to register + * @param the event type to handle + */ + void register(Object plugin, Class eventClass, short postOrder, + EventHandler handler); + /** * Fires the specified event to the event bus asynchronously. This allows Velocity to continue * servicing connections while a plugin handles a potentially long-running operation such as a diff --git a/api/src/main/java/com/velocitypowered/api/event/PostOrder.java b/api/src/main/java/com/velocitypowered/api/event/PostOrder.java index dde8a4379..0d52ed2c0 100644 --- a/api/src/main/java/com/velocitypowered/api/event/PostOrder.java +++ b/api/src/main/java/com/velocitypowered/api/event/PostOrder.java @@ -12,6 +12,6 @@ package com.velocitypowered.api.event; */ public enum PostOrder { - FIRST, EARLY, NORMAL, LATE, LAST + FIRST, EARLY, NORMAL, LATE, LAST, CUSTOM } diff --git a/api/src/main/java/com/velocitypowered/api/event/Subscribe.java b/api/src/main/java/com/velocitypowered/api/event/Subscribe.java index bee71a3cb..abb96c949 100644 --- a/api/src/main/java/com/velocitypowered/api/event/Subscribe.java +++ b/api/src/main/java/com/velocitypowered/api/event/Subscribe.java @@ -22,24 +22,38 @@ public @interface Subscribe { /** * The order events will be posted to this listener. * + * @deprecated specify the order using {@link #priority()} instead * @return the order */ + @Deprecated PostOrder order() default PostOrder.NORMAL; /** - * Whether the handler must be called asynchronously. + * The priority of this event handler. Priorities are used to determine the order in which event + * handlers are called. The higher the priority, the earlier the event handler will be called. * - *

This option currently has no effect, but in the future it will. In Velocity 3.0.0, - * all event handlers run asynchronously by default. You are encouraged to determine whether or - * not to enable it now. This option is being provided as a migration aid.

+ *

Note that due to compatibility constraints, you must specify {@link PostOrder#CUSTOM} + * in order to use this field.

* - *

If this method returns {@code true}, the method is guaranteed to be executed - * asynchronously. Otherwise, the handler may be executed on the current thread or - * asynchronously. This still means you must consider thread-safety in your - * event listeners as the "current thread" can and will be different each time.

+ * @return the priority + */ + short priority() default Short.MIN_VALUE; + + /** + * Whether the handler must be called asynchronously. By default, all event handlers are called + * asynchronously. * - *

If any method handler targeting an event type is marked with {@code true}, then every - * handler targeting that event type will be executed asynchronously.

+ *

For performance (for instance, if you use {@link EventTask#withContinuation}), you can + * optionally specify false. This option will become {@code false} by default + * in a future release of Velocity.

+ * + *

If this is {@code true}, the method is guaranteed to be executed asynchronously. Otherwise, + * the handler may be executed on the current thread or asynchronously. This still means + * you must consider thread-safety in your event listeners as the "current thread" can + * and will be different each time.

+ * + *

Note that if any method handler targeting an event type is marked with {@code true}, then + * every handler targeting that event type will be executed asynchronously.

* * @return Requires async */ diff --git a/proxy/src/main/java/com/velocitypowered/proxy/VelocityServer.java b/proxy/src/main/java/com/velocitypowered/proxy/VelocityServer.java index be0c8de9b..2a362ccc9 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/VelocityServer.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/VelocityServer.java @@ -540,7 +540,6 @@ public class VelocityServer implements ProxyServer, ForwardingAudience { eventManager.fire(new ProxyShutdownEvent()).join(); - timedOut = !eventManager.shutdown() || timedOut; timedOut = !scheduler.shutdown() || timedOut; if (timedOut) { diff --git a/proxy/src/main/java/com/velocitypowered/proxy/command/VelocityCommandManager.java b/proxy/src/main/java/com/velocitypowered/proxy/command/VelocityCommandManager.java index a4fab9321..24fc6a059 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/command/VelocityCommandManager.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/command/VelocityCommandManager.java @@ -49,6 +49,8 @@ import java.util.Locale; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ForkJoinPool; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; @@ -71,6 +73,7 @@ public class VelocityCommandManager implements CommandManager { private final SuggestionsProvider suggestionsProvider; private final CommandGraphInjector injector; private final Map commandMetas; + private final ExecutorService asyncExecutor; /** * Constructs a command manager. @@ -89,6 +92,7 @@ public class VelocityCommandManager implements CommandManager { this.suggestionsProvider = new SuggestionsProvider<>(this.dispatcher, this.lock.readLock()); this.injector = new CommandGraphInjector<>(this.dispatcher, this.lock.readLock()); this.commandMetas = new ConcurrentHashMap<>(); + this.asyncExecutor = ForkJoinPool.commonPool(); // TODO: remove entirely } public void setAnnounceProxyCommands(boolean announceProxyCommands) { @@ -266,7 +270,7 @@ public class VelocityCommandManager implements CommandManager { return false; } return executeImmediately0(source, commandResult.getCommand().orElse(event.getCommand())); - }, eventManager.getAsyncExecutor()); + }, asyncExecutor); } @Override @@ -275,8 +279,7 @@ public class VelocityCommandManager implements CommandManager { Preconditions.checkNotNull(source, "source"); Preconditions.checkNotNull(cmdLine, "cmdLine"); - return CompletableFuture.supplyAsync( - () -> executeImmediately0(source, cmdLine), eventManager.getAsyncExecutor()); + return CompletableFuture.supplyAsync(() -> executeImmediately0(source, cmdLine), asyncExecutor); } /** diff --git a/proxy/src/main/java/com/velocitypowered/proxy/event/VelocityEventManager.java b/proxy/src/main/java/com/velocitypowered/proxy/event/VelocityEventManager.java index f00c3877c..4c48068cb 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/event/VelocityEventManager.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/event/VelocityEventManager.java @@ -25,7 +25,6 @@ import com.google.common.base.VerifyException; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ListMultimap; import com.google.common.reflect.TypeToken; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.velocitypowered.api.event.Continuation; import com.velocitypowered.api.event.EventHandler; import com.velocitypowered.api.event.EventManager; @@ -38,6 +37,7 @@ import com.velocitypowered.api.plugin.PluginManager; import com.velocitypowered.proxy.event.UntargetedEventHandler.EventTaskHandler; import com.velocitypowered.proxy.event.UntargetedEventHandler.VoidHandler; import com.velocitypowered.proxy.event.UntargetedEventHandler.WithContinuationHandler; +import com.velocitypowered.proxy.util.collect.Enum2IntMap; import java.lang.invoke.MethodHandle; import java.lang.invoke.MethodHandles; import java.lang.invoke.VarHandle; @@ -55,9 +55,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.BiConsumer; @@ -76,6 +73,14 @@ import org.lanternpowered.lmbda.LambdaType; */ public class VelocityEventManager implements EventManager { + private static final Enum2IntMap POST_ORDER_MAP = new Enum2IntMap.Builder<>(PostOrder.class) + .put(PostOrder.FIRST, Short.MAX_VALUE - 1) + .put(PostOrder.EARLY, Short.MAX_VALUE / 2) + .put(PostOrder.NORMAL, 0) + .put(PostOrder.LATE, Short.MIN_VALUE / 2) + .put(PostOrder.LAST, Short.MIN_VALUE + 1) + .put(PostOrder.CUSTOM, 0) + .build(); private static final Logger logger = LogManager.getLogger(VelocityEventManager.class); private static final MethodHandles.Lookup methodHandlesLookup = MethodHandles.lookup(); @@ -87,9 +92,8 @@ public class VelocityEventManager implements EventManager { LambdaType.of(WithContinuationHandler.class); private static final Comparator handlerComparator = - Comparator.comparingInt(o -> o.order); + Collections.reverseOrder(Comparator.comparingInt(o -> o.order)); - private final ExecutorService asyncExecutor; private final PluginManager pluginManager; private final ListMultimap, HandlerRegistration> handlersByType = @@ -112,9 +116,6 @@ public class VelocityEventManager implements EventManager { */ public VelocityEventManager(final PluginManager pluginManager) { this.pluginManager = pluginManager; - this.asyncExecutor = Executors - .newFixedThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactoryBuilder() - .setNameFormat("Velocity Async Event Executor - #%d").setDaemon(true).build()); } /** @@ -140,6 +141,7 @@ public class VelocityEventManager implements EventManager { final short order; final Class eventType; final EventHandler handler; + final AsyncType asyncType; /** * The instance of the {@link EventHandler} or the listener instance that was registered. @@ -147,20 +149,40 @@ public class VelocityEventManager implements EventManager { final Object instance; public HandlerRegistration(final PluginContainer plugin, final short order, - final Class eventType, final Object instance, final EventHandler handler) { + final Class eventType, final Object instance, final EventHandler handler, + final AsyncType asyncType) { this.plugin = plugin; this.order = order; this.eventType = eventType; this.instance = instance; this.handler = handler; + this.asyncType = asyncType; } } + enum AsyncType { + /** + * The event will never run async, everything is handled on the netty thread. + */ + NEVER, + /** + * The event will initially start on the thread calling the {@code fire} method, and possibly + * switch over to an async thread. + */ + SOMETIMES, + /** + * The complete event will be handled on an async thread. + */ + ALWAYS + } + static final class HandlersCache { + final AsyncType asyncType; final HandlerRegistration[] handlers; - HandlersCache(final HandlerRegistration[] handlers) { + HandlersCache(AsyncType asyncType, final HandlerRegistration[] handlers) { + this.asyncType = asyncType; this.handlers = handlers; } } @@ -183,7 +205,15 @@ public class VelocityEventManager implements EventManager { } baked.sort(handlerComparator); - return new HandlersCache(baked.toArray(new HandlerRegistration[0])); + + AsyncType asyncType = AsyncType.NEVER; + for (HandlerRegistration registration : baked) { + if (registration.asyncType.compareTo(asyncType) > 0) { + asyncType = registration.asyncType; + } + } + + return new HandlersCache(asyncType, baked.toArray(new HandlerRegistration[0])); } /** @@ -219,15 +249,17 @@ public class VelocityEventManager implements EventManager { static final class MethodHandlerInfo { final Method method; + final AsyncType asyncType; final @Nullable Class eventType; final short order; final @Nullable String errors; final @Nullable Class continuationType; - private MethodHandlerInfo(final Method method, final @Nullable Class eventType, - final short order, final @Nullable String errors, + private MethodHandlerInfo(final Method method, final AsyncType asyncType, + final @Nullable Class eventType, final short order, final @Nullable String errors, final @Nullable Class continuationType) { this.method = method; + this.asyncType = asyncType; this.eventType = eventType; this.order = order; this.errors = errors; @@ -291,17 +323,41 @@ public class VelocityEventManager implements EventManager { } } } + AsyncType asyncType = AsyncType.NEVER; + final Class returnType = method.getReturnType(); if (handlerAdapter == null) { - final Class returnType = method.getReturnType(); if (returnType != void.class && continuationType == Continuation.class) { errors.add("method return type must be void if a continuation parameter is provided"); } else if (returnType != void.class && returnType != EventTask.class) { - errors.add("method return type must be void or EventTask"); + errors.add("method return type must be void, AsyncTask, " + + "EventTask.Basic or EventTask.WithContinuation"); + } else if (returnType == EventTask.class) { + // technically, for compatibility, we *should* assume that the method must be invoked + // async, however, from examining some publicly-available plugins, developers did + // generally follow the contract and returned an EventTask only if they wanted this + // behavior. enable it for them. + asyncType = AsyncType.SOMETIMES; } + } else { + // for custom handlers, we always expect a return type of EventTask. this feature appears + // to have not been used in the wild AFAIK, so it gets the new behavior by default + asyncType = AsyncType.SOMETIMES; + } + + if (paramCount == 1 && returnType == void.class && subscribe.async()) { + // these are almost always a dead giveaway of a plugin that will need its handlers + // run async, so unless we're told otherwise, we'll assume that's the case + asyncType = AsyncType.ALWAYS; + } + + final short order; + if (subscribe.order() == PostOrder.CUSTOM) { + order = subscribe.priority(); + } else { + order = (short) POST_ORDER_MAP.get(subscribe.order()); } - final short order = (short) subscribe.order().ordinal(); final String errorsJoined = errors.isEmpty() ? null : String.join(",", errors); - collected.put(key, new MethodHandlerInfo(method, eventType, order, errorsJoined, + collected.put(key, new MethodHandlerInfo(method, asyncType, eventType, order, errorsJoined, continuationType)); } final Class superclass = targetClass.getSuperclass(); @@ -340,12 +396,29 @@ public class VelocityEventManager implements EventManager { @SuppressWarnings("unchecked") public void register(final Object plugin, final Class eventClass, final PostOrder order, final EventHandler handler) { + if (order == PostOrder.CUSTOM) { + throw new IllegalArgumentException( + "This method does not support custom post orders. Use the overload with short instead." + ); + } + register(plugin, eventClass, (short) POST_ORDER_MAP.get(order), handler, AsyncType.ALWAYS); + } + + @Override + public void register(Object plugin, Class eventClass, short postOrder, + EventHandler handler) { + register(plugin, eventClass, postOrder, handler, AsyncType.SOMETIMES); + } + + private void register(Object plugin, Class eventClass, short postOrder, + EventHandler handler, AsyncType asyncType) { final PluginContainer pluginContainer = pluginManager.ensurePluginContainer(plugin); requireNonNull(eventClass, "eventClass"); requireNonNull(handler, "handler"); final HandlerRegistration registration = new HandlerRegistration(pluginContainer, - (short) order.ordinal(), eventClass, handler, (EventHandler) handler); + postOrder, eventClass, handler, (EventHandler) handler, + AsyncType.ALWAYS); register(Collections.singletonList(registration)); } @@ -375,7 +448,7 @@ public class VelocityEventManager implements EventManager { final EventHandler handler = untargetedHandler.buildHandler(listener); registrations.add(new HandlerRegistration(pluginContainer, info.order, - info.eventType, listener, handler)); + info.eventType, listener, handler, info.asyncType)); } register(registrations); @@ -462,10 +535,13 @@ public class VelocityEventManager implements EventManager { private void fire(final @Nullable CompletableFuture future, final E event, final HandlersCache handlersCache) { - // In Velocity 1.1.0, all events were fired asynchronously. As Velocity 3.0.0 is intended to be - // largely (albeit not 100%) compatible with 1.1.x, we also fire events async. This behavior - // will go away in Velocity Polymer. - asyncExecutor.execute(() -> fire(future, event, 0, true, handlersCache.handlers)); + final HandlerRegistration registration = handlersCache.handlers[0]; + if (registration.asyncType == AsyncType.ALWAYS) { + registration.plugin.getExecutorService().execute( + () -> fire(future, event, 0, true, handlersCache.handlers)); + } else { + fire(future, event, 0, false, handlersCache.handlers); + } } private static final int TASK_STATE_DEFAULT = 0; @@ -494,6 +570,7 @@ public class VelocityEventManager implements EventManager { private final @Nullable CompletableFuture future; private final boolean currentlyAsync; private final E event; + private final Thread firedOnThread; // This field is modified via a VarHandle, so this field is used and cannot be final. @SuppressWarnings({"UnusedVariable", "FieldMayBeFinal", "FieldCanBeLocal"}) @@ -516,6 +593,7 @@ public class VelocityEventManager implements EventManager { this.event = event; this.index = index; this.currentlyAsync = currentlyAsync; + this.firedOnThread = Thread.currentThread(); } @Override @@ -526,8 +604,8 @@ public class VelocityEventManager implements EventManager { } /** - * Executes the task and returns whether the next one should be executed immediately after this - * one without scheduling. + * Executes the task and returns whether the next handler should be executed immediately + * after this one, without additional scheduling. */ boolean execute() { state = TASK_STATE_EXECUTING; @@ -569,7 +647,18 @@ public class VelocityEventManager implements EventManager { } if (!CONTINUATION_TASK_STATE.compareAndSet( this, TASK_STATE_EXECUTING, TASK_STATE_CONTINUE_IMMEDIATELY)) { - asyncExecutor.execute(() -> fire(future, event, index + 1, true, registrations)); + // We established earlier that registrations[index + 1] is a valid index. + // If we are remaining in the same thread for the next handler, fire + // the next event immediately, else fire it within the executor service + // of the plugin with the next handler. + final HandlerRegistration next = registrations[index + 1]; + final Thread currentThread = Thread.currentThread(); + if (currentThread == firedOnThread && next.asyncType != AsyncType.ALWAYS) { + fire(future, event, index + 1, currentlyAsync, registrations); + } else { + next.plugin.getExecutorService().execute(() -> + fire(future, event, index + 1, true, registrations)); + } } } @@ -595,7 +684,7 @@ public class VelocityEventManager implements EventManager { continue; } } else { - asyncExecutor.execute(continuationTask); + registration.plugin.getExecutorService().execute(continuationTask); } // fire will continue in another thread once the async task is // executed and the continuation is resumed @@ -615,13 +704,4 @@ public class VelocityEventManager implements EventManager { logger.error("Couldn't pass {} to {} {}", registration.eventType.getSimpleName(), pluginDescription.getId(), pluginDescription.getVersion().orElse(""), t); } - - public boolean shutdown() throws InterruptedException { - asyncExecutor.shutdown(); - return asyncExecutor.awaitTermination(10, TimeUnit.SECONDS); - } - - public ExecutorService getAsyncExecutor() { - return asyncExecutor; - } } \ No newline at end of file diff --git a/proxy/src/test/java/com/velocitypowered/proxy/command/CommandTestSuite.java b/proxy/src/test/java/com/velocitypowered/proxy/command/CommandTestSuite.java index 9bd202fa4..c8e17d0f8 100644 --- a/proxy/src/test/java/com/velocitypowered/proxy/command/CommandTestSuite.java +++ b/proxy/src/test/java/com/velocitypowered/proxy/command/CommandTestSuite.java @@ -31,7 +31,6 @@ import com.velocitypowered.proxy.event.MockEventManager; import com.velocitypowered.proxy.event.VelocityEventManager; import java.util.Arrays; import java.util.Collection; -import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@ -47,16 +46,6 @@ abstract class CommandTestSuite { eventManager = new MockEventManager(); } - @AfterAll - static void afterAll() { - try { - eventManager.shutdown(); - eventManager = null; - } catch (final InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - @BeforeEach void setUp() { this.manager = new VelocityCommandManager(eventManager); diff --git a/proxy/src/test/java/com/velocitypowered/proxy/event/EventTest.java b/proxy/src/test/java/com/velocitypowered/proxy/event/EventTest.java index e05df200f..fc817f4db 100644 --- a/proxy/src/test/java/com/velocitypowered/proxy/event/EventTest.java +++ b/proxy/src/test/java/com/velocitypowered/proxy/event/EventTest.java @@ -41,20 +41,24 @@ import org.junit.jupiter.api.TestInstance; public class EventTest { public static final String CONTINUATION_TEST_THREAD_NAME = "Continuation test thread"; - private final VelocityEventManager eventManager = - new VelocityEventManager(new FakePluginManager()); + private final FakePluginManager pluginManager = new FakePluginManager(); + private final VelocityEventManager eventManager = new VelocityEventManager(pluginManager); @AfterAll void shutdown() throws Exception { - eventManager.shutdown(); + pluginManager.shutdown(); } static final class TestEvent { } + static void assertSyncThread(final Thread thread) { + assertEquals(Thread.currentThread(), thread); + } + static void assertAsyncThread(final Thread thread) { - assertTrue(thread.getName().contains("Velocity Async Event Executor")); + assertTrue(thread.getName().contains("Test Async Thread")); } static void assertContinuationThread(final Thread thread) { @@ -90,6 +94,7 @@ public class EventTest { eventManager.fire(new TestEvent()).get(); } finally { eventManager.unregisterListeners(FakePluginManager.PLUGIN_A); + eventManager.unregisterListeners(FakePluginManager.PLUGIN_B); } // Check that the order is A < B < C. @@ -119,6 +124,7 @@ public class EventTest { eventManager.fire(new TestEvent()).get(); } finally { eventManager.unregisterListeners(FakePluginManager.PLUGIN_A); + eventManager.unregisterListeners(FakePluginManager.PLUGIN_B); } // Check that the order is A < B < C. @@ -126,6 +132,26 @@ public class EventTest { assertTrue(listener2Invoked.get() < listener3Invoked.get(), "Listener C invoked before B!"); } + @Test + void testAlwaysSync() throws Exception { + final AlwaysSyncListener listener = new AlwaysSyncListener(); + handleMethodListener(listener); + assertSyncThread(listener.thread); + assertEquals(1, listener.result); + } + + static final class AlwaysSyncListener { + + @MonotonicNonNull Thread thread; + int result; + + @Subscribe(async = false) + void sync(TestEvent event) { + result++; + thread = Thread.currentThread(); + } + } + @Test void testAlwaysAsync() throws Exception { final AlwaysAsyncListener listener = new AlwaysAsyncListener(); @@ -143,7 +169,7 @@ public class EventTest { @MonotonicNonNull Thread threadC; int result; - @Subscribe + @Subscribe(async = true, order = PostOrder.EARLY) void firstAsync(TestEvent event) { result++; threadA = Thread.currentThread(); @@ -155,50 +181,93 @@ public class EventTest { return EventTask.async(() -> result++); } - @Subscribe + @Subscribe(order = PostOrder.LATE) void thirdAsync(TestEvent event) { result++; threadC = Thread.currentThread(); } } + @Test + void testSometimesAsync() throws Exception { + final SometimesAsyncListener listener = new SometimesAsyncListener(); + handleMethodListener(listener); + assertSyncThread(listener.threadA); + assertSyncThread(listener.threadB); + assertAsyncThread(listener.threadC); + assertAsyncThread(listener.threadD); + assertEquals(3, listener.result); + } + + static final class SometimesAsyncListener { + + @MonotonicNonNull Thread threadA; + @MonotonicNonNull Thread threadB; + @MonotonicNonNull Thread threadC; + @MonotonicNonNull Thread threadD; + int result; + + @Subscribe(order = PostOrder.EARLY, async = false) + void notAsync(TestEvent event) { + result++; + threadA = Thread.currentThread(); + } + + @Subscribe + EventTask notAsyncUntilTask(TestEvent event) { + threadB = Thread.currentThread(); + return EventTask.async(() -> { + threadC = Thread.currentThread(); + result++; + }); + } + + @Subscribe(order = PostOrder.LATE, async = false) + void stillAsyncAfterTask(TestEvent event) { + threadD = Thread.currentThread(); + result++; + } + } + @Test void testContinuation() throws Exception { final ContinuationListener listener = new ContinuationListener(); handleMethodListener(listener); - assertAsyncThread(listener.thread1); - assertAsyncThread(listener.thread2); - assertContinuationThread(listener.thread2Custom); - assertAsyncThread(listener.thread3); + assertSyncThread(listener.threadA); + assertSyncThread(listener.threadB); + assertAsyncThread(listener.threadC); assertEquals(2, listener.value.get()); } static final class ContinuationListener { - @MonotonicNonNull Thread thread1; - @MonotonicNonNull Thread thread2; - @MonotonicNonNull Thread thread2Custom; - @MonotonicNonNull Thread thread3; + @MonotonicNonNull Thread threadA; + @MonotonicNonNull Thread threadB; + @MonotonicNonNull Thread threadC; final AtomicInteger value = new AtomicInteger(); @Subscribe(order = PostOrder.EARLY) EventTask continuation(TestEvent event) { - thread1 = Thread.currentThread(); + threadA = Thread.currentThread(); return EventTask.withContinuation(continuation -> { value.incrementAndGet(); - thread2 = Thread.currentThread(); + threadB = Thread.currentThread(); new Thread(() -> { - thread2Custom = Thread.currentThread(); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + e.printStackTrace(); + } value.incrementAndGet(); continuation.resume(); - }, CONTINUATION_TEST_THREAD_NAME).start(); + }).start(); }); } @Subscribe(order = PostOrder.LATE) void afterContinuation(TestEvent event) { - thread3 = Thread.currentThread(); + threadC = Thread.currentThread(); } } @@ -207,9 +276,9 @@ public class EventTest { final ResumeContinuationImmediatelyListener listener = new ResumeContinuationImmediatelyListener(); handleMethodListener(listener); - assertAsyncThread(listener.threadA); - assertAsyncThread(listener.threadB); - assertAsyncThread(listener.threadC); + assertSyncThread(listener.threadA); + assertSyncThread(listener.threadB); + assertSyncThread(listener.threadC); assertEquals(2, listener.result); } @@ -241,42 +310,44 @@ public class EventTest { void testContinuationParameter() throws Exception { final ContinuationParameterListener listener = new ContinuationParameterListener(); handleMethodListener(listener); - assertAsyncThread(listener.thread1); - assertAsyncThread(listener.thread2); - assertContinuationThread(listener.thread2Custom); - assertAsyncThread(listener.thread3); + assertSyncThread(listener.threadA); + assertSyncThread(listener.threadB); + assertAsyncThread(listener.threadC); assertEquals(3, listener.result.get()); } static final class ContinuationParameterListener { - @MonotonicNonNull Thread thread1; - @MonotonicNonNull Thread thread2; - @MonotonicNonNull Thread thread2Custom; - @MonotonicNonNull Thread thread3; + @MonotonicNonNull Thread threadA; + @MonotonicNonNull Thread threadB; + @MonotonicNonNull Thread threadC; final AtomicInteger result = new AtomicInteger(); @Subscribe void resume(TestEvent event, Continuation continuation) { - thread1 = Thread.currentThread(); + threadA = Thread.currentThread(); result.incrementAndGet(); continuation.resume(); } @Subscribe(order = PostOrder.LATE) void resumeFromCustomThread(TestEvent event, Continuation continuation) { - thread2 = Thread.currentThread(); + threadB = Thread.currentThread(); new Thread(() -> { - thread2Custom = Thread.currentThread(); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + e.printStackTrace(); + } result.incrementAndGet(); continuation.resume(); - }, CONTINUATION_TEST_THREAD_NAME).start(); + }).start(); } @Subscribe(order = PostOrder.LAST) void afterCustomThread(TestEvent event, Continuation continuation) { - thread3 = Thread.currentThread(); + threadC = Thread.currentThread(); result.incrementAndGet(); continuation.resume(); } @@ -328,8 +399,7 @@ public class EventTest { + "the second is the fancy continuation"); } }, - new TypeToken>() { - }, + new TypeToken>() {}, invokeFunction -> (instance, event) -> EventTask.withContinuation(continuation -> invokeFunction.accept(instance, event, new FancyContinuationImpl(continuation)) @@ -349,4 +419,4 @@ public class EventTest { continuation.resume(); } } -} \ No newline at end of file +} diff --git a/proxy/src/test/java/com/velocitypowered/proxy/testutil/FakePluginManager.java b/proxy/src/test/java/com/velocitypowered/proxy/testutil/FakePluginManager.java index 04ba3867d..b1bc1af72 100644 --- a/proxy/src/test/java/com/velocitypowered/proxy/testutil/FakePluginManager.java +++ b/proxy/src/test/java/com/velocitypowered/proxy/testutil/FakePluginManager.java @@ -18,6 +18,7 @@ package com.velocitypowered.proxy.testutil; import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.velocitypowered.api.plugin.PluginContainer; import com.velocitypowered.api.plugin.PluginDescription; import com.velocitypowered.api.plugin.PluginManager; @@ -25,7 +26,7 @@ import java.nio.file.Path; import java.util.Collection; import java.util.Optional; import java.util.concurrent.ExecutorService; -import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.Executors; import org.checkerframework.checker.nullness.qual.NonNull; /** @@ -36,15 +37,19 @@ public class FakePluginManager implements PluginManager { public static final Object PLUGIN_A = new Object(); public static final Object PLUGIN_B = new Object(); - private static final PluginContainer PC_A = new FakePluginContainer("a", PLUGIN_A); - private static final PluginContainer PC_B = new FakePluginContainer("b", PLUGIN_B); + private final PluginContainer containerA = new FakePluginContainer("a", PLUGIN_A); + private final PluginContainer containerB = new FakePluginContainer("b", PLUGIN_B); + + private ExecutorService service = Executors.newCachedThreadPool( + new ThreadFactoryBuilder().setNameFormat("Test Async Thread").setDaemon(true).build() + ); @Override public @NonNull Optional fromInstance(@NonNull Object instance) { if (instance == PLUGIN_A) { - return Optional.of(PC_A); + return Optional.of(containerA); } else if (instance == PLUGIN_B) { - return Optional.of(PC_B); + return Optional.of(containerB); } else { return Optional.empty(); } @@ -54,9 +59,9 @@ public class FakePluginManager implements PluginManager { public @NonNull Optional getPlugin(@NonNull String id) { switch (id) { case "a": - return Optional.of(PC_A); + return Optional.of(containerA); case "b": - return Optional.of(PC_B); + return Optional.of(containerB); default: return Optional.empty(); } @@ -64,7 +69,7 @@ public class FakePluginManager implements PluginManager { @Override public @NonNull Collection getPlugins() { - return ImmutableList.of(PC_A, PC_B); + return ImmutableList.of(containerA, containerB); } @Override @@ -77,16 +82,18 @@ public class FakePluginManager implements PluginManager { throw new UnsupportedOperationException(); } - private static class FakePluginContainer implements PluginContainer { + public void shutdown() { + this.service.shutdownNow(); + } + + private class FakePluginContainer implements PluginContainer { private final String id; private final Object instance; - private final ExecutorService service; private FakePluginContainer(String id, Object instance) { this.id = id; this.instance = instance; - this.service = ForkJoinPool.commonPool(); } @Override