Mirror von
https://github.com/PaperMC/Velocity.git
synchronisiert 2024-11-16 21:10:30 +01:00
Backport Velocity Polymer's async event API, with changes.
This commit backports the event manager from Velocity Polymer, with some changes for Velocity 1.1.x API compatibility: - All event handlers run asynchronously. (While EventTask.async() exists, it is not useful in 3.0.0, but is provided as a migration aid for Polymer.) - Event ordering is currently limited to the 5 levels available in Velocity 1.x.x.
Dieser Commit ist enthalten in:
Ursprung
3f50964f36
Commit
821ca02ee7
@ -0,0 +1,21 @@
|
||||
/*
|
||||
* Copyright (C) 2018 Velocity Contributors
|
||||
*
|
||||
* The Velocity API is licensed under the terms of the MIT License. For more details,
|
||||
* reference the LICENSE file in the api top-level directory.
|
||||
*/
|
||||
|
||||
package com.velocitypowered.api.event;
|
||||
|
||||
import org.checkerframework.checker.nullness.qual.Nullable;
|
||||
|
||||
@FunctionalInterface
|
||||
public interface AsyncEventExecutor<E> extends EventHandler<E> {
|
||||
|
||||
default void execute(E event) {
|
||||
throw new UnsupportedOperationException(
|
||||
"This event handler can only be invoked asynchronously.");
|
||||
}
|
||||
|
||||
@Nullable EventTask executeAsync(E event);
|
||||
}
|
25
api/src/main/java/com/velocitypowered/api/event/Continuation.java
Normale Datei
25
api/src/main/java/com/velocitypowered/api/event/Continuation.java
Normale Datei
@ -0,0 +1,25 @@
|
||||
/*
|
||||
* Copyright (C) 2018 Velocity Contributors
|
||||
*
|
||||
* The Velocity API is licensed under the terms of the MIT License. For more details,
|
||||
* reference the LICENSE file in the api top-level directory.
|
||||
*/
|
||||
|
||||
package com.velocitypowered.api.event;
|
||||
|
||||
/**
|
||||
* Represents a continuation of a paused event handler. Any of the resume methods
|
||||
* may only be called once otherwise an {@link IllegalStateException} is expected.
|
||||
*/
|
||||
public interface Continuation {
|
||||
|
||||
/**
|
||||
* Resumes the continuation.
|
||||
*/
|
||||
void resume();
|
||||
|
||||
/**
|
||||
* Resumes the continuation after the executed task failed.
|
||||
*/
|
||||
void resumeWithException(Throwable exception);
|
||||
}
|
@ -7,12 +7,21 @@
|
||||
|
||||
package com.velocitypowered.api.event;
|
||||
|
||||
import org.checkerframework.checker.nullness.qual.Nullable;
|
||||
|
||||
/**
|
||||
* Represents an interface to perform direct dispatch of an event. This makes integration easier to
|
||||
* achieve with platforms such as RxJava.
|
||||
* achieve with platforms such as RxJava. While this interface can be used to implement an
|
||||
* asynchronous event handler, {@link AsyncEventExecutor} provides a more idiomatic means of doing
|
||||
* so.
|
||||
*/
|
||||
@FunctionalInterface
|
||||
public interface EventHandler<E> {
|
||||
|
||||
void execute(E event);
|
||||
|
||||
default @Nullable EventTask executeAsync(E event) {
|
||||
execute(event);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
121
api/src/main/java/com/velocitypowered/api/event/EventTask.java
Normale Datei
121
api/src/main/java/com/velocitypowered/api/event/EventTask.java
Normale Datei
@ -0,0 +1,121 @@
|
||||
/*
|
||||
* Copyright (C) 2018 Velocity Contributors
|
||||
*
|
||||
* The Velocity API is licensed under the terms of the MIT License. For more details,
|
||||
* reference the LICENSE file in the api top-level directory.
|
||||
*/
|
||||
|
||||
package com.velocitypowered.api.event;
|
||||
|
||||
import static java.util.Objects.requireNonNull;
|
||||
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
/**
|
||||
* Represents a task that can be returned by a {@link EventHandler} which allows event handling to
|
||||
* be suspended and resumed at a later time, and executing event handlers completely or partially
|
||||
* asynchronously.
|
||||
*
|
||||
* <p><strong>Compatibility notice:</strong> While in Velocity 3.0.0, all event handlers still
|
||||
* execute asynchronously (to preserve backwards compatibility), this will not be the case in future
|
||||
* versions of Velocity. Please prepare your code by using continuations or returning an instance
|
||||
* returned by {@link #async(Runnable)}.</p>
|
||||
*/
|
||||
public interface EventTask {
|
||||
|
||||
/**
|
||||
* Whether this {@link EventTask} is required to be called asynchronously.
|
||||
*
|
||||
* <p>If this method returns {@code true}, the event task is guaranteed to be executed
|
||||
* asynchronously from the current thread. Otherwise, the event task may be executed on the
|
||||
* current thread or asynchronously.</p>
|
||||
*
|
||||
* @return Requires async
|
||||
*/
|
||||
boolean requiresAsync();
|
||||
|
||||
/**
|
||||
* Runs this event task with the given {@link Continuation}. The continuation must be notified
|
||||
* when the task is completed, either with {@link Continuation#resume()} if the task was
|
||||
* successful or {@link Continuation#resumeWithException(Throwable)} if an exception occurred.
|
||||
*
|
||||
* <p>The {@link Continuation} may only be resumed once, or an
|
||||
* {@link IllegalStateException} will be thrown.</p>
|
||||
*
|
||||
* <p>The {@link Continuation} doesn't need to be notified during the execution of this method,
|
||||
* this can happen at a later point in time and from another thread.</p>
|
||||
*
|
||||
* @param continuation The continuation
|
||||
*/
|
||||
void execute(Continuation continuation);
|
||||
|
||||
/**
|
||||
* Creates a basic async {@link EventTask} from the given {@link Runnable}. The task is guaranteed
|
||||
* to be executed asynchronously ({@link #requiresAsync()} always returns {@code true}).
|
||||
*
|
||||
* @param task The task
|
||||
* @return The async event task
|
||||
*/
|
||||
static EventTask async(final Runnable task) {
|
||||
requireNonNull(task, "task");
|
||||
return new EventTask() {
|
||||
|
||||
@Override
|
||||
public void execute(Continuation continuation) {
|
||||
task.run();
|
||||
continuation.resume();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean requiresAsync() {
|
||||
return true;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates an continuation based {@link EventTask} from the given {@link Consumer}. The task isn't
|
||||
* guaranteed to be executed asynchronously ({@link #requiresAsync()} always returns
|
||||
* {@code false}).
|
||||
*
|
||||
* @param task The task to execute
|
||||
* @return The event task
|
||||
*/
|
||||
static EventTask withContinuation(final Consumer<Continuation> task) {
|
||||
requireNonNull(task, "task");
|
||||
return new EventTask() {
|
||||
|
||||
@Override
|
||||
public void execute(final Continuation continuation) {
|
||||
task.accept(continuation);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean requiresAsync() {
|
||||
return false;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates an continuation based {@link EventTask} for the given {@link CompletableFuture}. The
|
||||
* continuation will be notified once the given future is completed.
|
||||
*
|
||||
* @param future The task to wait for
|
||||
* @return The event task
|
||||
*/
|
||||
// The Error Prone annotation here is spurious. The Future is handled via the CompletableFuture
|
||||
// API, which does NOT use the traditional blocking model.
|
||||
@SuppressWarnings("FutureReturnValueIgnored")
|
||||
static EventTask resumeWhenComplete(final CompletableFuture<?> future) {
|
||||
requireNonNull(future, "future");
|
||||
return withContinuation(continuation -> future.whenComplete((result, cause) -> {
|
||||
if (cause != null) {
|
||||
continuation.resumeWithException(cause);
|
||||
} else {
|
||||
continuation.resume();
|
||||
}
|
||||
}));
|
||||
}
|
||||
}
|
@ -57,4 +57,16 @@ public interface PluginManager {
|
||||
* @throws UnsupportedOperationException if the operation is not applicable to this plugin
|
||||
*/
|
||||
void addToClasspath(Object plugin, Path path);
|
||||
|
||||
/**
|
||||
* Ensures a plugin container exists for the given {@code plugin}.
|
||||
*
|
||||
* @param plugin the instance to look up the container for
|
||||
* @return container for the plugin
|
||||
*/
|
||||
default PluginContainer ensurePluginContainer(Object plugin) {
|
||||
return this.fromInstance(plugin)
|
||||
.orElseThrow(() -> new IllegalArgumentException(plugin.getClass().getCanonicalName()
|
||||
+ " does not have a container."));
|
||||
}
|
||||
}
|
||||
|
@ -78,6 +78,9 @@ dependencies {
|
||||
implementation 'com.electronwill.night-config:toml:3.6.3'
|
||||
|
||||
implementation 'org.bstats:bstats-base:2.2.0'
|
||||
implementation 'org.lanternpowered:lmbda:2.0.0'
|
||||
|
||||
implementation 'com.github.ben-manes.caffeine:caffeine:2.8.8'
|
||||
|
||||
compileOnly 'com.github.spotbugs:spotbugs-annotations:4.1.2'
|
||||
|
||||
|
@ -25,6 +25,7 @@ import com.google.gson.GsonBuilder;
|
||||
import com.velocitypowered.api.event.EventManager;
|
||||
import com.velocitypowered.api.event.proxy.ProxyInitializeEvent;
|
||||
import com.velocitypowered.api.event.proxy.ProxyReloadEvent;
|
||||
import com.velocitypowered.api.event.proxy.ProxyShutdownEvent;
|
||||
import com.velocitypowered.api.network.ProtocolVersion;
|
||||
import com.velocitypowered.api.plugin.PluginContainer;
|
||||
import com.velocitypowered.api.plugin.PluginManager;
|
||||
@ -43,8 +44,8 @@ import com.velocitypowered.proxy.command.builtin.VelocityCommand;
|
||||
import com.velocitypowered.proxy.config.VelocityConfiguration;
|
||||
import com.velocitypowered.proxy.connection.client.ConnectedPlayer;
|
||||
import com.velocitypowered.proxy.console.VelocityConsole;
|
||||
import com.velocitypowered.proxy.event.VelocityEventManager;
|
||||
import com.velocitypowered.proxy.network.ConnectionManager;
|
||||
import com.velocitypowered.proxy.plugin.VelocityEventManager;
|
||||
import com.velocitypowered.proxy.plugin.VelocityPluginManager;
|
||||
import com.velocitypowered.proxy.protocol.ProtocolUtils;
|
||||
import com.velocitypowered.proxy.protocol.util.FaviconSerializer;
|
||||
@ -277,7 +278,7 @@ public class VelocityServer implements ProxyServer, ForwardingAudience {
|
||||
Optional<?> instance = plugin.getInstance();
|
||||
if (instance.isPresent()) {
|
||||
try {
|
||||
eventManager.register(instance.get(), instance.get());
|
||||
eventManager.registerInternally(plugin, instance.get());
|
||||
} catch (Exception e) {
|
||||
logger.error("Unable to register plugin listener for {}",
|
||||
plugin.getDescription().getName().orElse(plugin.getDescription().getId()), e);
|
||||
@ -433,7 +434,7 @@ public class VelocityServer implements ProxyServer, ForwardingAudience {
|
||||
logger.error("Exception while tearing down player connections", e);
|
||||
}
|
||||
|
||||
eventManager.fireShutdownEvent();
|
||||
eventManager.fire(new ProxyShutdownEvent()).join();
|
||||
|
||||
timedOut = !eventManager.shutdown() || timedOut;
|
||||
timedOut = !scheduler.shutdown() || timedOut;
|
||||
|
@ -34,7 +34,7 @@ import com.velocitypowered.api.command.RawCommand;
|
||||
import com.velocitypowered.api.command.SimpleCommand;
|
||||
import com.velocitypowered.api.event.command.CommandExecuteEvent;
|
||||
import com.velocitypowered.api.event.command.CommandExecuteEvent.CommandResult;
|
||||
import com.velocitypowered.proxy.plugin.VelocityEventManager;
|
||||
import com.velocitypowered.proxy.event.VelocityEventManager;
|
||||
import com.velocitypowered.proxy.util.BrigadierUtils;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
@ -164,7 +164,7 @@ public class VelocityCommandManager implements CommandManager {
|
||||
return false;
|
||||
}
|
||||
return executeImmediately0(source, commandResult.getCommand().orElse(event.getCommand()));
|
||||
}, eventManager.getService());
|
||||
}, eventManager.getAsyncExecutor());
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -174,7 +174,7 @@ public class VelocityCommandManager implements CommandManager {
|
||||
Preconditions.checkNotNull(cmdLine, "cmdLine");
|
||||
|
||||
return CompletableFuture.supplyAsync(
|
||||
() -> executeImmediately0(source, cmdLine), eventManager.getService());
|
||||
() -> executeImmediately0(source, cmdLine), eventManager.getAsyncExecutor());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -0,0 +1,82 @@
|
||||
/*
|
||||
* Copyright (C) 2018 Velocity Contributors
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package com.velocitypowered.proxy.event;
|
||||
|
||||
import com.google.common.reflect.TypeToken;
|
||||
import com.velocitypowered.api.event.EventHandler;
|
||||
import com.velocitypowered.api.event.EventTask;
|
||||
import java.lang.invoke.MethodHandle;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.List;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Predicate;
|
||||
import org.checkerframework.checker.nullness.qual.Nullable;
|
||||
import org.lanternpowered.lmbda.LambdaFactory;
|
||||
import org.lanternpowered.lmbda.LambdaType;
|
||||
|
||||
final class CustomHandlerAdapter<F> {
|
||||
|
||||
final String name;
|
||||
private final Function<F, BiFunction<Object, Object, EventTask>> handlerBuilder;
|
||||
final Predicate<Method> filter;
|
||||
final BiConsumer<Method, List<String>> validator;
|
||||
private final LambdaType<F> functionType;
|
||||
private final MethodHandles.Lookup methodHandlesLookup;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
CustomHandlerAdapter(
|
||||
final String name,
|
||||
final Predicate<Method> filter,
|
||||
final BiConsumer<Method, List<String>> validator,
|
||||
final TypeToken<F> invokeFunctionType,
|
||||
final Function<F, BiFunction<Object, Object, EventTask>> handlerBuilder,
|
||||
final MethodHandles.Lookup methodHandlesLookup) {
|
||||
this.name = name;
|
||||
this.filter = filter;
|
||||
this.validator = validator;
|
||||
this.functionType = (LambdaType<F>) LambdaType.of(invokeFunctionType.getRawType());
|
||||
this.handlerBuilder = handlerBuilder;
|
||||
this.methodHandlesLookup = methodHandlesLookup;
|
||||
}
|
||||
|
||||
UntargetedEventHandler buildUntargetedHandler(final Method method)
|
||||
throws IllegalAccessException {
|
||||
final MethodHandle methodHandle = methodHandlesLookup.unreflect(method);
|
||||
final MethodHandles.Lookup defineLookup = MethodHandles.privateLookupIn(
|
||||
method.getDeclaringClass(), methodHandlesLookup);
|
||||
final LambdaType<F> lambdaType = functionType.defineClassesWith(defineLookup);
|
||||
final F invokeFunction = LambdaFactory.create(lambdaType, methodHandle);
|
||||
final BiFunction<Object, Object, EventTask> handlerFunction =
|
||||
handlerBuilder.apply(invokeFunction);
|
||||
return targetInstance -> new EventHandler() {
|
||||
|
||||
@Override
|
||||
public void execute(Object event) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public @Nullable EventTask executeAsync(Object event) {
|
||||
return handlerFunction.apply(targetInstance, event);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
@ -0,0 +1,63 @@
|
||||
/*
|
||||
* Copyright (C) 2018 Velocity Contributors
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package com.velocitypowered.proxy.event;
|
||||
|
||||
import com.velocitypowered.api.event.AsyncEventExecutor;
|
||||
import com.velocitypowered.api.event.Continuation;
|
||||
import com.velocitypowered.api.event.EventHandler;
|
||||
import com.velocitypowered.api.event.EventTask;
|
||||
import org.checkerframework.checker.nullness.qual.Nullable;
|
||||
|
||||
public interface UntargetedEventHandler {
|
||||
|
||||
EventHandler<Object> buildHandler(Object targetInstance);
|
||||
|
||||
interface EventTaskHandler extends UntargetedEventHandler {
|
||||
|
||||
@Nullable EventTask execute(Object targetInstance, Object event);
|
||||
|
||||
@Override
|
||||
default EventHandler<Object> buildHandler(final Object targetInstance) {
|
||||
return (AsyncEventExecutor<Object>) event -> execute(targetInstance, event);
|
||||
}
|
||||
}
|
||||
|
||||
interface VoidHandler extends UntargetedEventHandler {
|
||||
|
||||
void execute(Object targetInstance, Object event);
|
||||
|
||||
@Override
|
||||
default EventHandler<Object> buildHandler(final Object targetInstance) {
|
||||
return (AsyncEventExecutor<Object>) event -> {
|
||||
execute(targetInstance, event);
|
||||
return null;
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
interface WithContinuationHandler extends UntargetedEventHandler {
|
||||
|
||||
void execute(Object targetInstance, Object event, Continuation continuation);
|
||||
|
||||
@Override
|
||||
default EventHandler<Object> buildHandler(final Object targetInstance) {
|
||||
return (AsyncEventExecutor<Object>) event -> EventTask.withContinuation(continuation ->
|
||||
execute(targetInstance, event, continuation));
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,627 @@
|
||||
/*
|
||||
* Copyright (C) 2018 Velocity Contributors
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package com.velocitypowered.proxy.event;
|
||||
|
||||
import static java.util.Objects.requireNonNull;
|
||||
|
||||
import com.github.benmanes.caffeine.cache.Caffeine;
|
||||
import com.github.benmanes.caffeine.cache.LoadingCache;
|
||||
import com.google.common.base.VerifyException;
|
||||
import com.google.common.collect.HashMultimap;
|
||||
import com.google.common.collect.Multimap;
|
||||
import com.google.common.reflect.TypeToken;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import com.velocitypowered.api.event.AsyncEventExecutor;
|
||||
import com.velocitypowered.api.event.Continuation;
|
||||
import com.velocitypowered.api.event.EventHandler;
|
||||
import com.velocitypowered.api.event.EventManager;
|
||||
import com.velocitypowered.api.event.EventTask;
|
||||
import com.velocitypowered.api.event.PostOrder;
|
||||
import com.velocitypowered.api.event.Subscribe;
|
||||
import com.velocitypowered.api.plugin.PluginContainer;
|
||||
import com.velocitypowered.api.plugin.PluginManager;
|
||||
import com.velocitypowered.proxy.event.UntargetedEventHandler.EventTaskHandler;
|
||||
import com.velocitypowered.proxy.event.UntargetedEventHandler.VoidHandler;
|
||||
import com.velocitypowered.proxy.event.UntargetedEventHandler.WithContinuationHandler;
|
||||
import java.lang.invoke.MethodHandle;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.lang.invoke.VarHandle;
|
||||
import java.lang.reflect.Method;
|
||||
import java.lang.reflect.Modifier;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.checkerframework.checker.nullness.qual.Nullable;
|
||||
import org.lanternpowered.lmbda.LambdaFactory;
|
||||
import org.lanternpowered.lmbda.LambdaType;
|
||||
|
||||
public class VelocityEventManager implements EventManager {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(VelocityEventManager.class);
|
||||
|
||||
private static final MethodHandles.Lookup methodHandlesLookup = MethodHandles.lookup();
|
||||
private static final LambdaType<EventTaskHandler> untargetedEventTaskHandlerType =
|
||||
LambdaType.of(EventTaskHandler.class);
|
||||
private static final LambdaType<VoidHandler> untargetedVoidHandlerType =
|
||||
LambdaType.of(VoidHandler.class);
|
||||
private static final LambdaType<WithContinuationHandler> untargetedWithContinuationHandlerType =
|
||||
LambdaType.of(WithContinuationHandler.class);
|
||||
|
||||
private static final Comparator<HandlerRegistration> handlerComparator =
|
||||
Comparator.comparingInt(o -> o.order);
|
||||
|
||||
private final ExecutorService asyncExecutor;
|
||||
private final PluginManager pluginManager;
|
||||
|
||||
private final Multimap<Class<?>, HandlerRegistration> handlersByType = HashMultimap.create();
|
||||
private final LoadingCache<Class<?>, HandlersCache> handlersCache =
|
||||
Caffeine.newBuilder().build(this::bakeHandlers);
|
||||
|
||||
private final LoadingCache<Method, UntargetedEventHandler> untargetedMethodHandlers =
|
||||
Caffeine.newBuilder().weakValues().build(this::buildUntargetedMethodHandler);
|
||||
|
||||
private final ReadWriteLock lock = new ReentrantReadWriteLock();
|
||||
|
||||
private final List<CustomHandlerAdapter<?>> handlerAdapters = new ArrayList<>();
|
||||
|
||||
/**
|
||||
* Initializes the Velocity event manager.
|
||||
*
|
||||
* @param pluginManager a reference to the Velocity plugin manager
|
||||
*/
|
||||
public VelocityEventManager(final PluginManager pluginManager) {
|
||||
this.pluginManager = pluginManager;
|
||||
this.asyncExecutor = Executors
|
||||
.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactoryBuilder()
|
||||
.setNameFormat("Velocity Async Event Executor - #%d").setDaemon(true).build());
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers a new continuation adapter function.
|
||||
*/
|
||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||
public <F> void registerHandlerAdapter(
|
||||
final String name,
|
||||
final Predicate<Method> filter,
|
||||
final BiConsumer<Method, List<String>> validator,
|
||||
final TypeToken<F> invokeFunctionType,
|
||||
final Function<F, BiFunction<Object, Object, EventTask>> handlerBuilder) {
|
||||
handlerAdapters.add(new CustomHandlerAdapter(name, filter, validator,
|
||||
invokeFunctionType, handlerBuilder, methodHandlesLookup));
|
||||
}
|
||||
|
||||
/**
|
||||
* Represents the registration of a single {@link EventHandler}.
|
||||
*/
|
||||
static final class HandlerRegistration {
|
||||
|
||||
final PluginContainer plugin;
|
||||
final short order;
|
||||
final Class<?> eventType;
|
||||
final EventHandler<Object> handler;
|
||||
|
||||
/**
|
||||
* The instance of the {@link EventHandler} or the listener instance that was registered.
|
||||
*/
|
||||
final Object instance;
|
||||
|
||||
public HandlerRegistration(final PluginContainer plugin, final short order,
|
||||
final Class<?> eventType, final Object instance, final EventHandler<Object> handler) {
|
||||
this.plugin = plugin;
|
||||
this.order = order;
|
||||
this.eventType = eventType;
|
||||
this.instance = instance;
|
||||
this.handler = handler;
|
||||
}
|
||||
}
|
||||
|
||||
enum AsyncType {
|
||||
/**
|
||||
* The complete event will be handled on an async thread.
|
||||
*/
|
||||
ALWAYS,
|
||||
/**
|
||||
* The event will never run async, everything is handled on
|
||||
* the netty thread.
|
||||
*/
|
||||
NEVER
|
||||
}
|
||||
|
||||
static final class HandlersCache {
|
||||
|
||||
final HandlerRegistration[] handlers;
|
||||
|
||||
HandlersCache(final HandlerRegistration[] handlers) {
|
||||
this.handlers = handlers;
|
||||
}
|
||||
}
|
||||
|
||||
private static List<Class<?>> getEventTypes(final Class<?> eventType) {
|
||||
return TypeToken.of(eventType).getTypes().rawTypes().stream()
|
||||
.filter(type -> type != Object.class)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
private @Nullable HandlersCache bakeHandlers(final Class<?> eventType) {
|
||||
final List<HandlerRegistration> baked = new ArrayList<>();
|
||||
final List<Class<?>> types = getEventTypes(eventType);
|
||||
|
||||
lock.readLock().lock();
|
||||
try {
|
||||
for (final Class<?> type : types) {
|
||||
baked.addAll(handlersByType.get(type));
|
||||
}
|
||||
} finally {
|
||||
lock.readLock().unlock();
|
||||
}
|
||||
|
||||
if (baked.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
baked.sort(handlerComparator);
|
||||
return new HandlersCache(baked.toArray(new HandlerRegistration[0]));
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates an {@link UntargetedEventHandler} for the given {@link Method}. This essentially
|
||||
* implements the {@link UntargetedEventHandler} (or the no async task variant) to invoke the
|
||||
* target method. The implemented class is defined in the same package as the declaring class.
|
||||
* The {@link UntargetedEventHandler} interface must be public so the implementation can access
|
||||
* it.
|
||||
*
|
||||
* @param method The method to generate an untargeted handler for
|
||||
* @return The untargeted handler
|
||||
*/
|
||||
private UntargetedEventHandler buildUntargetedMethodHandler(final Method method)
|
||||
throws IllegalAccessException {
|
||||
for (final CustomHandlerAdapter<?> handlerAdapter : handlerAdapters) {
|
||||
if (handlerAdapter.filter.test(method)) {
|
||||
return handlerAdapter.buildUntargetedHandler(method);
|
||||
}
|
||||
}
|
||||
final MethodHandles.Lookup lookup = MethodHandles.privateLookupIn(
|
||||
method.getDeclaringClass(), methodHandlesLookup);
|
||||
final MethodHandle methodHandle = lookup.unreflect(method);
|
||||
final LambdaType<? extends UntargetedEventHandler> type;
|
||||
if (EventTask.class.isAssignableFrom(method.getReturnType())) {
|
||||
type = untargetedEventTaskHandlerType;
|
||||
} else if (method.getParameterCount() == 2) {
|
||||
type = untargetedWithContinuationHandlerType;
|
||||
} else {
|
||||
type = untargetedVoidHandlerType;
|
||||
}
|
||||
return LambdaFactory.create(type.defineClassesWith(lookup), methodHandle);
|
||||
}
|
||||
|
||||
static final class MethodHandlerInfo {
|
||||
|
||||
final Method method;
|
||||
final @Nullable Class<?> eventType;
|
||||
final short order;
|
||||
final @Nullable String errors;
|
||||
final @Nullable Class<?> continuationType;
|
||||
|
||||
private MethodHandlerInfo(final Method method, final @Nullable Class<?> eventType,
|
||||
final short order, final @Nullable String errors,
|
||||
final @Nullable Class<?> continuationType) {
|
||||
this.method = method;
|
||||
this.eventType = eventType;
|
||||
this.order = order;
|
||||
this.errors = errors;
|
||||
this.continuationType = continuationType;
|
||||
}
|
||||
}
|
||||
|
||||
private void collectMethods(final Class<?> targetClass,
|
||||
final Map<String, MethodHandlerInfo> collected) {
|
||||
for (final Method method : targetClass.getDeclaredMethods()) {
|
||||
final Subscribe subscribe = method.getAnnotation(Subscribe.class);
|
||||
if (subscribe == null) {
|
||||
continue;
|
||||
}
|
||||
String key = method.getName()
|
||||
+ "("
|
||||
+ Arrays.stream(method.getParameterTypes())
|
||||
.map(Class::getName)
|
||||
.collect(Collectors.joining(","))
|
||||
+ ")";
|
||||
if (Modifier.isPrivate(method.getModifiers())) {
|
||||
key = targetClass.getName() + "$" + key;
|
||||
}
|
||||
if (collected.containsKey(key)) {
|
||||
continue;
|
||||
}
|
||||
final Set<String> errors = new HashSet<>();
|
||||
if (Modifier.isStatic(method.getModifiers())) {
|
||||
errors.add("method must not be static");
|
||||
}
|
||||
if (Modifier.isAbstract(method.getModifiers())) {
|
||||
errors.add("method must not be abstract");
|
||||
}
|
||||
Class<?> eventType = null;
|
||||
Class<?> continuationType = null;
|
||||
CustomHandlerAdapter<?> handlerAdapter = null;
|
||||
final int paramCount = method.getParameterCount();
|
||||
if (paramCount == 0) {
|
||||
errors.add("method must have at least one parameter which is the event");
|
||||
} else {
|
||||
final Class<?>[] parameterTypes = method.getParameterTypes();
|
||||
eventType = parameterTypes[0];
|
||||
for (final CustomHandlerAdapter<?> handlerAdapterCandidate : handlerAdapters) {
|
||||
if (handlerAdapterCandidate.filter.test(method)) {
|
||||
handlerAdapter = handlerAdapterCandidate;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (handlerAdapter != null) {
|
||||
final List<String> adapterErrors = new ArrayList<>();
|
||||
handlerAdapter.validator.accept(method, adapterErrors);
|
||||
if (!adapterErrors.isEmpty()) {
|
||||
errors.add(String.format("%s adapter errors: [%s]",
|
||||
handlerAdapter.name, String.join(", ", adapterErrors)));
|
||||
}
|
||||
} else if (paramCount == 2) {
|
||||
continuationType = parameterTypes[1];
|
||||
if (continuationType != Continuation.class) {
|
||||
errors.add(String.format("method is allowed to have a continuation as second parameter,"
|
||||
+ " but %s is invalid", continuationType.getName()));
|
||||
}
|
||||
}
|
||||
}
|
||||
if (handlerAdapter == null) {
|
||||
final Class<?> returnType = method.getReturnType();
|
||||
if (returnType != void.class && continuationType == Continuation.class) {
|
||||
errors.add("method return type must be void if a continuation parameter is provided");
|
||||
} else if (returnType != void.class && returnType != EventTask.class) {
|
||||
errors.add("method return type must be void, AsyncTask, "
|
||||
+ "AsyncTask.Basic or AsyncTask.WithContinuation");
|
||||
}
|
||||
}
|
||||
final short order = (short) subscribe.order().ordinal();
|
||||
final String errorsJoined = errors.isEmpty() ? null : String.join(",", errors);
|
||||
collected.put(key, new MethodHandlerInfo(method, eventType, order, errorsJoined,
|
||||
continuationType));
|
||||
}
|
||||
final Class<?> superclass = targetClass.getSuperclass();
|
||||
if (superclass != Object.class) {
|
||||
collectMethods(superclass, collected);
|
||||
}
|
||||
}
|
||||
|
||||
private void register(final List<HandlerRegistration> registrations) {
|
||||
lock.writeLock().lock();
|
||||
try {
|
||||
for (final HandlerRegistration registration : registrations) {
|
||||
handlersByType.put(registration.eventType, registration);
|
||||
}
|
||||
} finally {
|
||||
lock.writeLock().unlock();
|
||||
}
|
||||
// Invalidate all the affected event subtypes
|
||||
handlersCache.invalidateAll(registrations.stream()
|
||||
.flatMap(registration -> getEventTypes(registration.eventType).stream())
|
||||
.distinct()
|
||||
.collect(Collectors.toList()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void register(final Object plugin, final Object listener) {
|
||||
requireNonNull(listener, "listener");
|
||||
final PluginContainer pluginContainer = pluginManager.ensurePluginContainer(plugin);
|
||||
if (plugin == listener) {
|
||||
throw new IllegalArgumentException("The plugin main instance is automatically registered.");
|
||||
}
|
||||
registerInternally(pluginContainer, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public <E> void register(final Object plugin, final Class<E> eventClass,
|
||||
final PostOrder order, final EventHandler<E> handler) {
|
||||
final PluginContainer pluginContainer = pluginManager.ensurePluginContainer(plugin);
|
||||
requireNonNull(eventClass, "eventClass");
|
||||
requireNonNull(handler, "handler");
|
||||
|
||||
final HandlerRegistration registration = new HandlerRegistration(pluginContainer,
|
||||
(short) order.ordinal(), eventClass, handler, (EventHandler<Object>) handler);
|
||||
register(Collections.singletonList(registration));
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers the listener for a given plugin.
|
||||
*
|
||||
* @param pluginContainer registering plugin
|
||||
* @param listener listener to register
|
||||
*/
|
||||
public void registerInternally(final PluginContainer pluginContainer, final Object listener) {
|
||||
final Class<?> targetClass = listener.getClass();
|
||||
final Map<String, MethodHandlerInfo> collected = new HashMap<>();
|
||||
collectMethods(targetClass, collected);
|
||||
|
||||
final List<HandlerRegistration> registrations = new ArrayList<>();
|
||||
for (final MethodHandlerInfo info : collected.values()) {
|
||||
if (info.errors != null) {
|
||||
logger.info("Invalid listener method {} in {}: {}",
|
||||
info.method.getName(), info.method.getDeclaringClass().getName(), info.errors);
|
||||
continue;
|
||||
}
|
||||
final UntargetedEventHandler untargetedHandler = untargetedMethodHandlers.get(info.method);
|
||||
assert untargetedHandler != null;
|
||||
if (info.eventType == null) {
|
||||
throw new VerifyException("Event type is not present and there are no errors");
|
||||
}
|
||||
|
||||
final EventHandler<Object> handler = untargetedHandler.buildHandler(listener);
|
||||
registrations.add(new HandlerRegistration(pluginContainer, info.order,
|
||||
info.eventType, listener, handler));
|
||||
}
|
||||
|
||||
register(registrations);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unregisterListeners(final Object plugin) {
|
||||
final PluginContainer pluginContainer = pluginManager.ensurePluginContainer(plugin);
|
||||
unregisterIf(registration -> registration.plugin == pluginContainer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unregisterListener(final Object plugin, final Object handler) {
|
||||
final PluginContainer pluginContainer = pluginManager.ensurePluginContainer(plugin);
|
||||
requireNonNull(handler, "handler");
|
||||
unregisterIf(registration ->
|
||||
registration.plugin == pluginContainer && registration.handler == handler);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <E> void unregister(final Object plugin, final EventHandler<E> handler) {
|
||||
unregisterListener(plugin, handler);
|
||||
}
|
||||
|
||||
private void unregisterIf(final Predicate<HandlerRegistration> predicate) {
|
||||
final List<HandlerRegistration> removed = new ArrayList<>();
|
||||
lock.writeLock().lock();
|
||||
try {
|
||||
final Iterator<HandlerRegistration> it = handlersByType.values().iterator();
|
||||
while (it.hasNext()) {
|
||||
final HandlerRegistration registration = it.next();
|
||||
if (predicate.test(registration)) {
|
||||
it.remove();
|
||||
removed.add(registration);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
lock.writeLock().unlock();
|
||||
}
|
||||
|
||||
// Invalidate all the affected event subtypes
|
||||
handlersCache.invalidateAll(removed.stream()
|
||||
.flatMap(registration -> getEventTypes(registration.eventType).stream())
|
||||
.distinct()
|
||||
.collect(Collectors.toList()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void fireAndForget(final Object event) {
|
||||
requireNonNull(event, "event");
|
||||
final HandlersCache handlersCache = this.handlersCache.get(event.getClass());
|
||||
if (handlersCache == null) {
|
||||
// Optimization: nobody's listening.
|
||||
return;
|
||||
}
|
||||
fire(null, event, handlersCache);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <E> CompletableFuture<E> fire(final E event) {
|
||||
requireNonNull(event, "event");
|
||||
final HandlersCache handlersCache = this.handlersCache.get(event.getClass());
|
||||
if (handlersCache == null) {
|
||||
// Optimization: nobody's listening.
|
||||
return CompletableFuture.completedFuture(event);
|
||||
}
|
||||
final CompletableFuture<E> future = new CompletableFuture<>();
|
||||
fire(future, event, handlersCache);
|
||||
return future;
|
||||
}
|
||||
|
||||
private <E> void fire(final @Nullable CompletableFuture<E> future,
|
||||
final E event, final HandlersCache handlersCache) {
|
||||
// In Velocity 1.1.0, all events were fired asynchronously. As Velocity 3.0.0 is intended to be
|
||||
// largely (albeit not 100%) compatible with 1.1.x, we also fire events async. This behavior
|
||||
// will go away in Velocity Polymer.
|
||||
asyncExecutor.execute(() -> fire(future, event, 0, true, handlersCache.handlers));
|
||||
}
|
||||
|
||||
private static final int TASK_STATE_DEFAULT = 0;
|
||||
private static final int TASK_STATE_EXECUTING = 1;
|
||||
private static final int TASK_STATE_CONTINUE_IMMEDIATELY = 2;
|
||||
|
||||
private static final VarHandle CONTINUATION_TASK_RESUMED;
|
||||
private static final VarHandle CONTINUATION_TASK_STATE;
|
||||
|
||||
static {
|
||||
try {
|
||||
CONTINUATION_TASK_RESUMED = MethodHandles.lookup()
|
||||
.findVarHandle(ContinuationTask.class, "resumed", boolean.class);
|
||||
CONTINUATION_TASK_STATE = MethodHandles.lookup()
|
||||
.findVarHandle(ContinuationTask.class, "state", int.class);
|
||||
} catch (final ReflectiveOperationException e) {
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
}
|
||||
|
||||
final class ContinuationTask<E> implements Continuation, Runnable {
|
||||
|
||||
private final EventTask task;
|
||||
private final int index;
|
||||
private final HandlerRegistration[] registrations;
|
||||
private final @Nullable CompletableFuture<E> future;
|
||||
private final boolean currentlyAsync;
|
||||
private final E event;
|
||||
|
||||
// This field is modified via a VarHandle, so this field is used and cannot be final.
|
||||
@SuppressWarnings({"UnusedVariable", "FieldMayBeFinal", "FieldCanBeLocal"})
|
||||
private volatile int state = TASK_STATE_DEFAULT;
|
||||
|
||||
// This field is modified via a VarHandle, so this field is used and cannot be final.
|
||||
@SuppressWarnings({"UnusedVariable", "FieldMayBeFinal"})
|
||||
private volatile boolean resumed = false;
|
||||
|
||||
private ContinuationTask(
|
||||
final EventTask task,
|
||||
final HandlerRegistration[] registrations,
|
||||
final @Nullable CompletableFuture<E> future,
|
||||
final E event,
|
||||
final int index,
|
||||
final boolean currentlyAsync) {
|
||||
this.task = task;
|
||||
this.registrations = registrations;
|
||||
this.future = future;
|
||||
this.event = event;
|
||||
this.index = index;
|
||||
this.currentlyAsync = currentlyAsync;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
if (execute()) {
|
||||
fire(future, event, index + 1, currentlyAsync, registrations);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes the task and returns whether the next one should be executed
|
||||
* immediately after this one without scheduling.
|
||||
*/
|
||||
boolean execute() {
|
||||
state = TASK_STATE_EXECUTING;
|
||||
try {
|
||||
task.execute(this);
|
||||
} catch (final Throwable t) {
|
||||
// validateOnlyOnce false here so don't get an exception if the
|
||||
// continuation was resumed before
|
||||
resume(t, false);
|
||||
}
|
||||
return !CONTINUATION_TASK_STATE.compareAndSet(
|
||||
this, TASK_STATE_EXECUTING, TASK_STATE_DEFAULT);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resume() {
|
||||
resume(null, true);
|
||||
}
|
||||
|
||||
void resume(final @Nullable Throwable exception, final boolean validateOnlyOnce) {
|
||||
final boolean changed = CONTINUATION_TASK_RESUMED.compareAndSet(this, false, true);
|
||||
// Only allow the continuation to be resumed once
|
||||
if (!changed && validateOnlyOnce) {
|
||||
throw new IllegalStateException("The continuation can only be resumed once.");
|
||||
}
|
||||
final HandlerRegistration registration = registrations[index];
|
||||
if (exception != null) {
|
||||
logHandlerException(registration, exception);
|
||||
}
|
||||
if (!changed) {
|
||||
return;
|
||||
}
|
||||
if (index + 1 == registrations.length) {
|
||||
// Optimization: don't schedule a task just to complete the future
|
||||
if (future != null) {
|
||||
future.complete(event);
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (!CONTINUATION_TASK_STATE.compareAndSet(
|
||||
this, TASK_STATE_EXECUTING, TASK_STATE_CONTINUE_IMMEDIATELY)) {
|
||||
asyncExecutor.execute(() -> fire(future, event, index + 1, true, registrations));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resumeWithException(final Throwable exception) {
|
||||
resume(requireNonNull(exception, "exception"), true);
|
||||
}
|
||||
}
|
||||
|
||||
private <E> void fire(final @Nullable CompletableFuture<E> future, final E event,
|
||||
final int offset, final boolean currentlyAsync, final HandlerRegistration[] registrations) {
|
||||
for (int i = offset; i < registrations.length; i++) {
|
||||
final HandlerRegistration registration = registrations[i];
|
||||
try {
|
||||
final EventTask eventTask = registration.handler.executeAsync(event);
|
||||
if (eventTask == null) {
|
||||
continue;
|
||||
}
|
||||
final ContinuationTask<E> continuationTask = new ContinuationTask<>(eventTask,
|
||||
registrations, future, event, i, currentlyAsync);
|
||||
if (currentlyAsync || !eventTask.requiresAsync()) {
|
||||
if (continuationTask.execute()) {
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
asyncExecutor.execute(continuationTask);
|
||||
}
|
||||
// fire will continue in another thread once the async task is
|
||||
// executed and the continuation is resumed
|
||||
return;
|
||||
} catch (final Throwable t) {
|
||||
logHandlerException(registration, t);
|
||||
}
|
||||
}
|
||||
if (future != null) {
|
||||
future.complete(event);
|
||||
}
|
||||
}
|
||||
|
||||
private static void logHandlerException(
|
||||
final HandlerRegistration registration, final Throwable t) {
|
||||
logger.error("Couldn't pass {} to {}", registration.eventType.getSimpleName(),
|
||||
registration.plugin.getDescription().getId(), t);
|
||||
}
|
||||
|
||||
public boolean shutdown() throws InterruptedException {
|
||||
asyncExecutor.shutdown();
|
||||
return asyncExecutor.awaitTermination(10, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
public ExecutorService getAsyncExecutor() {
|
||||
return asyncExecutor;
|
||||
}
|
||||
}
|
@ -1,248 +0,0 @@
|
||||
/*
|
||||
* Copyright (C) 2018 Velocity Contributors
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package com.velocitypowered.proxy.plugin;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ListMultimap;
|
||||
import com.google.common.collect.Multimaps;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import com.velocitypowered.api.event.EventHandler;
|
||||
import com.velocitypowered.api.event.EventManager;
|
||||
import com.velocitypowered.api.event.PostOrder;
|
||||
import com.velocitypowered.api.event.Subscribe;
|
||||
import com.velocitypowered.api.event.proxy.ProxyShutdownEvent;
|
||||
import com.velocitypowered.api.plugin.PluginManager;
|
||||
import java.lang.reflect.Method;
|
||||
import java.net.URL;
|
||||
import java.security.AccessController;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.IdentityHashMap;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import net.kyori.event.EventSubscriber;
|
||||
import net.kyori.event.PostResult;
|
||||
import net.kyori.event.SimpleEventBus;
|
||||
import net.kyori.event.method.MethodScanner;
|
||||
import net.kyori.event.method.MethodSubscriptionAdapter;
|
||||
import net.kyori.event.method.SimpleMethodSubscriptionAdapter;
|
||||
import net.kyori.event.method.asm.ASMEventExecutorFactory;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.checkerframework.checker.nullness.qual.NonNull;
|
||||
|
||||
public class VelocityEventManager implements EventManager {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(VelocityEventManager.class);
|
||||
|
||||
private final ListMultimap<Object, Object> registeredListenersByPlugin = Multimaps
|
||||
.synchronizedListMultimap(Multimaps.newListMultimap(new IdentityHashMap<>(), ArrayList::new));
|
||||
private final ListMultimap<Object, EventHandler<?>> registeredHandlersByPlugin = Multimaps
|
||||
.synchronizedListMultimap(Multimaps.newListMultimap(new IdentityHashMap<>(), ArrayList::new));
|
||||
private final SimpleEventBus<Object> bus;
|
||||
private final MethodSubscriptionAdapter<Object> methodAdapter;
|
||||
private final ExecutorService service;
|
||||
private final PluginManager pluginManager;
|
||||
|
||||
/**
|
||||
* Initializes the Velocity event manager.
|
||||
*
|
||||
* @param pluginManager a reference to the Velocity plugin manager
|
||||
*/
|
||||
public VelocityEventManager(PluginManager pluginManager) {
|
||||
// Expose the event executors to the plugins - required in order for the generated ASM classes
|
||||
// to work.
|
||||
PluginClassLoader cl = AccessController.doPrivileged(
|
||||
(PrivilegedAction<PluginClassLoader>) () -> new PluginClassLoader(new URL[0]));
|
||||
cl.addToClassloaders();
|
||||
|
||||
// Initialize the event bus.
|
||||
this.bus = new SimpleEventBus<Object>(Object.class) {
|
||||
@Override
|
||||
protected boolean shouldPost(@NonNull Object event, @NonNull EventSubscriber<?> subscriber) {
|
||||
// Velocity doesn't use Cancellable or generic events, so we can skip those checks.
|
||||
return true;
|
||||
}
|
||||
};
|
||||
this.methodAdapter = new SimpleMethodSubscriptionAdapter<>(bus,
|
||||
new ASMEventExecutorFactory<>(cl),
|
||||
new VelocityMethodScanner());
|
||||
this.pluginManager = pluginManager;
|
||||
this.service = Executors
|
||||
.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactoryBuilder()
|
||||
.setNameFormat("Velocity Event Executor - #%d").setDaemon(true).build());
|
||||
}
|
||||
|
||||
private void ensurePlugin(Object plugin) {
|
||||
Preconditions.checkNotNull(plugin, "plugin");
|
||||
Preconditions.checkArgument(pluginManager.fromInstance(plugin).isPresent(),
|
||||
"Specified plugin is not loaded");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void register(Object plugin, Object listener) {
|
||||
ensurePlugin(plugin);
|
||||
Preconditions.checkNotNull(listener, "listener");
|
||||
if (plugin == listener && registeredListenersByPlugin.containsEntry(plugin, plugin)) {
|
||||
throw new IllegalArgumentException("The plugin main instance is automatically registered.");
|
||||
}
|
||||
|
||||
registeredListenersByPlugin.put(plugin, listener);
|
||||
methodAdapter.register(listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("type.argument.type.incompatible")
|
||||
public <E> void register(Object plugin, Class<E> eventClass, PostOrder postOrder,
|
||||
EventHandler<E> handler) {
|
||||
ensurePlugin(plugin);
|
||||
Preconditions.checkNotNull(eventClass, "eventClass");
|
||||
Preconditions.checkNotNull(postOrder, "postOrder");
|
||||
Preconditions.checkNotNull(handler, "listener");
|
||||
|
||||
registeredHandlersByPlugin.put(plugin, handler);
|
||||
bus.register(eventClass, new KyoriToVelocityHandler<>(handler, postOrder));
|
||||
}
|
||||
|
||||
@Override
|
||||
public <E> CompletableFuture<E> fire(E event) {
|
||||
if (event == null) {
|
||||
throw new NullPointerException("event");
|
||||
}
|
||||
if (!bus.hasSubscribers(event.getClass())) {
|
||||
// Optimization: nobody's listening.
|
||||
return CompletableFuture.completedFuture(event);
|
||||
}
|
||||
|
||||
return CompletableFuture.supplyAsync(() -> {
|
||||
fireEvent(event);
|
||||
return event;
|
||||
}, service);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void fireAndForget(Object event) {
|
||||
if (event == null) {
|
||||
throw new NullPointerException("event");
|
||||
}
|
||||
if (!bus.hasSubscribers(event.getClass())) {
|
||||
// Optimization: nobody's listening.
|
||||
return;
|
||||
}
|
||||
service.execute(() -> fireEvent(event));
|
||||
}
|
||||
|
||||
private void fireEvent(Object event) {
|
||||
PostResult result = bus.post(event);
|
||||
if (!result.exceptions().isEmpty()) {
|
||||
logger.error("Some errors occurred whilst posting event {}.", event);
|
||||
int i = 0;
|
||||
for (Throwable exception : result.exceptions().values()) {
|
||||
logger.error("#{}: \n", ++i, exception);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void unregisterHandler(EventHandler<?> handler) {
|
||||
bus.unregister(s -> s instanceof KyoriToVelocityHandler
|
||||
&& ((KyoriToVelocityHandler<?>) s).handler == handler);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unregisterListeners(Object plugin) {
|
||||
ensurePlugin(plugin);
|
||||
Collection<Object> listeners = registeredListenersByPlugin.removeAll(plugin);
|
||||
listeners.forEach(methodAdapter::unregister);
|
||||
Collection<EventHandler<?>> handlers = registeredHandlersByPlugin.removeAll(plugin);
|
||||
handlers.forEach(this::unregisterHandler);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unregisterListener(Object plugin, Object listener) {
|
||||
ensurePlugin(plugin);
|
||||
Preconditions.checkNotNull(listener, "listener");
|
||||
if (registeredListenersByPlugin.remove(plugin, listener)) {
|
||||
methodAdapter.unregister(listener);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public <E> void unregister(Object plugin, EventHandler<E> handler) {
|
||||
ensurePlugin(plugin);
|
||||
Preconditions.checkNotNull(handler, "listener");
|
||||
if (registeredHandlersByPlugin.remove(plugin, handler)) {
|
||||
unregisterHandler(handler);
|
||||
}
|
||||
}
|
||||
|
||||
public boolean shutdown() throws InterruptedException {
|
||||
service.shutdown();
|
||||
return service.awaitTermination(10, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
public void fireShutdownEvent() {
|
||||
// We shut down the proxy already, so the fact this executes in the main thread is irrelevant.
|
||||
fireEvent(new ProxyShutdownEvent());
|
||||
}
|
||||
|
||||
public ExecutorService getService() {
|
||||
return service;
|
||||
}
|
||||
|
||||
private static class VelocityMethodScanner implements MethodScanner<Object> {
|
||||
|
||||
@Override
|
||||
public boolean shouldRegister(@NonNull Object listener, @NonNull Method method) {
|
||||
return method.isAnnotationPresent(Subscribe.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int postOrder(@NonNull Object listener, @NonNull Method method) {
|
||||
return method.getAnnotation(Subscribe.class).order().ordinal();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean consumeCancelledEvents(@NonNull Object listener, @NonNull Method method) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
private static class KyoriToVelocityHandler<E> implements EventSubscriber<E> {
|
||||
|
||||
private final EventHandler<E> handler;
|
||||
private final int postOrder;
|
||||
|
||||
private KyoriToVelocityHandler(EventHandler<E> handler, PostOrder postOrder) {
|
||||
this.handler = handler;
|
||||
this.postOrder = postOrder.ordinal();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void invoke(@NonNull E event) {
|
||||
handler.execute(event);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int postOrder() {
|
||||
return postOrder;
|
||||
}
|
||||
}
|
||||
}
|
@ -20,7 +20,6 @@ package com.velocitypowered.proxy.command;
|
||||
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.junit.jupiter.api.Assertions.fail;
|
||||
|
||||
@ -38,8 +37,8 @@ import com.velocitypowered.api.command.CommandMeta;
|
||||
import com.velocitypowered.api.command.CommandSource;
|
||||
import com.velocitypowered.api.command.RawCommand;
|
||||
import com.velocitypowered.api.command.SimpleCommand;
|
||||
import com.velocitypowered.proxy.event.VelocityEventManager;
|
||||
import com.velocitypowered.proxy.plugin.MockEventManager;
|
||||
import com.velocitypowered.proxy.plugin.VelocityEventManager;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
288
proxy/src/test/java/com/velocitypowered/proxy/event/EventTest.java
Normale Datei
288
proxy/src/test/java/com/velocitypowered/proxy/event/EventTest.java
Normale Datei
@ -0,0 +1,288 @@
|
||||
/*
|
||||
* Copyright (C) 2018 Velocity Contributors
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package com.velocitypowered.proxy.event;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
import com.google.common.reflect.TypeToken;
|
||||
import com.velocitypowered.api.event.Continuation;
|
||||
import com.velocitypowered.api.event.EventTask;
|
||||
import com.velocitypowered.api.event.PostOrder;
|
||||
import com.velocitypowered.api.event.Subscribe;
|
||||
import com.velocitypowered.proxy.testutil.FakePluginManager;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.TestInstance;
|
||||
|
||||
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
|
||||
public class EventTest {
|
||||
|
||||
private final VelocityEventManager eventManager =
|
||||
new VelocityEventManager(new FakePluginManager());
|
||||
|
||||
@AfterAll
|
||||
void shutdown() throws Exception {
|
||||
eventManager.shutdown();
|
||||
}
|
||||
|
||||
static final class TestEvent {
|
||||
}
|
||||
|
||||
static void assertAsyncThread(final Thread thread) {
|
||||
assertTrue(thread.getName().contains("Velocity Async Event Executor"));
|
||||
}
|
||||
|
||||
private void handleMethodListener(final Object listener) throws Exception {
|
||||
eventManager.register(FakePluginManager.PLUGIN_A, listener);
|
||||
try {
|
||||
eventManager.fire(new TestEvent()).get();
|
||||
} finally {
|
||||
eventManager.unregisterListeners(FakePluginManager.PLUGIN_A);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void testAlwaysAsync() throws Exception {
|
||||
final AlwaysAsyncListener listener = new AlwaysAsyncListener();
|
||||
handleMethodListener(listener);
|
||||
assertAsyncThread(listener.threadA);
|
||||
assertAsyncThread(listener.threadB);
|
||||
assertAsyncThread(listener.threadC);
|
||||
assertEquals(3, listener.result);
|
||||
}
|
||||
|
||||
static final class AlwaysAsyncListener {
|
||||
|
||||
@MonotonicNonNull Thread threadA;
|
||||
@MonotonicNonNull Thread threadB;
|
||||
@MonotonicNonNull Thread threadC;
|
||||
int result;
|
||||
|
||||
@Subscribe
|
||||
void firstAsync(TestEvent event) {
|
||||
result++;
|
||||
threadA = Thread.currentThread();
|
||||
}
|
||||
|
||||
@Subscribe
|
||||
EventTask secondAsync(TestEvent event) {
|
||||
threadB = Thread.currentThread();
|
||||
return EventTask.async(() -> result++);
|
||||
}
|
||||
|
||||
@Subscribe
|
||||
void thirdAsync(TestEvent event) {
|
||||
result++;
|
||||
threadC = Thread.currentThread();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void testContinuation() throws Exception {
|
||||
final ContinuationListener listener = new ContinuationListener();
|
||||
handleMethodListener(listener);
|
||||
assertAsyncThread(listener.threadA);
|
||||
assertAsyncThread(listener.threadB);
|
||||
assertAsyncThread(listener.threadC);
|
||||
assertEquals(2, listener.value.get());
|
||||
}
|
||||
|
||||
static final class ContinuationListener {
|
||||
|
||||
@MonotonicNonNull Thread threadA;
|
||||
@MonotonicNonNull Thread threadB;
|
||||
@MonotonicNonNull Thread threadC;
|
||||
|
||||
final AtomicInteger value = new AtomicInteger();
|
||||
|
||||
@Subscribe(order = PostOrder.EARLY)
|
||||
EventTask continuation(TestEvent event) {
|
||||
threadA = Thread.currentThread();
|
||||
return EventTask.withContinuation(continuation -> {
|
||||
value.incrementAndGet();
|
||||
threadB = Thread.currentThread();
|
||||
new Thread(() -> {
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
value.incrementAndGet();
|
||||
continuation.resume();
|
||||
}).start();
|
||||
});
|
||||
}
|
||||
|
||||
@Subscribe(order = PostOrder.LATE)
|
||||
void afterContinuation(TestEvent event) {
|
||||
threadC = Thread.currentThread();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void testResumeContinuationImmediately() throws Exception {
|
||||
final ResumeContinuationImmediatelyListener listener =
|
||||
new ResumeContinuationImmediatelyListener();
|
||||
handleMethodListener(listener);
|
||||
assertAsyncThread(listener.threadA);
|
||||
assertAsyncThread(listener.threadB);
|
||||
assertAsyncThread(listener.threadC);
|
||||
assertEquals(2, listener.result);
|
||||
}
|
||||
|
||||
static final class ResumeContinuationImmediatelyListener {
|
||||
|
||||
@MonotonicNonNull Thread threadA;
|
||||
@MonotonicNonNull Thread threadB;
|
||||
@MonotonicNonNull Thread threadC;
|
||||
int result;
|
||||
|
||||
@Subscribe(order = PostOrder.EARLY)
|
||||
EventTask continuation(TestEvent event) {
|
||||
threadA = Thread.currentThread();
|
||||
return EventTask.withContinuation(continuation -> {
|
||||
threadB = Thread.currentThread();
|
||||
result++;
|
||||
continuation.resume();
|
||||
});
|
||||
}
|
||||
|
||||
@Subscribe(order = PostOrder.LATE)
|
||||
void afterContinuation(TestEvent event) {
|
||||
threadC = Thread.currentThread();
|
||||
result++;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void testContinuationParameter() throws Exception {
|
||||
final ContinuationParameterListener listener = new ContinuationParameterListener();
|
||||
handleMethodListener(listener);
|
||||
assertAsyncThread(listener.threadA);
|
||||
assertAsyncThread(listener.threadB);
|
||||
assertAsyncThread(listener.threadC);
|
||||
assertEquals(3, listener.result.get());
|
||||
}
|
||||
|
||||
static final class ContinuationParameterListener {
|
||||
|
||||
@MonotonicNonNull Thread threadA;
|
||||
@MonotonicNonNull Thread threadB;
|
||||
@MonotonicNonNull Thread threadC;
|
||||
|
||||
final AtomicInteger result = new AtomicInteger();
|
||||
|
||||
@Subscribe
|
||||
void resume(TestEvent event, Continuation continuation) {
|
||||
threadA = Thread.currentThread();
|
||||
result.incrementAndGet();
|
||||
continuation.resume();
|
||||
}
|
||||
|
||||
@Subscribe(order = PostOrder.LATE)
|
||||
void resumeFromCustomThread(TestEvent event, Continuation continuation) {
|
||||
threadB = Thread.currentThread();
|
||||
new Thread(() -> {
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
result.incrementAndGet();
|
||||
continuation.resume();
|
||||
}).start();
|
||||
}
|
||||
|
||||
@Subscribe(order = PostOrder.LAST)
|
||||
void afterCustomThread(TestEvent event, Continuation continuation) {
|
||||
threadC = Thread.currentThread();
|
||||
result.incrementAndGet();
|
||||
continuation.resume();
|
||||
}
|
||||
}
|
||||
|
||||
interface FancyContinuation {
|
||||
|
||||
void resume();
|
||||
|
||||
void resumeWithError(Exception exception);
|
||||
}
|
||||
|
||||
private static final class FancyContinuationImpl implements FancyContinuation {
|
||||
|
||||
private final Continuation continuation;
|
||||
|
||||
private FancyContinuationImpl(final Continuation continuation) {
|
||||
this.continuation = continuation;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resume() {
|
||||
continuation.resume();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resumeWithError(final Exception exception) {
|
||||
continuation.resumeWithException(exception);
|
||||
}
|
||||
}
|
||||
|
||||
interface TriConsumer<A, B, C> {
|
||||
|
||||
void accept(A a, B b, C c);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testFancyContinuationParameter() throws Exception {
|
||||
eventManager.registerHandlerAdapter(
|
||||
"fancy",
|
||||
method -> method.getParameterCount() > 1
|
||||
&& method.getParameterTypes()[1] == FancyContinuation.class,
|
||||
(method, errors) -> {
|
||||
if (method.getReturnType() != void.class) {
|
||||
errors.add("method return type must be void");
|
||||
}
|
||||
if (method.getParameterCount() != 2) {
|
||||
errors.add("method must have exactly two parameters, the first is the event and "
|
||||
+ "the second is the fancy continuation");
|
||||
}
|
||||
},
|
||||
new TypeToken<TriConsumer<Object, Object, FancyContinuation>>() {},
|
||||
invokeFunction -> (instance, event) ->
|
||||
EventTask.withContinuation(continuation ->
|
||||
invokeFunction.accept(instance, event, new FancyContinuationImpl(continuation))
|
||||
));
|
||||
final FancyContinuationListener listener = new FancyContinuationListener();
|
||||
handleMethodListener(listener);
|
||||
assertEquals(1, listener.result);
|
||||
}
|
||||
|
||||
static final class FancyContinuationListener {
|
||||
|
||||
int result;
|
||||
|
||||
@Subscribe
|
||||
void continuation(TestEvent event, FancyContinuation continuation) {
|
||||
result++;
|
||||
continuation.resume();
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,30 @@
|
||||
/*
|
||||
* Copyright (C) 2018 Velocity Contributors
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package com.velocitypowered.proxy.event;
|
||||
|
||||
import com.velocitypowered.proxy.plugin.MockPluginManager;
|
||||
|
||||
/**
|
||||
* A mock {@link VelocityEventManager}. Must be shutdown after use!
|
||||
*/
|
||||
public class MockEventManager extends VelocityEventManager {
|
||||
|
||||
public MockEventManager() {
|
||||
super(MockPluginManager.INSTANCE);
|
||||
}
|
||||
}
|
@ -17,6 +17,8 @@
|
||||
|
||||
package com.velocitypowered.proxy.plugin;
|
||||
|
||||
import com.velocitypowered.proxy.event.VelocityEventManager;
|
||||
|
||||
/**
|
||||
* A mock {@link VelocityEventManager}. Must be shutdown after use!
|
||||
*/
|
||||
|
Laden…
In neuem Issue referenzieren
Einen Benutzer sperren