diff --git a/api/src/main/java/com/velocitypowered/api/event/AsyncEventExecutor.java b/api/src/main/java/com/velocitypowered/api/event/AsyncEventExecutor.java new file mode 100644 index 000000000..cd4e72141 --- /dev/null +++ b/api/src/main/java/com/velocitypowered/api/event/AsyncEventExecutor.java @@ -0,0 +1,21 @@ +/* + * Copyright (C) 2018 Velocity Contributors + * + * The Velocity API is licensed under the terms of the MIT License. For more details, + * reference the LICENSE file in the api top-level directory. + */ + +package com.velocitypowered.api.event; + +import org.checkerframework.checker.nullness.qual.Nullable; + +@FunctionalInterface +public interface AsyncEventExecutor extends EventHandler { + + default void execute(E event) { + throw new UnsupportedOperationException( + "This event handler can only be invoked asynchronously."); + } + + @Nullable EventTask executeAsync(E event); +} diff --git a/api/src/main/java/com/velocitypowered/api/event/Continuation.java b/api/src/main/java/com/velocitypowered/api/event/Continuation.java new file mode 100644 index 000000000..d8f2fa974 --- /dev/null +++ b/api/src/main/java/com/velocitypowered/api/event/Continuation.java @@ -0,0 +1,25 @@ +/* + * Copyright (C) 2018 Velocity Contributors + * + * The Velocity API is licensed under the terms of the MIT License. For more details, + * reference the LICENSE file in the api top-level directory. + */ + +package com.velocitypowered.api.event; + +/** + * Represents a continuation of a paused event handler. Any of the resume methods + * may only be called once otherwise an {@link IllegalStateException} is expected. + */ +public interface Continuation { + + /** + * Resumes the continuation. + */ + void resume(); + + /** + * Resumes the continuation after the executed task failed. + */ + void resumeWithException(Throwable exception); +} \ No newline at end of file diff --git a/api/src/main/java/com/velocitypowered/api/event/EventHandler.java b/api/src/main/java/com/velocitypowered/api/event/EventHandler.java index 2db7c1fc2..c18ea098d 100644 --- a/api/src/main/java/com/velocitypowered/api/event/EventHandler.java +++ b/api/src/main/java/com/velocitypowered/api/event/EventHandler.java @@ -7,12 +7,21 @@ package com.velocitypowered.api.event; +import org.checkerframework.checker.nullness.qual.Nullable; + /** * Represents an interface to perform direct dispatch of an event. This makes integration easier to - * achieve with platforms such as RxJava. + * achieve with platforms such as RxJava. While this interface can be used to implement an + * asynchronous event handler, {@link AsyncEventExecutor} provides a more idiomatic means of doing + * so. */ @FunctionalInterface public interface EventHandler { void execute(E event); + + default @Nullable EventTask executeAsync(E event) { + execute(event); + return null; + } } diff --git a/api/src/main/java/com/velocitypowered/api/event/EventTask.java b/api/src/main/java/com/velocitypowered/api/event/EventTask.java new file mode 100644 index 000000000..37b3b22b9 --- /dev/null +++ b/api/src/main/java/com/velocitypowered/api/event/EventTask.java @@ -0,0 +1,121 @@ +/* + * Copyright (C) 2018 Velocity Contributors + * + * The Velocity API is licensed under the terms of the MIT License. For more details, + * reference the LICENSE file in the api top-level directory. + */ + +package com.velocitypowered.api.event; + +import static java.util.Objects.requireNonNull; + +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; + +/** + * Represents a task that can be returned by a {@link EventHandler} which allows event handling to + * be suspended and resumed at a later time, and executing event handlers completely or partially + * asynchronously. + * + *

Compatibility notice: While in Velocity 3.0.0, all event handlers still + * execute asynchronously (to preserve backwards compatibility), this will not be the case in future + * versions of Velocity. Please prepare your code by using continuations or returning an instance + * returned by {@link #async(Runnable)}.

+ */ +public interface EventTask { + + /** + * Whether this {@link EventTask} is required to be called asynchronously. + * + *

If this method returns {@code true}, the event task is guaranteed to be executed + * asynchronously from the current thread. Otherwise, the event task may be executed on the + * current thread or asynchronously.

+ * + * @return Requires async + */ + boolean requiresAsync(); + + /** + * Runs this event task with the given {@link Continuation}. The continuation must be notified + * when the task is completed, either with {@link Continuation#resume()} if the task was + * successful or {@link Continuation#resumeWithException(Throwable)} if an exception occurred. + * + *

The {@link Continuation} may only be resumed once, or an + * {@link IllegalStateException} will be thrown.

+ * + *

The {@link Continuation} doesn't need to be notified during the execution of this method, + * this can happen at a later point in time and from another thread.

+ * + * @param continuation The continuation + */ + void execute(Continuation continuation); + + /** + * Creates a basic async {@link EventTask} from the given {@link Runnable}. The task is guaranteed + * to be executed asynchronously ({@link #requiresAsync()} always returns {@code true}). + * + * @param task The task + * @return The async event task + */ + static EventTask async(final Runnable task) { + requireNonNull(task, "task"); + return new EventTask() { + + @Override + public void execute(Continuation continuation) { + task.run(); + continuation.resume(); + } + + @Override + public boolean requiresAsync() { + return true; + } + }; + } + + /** + * Creates an continuation based {@link EventTask} from the given {@link Consumer}. The task isn't + * guaranteed to be executed asynchronously ({@link #requiresAsync()} always returns + * {@code false}). + * + * @param task The task to execute + * @return The event task + */ + static EventTask withContinuation(final Consumer task) { + requireNonNull(task, "task"); + return new EventTask() { + + @Override + public void execute(final Continuation continuation) { + task.accept(continuation); + } + + @Override + public boolean requiresAsync() { + return false; + } + }; + } + + /** + * Creates an continuation based {@link EventTask} for the given {@link CompletableFuture}. The + * continuation will be notified once the given future is completed. + * + * @param future The task to wait for + * @return The event task + */ + // The Error Prone annotation here is spurious. The Future is handled via the CompletableFuture + // API, which does NOT use the traditional blocking model. + @SuppressWarnings("FutureReturnValueIgnored") + static EventTask resumeWhenComplete(final CompletableFuture future) { + requireNonNull(future, "future"); + return withContinuation(continuation -> future.whenComplete((result, cause) -> { + if (cause != null) { + continuation.resumeWithException(cause); + } else { + continuation.resume(); + } + })); + } +} \ No newline at end of file diff --git a/api/src/main/java/com/velocitypowered/api/plugin/PluginManager.java b/api/src/main/java/com/velocitypowered/api/plugin/PluginManager.java index 7af16566d..a88fd9b34 100644 --- a/api/src/main/java/com/velocitypowered/api/plugin/PluginManager.java +++ b/api/src/main/java/com/velocitypowered/api/plugin/PluginManager.java @@ -57,4 +57,16 @@ public interface PluginManager { * @throws UnsupportedOperationException if the operation is not applicable to this plugin */ void addToClasspath(Object plugin, Path path); + + /** + * Ensures a plugin container exists for the given {@code plugin}. + * + * @param plugin the instance to look up the container for + * @return container for the plugin + */ + default PluginContainer ensurePluginContainer(Object plugin) { + return this.fromInstance(plugin) + .orElseThrow(() -> new IllegalArgumentException(plugin.getClass().getCanonicalName() + + " does not have a container.")); + } } diff --git a/proxy/build.gradle b/proxy/build.gradle index 49f9bc19f..60a9f156c 100644 --- a/proxy/build.gradle +++ b/proxy/build.gradle @@ -78,6 +78,9 @@ dependencies { implementation 'com.electronwill.night-config:toml:3.6.3' implementation 'org.bstats:bstats-base:2.2.0' + implementation 'org.lanternpowered:lmbda:2.0.0' + + implementation 'com.github.ben-manes.caffeine:caffeine:2.8.8' compileOnly 'com.github.spotbugs:spotbugs-annotations:4.1.2' diff --git a/proxy/src/main/java/com/velocitypowered/proxy/VelocityServer.java b/proxy/src/main/java/com/velocitypowered/proxy/VelocityServer.java index b445932ac..ed3bbf801 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/VelocityServer.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/VelocityServer.java @@ -25,6 +25,7 @@ import com.google.gson.GsonBuilder; import com.velocitypowered.api.event.EventManager; import com.velocitypowered.api.event.proxy.ProxyInitializeEvent; import com.velocitypowered.api.event.proxy.ProxyReloadEvent; +import com.velocitypowered.api.event.proxy.ProxyShutdownEvent; import com.velocitypowered.api.network.ProtocolVersion; import com.velocitypowered.api.plugin.PluginContainer; import com.velocitypowered.api.plugin.PluginManager; @@ -43,8 +44,8 @@ import com.velocitypowered.proxy.command.builtin.VelocityCommand; import com.velocitypowered.proxy.config.VelocityConfiguration; import com.velocitypowered.proxy.connection.client.ConnectedPlayer; import com.velocitypowered.proxy.console.VelocityConsole; +import com.velocitypowered.proxy.event.VelocityEventManager; import com.velocitypowered.proxy.network.ConnectionManager; -import com.velocitypowered.proxy.plugin.VelocityEventManager; import com.velocitypowered.proxy.plugin.VelocityPluginManager; import com.velocitypowered.proxy.protocol.ProtocolUtils; import com.velocitypowered.proxy.protocol.util.FaviconSerializer; @@ -277,7 +278,7 @@ public class VelocityServer implements ProxyServer, ForwardingAudience { Optional instance = plugin.getInstance(); if (instance.isPresent()) { try { - eventManager.register(instance.get(), instance.get()); + eventManager.registerInternally(plugin, instance.get()); } catch (Exception e) { logger.error("Unable to register plugin listener for {}", plugin.getDescription().getName().orElse(plugin.getDescription().getId()), e); @@ -433,7 +434,7 @@ public class VelocityServer implements ProxyServer, ForwardingAudience { logger.error("Exception while tearing down player connections", e); } - eventManager.fireShutdownEvent(); + eventManager.fire(new ProxyShutdownEvent()).join(); timedOut = !eventManager.shutdown() || timedOut; timedOut = !scheduler.shutdown() || timedOut; diff --git a/proxy/src/main/java/com/velocitypowered/proxy/command/VelocityCommandManager.java b/proxy/src/main/java/com/velocitypowered/proxy/command/VelocityCommandManager.java index 573a1b596..48295a2bb 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/command/VelocityCommandManager.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/command/VelocityCommandManager.java @@ -34,7 +34,7 @@ import com.velocitypowered.api.command.RawCommand; import com.velocitypowered.api.command.SimpleCommand; import com.velocitypowered.api.event.command.CommandExecuteEvent; import com.velocitypowered.api.event.command.CommandExecuteEvent.CommandResult; -import com.velocitypowered.proxy.plugin.VelocityEventManager; +import com.velocitypowered.proxy.event.VelocityEventManager; import com.velocitypowered.proxy.util.BrigadierUtils; import java.util.Iterator; import java.util.List; @@ -164,7 +164,7 @@ public class VelocityCommandManager implements CommandManager { return false; } return executeImmediately0(source, commandResult.getCommand().orElse(event.getCommand())); - }, eventManager.getService()); + }, eventManager.getAsyncExecutor()); } @Override @@ -174,7 +174,7 @@ public class VelocityCommandManager implements CommandManager { Preconditions.checkNotNull(cmdLine, "cmdLine"); return CompletableFuture.supplyAsync( - () -> executeImmediately0(source, cmdLine), eventManager.getService()); + () -> executeImmediately0(source, cmdLine), eventManager.getAsyncExecutor()); } /** diff --git a/proxy/src/main/java/com/velocitypowered/proxy/event/CustomHandlerAdapter.java b/proxy/src/main/java/com/velocitypowered/proxy/event/CustomHandlerAdapter.java new file mode 100644 index 000000000..7215b8308 --- /dev/null +++ b/proxy/src/main/java/com/velocitypowered/proxy/event/CustomHandlerAdapter.java @@ -0,0 +1,82 @@ +/* + * Copyright (C) 2018 Velocity Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package com.velocitypowered.proxy.event; + +import com.google.common.reflect.TypeToken; +import com.velocitypowered.api.event.EventHandler; +import com.velocitypowered.api.event.EventTask; +import java.lang.invoke.MethodHandle; +import java.lang.invoke.MethodHandles; +import java.lang.reflect.Method; +import java.util.List; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.function.Predicate; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.lanternpowered.lmbda.LambdaFactory; +import org.lanternpowered.lmbda.LambdaType; + +final class CustomHandlerAdapter { + + final String name; + private final Function> handlerBuilder; + final Predicate filter; + final BiConsumer> validator; + private final LambdaType functionType; + private final MethodHandles.Lookup methodHandlesLookup; + + @SuppressWarnings("unchecked") + CustomHandlerAdapter( + final String name, + final Predicate filter, + final BiConsumer> validator, + final TypeToken invokeFunctionType, + final Function> handlerBuilder, + final MethodHandles.Lookup methodHandlesLookup) { + this.name = name; + this.filter = filter; + this.validator = validator; + this.functionType = (LambdaType) LambdaType.of(invokeFunctionType.getRawType()); + this.handlerBuilder = handlerBuilder; + this.methodHandlesLookup = methodHandlesLookup; + } + + UntargetedEventHandler buildUntargetedHandler(final Method method) + throws IllegalAccessException { + final MethodHandle methodHandle = methodHandlesLookup.unreflect(method); + final MethodHandles.Lookup defineLookup = MethodHandles.privateLookupIn( + method.getDeclaringClass(), methodHandlesLookup); + final LambdaType lambdaType = functionType.defineClassesWith(defineLookup); + final F invokeFunction = LambdaFactory.create(lambdaType, methodHandle); + final BiFunction handlerFunction = + handlerBuilder.apply(invokeFunction); + return targetInstance -> new EventHandler() { + + @Override + public void execute(Object event) { + throw new UnsupportedOperationException(); + } + + @Override + public @Nullable EventTask executeAsync(Object event) { + return handlerFunction.apply(targetInstance, event); + } + }; + } +} \ No newline at end of file diff --git a/proxy/src/main/java/com/velocitypowered/proxy/event/UntargetedEventHandler.java b/proxy/src/main/java/com/velocitypowered/proxy/event/UntargetedEventHandler.java new file mode 100644 index 000000000..6bf098214 --- /dev/null +++ b/proxy/src/main/java/com/velocitypowered/proxy/event/UntargetedEventHandler.java @@ -0,0 +1,63 @@ +/* + * Copyright (C) 2018 Velocity Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package com.velocitypowered.proxy.event; + +import com.velocitypowered.api.event.AsyncEventExecutor; +import com.velocitypowered.api.event.Continuation; +import com.velocitypowered.api.event.EventHandler; +import com.velocitypowered.api.event.EventTask; +import org.checkerframework.checker.nullness.qual.Nullable; + +public interface UntargetedEventHandler { + + EventHandler buildHandler(Object targetInstance); + + interface EventTaskHandler extends UntargetedEventHandler { + + @Nullable EventTask execute(Object targetInstance, Object event); + + @Override + default EventHandler buildHandler(final Object targetInstance) { + return (AsyncEventExecutor) event -> execute(targetInstance, event); + } + } + + interface VoidHandler extends UntargetedEventHandler { + + void execute(Object targetInstance, Object event); + + @Override + default EventHandler buildHandler(final Object targetInstance) { + return (AsyncEventExecutor) event -> { + execute(targetInstance, event); + return null; + }; + } + } + + interface WithContinuationHandler extends UntargetedEventHandler { + + void execute(Object targetInstance, Object event, Continuation continuation); + + @Override + default EventHandler buildHandler(final Object targetInstance) { + return (AsyncEventExecutor) event -> EventTask.withContinuation(continuation -> + execute(targetInstance, event, continuation)); + } + } +} \ No newline at end of file diff --git a/proxy/src/main/java/com/velocitypowered/proxy/event/VelocityEventManager.java b/proxy/src/main/java/com/velocitypowered/proxy/event/VelocityEventManager.java new file mode 100644 index 000000000..a1d817d95 --- /dev/null +++ b/proxy/src/main/java/com/velocitypowered/proxy/event/VelocityEventManager.java @@ -0,0 +1,627 @@ +/* + * Copyright (C) 2018 Velocity Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package com.velocitypowered.proxy.event; + +import static java.util.Objects.requireNonNull; + +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; +import com.google.common.base.VerifyException; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; +import com.google.common.reflect.TypeToken; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.velocitypowered.api.event.AsyncEventExecutor; +import com.velocitypowered.api.event.Continuation; +import com.velocitypowered.api.event.EventHandler; +import com.velocitypowered.api.event.EventManager; +import com.velocitypowered.api.event.EventTask; +import com.velocitypowered.api.event.PostOrder; +import com.velocitypowered.api.event.Subscribe; +import com.velocitypowered.api.plugin.PluginContainer; +import com.velocitypowered.api.plugin.PluginManager; +import com.velocitypowered.proxy.event.UntargetedEventHandler.EventTaskHandler; +import com.velocitypowered.proxy.event.UntargetedEventHandler.VoidHandler; +import com.velocitypowered.proxy.event.UntargetedEventHandler.WithContinuationHandler; +import java.lang.invoke.MethodHandle; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.VarHandle; +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.lanternpowered.lmbda.LambdaFactory; +import org.lanternpowered.lmbda.LambdaType; + +public class VelocityEventManager implements EventManager { + + private static final Logger logger = LogManager.getLogger(VelocityEventManager.class); + + private static final MethodHandles.Lookup methodHandlesLookup = MethodHandles.lookup(); + private static final LambdaType untargetedEventTaskHandlerType = + LambdaType.of(EventTaskHandler.class); + private static final LambdaType untargetedVoidHandlerType = + LambdaType.of(VoidHandler.class); + private static final LambdaType untargetedWithContinuationHandlerType = + LambdaType.of(WithContinuationHandler.class); + + private static final Comparator handlerComparator = + Comparator.comparingInt(o -> o.order); + + private final ExecutorService asyncExecutor; + private final PluginManager pluginManager; + + private final Multimap, HandlerRegistration> handlersByType = HashMultimap.create(); + private final LoadingCache, HandlersCache> handlersCache = + Caffeine.newBuilder().build(this::bakeHandlers); + + private final LoadingCache untargetedMethodHandlers = + Caffeine.newBuilder().weakValues().build(this::buildUntargetedMethodHandler); + + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + + private final List> handlerAdapters = new ArrayList<>(); + + /** + * Initializes the Velocity event manager. + * + * @param pluginManager a reference to the Velocity plugin manager + */ + public VelocityEventManager(final PluginManager pluginManager) { + this.pluginManager = pluginManager; + this.asyncExecutor = Executors + .newFixedThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactoryBuilder() + .setNameFormat("Velocity Async Event Executor - #%d").setDaemon(true).build()); + } + + /** + * Registers a new continuation adapter function. + */ + @SuppressWarnings({"unchecked", "rawtypes"}) + public void registerHandlerAdapter( + final String name, + final Predicate filter, + final BiConsumer> validator, + final TypeToken invokeFunctionType, + final Function> handlerBuilder) { + handlerAdapters.add(new CustomHandlerAdapter(name, filter, validator, + invokeFunctionType, handlerBuilder, methodHandlesLookup)); + } + + /** + * Represents the registration of a single {@link EventHandler}. + */ + static final class HandlerRegistration { + + final PluginContainer plugin; + final short order; + final Class eventType; + final EventHandler handler; + + /** + * The instance of the {@link EventHandler} or the listener instance that was registered. + */ + final Object instance; + + public HandlerRegistration(final PluginContainer plugin, final short order, + final Class eventType, final Object instance, final EventHandler handler) { + this.plugin = plugin; + this.order = order; + this.eventType = eventType; + this.instance = instance; + this.handler = handler; + } + } + + enum AsyncType { + /** + * The complete event will be handled on an async thread. + */ + ALWAYS, + /** + * The event will never run async, everything is handled on + * the netty thread. + */ + NEVER + } + + static final class HandlersCache { + + final HandlerRegistration[] handlers; + + HandlersCache(final HandlerRegistration[] handlers) { + this.handlers = handlers; + } + } + + private static List> getEventTypes(final Class eventType) { + return TypeToken.of(eventType).getTypes().rawTypes().stream() + .filter(type -> type != Object.class) + .collect(Collectors.toList()); + } + + private @Nullable HandlersCache bakeHandlers(final Class eventType) { + final List baked = new ArrayList<>(); + final List> types = getEventTypes(eventType); + + lock.readLock().lock(); + try { + for (final Class type : types) { + baked.addAll(handlersByType.get(type)); + } + } finally { + lock.readLock().unlock(); + } + + if (baked.isEmpty()) { + return null; + } + + baked.sort(handlerComparator); + return new HandlersCache(baked.toArray(new HandlerRegistration[0])); + } + + /** + * Creates an {@link UntargetedEventHandler} for the given {@link Method}. This essentially + * implements the {@link UntargetedEventHandler} (or the no async task variant) to invoke the + * target method. The implemented class is defined in the same package as the declaring class. + * The {@link UntargetedEventHandler} interface must be public so the implementation can access + * it. + * + * @param method The method to generate an untargeted handler for + * @return The untargeted handler + */ + private UntargetedEventHandler buildUntargetedMethodHandler(final Method method) + throws IllegalAccessException { + for (final CustomHandlerAdapter handlerAdapter : handlerAdapters) { + if (handlerAdapter.filter.test(method)) { + return handlerAdapter.buildUntargetedHandler(method); + } + } + final MethodHandles.Lookup lookup = MethodHandles.privateLookupIn( + method.getDeclaringClass(), methodHandlesLookup); + final MethodHandle methodHandle = lookup.unreflect(method); + final LambdaType type; + if (EventTask.class.isAssignableFrom(method.getReturnType())) { + type = untargetedEventTaskHandlerType; + } else if (method.getParameterCount() == 2) { + type = untargetedWithContinuationHandlerType; + } else { + type = untargetedVoidHandlerType; + } + return LambdaFactory.create(type.defineClassesWith(lookup), methodHandle); + } + + static final class MethodHandlerInfo { + + final Method method; + final @Nullable Class eventType; + final short order; + final @Nullable String errors; + final @Nullable Class continuationType; + + private MethodHandlerInfo(final Method method, final @Nullable Class eventType, + final short order, final @Nullable String errors, + final @Nullable Class continuationType) { + this.method = method; + this.eventType = eventType; + this.order = order; + this.errors = errors; + this.continuationType = continuationType; + } + } + + private void collectMethods(final Class targetClass, + final Map collected) { + for (final Method method : targetClass.getDeclaredMethods()) { + final Subscribe subscribe = method.getAnnotation(Subscribe.class); + if (subscribe == null) { + continue; + } + String key = method.getName() + + "(" + + Arrays.stream(method.getParameterTypes()) + .map(Class::getName) + .collect(Collectors.joining(",")) + + ")"; + if (Modifier.isPrivate(method.getModifiers())) { + key = targetClass.getName() + "$" + key; + } + if (collected.containsKey(key)) { + continue; + } + final Set errors = new HashSet<>(); + if (Modifier.isStatic(method.getModifiers())) { + errors.add("method must not be static"); + } + if (Modifier.isAbstract(method.getModifiers())) { + errors.add("method must not be abstract"); + } + Class eventType = null; + Class continuationType = null; + CustomHandlerAdapter handlerAdapter = null; + final int paramCount = method.getParameterCount(); + if (paramCount == 0) { + errors.add("method must have at least one parameter which is the event"); + } else { + final Class[] parameterTypes = method.getParameterTypes(); + eventType = parameterTypes[0]; + for (final CustomHandlerAdapter handlerAdapterCandidate : handlerAdapters) { + if (handlerAdapterCandidate.filter.test(method)) { + handlerAdapter = handlerAdapterCandidate; + break; + } + } + if (handlerAdapter != null) { + final List adapterErrors = new ArrayList<>(); + handlerAdapter.validator.accept(method, adapterErrors); + if (!adapterErrors.isEmpty()) { + errors.add(String.format("%s adapter errors: [%s]", + handlerAdapter.name, String.join(", ", adapterErrors))); + } + } else if (paramCount == 2) { + continuationType = parameterTypes[1]; + if (continuationType != Continuation.class) { + errors.add(String.format("method is allowed to have a continuation as second parameter," + + " but %s is invalid", continuationType.getName())); + } + } + } + if (handlerAdapter == null) { + final Class returnType = method.getReturnType(); + if (returnType != void.class && continuationType == Continuation.class) { + errors.add("method return type must be void if a continuation parameter is provided"); + } else if (returnType != void.class && returnType != EventTask.class) { + errors.add("method return type must be void, AsyncTask, " + + "AsyncTask.Basic or AsyncTask.WithContinuation"); + } + } + final short order = (short) subscribe.order().ordinal(); + final String errorsJoined = errors.isEmpty() ? null : String.join(",", errors); + collected.put(key, new MethodHandlerInfo(method, eventType, order, errorsJoined, + continuationType)); + } + final Class superclass = targetClass.getSuperclass(); + if (superclass != Object.class) { + collectMethods(superclass, collected); + } + } + + private void register(final List registrations) { + lock.writeLock().lock(); + try { + for (final HandlerRegistration registration : registrations) { + handlersByType.put(registration.eventType, registration); + } + } finally { + lock.writeLock().unlock(); + } + // Invalidate all the affected event subtypes + handlersCache.invalidateAll(registrations.stream() + .flatMap(registration -> getEventTypes(registration.eventType).stream()) + .distinct() + .collect(Collectors.toList())); + } + + @Override + public void register(final Object plugin, final Object listener) { + requireNonNull(listener, "listener"); + final PluginContainer pluginContainer = pluginManager.ensurePluginContainer(plugin); + if (plugin == listener) { + throw new IllegalArgumentException("The plugin main instance is automatically registered."); + } + registerInternally(pluginContainer, listener); + } + + @Override + @SuppressWarnings("unchecked") + public void register(final Object plugin, final Class eventClass, + final PostOrder order, final EventHandler handler) { + final PluginContainer pluginContainer = pluginManager.ensurePluginContainer(plugin); + requireNonNull(eventClass, "eventClass"); + requireNonNull(handler, "handler"); + + final HandlerRegistration registration = new HandlerRegistration(pluginContainer, + (short) order.ordinal(), eventClass, handler, (EventHandler) handler); + register(Collections.singletonList(registration)); + } + + /** + * Registers the listener for a given plugin. + * + * @param pluginContainer registering plugin + * @param listener listener to register + */ + public void registerInternally(final PluginContainer pluginContainer, final Object listener) { + final Class targetClass = listener.getClass(); + final Map collected = new HashMap<>(); + collectMethods(targetClass, collected); + + final List registrations = new ArrayList<>(); + for (final MethodHandlerInfo info : collected.values()) { + if (info.errors != null) { + logger.info("Invalid listener method {} in {}: {}", + info.method.getName(), info.method.getDeclaringClass().getName(), info.errors); + continue; + } + final UntargetedEventHandler untargetedHandler = untargetedMethodHandlers.get(info.method); + assert untargetedHandler != null; + if (info.eventType == null) { + throw new VerifyException("Event type is not present and there are no errors"); + } + + final EventHandler handler = untargetedHandler.buildHandler(listener); + registrations.add(new HandlerRegistration(pluginContainer, info.order, + info.eventType, listener, handler)); + } + + register(registrations); + } + + @Override + public void unregisterListeners(final Object plugin) { + final PluginContainer pluginContainer = pluginManager.ensurePluginContainer(plugin); + unregisterIf(registration -> registration.plugin == pluginContainer); + } + + @Override + public void unregisterListener(final Object plugin, final Object handler) { + final PluginContainer pluginContainer = pluginManager.ensurePluginContainer(plugin); + requireNonNull(handler, "handler"); + unregisterIf(registration -> + registration.plugin == pluginContainer && registration.handler == handler); + } + + @Override + public void unregister(final Object plugin, final EventHandler handler) { + unregisterListener(plugin, handler); + } + + private void unregisterIf(final Predicate predicate) { + final List removed = new ArrayList<>(); + lock.writeLock().lock(); + try { + final Iterator it = handlersByType.values().iterator(); + while (it.hasNext()) { + final HandlerRegistration registration = it.next(); + if (predicate.test(registration)) { + it.remove(); + removed.add(registration); + } + } + } finally { + lock.writeLock().unlock(); + } + + // Invalidate all the affected event subtypes + handlersCache.invalidateAll(removed.stream() + .flatMap(registration -> getEventTypes(registration.eventType).stream()) + .distinct() + .collect(Collectors.toList())); + } + + @Override + public void fireAndForget(final Object event) { + requireNonNull(event, "event"); + final HandlersCache handlersCache = this.handlersCache.get(event.getClass()); + if (handlersCache == null) { + // Optimization: nobody's listening. + return; + } + fire(null, event, handlersCache); + } + + @Override + public CompletableFuture fire(final E event) { + requireNonNull(event, "event"); + final HandlersCache handlersCache = this.handlersCache.get(event.getClass()); + if (handlersCache == null) { + // Optimization: nobody's listening. + return CompletableFuture.completedFuture(event); + } + final CompletableFuture future = new CompletableFuture<>(); + fire(future, event, handlersCache); + return future; + } + + private void fire(final @Nullable CompletableFuture future, + final E event, final HandlersCache handlersCache) { + // In Velocity 1.1.0, all events were fired asynchronously. As Velocity 3.0.0 is intended to be + // largely (albeit not 100%) compatible with 1.1.x, we also fire events async. This behavior + // will go away in Velocity Polymer. + asyncExecutor.execute(() -> fire(future, event, 0, true, handlersCache.handlers)); + } + + private static final int TASK_STATE_DEFAULT = 0; + private static final int TASK_STATE_EXECUTING = 1; + private static final int TASK_STATE_CONTINUE_IMMEDIATELY = 2; + + private static final VarHandle CONTINUATION_TASK_RESUMED; + private static final VarHandle CONTINUATION_TASK_STATE; + + static { + try { + CONTINUATION_TASK_RESUMED = MethodHandles.lookup() + .findVarHandle(ContinuationTask.class, "resumed", boolean.class); + CONTINUATION_TASK_STATE = MethodHandles.lookup() + .findVarHandle(ContinuationTask.class, "state", int.class); + } catch (final ReflectiveOperationException e) { + throw new IllegalStateException(); + } + } + + final class ContinuationTask implements Continuation, Runnable { + + private final EventTask task; + private final int index; + private final HandlerRegistration[] registrations; + private final @Nullable CompletableFuture future; + private final boolean currentlyAsync; + private final E event; + + // This field is modified via a VarHandle, so this field is used and cannot be final. + @SuppressWarnings({"UnusedVariable", "FieldMayBeFinal", "FieldCanBeLocal"}) + private volatile int state = TASK_STATE_DEFAULT; + + // This field is modified via a VarHandle, so this field is used and cannot be final. + @SuppressWarnings({"UnusedVariable", "FieldMayBeFinal"}) + private volatile boolean resumed = false; + + private ContinuationTask( + final EventTask task, + final HandlerRegistration[] registrations, + final @Nullable CompletableFuture future, + final E event, + final int index, + final boolean currentlyAsync) { + this.task = task; + this.registrations = registrations; + this.future = future; + this.event = event; + this.index = index; + this.currentlyAsync = currentlyAsync; + } + + @Override + public void run() { + if (execute()) { + fire(future, event, index + 1, currentlyAsync, registrations); + } + } + + /** + * Executes the task and returns whether the next one should be executed + * immediately after this one without scheduling. + */ + boolean execute() { + state = TASK_STATE_EXECUTING; + try { + task.execute(this); + } catch (final Throwable t) { + // validateOnlyOnce false here so don't get an exception if the + // continuation was resumed before + resume(t, false); + } + return !CONTINUATION_TASK_STATE.compareAndSet( + this, TASK_STATE_EXECUTING, TASK_STATE_DEFAULT); + } + + @Override + public void resume() { + resume(null, true); + } + + void resume(final @Nullable Throwable exception, final boolean validateOnlyOnce) { + final boolean changed = CONTINUATION_TASK_RESUMED.compareAndSet(this, false, true); + // Only allow the continuation to be resumed once + if (!changed && validateOnlyOnce) { + throw new IllegalStateException("The continuation can only be resumed once."); + } + final HandlerRegistration registration = registrations[index]; + if (exception != null) { + logHandlerException(registration, exception); + } + if (!changed) { + return; + } + if (index + 1 == registrations.length) { + // Optimization: don't schedule a task just to complete the future + if (future != null) { + future.complete(event); + } + return; + } + if (!CONTINUATION_TASK_STATE.compareAndSet( + this, TASK_STATE_EXECUTING, TASK_STATE_CONTINUE_IMMEDIATELY)) { + asyncExecutor.execute(() -> fire(future, event, index + 1, true, registrations)); + } + } + + @Override + public void resumeWithException(final Throwable exception) { + resume(requireNonNull(exception, "exception"), true); + } + } + + private void fire(final @Nullable CompletableFuture future, final E event, + final int offset, final boolean currentlyAsync, final HandlerRegistration[] registrations) { + for (int i = offset; i < registrations.length; i++) { + final HandlerRegistration registration = registrations[i]; + try { + final EventTask eventTask = registration.handler.executeAsync(event); + if (eventTask == null) { + continue; + } + final ContinuationTask continuationTask = new ContinuationTask<>(eventTask, + registrations, future, event, i, currentlyAsync); + if (currentlyAsync || !eventTask.requiresAsync()) { + if (continuationTask.execute()) { + continue; + } + } else { + asyncExecutor.execute(continuationTask); + } + // fire will continue in another thread once the async task is + // executed and the continuation is resumed + return; + } catch (final Throwable t) { + logHandlerException(registration, t); + } + } + if (future != null) { + future.complete(event); + } + } + + private static void logHandlerException( + final HandlerRegistration registration, final Throwable t) { + logger.error("Couldn't pass {} to {}", registration.eventType.getSimpleName(), + registration.plugin.getDescription().getId(), t); + } + + public boolean shutdown() throws InterruptedException { + asyncExecutor.shutdown(); + return asyncExecutor.awaitTermination(10, TimeUnit.SECONDS); + } + + public ExecutorService getAsyncExecutor() { + return asyncExecutor; + } +} \ No newline at end of file diff --git a/proxy/src/main/java/com/velocitypowered/proxy/plugin/VelocityEventManager.java b/proxy/src/main/java/com/velocitypowered/proxy/plugin/VelocityEventManager.java deleted file mode 100644 index 70736c307..000000000 --- a/proxy/src/main/java/com/velocitypowered/proxy/plugin/VelocityEventManager.java +++ /dev/null @@ -1,248 +0,0 @@ -/* - * Copyright (C) 2018 Velocity Contributors - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -package com.velocitypowered.proxy.plugin; - -import com.google.common.base.Preconditions; -import com.google.common.collect.ListMultimap; -import com.google.common.collect.Multimaps; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.velocitypowered.api.event.EventHandler; -import com.velocitypowered.api.event.EventManager; -import com.velocitypowered.api.event.PostOrder; -import com.velocitypowered.api.event.Subscribe; -import com.velocitypowered.api.event.proxy.ProxyShutdownEvent; -import com.velocitypowered.api.plugin.PluginManager; -import java.lang.reflect.Method; -import java.net.URL; -import java.security.AccessController; -import java.security.PrivilegedAction; -import java.util.ArrayList; -import java.util.Collection; -import java.util.IdentityHashMap; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import net.kyori.event.EventSubscriber; -import net.kyori.event.PostResult; -import net.kyori.event.SimpleEventBus; -import net.kyori.event.method.MethodScanner; -import net.kyori.event.method.MethodSubscriptionAdapter; -import net.kyori.event.method.SimpleMethodSubscriptionAdapter; -import net.kyori.event.method.asm.ASMEventExecutorFactory; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.checkerframework.checker.nullness.qual.NonNull; - -public class VelocityEventManager implements EventManager { - - private static final Logger logger = LogManager.getLogger(VelocityEventManager.class); - - private final ListMultimap registeredListenersByPlugin = Multimaps - .synchronizedListMultimap(Multimaps.newListMultimap(new IdentityHashMap<>(), ArrayList::new)); - private final ListMultimap> registeredHandlersByPlugin = Multimaps - .synchronizedListMultimap(Multimaps.newListMultimap(new IdentityHashMap<>(), ArrayList::new)); - private final SimpleEventBus bus; - private final MethodSubscriptionAdapter methodAdapter; - private final ExecutorService service; - private final PluginManager pluginManager; - - /** - * Initializes the Velocity event manager. - * - * @param pluginManager a reference to the Velocity plugin manager - */ - public VelocityEventManager(PluginManager pluginManager) { - // Expose the event executors to the plugins - required in order for the generated ASM classes - // to work. - PluginClassLoader cl = AccessController.doPrivileged( - (PrivilegedAction) () -> new PluginClassLoader(new URL[0])); - cl.addToClassloaders(); - - // Initialize the event bus. - this.bus = new SimpleEventBus(Object.class) { - @Override - protected boolean shouldPost(@NonNull Object event, @NonNull EventSubscriber subscriber) { - // Velocity doesn't use Cancellable or generic events, so we can skip those checks. - return true; - } - }; - this.methodAdapter = new SimpleMethodSubscriptionAdapter<>(bus, - new ASMEventExecutorFactory<>(cl), - new VelocityMethodScanner()); - this.pluginManager = pluginManager; - this.service = Executors - .newFixedThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactoryBuilder() - .setNameFormat("Velocity Event Executor - #%d").setDaemon(true).build()); - } - - private void ensurePlugin(Object plugin) { - Preconditions.checkNotNull(plugin, "plugin"); - Preconditions.checkArgument(pluginManager.fromInstance(plugin).isPresent(), - "Specified plugin is not loaded"); - } - - @Override - public void register(Object plugin, Object listener) { - ensurePlugin(plugin); - Preconditions.checkNotNull(listener, "listener"); - if (plugin == listener && registeredListenersByPlugin.containsEntry(plugin, plugin)) { - throw new IllegalArgumentException("The plugin main instance is automatically registered."); - } - - registeredListenersByPlugin.put(plugin, listener); - methodAdapter.register(listener); - } - - @Override - @SuppressWarnings("type.argument.type.incompatible") - public void register(Object plugin, Class eventClass, PostOrder postOrder, - EventHandler handler) { - ensurePlugin(plugin); - Preconditions.checkNotNull(eventClass, "eventClass"); - Preconditions.checkNotNull(postOrder, "postOrder"); - Preconditions.checkNotNull(handler, "listener"); - - registeredHandlersByPlugin.put(plugin, handler); - bus.register(eventClass, new KyoriToVelocityHandler<>(handler, postOrder)); - } - - @Override - public CompletableFuture fire(E event) { - if (event == null) { - throw new NullPointerException("event"); - } - if (!bus.hasSubscribers(event.getClass())) { - // Optimization: nobody's listening. - return CompletableFuture.completedFuture(event); - } - - return CompletableFuture.supplyAsync(() -> { - fireEvent(event); - return event; - }, service); - } - - @Override - public void fireAndForget(Object event) { - if (event == null) { - throw new NullPointerException("event"); - } - if (!bus.hasSubscribers(event.getClass())) { - // Optimization: nobody's listening. - return; - } - service.execute(() -> fireEvent(event)); - } - - private void fireEvent(Object event) { - PostResult result = bus.post(event); - if (!result.exceptions().isEmpty()) { - logger.error("Some errors occurred whilst posting event {}.", event); - int i = 0; - for (Throwable exception : result.exceptions().values()) { - logger.error("#{}: \n", ++i, exception); - } - } - } - - private void unregisterHandler(EventHandler handler) { - bus.unregister(s -> s instanceof KyoriToVelocityHandler - && ((KyoriToVelocityHandler) s).handler == handler); - } - - @Override - public void unregisterListeners(Object plugin) { - ensurePlugin(plugin); - Collection listeners = registeredListenersByPlugin.removeAll(plugin); - listeners.forEach(methodAdapter::unregister); - Collection> handlers = registeredHandlersByPlugin.removeAll(plugin); - handlers.forEach(this::unregisterHandler); - } - - @Override - public void unregisterListener(Object plugin, Object listener) { - ensurePlugin(plugin); - Preconditions.checkNotNull(listener, "listener"); - if (registeredListenersByPlugin.remove(plugin, listener)) { - methodAdapter.unregister(listener); - } - } - - @Override - public void unregister(Object plugin, EventHandler handler) { - ensurePlugin(plugin); - Preconditions.checkNotNull(handler, "listener"); - if (registeredHandlersByPlugin.remove(plugin, handler)) { - unregisterHandler(handler); - } - } - - public boolean shutdown() throws InterruptedException { - service.shutdown(); - return service.awaitTermination(10, TimeUnit.SECONDS); - } - - public void fireShutdownEvent() { - // We shut down the proxy already, so the fact this executes in the main thread is irrelevant. - fireEvent(new ProxyShutdownEvent()); - } - - public ExecutorService getService() { - return service; - } - - private static class VelocityMethodScanner implements MethodScanner { - - @Override - public boolean shouldRegister(@NonNull Object listener, @NonNull Method method) { - return method.isAnnotationPresent(Subscribe.class); - } - - @Override - public int postOrder(@NonNull Object listener, @NonNull Method method) { - return method.getAnnotation(Subscribe.class).order().ordinal(); - } - - @Override - public boolean consumeCancelledEvents(@NonNull Object listener, @NonNull Method method) { - return true; - } - } - - private static class KyoriToVelocityHandler implements EventSubscriber { - - private final EventHandler handler; - private final int postOrder; - - private KyoriToVelocityHandler(EventHandler handler, PostOrder postOrder) { - this.handler = handler; - this.postOrder = postOrder.ordinal(); - } - - @Override - public void invoke(@NonNull E event) { - handler.execute(event); - } - - @Override - public int postOrder() { - return postOrder; - } - } -} diff --git a/proxy/src/test/java/com/velocitypowered/proxy/command/CommandManagerTests.java b/proxy/src/test/java/com/velocitypowered/proxy/command/CommandManagerTests.java index 34612ebbc..467882469 100644 --- a/proxy/src/test/java/com/velocitypowered/proxy/command/CommandManagerTests.java +++ b/proxy/src/test/java/com/velocitypowered/proxy/command/CommandManagerTests.java @@ -20,7 +20,6 @@ package com.velocitypowered.proxy.command; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -38,8 +37,8 @@ import com.velocitypowered.api.command.CommandMeta; import com.velocitypowered.api.command.CommandSource; import com.velocitypowered.api.command.RawCommand; import com.velocitypowered.api.command.SimpleCommand; +import com.velocitypowered.proxy.event.VelocityEventManager; import com.velocitypowered.proxy.plugin.MockEventManager; -import com.velocitypowered.proxy.plugin.VelocityEventManager; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; diff --git a/proxy/src/test/java/com/velocitypowered/proxy/event/EventTest.java b/proxy/src/test/java/com/velocitypowered/proxy/event/EventTest.java new file mode 100644 index 000000000..6a7f96de5 --- /dev/null +++ b/proxy/src/test/java/com/velocitypowered/proxy/event/EventTest.java @@ -0,0 +1,288 @@ +/* + * Copyright (C) 2018 Velocity Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package com.velocitypowered.proxy.event; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.google.common.reflect.TypeToken; +import com.velocitypowered.api.event.Continuation; +import com.velocitypowered.api.event.EventTask; +import com.velocitypowered.api.event.PostOrder; +import com.velocitypowered.api.event.Subscribe; +import com.velocitypowered.proxy.testutil.FakePluginManager; +import java.util.concurrent.atomic.AtomicInteger; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public class EventTest { + + private final VelocityEventManager eventManager = + new VelocityEventManager(new FakePluginManager()); + + @AfterAll + void shutdown() throws Exception { + eventManager.shutdown(); + } + + static final class TestEvent { + } + + static void assertAsyncThread(final Thread thread) { + assertTrue(thread.getName().contains("Velocity Async Event Executor")); + } + + private void handleMethodListener(final Object listener) throws Exception { + eventManager.register(FakePluginManager.PLUGIN_A, listener); + try { + eventManager.fire(new TestEvent()).get(); + } finally { + eventManager.unregisterListeners(FakePluginManager.PLUGIN_A); + } + } + + @Test + void testAlwaysAsync() throws Exception { + final AlwaysAsyncListener listener = new AlwaysAsyncListener(); + handleMethodListener(listener); + assertAsyncThread(listener.threadA); + assertAsyncThread(listener.threadB); + assertAsyncThread(listener.threadC); + assertEquals(3, listener.result); + } + + static final class AlwaysAsyncListener { + + @MonotonicNonNull Thread threadA; + @MonotonicNonNull Thread threadB; + @MonotonicNonNull Thread threadC; + int result; + + @Subscribe + void firstAsync(TestEvent event) { + result++; + threadA = Thread.currentThread(); + } + + @Subscribe + EventTask secondAsync(TestEvent event) { + threadB = Thread.currentThread(); + return EventTask.async(() -> result++); + } + + @Subscribe + void thirdAsync(TestEvent event) { + result++; + threadC = Thread.currentThread(); + } + } + + @Test + void testContinuation() throws Exception { + final ContinuationListener listener = new ContinuationListener(); + handleMethodListener(listener); + assertAsyncThread(listener.threadA); + assertAsyncThread(listener.threadB); + assertAsyncThread(listener.threadC); + assertEquals(2, listener.value.get()); + } + + static final class ContinuationListener { + + @MonotonicNonNull Thread threadA; + @MonotonicNonNull Thread threadB; + @MonotonicNonNull Thread threadC; + + final AtomicInteger value = new AtomicInteger(); + + @Subscribe(order = PostOrder.EARLY) + EventTask continuation(TestEvent event) { + threadA = Thread.currentThread(); + return EventTask.withContinuation(continuation -> { + value.incrementAndGet(); + threadB = Thread.currentThread(); + new Thread(() -> { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + e.printStackTrace(); + } + value.incrementAndGet(); + continuation.resume(); + }).start(); + }); + } + + @Subscribe(order = PostOrder.LATE) + void afterContinuation(TestEvent event) { + threadC = Thread.currentThread(); + } + } + + @Test + void testResumeContinuationImmediately() throws Exception { + final ResumeContinuationImmediatelyListener listener = + new ResumeContinuationImmediatelyListener(); + handleMethodListener(listener); + assertAsyncThread(listener.threadA); + assertAsyncThread(listener.threadB); + assertAsyncThread(listener.threadC); + assertEquals(2, listener.result); + } + + static final class ResumeContinuationImmediatelyListener { + + @MonotonicNonNull Thread threadA; + @MonotonicNonNull Thread threadB; + @MonotonicNonNull Thread threadC; + int result; + + @Subscribe(order = PostOrder.EARLY) + EventTask continuation(TestEvent event) { + threadA = Thread.currentThread(); + return EventTask.withContinuation(continuation -> { + threadB = Thread.currentThread(); + result++; + continuation.resume(); + }); + } + + @Subscribe(order = PostOrder.LATE) + void afterContinuation(TestEvent event) { + threadC = Thread.currentThread(); + result++; + } + } + + @Test + void testContinuationParameter() throws Exception { + final ContinuationParameterListener listener = new ContinuationParameterListener(); + handleMethodListener(listener); + assertAsyncThread(listener.threadA); + assertAsyncThread(listener.threadB); + assertAsyncThread(listener.threadC); + assertEquals(3, listener.result.get()); + } + + static final class ContinuationParameterListener { + + @MonotonicNonNull Thread threadA; + @MonotonicNonNull Thread threadB; + @MonotonicNonNull Thread threadC; + + final AtomicInteger result = new AtomicInteger(); + + @Subscribe + void resume(TestEvent event, Continuation continuation) { + threadA = Thread.currentThread(); + result.incrementAndGet(); + continuation.resume(); + } + + @Subscribe(order = PostOrder.LATE) + void resumeFromCustomThread(TestEvent event, Continuation continuation) { + threadB = Thread.currentThread(); + new Thread(() -> { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + e.printStackTrace(); + } + result.incrementAndGet(); + continuation.resume(); + }).start(); + } + + @Subscribe(order = PostOrder.LAST) + void afterCustomThread(TestEvent event, Continuation continuation) { + threadC = Thread.currentThread(); + result.incrementAndGet(); + continuation.resume(); + } + } + + interface FancyContinuation { + + void resume(); + + void resumeWithError(Exception exception); + } + + private static final class FancyContinuationImpl implements FancyContinuation { + + private final Continuation continuation; + + private FancyContinuationImpl(final Continuation continuation) { + this.continuation = continuation; + } + + @Override + public void resume() { + continuation.resume(); + } + + @Override + public void resumeWithError(final Exception exception) { + continuation.resumeWithException(exception); + } + } + + interface TriConsumer { + + void accept(A a, B b, C c); + } + + @Test + void testFancyContinuationParameter() throws Exception { + eventManager.registerHandlerAdapter( + "fancy", + method -> method.getParameterCount() > 1 + && method.getParameterTypes()[1] == FancyContinuation.class, + (method, errors) -> { + if (method.getReturnType() != void.class) { + errors.add("method return type must be void"); + } + if (method.getParameterCount() != 2) { + errors.add("method must have exactly two parameters, the first is the event and " + + "the second is the fancy continuation"); + } + }, + new TypeToken>() {}, + invokeFunction -> (instance, event) -> + EventTask.withContinuation(continuation -> + invokeFunction.accept(instance, event, new FancyContinuationImpl(continuation)) + )); + final FancyContinuationListener listener = new FancyContinuationListener(); + handleMethodListener(listener); + assertEquals(1, listener.result); + } + + static final class FancyContinuationListener { + + int result; + + @Subscribe + void continuation(TestEvent event, FancyContinuation continuation) { + result++; + continuation.resume(); + } + } +} \ No newline at end of file diff --git a/proxy/src/test/java/com/velocitypowered/proxy/event/MockEventManager.java b/proxy/src/test/java/com/velocitypowered/proxy/event/MockEventManager.java new file mode 100644 index 000000000..8616e7252 --- /dev/null +++ b/proxy/src/test/java/com/velocitypowered/proxy/event/MockEventManager.java @@ -0,0 +1,30 @@ +/* + * Copyright (C) 2018 Velocity Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package com.velocitypowered.proxy.event; + +import com.velocitypowered.proxy.plugin.MockPluginManager; + +/** + * A mock {@link VelocityEventManager}. Must be shutdown after use! + */ +public class MockEventManager extends VelocityEventManager { + + public MockEventManager() { + super(MockPluginManager.INSTANCE); + } +} \ No newline at end of file diff --git a/proxy/src/test/java/com/velocitypowered/proxy/plugin/MockEventManager.java b/proxy/src/test/java/com/velocitypowered/proxy/plugin/MockEventManager.java index e882e7d20..b895e5a59 100644 --- a/proxy/src/test/java/com/velocitypowered/proxy/plugin/MockEventManager.java +++ b/proxy/src/test/java/com/velocitypowered/proxy/plugin/MockEventManager.java @@ -17,6 +17,8 @@ package com.velocitypowered.proxy.plugin; +import com.velocitypowered.proxy.event.VelocityEventManager; + /** * A mock {@link VelocityEventManager}. Must be shutdown after use! */