diff --git a/api/build.gradle b/api/build.gradle index 6b8bf79c9..342cc71b9 100644 --- a/api/build.gradle +++ b/api/build.gradle @@ -9,8 +9,8 @@ apply from: '../gradle/publish.gradle' apply plugin: 'com.github.johnrengelman.shadow' java { - sourceCompatibility = JavaVersion.VERSION_1_8 - targetCompatibility = JavaVersion.VERSION_1_8 + sourceCompatibility = JavaVersion.VERSION_11 + targetCompatibility = JavaVersion.VERSION_11 } sourceSets { @@ -19,11 +19,6 @@ sourceSets { } } -java { - sourceCompatibility = JavaVersion.VERSION_1_8 - targetCompatibility = JavaVersion.VERSION_1_8 -} - dependencies { api 'com.google.code.gson:gson:2.8.6' api "com.google.guava:guava:${guavaVersion}" diff --git a/api/src/main/java/com/velocitypowered/api/event/Continuation.java b/api/src/main/java/com/velocitypowered/api/event/Continuation.java new file mode 100644 index 000000000..412bf04ca --- /dev/null +++ b/api/src/main/java/com/velocitypowered/api/event/Continuation.java @@ -0,0 +1,18 @@ +package com.velocitypowered.api.event; + +/** + * Represents a continuation of a paused event handler. Any of the resume methods + * may only be called once otherwise an {@link IllegalStateException} is expected. + */ +public interface Continuation { + + /** + * Resumes the continuation. + */ + void resume(); + + /** + * Resumes the continuation after the executed task failed. + */ + void resumeWithException(Throwable exception); +} diff --git a/api/src/main/java/com/velocitypowered/api/event/EventHandler.java b/api/src/main/java/com/velocitypowered/api/event/EventHandler.java index f6005cdf2..bef9d23e4 100644 --- a/api/src/main/java/com/velocitypowered/api/event/EventHandler.java +++ b/api/src/main/java/com/velocitypowered/api/event/EventHandler.java @@ -7,5 +7,5 @@ package com.velocitypowered.api.event; @FunctionalInterface public interface EventHandler { - void execute(E event); + EventTask execute(E event); } diff --git a/api/src/main/java/com/velocitypowered/api/event/EventTask.java b/api/src/main/java/com/velocitypowered/api/event/EventTask.java new file mode 100644 index 000000000..1c4000e97 --- /dev/null +++ b/api/src/main/java/com/velocitypowered/api/event/EventTask.java @@ -0,0 +1,181 @@ +package com.velocitypowered.api.event; + +import static java.util.Objects.requireNonNull; + +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; + +/** + * Represents a task that can be returned by a {@link EventHandler} which allows event handling to + * be suspended and resumed at a later time, and executing event handlers completely or partially + * asynchronously. + * + *

By default will all event handlers be executed on the thread the event was posted, using + * event tasks this behavior can be altered.

+ */ +public abstract class EventTask { + + EventTask() { + } + + /** + * Whether this {@link EventTask} is required to be called asynchronously. + * + *

If this method returns {@code true}, the event task is guaranteed to be executed + * asynchronously from the current thread. Otherwise, the event task may be executed on the + * current thread or asynchronously.

+ * + * @return Requires async + */ + public abstract boolean requiresAsync(); + + /** + * Represents a basic {@link EventTask}. The execution of the event handlers will resume after + * this basic task is executed using {@link #run()}. + */ + public abstract static class Basic extends EventTask { + + /** + * Runs the task. + */ + public abstract void run(); + } + + /** + * Represents an {@link EventTask} which receives a {@link Continuation} through + * {@link #run(Continuation)}. The continuation must be notified when the task is + * completed, either with {@link Continuation#resume()} if the task was successful or + * {@link Continuation#resumeWithException(Throwable)} if an exception occurred. + * + *

The {@link Continuation} may only be resumed once, or an + * {@link IllegalStateException} is expected.

+ * + *

The {@link Continuation} doesn't need to be notified during the execution of + * {@link #run(Continuation)}, this can happen at a later point in time and from another + * thread.

+ */ + public abstract static class WithContinuation extends EventTask { + + /** + * Runs this async task with the given continuation. + * + * @param continuation The continuation + */ + public abstract void run(Continuation continuation); + } + + /** + * Creates a basic {@link EventTask} from the given {@link Runnable}. The task isn't guaranteed + * to be executed asynchronously ({@link #requiresAsync()} always returns {@code false}). + * + * @param task The task + * @return The event task + */ + public static EventTask.Basic of(final Runnable task) { + requireNonNull(task, "task"); + return new Basic() { + + @Override + public void run() { + task.run(); + } + + @Override + public boolean requiresAsync() { + return false; + } + }; + } + + /** + * Creates a basic async {@link EventTask} from the given {@link Runnable}. The task is guaranteed + * to be executed asynchronously ({@link #requiresAsync()} always returns {@code true}). + * + * @param task The task + * @return The async event task + */ + public static EventTask.Basic async(final Runnable task) { + requireNonNull(task, "task"); + return new Basic() { + + @Override + public void run() { + task.run(); + } + + @Override + public boolean requiresAsync() { + return true; + } + }; + } + + /** + * Creates an continuation based {@link EventTask} from the given {@link Consumer}. The task isn't + * guaranteed to be executed asynchronously ({@link #requiresAsync()} always returns + * {@code false}). + * + * @param task The task to execute + * @return The event task + */ + public static EventTask.WithContinuation withContinuation( + final Consumer task) { + requireNonNull(task, "task"); + return new WithContinuation() { + + @Override + public void run(final Continuation continuation) { + task.accept(continuation); + } + + @Override + public boolean requiresAsync() { + return false; + } + }; + } + + /** + * Creates an async continuation based {@link EventTask} from the given {@link Consumer}. The task + * is guaranteed to be executed asynchronously ({@link #requiresAsync()} always returns + * {@code false}). + * + * @param task The task to execute + * @return The event task + */ + public static EventTask.WithContinuation asyncWithContinuation( + final Consumer task) { + requireNonNull(task, "task"); + return new WithContinuation() { + + @Override + public void run(final Continuation continuation) { + task.accept(continuation); + } + + @Override + public boolean requiresAsync() { + return true; + } + }; + } + + /** + * Creates an continuation based {@link EventTask} for the given {@link CompletableFuture}. The + * continuation will be notified once the given future is completed. + * + * @param future The task to wait for + * @return The event task + */ + public static EventTask.WithContinuation resumeWhenComplete( + final CompletableFuture future) { + requireNonNull(future, "future"); + return withContinuation(continuation -> future.whenComplete((result, cause) -> { + if (cause != null) { + continuation.resumeWithException(cause); + } else { + continuation.resume(); + } + })); + } +} diff --git a/api/src/main/java/com/velocitypowered/api/event/Subscribe.java b/api/src/main/java/com/velocitypowered/api/event/Subscribe.java index 35637f260..bcdc57a6f 100644 --- a/api/src/main/java/com/velocitypowered/api/event/Subscribe.java +++ b/api/src/main/java/com/velocitypowered/api/event/Subscribe.java @@ -13,10 +13,23 @@ import java.lang.annotation.Target; public @interface Subscribe { /** - * The order events will be posted to this listener. + * The order events will be posted to this handler. * * @return the order */ short order() default PostOrder.NORMAL; + /** + * Whether the handler is required to be called asynchronously. + * + *

If this method returns {@code true}, the method is guaranteed to be executed + * asynchronously from the current thread. Otherwise, the handler may be executed on the + * current thread or asynchronously.

+ * + *

If any method handler targeting an event type is marked with {@code true}, then every + * handler targeting that event type will be executed asynchronously.

+ * + * @return Requires async + */ + boolean async() default false; } diff --git a/proxy/build.gradle b/proxy/build.gradle index e7dbaf011..d4a34e027 100644 --- a/proxy/build.gradle +++ b/proxy/build.gradle @@ -9,8 +9,8 @@ apply from: '../gradle/checkstyle.gradle' apply plugin: 'com.github.johnrengelman.shadow' java { - sourceCompatibility = JavaVersion.VERSION_1_8 - targetCompatibility = JavaVersion.VERSION_1_8 + sourceCompatibility = JavaVersion.VERSION_11 + targetCompatibility = JavaVersion.VERSION_11 } test { @@ -69,7 +69,6 @@ dependencies { runtimeOnly 'com.lmax:disruptor:3.4.2' // Async loggers implementation 'it.unimi.dsi:fastutil:8.4.1' - implementation 'net.kyori:event-method-asm:4.0.0-SNAPSHOT' implementation 'net.kyori:adventure-nbt:4.0.0-SNAPSHOT' implementation 'org.asynchttpclient:async-http-client:2.12.1' @@ -78,6 +77,10 @@ dependencies { implementation 'com.electronwill.night-config:toml:3.6.3' + implementation 'org.lanternpowered:lmbda:2.0.0-SNAPSHOT' + + implementation 'com.github.ben-manes.caffeine:caffeine:2.8.8' + compileOnly 'com.github.spotbugs:spotbugs-annotations:4.1.2' testImplementation "org.junit.jupiter:junit-jupiter-api:${junitVersion}" @@ -129,3 +132,7 @@ shadowJar { artifacts { archives shadowJar } + +test { + useJUnitPlatform() +} diff --git a/proxy/src/main/java/com/velocitypowered/proxy/VelocityServer.java b/proxy/src/main/java/com/velocitypowered/proxy/VelocityServer.java index e4af1869f..444e695f9 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/VelocityServer.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/VelocityServer.java @@ -8,6 +8,7 @@ import com.google.gson.GsonBuilder; import com.velocitypowered.api.event.EventManager; import com.velocitypowered.api.event.lifecycle.ProxyInitializeEvent; import com.velocitypowered.api.event.lifecycle.ProxyReloadEvent; +import com.velocitypowered.api.event.lifecycle.ProxyShutdownEvent; import com.velocitypowered.api.network.ProtocolVersion; import com.velocitypowered.api.plugin.PluginContainer; import com.velocitypowered.api.plugin.PluginManager; @@ -26,11 +27,11 @@ import com.velocitypowered.proxy.command.builtin.VelocityCommand; import com.velocitypowered.proxy.config.VelocityConfiguration; import com.velocitypowered.proxy.connection.client.ConnectedPlayer; import com.velocitypowered.proxy.console.VelocityConsole; +import com.velocitypowered.proxy.event.VelocityEventManager; import com.velocitypowered.proxy.network.ConnectionManager; import com.velocitypowered.proxy.network.ProtocolUtils; import com.velocitypowered.proxy.network.serialization.FaviconSerializer; import com.velocitypowered.proxy.network.serialization.GameProfileSerializer; -import com.velocitypowered.proxy.plugin.VelocityEventManager; import com.velocitypowered.proxy.plugin.VelocityPluginManager; import com.velocitypowered.proxy.scheduler.VelocityScheduler; import com.velocitypowered.proxy.server.ServerMap; @@ -422,7 +423,14 @@ public class VelocityServer implements ProxyServer, ForwardingAudience { logger.error("Exception while tearing down player connections", e); } - eventManager.fireShutdownEvent(); + try { + eventManager.fire(new ProxyShutdownEvent()).get(10, TimeUnit.SECONDS); + } catch (TimeoutException e) { + timedOut = true; + } catch (ExecutionException e) { + timedOut = true; + logger.error("Exception while firing the shutdown event", e); + } timedOut = !eventManager.shutdown() || timedOut; timedOut = !scheduler.shutdown() || timedOut; diff --git a/proxy/src/main/java/com/velocitypowered/proxy/command/VelocityCommandManager.java b/proxy/src/main/java/com/velocitypowered/proxy/command/VelocityCommandManager.java index a21b93bfa..d3e4ea445 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/command/VelocityCommandManager.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/command/VelocityCommandManager.java @@ -17,7 +17,7 @@ import com.velocitypowered.api.command.RawCommand; import com.velocitypowered.api.command.SimpleCommand; import com.velocitypowered.api.event.command.CommandExecuteEvent; import com.velocitypowered.api.event.command.CommandExecuteEvent.CommandResult; -import com.velocitypowered.proxy.plugin.VelocityEventManager; +import com.velocitypowered.proxy.event.VelocityEventManager; import com.velocitypowered.proxy.util.BrigadierUtils; import java.util.Iterator; import java.util.List; @@ -147,7 +147,7 @@ public class VelocityCommandManager implements CommandManager { return false; } return executeImmediately0(source, commandResult.getCommand().orElse(event.getCommand())); - }, eventManager.getService()); + }, eventManager.getAsyncExecutor()); } @Override @@ -157,7 +157,7 @@ public class VelocityCommandManager implements CommandManager { Preconditions.checkNotNull(cmdLine, "cmdLine"); return CompletableFuture.supplyAsync( - () -> executeImmediately0(source, cmdLine), eventManager.getService()); + () -> executeImmediately0(source, cmdLine), eventManager.getAsyncExecutor()); } /** diff --git a/proxy/src/main/java/com/velocitypowered/proxy/event/UntargetedEventHandler.java b/proxy/src/main/java/com/velocitypowered/proxy/event/UntargetedEventHandler.java new file mode 100644 index 000000000..f4ae56fb1 --- /dev/null +++ b/proxy/src/main/java/com/velocitypowered/proxy/event/UntargetedEventHandler.java @@ -0,0 +1,19 @@ +package com.velocitypowered.proxy.event; + +import com.velocitypowered.api.event.EventTask; + +public interface UntargetedEventHandler { + + EventTask execute(Object targetInstance, Object event); + + interface Void extends UntargetedEventHandler { + + @Override + default EventTask execute(final Object targetInstance, final Object event) { + executeVoid(targetInstance, event); + return null; + } + + void executeVoid(Object targetInstance, Object event); + } +} diff --git a/proxy/src/main/java/com/velocitypowered/proxy/event/VelocityEventManager.java b/proxy/src/main/java/com/velocitypowered/proxy/event/VelocityEventManager.java new file mode 100644 index 000000000..ee2069fd0 --- /dev/null +++ b/proxy/src/main/java/com/velocitypowered/proxy/event/VelocityEventManager.java @@ -0,0 +1,521 @@ +package com.velocitypowered.proxy.event; + +import static java.util.Objects.requireNonNull; + +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; +import com.google.common.reflect.TypeToken; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.velocitypowered.api.event.Continuation; +import com.velocitypowered.api.event.EventHandler; +import com.velocitypowered.api.event.EventManager; +import com.velocitypowered.api.event.EventTask; +import com.velocitypowered.api.event.Subscribe; +import com.velocitypowered.api.plugin.PluginContainer; +import com.velocitypowered.api.plugin.PluginManager; +import java.lang.invoke.MethodHandles; +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.lanternpowered.lmbda.LambdaFactory; +import org.lanternpowered.lmbda.LambdaType; + +public class VelocityEventManager implements EventManager { + + private static final Logger logger = LogManager.getLogger(VelocityEventManager.class); + + private static final MethodHandles.Lookup methodHandlesLookup = MethodHandles.lookup(); + private static final LambdaType untargetedHandlerType = + LambdaType.of(UntargetedEventHandler.class); + private static final LambdaType untargetedVoidHandlerType = + LambdaType.of(UntargetedEventHandler.Void.class); + + private static final Comparator handlerComparator = + Comparator.comparingInt(o -> o.order); + + private final ExecutorService asyncExecutor; + private final PluginManager pluginManager; + + private final Multimap, HandlerRegistration> handlersByType = HashMultimap.create(); + private final LoadingCache, @Nullable HandlersCache> handlersCache = + Caffeine.newBuilder().build(this::bakeHandlers); + + private final LoadingCache untargetedMethodHandlers = + Caffeine.newBuilder().weakValues().build(this::buildUntargetedMethodHandler); + + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + + /** + * Initializes the Velocity event manager. + * + * @param pluginManager a reference to the Velocity plugin manager + */ + public VelocityEventManager(final PluginManager pluginManager) { + this.pluginManager = pluginManager; + this.asyncExecutor = Executors + .newFixedThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactoryBuilder() + .setNameFormat("Velocity Async Event Executor - #%d").setDaemon(true).build()); + } + + /** + * Represents the registration of a single {@link EventHandler}. + */ + static final class HandlerRegistration { + + final PluginContainer plugin; + final short order; + final Class eventType; + final EventHandler handler; + final AsyncType asyncType; + + /** + * The instance of the {@link EventHandler} or the listener + * instance that was registered. + */ + final Object instance; + + public HandlerRegistration(final PluginContainer plugin, final short order, + final Class eventType, final Object instance, final EventHandler handler, + final AsyncType asyncType) { + this.plugin = plugin; + this.order = order; + this.eventType = eventType; + this.instance = instance; + this.handler = handler; + this.asyncType = asyncType; + } + } + + enum AsyncType { + /** + * The complete event will be handled on an async thread. + */ + ALWAYS, + /** + * The event will initially start on the netty thread, and possibly + * switch over to an async thread. + */ + SOMETIMES, + /** + * The event will never run async, everything is handled on + * the netty thread. + */ + NEVER + } + + static final class HandlersCache { + + final AsyncType asyncType; + final HandlerRegistration[] handlers; + + HandlersCache(final HandlerRegistration[] handlers, final AsyncType asyncType) { + this.asyncType = asyncType; + this.handlers = handlers; + } + } + + private static List> getEventTypes(final Class eventType) { + return TypeToken.of(eventType).getTypes().rawTypes().stream() + .filter(type -> type != Object.class) + .collect(Collectors.toList()); + } + + private @Nullable HandlersCache bakeHandlers(final Class eventType) { + final List baked = new ArrayList<>(); + final List> types = getEventTypes(eventType); + + lock.readLock().lock(); + try { + for (final Class type : types) { + baked.addAll(handlersByType.get(type)); + } + } finally { + lock.readLock().unlock(); + } + + if (baked.isEmpty()) { + return null; + } + + baked.sort(handlerComparator); + + final AsyncType asyncType; + if (baked.stream().anyMatch(reg -> reg.asyncType == AsyncType.ALWAYS)) { + asyncType = AsyncType.ALWAYS; + } else if (baked.stream().anyMatch(reg -> reg.asyncType == AsyncType.SOMETIMES)) { + asyncType = AsyncType.SOMETIMES; + } else { + asyncType = AsyncType.NEVER; + } + + return new HandlersCache(baked.toArray(new HandlerRegistration[0]), asyncType); + } + + /** + * Creates an {@link UntargetedEventHandler} for the given {@link Method}. This essentially + * implements the {@link UntargetedEventHandler} (or the no async task variant) to invoke the + * target method. The implemented class is defined in the same package as the declaring class. + * The {@link UntargetedEventHandler} interface must be public so the implementation can access + * it. + * + * @param method The method to generate an untargeted handler for + * @return The untargeted handler + */ + private UntargetedEventHandler buildUntargetedMethodHandler(final Method method) + throws IllegalAccessException { + final MethodHandles.Lookup lookup = MethodHandles.privateLookupIn( + method.getDeclaringClass(), methodHandlesLookup); + final LambdaType type; + if (EventTask.class.isAssignableFrom(method.getReturnType())) { + type = untargetedHandlerType; + } else { + type = untargetedVoidHandlerType; + } + return LambdaFactory.create(type.defineClassesWith(lookup), lookup.unreflect(method)); + } + + static final class MethodHandlerInfo { + + final Method method; + final AsyncType asyncType; + final @Nullable Class eventType; + final short order; + final @Nullable String errors; + + private MethodHandlerInfo(final Method method, final AsyncType asyncType, + final @Nullable Class eventType, final short order, final @Nullable String errors) { + this.method = method; + this.asyncType = asyncType; + this.eventType = eventType; + this.order = order; + this.errors = errors; + } + } + + private void collectMethods(final Class targetClass, + final Map collected) { + for (final Method method : targetClass.getDeclaredMethods()) { + final Subscribe subscribe = method.getAnnotation(Subscribe.class); + if (subscribe == null) { + continue; + } + String key = method.getName() + + "(" + + Arrays.stream(method.getParameterTypes()) + .map(Class::getName) + .collect(Collectors.joining(",")) + + ")"; + if (Modifier.isPrivate(method.getModifiers())) { + key = targetClass.getName() + "$" + key; + } + if (collected.containsKey(key)) { + continue; + } + final Set errors = new HashSet<>(); + if (Modifier.isStatic(method.getModifiers())) { + errors.add("method must not be static"); + } + if (Modifier.isAbstract(method.getModifiers())) { + errors.add("method must not be abstract"); + } + Class eventType = null; + if (method.getParameterCount() != 1) { + errors.add("method must have a single parameter which is the event"); + } else { + eventType = method.getParameterTypes()[0]; + } + AsyncType asyncType = AsyncType.NEVER; + final Class returnType = method.getReturnType(); + if (returnType != void.class + && returnType != EventTask.class + && returnType != EventTask.Basic.class + && returnType != EventTask.WithContinuation.class) { + errors.add("method return type must be void, AsyncTask, " + + "AsyncTask.Basic or AsyncTask.WithContinuation"); + } else if (returnType == EventTask.class + || returnType == EventTask.Basic.class + || returnType == EventTask.WithContinuation.class) { + asyncType = AsyncType.SOMETIMES; + } + if (subscribe.async()) { + asyncType = AsyncType.ALWAYS; + } + final short order = subscribe.order(); + final String errorsJoined = errors.isEmpty() ? null : String.join(",", errors); + collected.put(key, new MethodHandlerInfo(method, asyncType, eventType, order, errorsJoined)); + } + final Class superclass = targetClass.getSuperclass(); + if (superclass != Object.class) { + collectMethods(superclass, collected); + } + } + + private PluginContainer ensurePlugin(final Object plugin) { + requireNonNull(plugin, "plugin"); + return pluginManager.fromInstance(plugin) + .orElseThrow(() -> new IllegalArgumentException("Specified plugin is not loaded")); + } + + private void register(final List registrations) { + lock.writeLock().lock(); + try { + for (final HandlerRegistration registration : registrations) { + handlersByType.put(registration.eventType, registration); + } + } finally { + lock.writeLock().unlock(); + } + // Invalidate all the affected event subtypes + handlersCache.invalidateAll(registrations.stream() + .flatMap(registration -> getEventTypes(registration.eventType).stream()) + .distinct() + .collect(Collectors.toList())); + } + + @Override + public void register(final Object plugin, final Object listener) { + requireNonNull(listener, "listener"); + final PluginContainer pluginContainer = ensurePlugin(plugin); + if (plugin == listener) { + throw new IllegalArgumentException("The plugin main instance is automatically registered."); + } + + final Class targetClass = listener.getClass(); + final Map collected = new HashMap<>(); + collectMethods(targetClass, collected); + + final List registrations = new ArrayList<>(); + for (final MethodHandlerInfo info : collected.values()) { + if (info.errors != null) { + logger.info("Invalid listener method {} in {}: {}", + info.method.getName(), info.method.getDeclaringClass().getName(), info.errors); + continue; + } + final UntargetedEventHandler untargetedHandler = + untargetedMethodHandlers.get(info.method); + assert untargetedHandler != null; + final EventHandler handler = event -> untargetedHandler.execute(listener, event); + registrations.add(new HandlerRegistration(pluginContainer, info.order, + info.eventType, listener, handler, info.asyncType)); + } + + register(registrations); + } + + @Override + @SuppressWarnings("unchecked") + public void register(final Object plugin, final Class eventClass, + final short order, final EventHandler handler) { + final PluginContainer pluginContainer = ensurePlugin(plugin); + requireNonNull(eventClass, "eventClass"); + requireNonNull(handler, "handler"); + + final HandlerRegistration registration = new HandlerRegistration(pluginContainer, order, + eventClass, handler, (EventHandler) handler, AsyncType.SOMETIMES); + register(Collections.singletonList(registration)); + } + + @Override + public void unregisterListeners(final Object plugin) { + final PluginContainer pluginContainer = ensurePlugin(plugin); + unregisterIf(registration -> registration.plugin == pluginContainer); + } + + @Override + public void unregisterListener(final Object plugin, final Object handler) { + final PluginContainer pluginContainer = ensurePlugin(plugin); + requireNonNull(handler, "handler"); + unregisterIf(registration -> + registration.plugin == pluginContainer && registration.handler == handler); + } + + @Override + public void unregister(final Object plugin, final EventHandler handler) { + unregisterListener(plugin, handler); + } + + private void unregisterIf(final Predicate predicate) { + final List removed = new ArrayList<>(); + lock.writeLock().lock(); + try { + final Iterator it = handlersByType.values().iterator(); + while (it.hasNext()) { + final HandlerRegistration registration = it.next(); + if (predicate.test(registration)) { + it.remove(); + removed.add(registration); + } + } + } finally { + lock.writeLock().unlock(); + } + + // Invalidate all the affected event subtypes + handlersCache.invalidateAll(removed.stream() + .flatMap(registration -> getEventTypes(registration.eventType).stream()) + .distinct() + .collect(Collectors.toList())); + } + + @Override + public void fireAndForget(final Object event) { + requireNonNull(event, "event"); + final HandlersCache handlersCache = this.handlersCache.get(event.getClass()); + if (handlersCache == null) { + // Optimization: nobody's listening. + return; + } + fire(null, event, handlersCache); + } + + @Override + public CompletableFuture fire(final E event) { + requireNonNull(event, "event"); + final HandlersCache handlersCache = this.handlersCache.get(event.getClass()); + if (handlersCache == null) { + // Optimization: nobody's listening. + return CompletableFuture.completedFuture(event); + } + final CompletableFuture future = new CompletableFuture<>(); + fire(future, event, handlersCache); + return future; + } + + private void fire(final @Nullable CompletableFuture future, + final E event, final HandlersCache handlersCache) { + if (handlersCache.asyncType == AsyncType.ALWAYS) { + // We already know that the event needs to be handled async, so + // execute it asynchronously from the start + asyncExecutor.execute(() -> fire(future, event, 0, true, handlersCache.handlers)); + } else { + fire(future, event, 0, false, handlersCache.handlers); + } + } + + private void fire(final @Nullable CompletableFuture future, final E event, + final int offset, final boolean currentlyAsync, final HandlerRegistration[] registrations) { + for (int i = offset; i < registrations.length; i++) { + final HandlerRegistration registration = registrations[i]; + try { + final EventTask eventTask = registration.handler.execute(event); + if (eventTask == null) { + continue; + } + if (eventTask instanceof EventTask.WithContinuation) { + final EventTask.WithContinuation withContinuation = + (EventTask.WithContinuation) eventTask; + final int index = i; + final Continuation continuation = new Continuation() { + private final AtomicBoolean resumed = new AtomicBoolean(); + + @Override + public void resume() { + resume(null); + } + + void resume(final @Nullable Throwable exception) { + // Only allow the continuation to be resumed once + if (!resumed.compareAndSet(false, true)) { + throw new IllegalStateException("The continuation can only be resumed once."); + } + if (exception != null) { + logHandlerException(registration, exception); + } + if (index + 1 == registrations.length) { + // Optimization: don't schedule a task just to complete the future + if (future != null) { + future.complete(event); + } + return; + } + asyncExecutor.execute(() -> fire(future, event, index + 1, true, registrations)); + } + + @Override + public void resumeWithException(final Throwable exception) { + resume(requireNonNull(exception, "exception")); + } + }; + final Runnable task = () -> { + try { + withContinuation.run(continuation); + } catch (final Throwable t) { + continuation.resumeWithException(t); + } + }; + if (currentlyAsync || !eventTask.requiresAsync()) { + task.run(); + } else { + asyncExecutor.execute(task); + } + // fire will continue in another thread once the async task is + // executed and the continuation is resumed + return; + } else { + final EventTask.Basic basic = (EventTask.Basic) eventTask; + if (currentlyAsync || !basic.requiresAsync()) { + // We are already async or we don't need async, so we can just run the + // task and continue with the next handler + basic.run(); + } else { + final int index = i; + // We are not yet in an async context, so the async task needs to be scheduled + // to the async executor, the event handling will continue on an async thread. + asyncExecutor.execute(() -> { + try { + basic.run(); + } catch (final Throwable t) { + logHandlerException(registration, t); + } + fire(future, event, index + 1, true, registrations); + }); + return; // fire will continue in another thread once the async task is completed + } + } + } catch (final Throwable t) { + logHandlerException(registration, t); + } + } + if (future != null) { + future.complete(event); + } + } + + private static void logHandlerException( + final HandlerRegistration registration, final Throwable t) { + logger.error("Couldn't pass {} to {}", registration.eventType.getSimpleName(), + registration.plugin.getDescription().getId(), t); + } + + public boolean shutdown() throws InterruptedException { + asyncExecutor.shutdown(); + return asyncExecutor.awaitTermination(10, TimeUnit.SECONDS); + } + + public ExecutorService getAsyncExecutor() { + return asyncExecutor; + } +} diff --git a/proxy/src/main/java/com/velocitypowered/proxy/plugin/VelocityEventManager.java b/proxy/src/main/java/com/velocitypowered/proxy/plugin/VelocityEventManager.java deleted file mode 100644 index 8a7e2b53d..000000000 --- a/proxy/src/main/java/com/velocitypowered/proxy/plugin/VelocityEventManager.java +++ /dev/null @@ -1,229 +0,0 @@ -package com.velocitypowered.proxy.plugin; - -import com.google.common.base.Preconditions; -import com.google.common.collect.ListMultimap; -import com.google.common.collect.Multimaps; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.velocitypowered.api.event.EventHandler; -import com.velocitypowered.api.event.EventManager; -import com.velocitypowered.api.event.Subscribe; -import com.velocitypowered.api.event.lifecycle.ProxyShutdownEvent; -import com.velocitypowered.api.plugin.PluginManager; -import java.lang.reflect.Method; -import java.net.URL; -import java.security.AccessController; -import java.security.PrivilegedAction; -import java.util.ArrayList; -import java.util.Collection; -import java.util.IdentityHashMap; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import net.kyori.event.EventSubscriber; -import net.kyori.event.PostResult; -import net.kyori.event.SimpleEventBus; -import net.kyori.event.method.MethodScanner; -import net.kyori.event.method.MethodSubscriptionAdapter; -import net.kyori.event.method.SimpleMethodSubscriptionAdapter; -import net.kyori.event.method.asm.ASMEventExecutorFactory; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.checkerframework.checker.nullness.qual.NonNull; - -public class VelocityEventManager implements EventManager { - - private static final Logger logger = LogManager.getLogger(VelocityEventManager.class); - - private final ListMultimap registeredListenersByPlugin = Multimaps - .synchronizedListMultimap(Multimaps.newListMultimap(new IdentityHashMap<>(), ArrayList::new)); - private final ListMultimap> registeredHandlersByPlugin = Multimaps - .synchronizedListMultimap(Multimaps.newListMultimap(new IdentityHashMap<>(), ArrayList::new)); - private final SimpleEventBus bus; - private final MethodSubscriptionAdapter methodAdapter; - private final ExecutorService service; - private final PluginManager pluginManager; - - /** - * Initializes the Velocity event manager. - * - * @param pluginManager a reference to the Velocity plugin manager - */ - public VelocityEventManager(PluginManager pluginManager) { - // Expose the event executors to the plugins - required in order for the generated ASM classes - // to work. - PluginClassLoader cl = AccessController.doPrivileged( - (PrivilegedAction) () -> new PluginClassLoader(new URL[0])); - cl.addToClassloaders(); - - // Initialize the event bus. - this.bus = new SimpleEventBus(Object.class) { - @Override - protected boolean shouldPost(@NonNull Object event, @NonNull EventSubscriber subscriber) { - // Velocity doesn't use Cancellable or generic events, so we can skip those checks. - return true; - } - }; - this.methodAdapter = new SimpleMethodSubscriptionAdapter<>(bus, - new ASMEventExecutorFactory<>(cl), - new VelocityMethodScanner()); - this.pluginManager = pluginManager; - this.service = Executors - .newFixedThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactoryBuilder() - .setNameFormat("Velocity Event Executor - #%d").setDaemon(true).build()); - } - - private void ensurePlugin(Object plugin) { - Preconditions.checkNotNull(plugin, "plugin"); - Preconditions.checkArgument(pluginManager.fromInstance(plugin).isPresent(), - "Specified plugin is not loaded"); - } - - @Override - public void register(Object plugin, Object listener) { - ensurePlugin(plugin); - Preconditions.checkNotNull(listener, "listener"); - if (plugin == listener && registeredListenersByPlugin.containsEntry(plugin, plugin)) { - throw new IllegalArgumentException("The plugin main instance is automatically registered."); - } - - registeredListenersByPlugin.put(plugin, listener); - methodAdapter.register(listener); - } - - @Override - @SuppressWarnings("type.argument.type.incompatible") - public void register(Object plugin, Class eventClass, short postOrder, - EventHandler handler) { - ensurePlugin(plugin); - Preconditions.checkNotNull(eventClass, "eventClass"); - Preconditions.checkNotNull(handler, "listener"); - - registeredHandlersByPlugin.put(plugin, handler); - bus.register(eventClass, new KyoriToVelocityHandler<>(handler, postOrder)); - } - - @Override - public CompletableFuture fire(E event) { - if (event == null) { - throw new NullPointerException("event"); - } - if (!bus.hasSubscribers(event.getClass())) { - // Optimization: nobody's listening. - return CompletableFuture.completedFuture(event); - } - - return CompletableFuture.supplyAsync(() -> { - fireEvent(event); - return event; - }, service); - } - - @Override - public void fireAndForget(Object event) { - if (event == null) { - throw new NullPointerException("event"); - } - if (!bus.hasSubscribers(event.getClass())) { - // Optimization: nobody's listening. - return; - } - service.execute(() -> fireEvent(event)); - } - - private void fireEvent(Object event) { - PostResult result = bus.post(event); - if (!result.exceptions().isEmpty()) { - logger.error("Some errors occurred whilst posting event {}.", event); - int i = 0; - for (Throwable exception : result.exceptions().values()) { - logger.error("#{}: \n", ++i, exception); - } - } - } - - private void unregisterHandler(EventHandler handler) { - bus.unregister(s -> s instanceof KyoriToVelocityHandler - && ((KyoriToVelocityHandler) s).handler == handler); - } - - @Override - public void unregisterListeners(Object plugin) { - ensurePlugin(plugin); - Collection listeners = registeredListenersByPlugin.removeAll(plugin); - listeners.forEach(methodAdapter::unregister); - Collection> handlers = registeredHandlersByPlugin.removeAll(plugin); - handlers.forEach(this::unregisterHandler); - } - - @Override - public void unregisterListener(Object plugin, Object listener) { - ensurePlugin(plugin); - Preconditions.checkNotNull(listener, "listener"); - if (registeredListenersByPlugin.remove(plugin, listener)) { - methodAdapter.unregister(listener); - } - } - - @Override - public void unregister(Object plugin, EventHandler handler) { - ensurePlugin(plugin); - Preconditions.checkNotNull(handler, "listener"); - if (registeredHandlersByPlugin.remove(plugin, handler)) { - unregisterHandler(handler); - } - } - - public boolean shutdown() throws InterruptedException { - service.shutdown(); - return service.awaitTermination(10, TimeUnit.SECONDS); - } - - public void fireShutdownEvent() { - // We shut down the proxy already, so the fact this executes in the main thread is irrelevant. - fireEvent(new ProxyShutdownEvent()); - } - - public ExecutorService getService() { - return service; - } - - private static class VelocityMethodScanner implements MethodScanner { - - @Override - public boolean shouldRegister(@NonNull Object listener, @NonNull Method method) { - return method.isAnnotationPresent(Subscribe.class); - } - - @Override - public int postOrder(@NonNull Object listener, @NonNull Method method) { - return method.getAnnotation(Subscribe.class).order(); - } - - @Override - public boolean consumeCancelledEvents(@NonNull Object listener, @NonNull Method method) { - return true; - } - } - - private static class KyoriToVelocityHandler implements EventSubscriber { - - private final EventHandler handler; - private final short postOrder; - - private KyoriToVelocityHandler(EventHandler handler, short postOrder) { - this.handler = handler; - this.postOrder = postOrder; - } - - @Override - public void invoke(@NonNull E event) { - handler.execute(event); - } - - @Override - public int postOrder() { - return postOrder; - } - } -} diff --git a/proxy/src/test/java/com/velocitypowered/proxy/command/CommandManagerTests.java b/proxy/src/test/java/com/velocitypowered/proxy/command/CommandManagerTests.java index 729bb17ca..7f964d1c8 100644 --- a/proxy/src/test/java/com/velocitypowered/proxy/command/CommandManagerTests.java +++ b/proxy/src/test/java/com/velocitypowered/proxy/command/CommandManagerTests.java @@ -21,8 +21,8 @@ import com.velocitypowered.api.command.CommandMeta; import com.velocitypowered.api.command.CommandSource; import com.velocitypowered.api.command.RawCommand; import com.velocitypowered.api.command.SimpleCommand; -import com.velocitypowered.proxy.plugin.MockEventManager; -import com.velocitypowered.proxy.plugin.VelocityEventManager; +import com.velocitypowered.proxy.event.MockEventManager; +import com.velocitypowered.proxy.event.VelocityEventManager; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; diff --git a/proxy/src/test/java/com/velocitypowered/proxy/event/EventTest.java b/proxy/src/test/java/com/velocitypowered/proxy/event/EventTest.java new file mode 100644 index 000000000..93d71e052 --- /dev/null +++ b/proxy/src/test/java/com/velocitypowered/proxy/event/EventTest.java @@ -0,0 +1,226 @@ +package com.velocitypowered.proxy.event; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.velocitypowered.api.event.EventTask; +import com.velocitypowered.api.event.PostOrder; +import com.velocitypowered.api.event.Subscribe; +import com.velocitypowered.proxy.testutil.FakePluginManager; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public class EventTest { + + private final VelocityEventManager eventManager = + new VelocityEventManager(new FakePluginManager()); + + @AfterAll + void shutdown() throws Exception { + eventManager.shutdown(); + } + + static final class TestEvent { + } + + static void assertAsyncThread(final Thread thread) { + assertTrue(thread.getName().contains("Velocity Async Event Executor")); + } + + static void assertSyncThread(final Thread thread) { + assertEquals(Thread.currentThread(), thread); + } + + private void handleMethodListener(final Object listener) throws Exception { + eventManager.register(FakePluginManager.PLUGIN_A, listener); + try { + eventManager.fire(new TestEvent()).get(); + } finally { + eventManager.unregisterListeners(FakePluginManager.PLUGIN_A); + } + } + + @Test + void testAlwaysSync() throws Exception { + final AlwaysSyncListener listener = new AlwaysSyncListener(); + handleMethodListener(listener); + assertSyncThread(listener.thread); + assertEquals(1, listener.result); + } + + static final class AlwaysSyncListener { + + Thread thread; + int result; + + @Subscribe + void sync(TestEvent event) { + result++; + thread = Thread.currentThread(); + } + } + + @Test + void testAlwaysAsync() throws Exception { + final AlwaysAsyncListener listener = new AlwaysAsyncListener(); + handleMethodListener(listener); + assertAsyncThread(listener.threadA); + assertAsyncThread(listener.threadB); + assertAsyncThread(listener.threadC); + assertEquals(3, listener.result); + } + + static final class AlwaysAsyncListener { + + Thread threadA; + Thread threadB; + Thread threadC; + int result; + + @Subscribe(async = true) + void async0(TestEvent event) { + result++; + threadA = Thread.currentThread(); + } + + @Subscribe + EventTask async1(TestEvent event) { + threadB = Thread.currentThread(); + return EventTask.async(() -> result++); + } + + @Subscribe + void async2(TestEvent event) { + result++; + threadC = Thread.currentThread(); + } + } + + @Test + void testSometimesAsync() throws Exception { + final SometimesAsyncListener listener = new SometimesAsyncListener(); + handleMethodListener(listener); + assertSyncThread(listener.threadA); + assertSyncThread(listener.threadB); + assertAsyncThread(listener.threadC); + assertAsyncThread(listener.threadD); + assertEquals(3, listener.result); + } + + static final class SometimesAsyncListener { + + Thread threadA; + Thread threadB; + Thread threadC; + Thread threadD; + int result; + + @Subscribe(order = PostOrder.EARLY) + void notAsync(TestEvent event) { + result++; + threadA = Thread.currentThread(); + } + + @Subscribe + EventTask notAsyncUntilTask(TestEvent event) { + threadB = Thread.currentThread(); + return EventTask.async(() -> { + threadC = Thread.currentThread(); + result++; + }); + } + + @Subscribe(order = PostOrder.LATE) + void stillAsyncAfterTask(TestEvent event) { + threadD = Thread.currentThread(); + result++; + } + } + + @Test + void testContinuation() throws Exception { + final ContinuationListener listener = new ContinuationListener(); + handleMethodListener(listener); + assertSyncThread(listener.threadA); + assertSyncThread(listener.threadB); + assertAsyncThread(listener.threadC); + assertEquals(2, listener.value.get()); + } + + static final class ContinuationListener { + + Thread threadA; + Thread threadB; + Thread threadC; + + final AtomicInteger value = new AtomicInteger(); + + @Subscribe(order = PostOrder.EARLY) + EventTask continuation(TestEvent event) { + threadA = Thread.currentThread(); + return EventTask.withContinuation(continuation -> { + value.incrementAndGet(); + threadB = Thread.currentThread(); + new Thread(() -> { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + e.printStackTrace(); + } + value.incrementAndGet(); + continuation.resume(); + }).start(); + }); + } + + @Subscribe(order = PostOrder.LATE) + void afterContinuation(TestEvent event) { + threadC = Thread.currentThread(); + } + } + + @Test + void testAsyncContinuation() throws Exception { + final AsyncContinuationListener listener = new AsyncContinuationListener(); + handleMethodListener(listener); + assertSyncThread(listener.threadA); + assertAsyncThread(listener.threadB); + assertAsyncThread(listener.threadC); + assertEquals(2, listener.value.get()); + } + + static final class AsyncContinuationListener { + + Thread threadA; + Thread threadB; + Thread threadC; + + final AtomicInteger value = new AtomicInteger(); + + @Subscribe(order = PostOrder.EARLY) + EventTask continuation(TestEvent event) { + threadA = Thread.currentThread(); + return EventTask.asyncWithContinuation(continuation -> { + value.incrementAndGet(); + threadB = Thread.currentThread(); + new Thread(() -> { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + e.printStackTrace(); + } + value.incrementAndGet(); + continuation.resume(); + }).start(); + }); + } + + @Subscribe(order = PostOrder.LATE) + void afterContinuation(TestEvent event) { + threadC = Thread.currentThread(); + } + } +} diff --git a/proxy/src/test/java/com/velocitypowered/proxy/plugin/MockEventManager.java b/proxy/src/test/java/com/velocitypowered/proxy/event/MockEventManager.java similarity index 67% rename from proxy/src/test/java/com/velocitypowered/proxy/plugin/MockEventManager.java rename to proxy/src/test/java/com/velocitypowered/proxy/event/MockEventManager.java index 20be3a079..6b937cda5 100644 --- a/proxy/src/test/java/com/velocitypowered/proxy/plugin/MockEventManager.java +++ b/proxy/src/test/java/com/velocitypowered/proxy/event/MockEventManager.java @@ -1,4 +1,6 @@ -package com.velocitypowered.proxy.plugin; +package com.velocitypowered.proxy.event; + +import com.velocitypowered.proxy.plugin.MockPluginManager; /** * A mock {@link VelocityEventManager}. Must be shutdown after use!