Mirror von
https://github.com/PaperMC/Velocity.git
synchronisiert 2024-11-17 05:20:14 +01:00
Merge pull request #384 from Cybermaxke/dev/events
Implement async event tasks.
Dieser Commit ist enthalten in:
Commit
1b551a8e1c
@ -9,8 +9,8 @@ apply from: '../gradle/publish.gradle'
|
|||||||
apply plugin: 'com.github.johnrengelman.shadow'
|
apply plugin: 'com.github.johnrengelman.shadow'
|
||||||
|
|
||||||
java {
|
java {
|
||||||
sourceCompatibility = JavaVersion.VERSION_1_8
|
sourceCompatibility = JavaVersion.VERSION_11
|
||||||
targetCompatibility = JavaVersion.VERSION_1_8
|
targetCompatibility = JavaVersion.VERSION_11
|
||||||
}
|
}
|
||||||
|
|
||||||
sourceSets {
|
sourceSets {
|
||||||
@ -19,11 +19,6 @@ sourceSets {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
java {
|
|
||||||
sourceCompatibility = JavaVersion.VERSION_1_8
|
|
||||||
targetCompatibility = JavaVersion.VERSION_1_8
|
|
||||||
}
|
|
||||||
|
|
||||||
dependencies {
|
dependencies {
|
||||||
api 'com.google.code.gson:gson:2.8.6'
|
api 'com.google.code.gson:gson:2.8.6'
|
||||||
api "com.google.guava:guava:${guavaVersion}"
|
api "com.google.guava:guava:${guavaVersion}"
|
||||||
|
18
api/src/main/java/com/velocitypowered/api/event/Continuation.java
Normale Datei
18
api/src/main/java/com/velocitypowered/api/event/Continuation.java
Normale Datei
@ -0,0 +1,18 @@
|
|||||||
|
package com.velocitypowered.api.event;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Represents a continuation of a paused event handler. Any of the resume methods
|
||||||
|
* may only be called once otherwise an {@link IllegalStateException} is expected.
|
||||||
|
*/
|
||||||
|
public interface Continuation {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Resumes the continuation.
|
||||||
|
*/
|
||||||
|
void resume();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Resumes the continuation after the executed task failed.
|
||||||
|
*/
|
||||||
|
void resumeWithException(Throwable exception);
|
||||||
|
}
|
@ -7,5 +7,5 @@ package com.velocitypowered.api.event;
|
|||||||
@FunctionalInterface
|
@FunctionalInterface
|
||||||
public interface EventHandler<E> {
|
public interface EventHandler<E> {
|
||||||
|
|
||||||
void execute(E event);
|
EventTask execute(E event);
|
||||||
}
|
}
|
||||||
|
181
api/src/main/java/com/velocitypowered/api/event/EventTask.java
Normale Datei
181
api/src/main/java/com/velocitypowered/api/event/EventTask.java
Normale Datei
@ -0,0 +1,181 @@
|
|||||||
|
package com.velocitypowered.api.event;
|
||||||
|
|
||||||
|
import static java.util.Objects.requireNonNull;
|
||||||
|
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Represents a task that can be returned by a {@link EventHandler} which allows event handling to
|
||||||
|
* be suspended and resumed at a later time, and executing event handlers completely or partially
|
||||||
|
* asynchronously.
|
||||||
|
*
|
||||||
|
* <p>By default will all event handlers be executed on the thread the event was posted, using
|
||||||
|
* event tasks this behavior can be altered.</p>
|
||||||
|
*/
|
||||||
|
public abstract class EventTask {
|
||||||
|
|
||||||
|
EventTask() {
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Whether this {@link EventTask} is required to be called asynchronously.
|
||||||
|
*
|
||||||
|
* <p>If this method returns {@code true}, the event task is guaranteed to be executed
|
||||||
|
* asynchronously from the current thread. Otherwise, the event task may be executed on the
|
||||||
|
* current thread or asynchronously.</p>
|
||||||
|
*
|
||||||
|
* @return Requires async
|
||||||
|
*/
|
||||||
|
public abstract boolean requiresAsync();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Represents a basic {@link EventTask}. The execution of the event handlers will resume after
|
||||||
|
* this basic task is executed using {@link #run()}.
|
||||||
|
*/
|
||||||
|
public abstract static class Basic extends EventTask {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Runs the task.
|
||||||
|
*/
|
||||||
|
public abstract void run();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Represents an {@link EventTask} which receives a {@link Continuation} through
|
||||||
|
* {@link #run(Continuation)}. The continuation must be notified when the task is
|
||||||
|
* completed, either with {@link Continuation#resume()} if the task was successful or
|
||||||
|
* {@link Continuation#resumeWithException(Throwable)} if an exception occurred.
|
||||||
|
*
|
||||||
|
* <p>The {@link Continuation} may only be resumed once, or an
|
||||||
|
* {@link IllegalStateException} is expected.</p>
|
||||||
|
*
|
||||||
|
* <p>The {@link Continuation} doesn't need to be notified during the execution of
|
||||||
|
* {@link #run(Continuation)}, this can happen at a later point in time and from another
|
||||||
|
* thread.</p>
|
||||||
|
*/
|
||||||
|
public abstract static class WithContinuation extends EventTask {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Runs this async task with the given continuation.
|
||||||
|
*
|
||||||
|
* @param continuation The continuation
|
||||||
|
*/
|
||||||
|
public abstract void run(Continuation continuation);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a basic {@link EventTask} from the given {@link Runnable}. The task isn't guaranteed
|
||||||
|
* to be executed asynchronously ({@link #requiresAsync()} always returns {@code false}).
|
||||||
|
*
|
||||||
|
* @param task The task
|
||||||
|
* @return The event task
|
||||||
|
*/
|
||||||
|
public static EventTask.Basic of(final Runnable task) {
|
||||||
|
requireNonNull(task, "task");
|
||||||
|
return new Basic() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
task.run();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean requiresAsync() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a basic async {@link EventTask} from the given {@link Runnable}. The task is guaranteed
|
||||||
|
* to be executed asynchronously ({@link #requiresAsync()} always returns {@code true}).
|
||||||
|
*
|
||||||
|
* @param task The task
|
||||||
|
* @return The async event task
|
||||||
|
*/
|
||||||
|
public static EventTask.Basic async(final Runnable task) {
|
||||||
|
requireNonNull(task, "task");
|
||||||
|
return new Basic() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
task.run();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean requiresAsync() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates an continuation based {@link EventTask} from the given {@link Consumer}. The task isn't
|
||||||
|
* guaranteed to be executed asynchronously ({@link #requiresAsync()} always returns
|
||||||
|
* {@code false}).
|
||||||
|
*
|
||||||
|
* @param task The task to execute
|
||||||
|
* @return The event task
|
||||||
|
*/
|
||||||
|
public static EventTask.WithContinuation withContinuation(
|
||||||
|
final Consumer<Continuation> task) {
|
||||||
|
requireNonNull(task, "task");
|
||||||
|
return new WithContinuation() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run(final Continuation continuation) {
|
||||||
|
task.accept(continuation);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean requiresAsync() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates an async continuation based {@link EventTask} from the given {@link Consumer}. The task
|
||||||
|
* is guaranteed to be executed asynchronously ({@link #requiresAsync()} always returns
|
||||||
|
* {@code false}).
|
||||||
|
*
|
||||||
|
* @param task The task to execute
|
||||||
|
* @return The event task
|
||||||
|
*/
|
||||||
|
public static EventTask.WithContinuation asyncWithContinuation(
|
||||||
|
final Consumer<Continuation> task) {
|
||||||
|
requireNonNull(task, "task");
|
||||||
|
return new WithContinuation() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run(final Continuation continuation) {
|
||||||
|
task.accept(continuation);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean requiresAsync() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates an continuation based {@link EventTask} for the given {@link CompletableFuture}. The
|
||||||
|
* continuation will be notified once the given future is completed.
|
||||||
|
*
|
||||||
|
* @param future The task to wait for
|
||||||
|
* @return The event task
|
||||||
|
*/
|
||||||
|
public static EventTask.WithContinuation resumeWhenComplete(
|
||||||
|
final CompletableFuture<?> future) {
|
||||||
|
requireNonNull(future, "future");
|
||||||
|
return withContinuation(continuation -> future.whenComplete((result, cause) -> {
|
||||||
|
if (cause != null) {
|
||||||
|
continuation.resumeWithException(cause);
|
||||||
|
} else {
|
||||||
|
continuation.resume();
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
}
|
@ -13,10 +13,23 @@ import java.lang.annotation.Target;
|
|||||||
public @interface Subscribe {
|
public @interface Subscribe {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The order events will be posted to this listener.
|
* The order events will be posted to this handler.
|
||||||
*
|
*
|
||||||
* @return the order
|
* @return the order
|
||||||
*/
|
*/
|
||||||
short order() default PostOrder.NORMAL;
|
short order() default PostOrder.NORMAL;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Whether the handler is required to be called asynchronously.
|
||||||
|
*
|
||||||
|
* <p>If this method returns {@code true}, the method is guaranteed to be executed
|
||||||
|
* asynchronously from the current thread. Otherwise, the handler may be executed on the
|
||||||
|
* current thread or asynchronously.</p>
|
||||||
|
*
|
||||||
|
* <p>If any method handler targeting an event type is marked with {@code true}, then every
|
||||||
|
* handler targeting that event type will be executed asynchronously.</p>
|
||||||
|
*
|
||||||
|
* @return Requires async
|
||||||
|
*/
|
||||||
|
boolean async() default false;
|
||||||
}
|
}
|
||||||
|
@ -9,8 +9,8 @@ apply from: '../gradle/checkstyle.gradle'
|
|||||||
apply plugin: 'com.github.johnrengelman.shadow'
|
apply plugin: 'com.github.johnrengelman.shadow'
|
||||||
|
|
||||||
java {
|
java {
|
||||||
sourceCompatibility = JavaVersion.VERSION_1_8
|
sourceCompatibility = JavaVersion.VERSION_11
|
||||||
targetCompatibility = JavaVersion.VERSION_1_8
|
targetCompatibility = JavaVersion.VERSION_11
|
||||||
}
|
}
|
||||||
|
|
||||||
test {
|
test {
|
||||||
@ -69,7 +69,6 @@ dependencies {
|
|||||||
runtimeOnly 'com.lmax:disruptor:3.4.2' // Async loggers
|
runtimeOnly 'com.lmax:disruptor:3.4.2' // Async loggers
|
||||||
|
|
||||||
implementation 'it.unimi.dsi:fastutil:8.4.1'
|
implementation 'it.unimi.dsi:fastutil:8.4.1'
|
||||||
implementation 'net.kyori:event-method-asm:4.0.0-SNAPSHOT'
|
|
||||||
implementation 'net.kyori:adventure-nbt:4.0.0-SNAPSHOT'
|
implementation 'net.kyori:adventure-nbt:4.0.0-SNAPSHOT'
|
||||||
|
|
||||||
implementation 'org.asynchttpclient:async-http-client:2.12.1'
|
implementation 'org.asynchttpclient:async-http-client:2.12.1'
|
||||||
@ -78,6 +77,10 @@ dependencies {
|
|||||||
|
|
||||||
implementation 'com.electronwill.night-config:toml:3.6.3'
|
implementation 'com.electronwill.night-config:toml:3.6.3'
|
||||||
|
|
||||||
|
implementation 'org.lanternpowered:lmbda:2.0.0-SNAPSHOT'
|
||||||
|
|
||||||
|
implementation 'com.github.ben-manes.caffeine:caffeine:2.8.8'
|
||||||
|
|
||||||
compileOnly 'com.github.spotbugs:spotbugs-annotations:4.1.2'
|
compileOnly 'com.github.spotbugs:spotbugs-annotations:4.1.2'
|
||||||
|
|
||||||
testImplementation "org.junit.jupiter:junit-jupiter-api:${junitVersion}"
|
testImplementation "org.junit.jupiter:junit-jupiter-api:${junitVersion}"
|
||||||
@ -129,3 +132,7 @@ shadowJar {
|
|||||||
artifacts {
|
artifacts {
|
||||||
archives shadowJar
|
archives shadowJar
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test {
|
||||||
|
useJUnitPlatform()
|
||||||
|
}
|
||||||
|
@ -8,6 +8,7 @@ import com.google.gson.GsonBuilder;
|
|||||||
import com.velocitypowered.api.event.EventManager;
|
import com.velocitypowered.api.event.EventManager;
|
||||||
import com.velocitypowered.api.event.lifecycle.ProxyInitializeEvent;
|
import com.velocitypowered.api.event.lifecycle.ProxyInitializeEvent;
|
||||||
import com.velocitypowered.api.event.lifecycle.ProxyReloadEvent;
|
import com.velocitypowered.api.event.lifecycle.ProxyReloadEvent;
|
||||||
|
import com.velocitypowered.api.event.lifecycle.ProxyShutdownEvent;
|
||||||
import com.velocitypowered.api.network.ProtocolVersion;
|
import com.velocitypowered.api.network.ProtocolVersion;
|
||||||
import com.velocitypowered.api.plugin.PluginContainer;
|
import com.velocitypowered.api.plugin.PluginContainer;
|
||||||
import com.velocitypowered.api.plugin.PluginManager;
|
import com.velocitypowered.api.plugin.PluginManager;
|
||||||
@ -26,11 +27,11 @@ import com.velocitypowered.proxy.command.builtin.VelocityCommand;
|
|||||||
import com.velocitypowered.proxy.config.VelocityConfiguration;
|
import com.velocitypowered.proxy.config.VelocityConfiguration;
|
||||||
import com.velocitypowered.proxy.connection.client.ConnectedPlayer;
|
import com.velocitypowered.proxy.connection.client.ConnectedPlayer;
|
||||||
import com.velocitypowered.proxy.console.VelocityConsole;
|
import com.velocitypowered.proxy.console.VelocityConsole;
|
||||||
|
import com.velocitypowered.proxy.event.VelocityEventManager;
|
||||||
import com.velocitypowered.proxy.network.ConnectionManager;
|
import com.velocitypowered.proxy.network.ConnectionManager;
|
||||||
import com.velocitypowered.proxy.network.ProtocolUtils;
|
import com.velocitypowered.proxy.network.ProtocolUtils;
|
||||||
import com.velocitypowered.proxy.network.serialization.FaviconSerializer;
|
import com.velocitypowered.proxy.network.serialization.FaviconSerializer;
|
||||||
import com.velocitypowered.proxy.network.serialization.GameProfileSerializer;
|
import com.velocitypowered.proxy.network.serialization.GameProfileSerializer;
|
||||||
import com.velocitypowered.proxy.plugin.VelocityEventManager;
|
|
||||||
import com.velocitypowered.proxy.plugin.VelocityPluginManager;
|
import com.velocitypowered.proxy.plugin.VelocityPluginManager;
|
||||||
import com.velocitypowered.proxy.scheduler.VelocityScheduler;
|
import com.velocitypowered.proxy.scheduler.VelocityScheduler;
|
||||||
import com.velocitypowered.proxy.server.ServerMap;
|
import com.velocitypowered.proxy.server.ServerMap;
|
||||||
@ -422,7 +423,14 @@ public class VelocityServer implements ProxyServer, ForwardingAudience {
|
|||||||
logger.error("Exception while tearing down player connections", e);
|
logger.error("Exception while tearing down player connections", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
eventManager.fireShutdownEvent();
|
try {
|
||||||
|
eventManager.fire(new ProxyShutdownEvent()).get(10, TimeUnit.SECONDS);
|
||||||
|
} catch (TimeoutException e) {
|
||||||
|
timedOut = true;
|
||||||
|
} catch (ExecutionException e) {
|
||||||
|
timedOut = true;
|
||||||
|
logger.error("Exception while firing the shutdown event", e);
|
||||||
|
}
|
||||||
|
|
||||||
timedOut = !eventManager.shutdown() || timedOut;
|
timedOut = !eventManager.shutdown() || timedOut;
|
||||||
timedOut = !scheduler.shutdown() || timedOut;
|
timedOut = !scheduler.shutdown() || timedOut;
|
||||||
|
@ -17,7 +17,7 @@ import com.velocitypowered.api.command.RawCommand;
|
|||||||
import com.velocitypowered.api.command.SimpleCommand;
|
import com.velocitypowered.api.command.SimpleCommand;
|
||||||
import com.velocitypowered.api.event.command.CommandExecuteEvent;
|
import com.velocitypowered.api.event.command.CommandExecuteEvent;
|
||||||
import com.velocitypowered.api.event.command.CommandExecuteEvent.CommandResult;
|
import com.velocitypowered.api.event.command.CommandExecuteEvent.CommandResult;
|
||||||
import com.velocitypowered.proxy.plugin.VelocityEventManager;
|
import com.velocitypowered.proxy.event.VelocityEventManager;
|
||||||
import com.velocitypowered.proxy.util.BrigadierUtils;
|
import com.velocitypowered.proxy.util.BrigadierUtils;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@ -147,7 +147,7 @@ public class VelocityCommandManager implements CommandManager {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
return executeImmediately0(source, commandResult.getCommand().orElse(event.getCommand()));
|
return executeImmediately0(source, commandResult.getCommand().orElse(event.getCommand()));
|
||||||
}, eventManager.getService());
|
}, eventManager.getAsyncExecutor());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -157,7 +157,7 @@ public class VelocityCommandManager implements CommandManager {
|
|||||||
Preconditions.checkNotNull(cmdLine, "cmdLine");
|
Preconditions.checkNotNull(cmdLine, "cmdLine");
|
||||||
|
|
||||||
return CompletableFuture.supplyAsync(
|
return CompletableFuture.supplyAsync(
|
||||||
() -> executeImmediately0(source, cmdLine), eventManager.getService());
|
() -> executeImmediately0(source, cmdLine), eventManager.getAsyncExecutor());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -0,0 +1,19 @@
|
|||||||
|
package com.velocitypowered.proxy.event;
|
||||||
|
|
||||||
|
import com.velocitypowered.api.event.EventTask;
|
||||||
|
|
||||||
|
public interface UntargetedEventHandler {
|
||||||
|
|
||||||
|
EventTask execute(Object targetInstance, Object event);
|
||||||
|
|
||||||
|
interface Void extends UntargetedEventHandler {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
default EventTask execute(final Object targetInstance, final Object event) {
|
||||||
|
executeVoid(targetInstance, event);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
void executeVoid(Object targetInstance, Object event);
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,521 @@
|
|||||||
|
package com.velocitypowered.proxy.event;
|
||||||
|
|
||||||
|
import static java.util.Objects.requireNonNull;
|
||||||
|
|
||||||
|
import com.github.benmanes.caffeine.cache.Caffeine;
|
||||||
|
import com.github.benmanes.caffeine.cache.LoadingCache;
|
||||||
|
import com.google.common.collect.HashMultimap;
|
||||||
|
import com.google.common.collect.Multimap;
|
||||||
|
import com.google.common.reflect.TypeToken;
|
||||||
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
import com.velocitypowered.api.event.Continuation;
|
||||||
|
import com.velocitypowered.api.event.EventHandler;
|
||||||
|
import com.velocitypowered.api.event.EventManager;
|
||||||
|
import com.velocitypowered.api.event.EventTask;
|
||||||
|
import com.velocitypowered.api.event.Subscribe;
|
||||||
|
import com.velocitypowered.api.plugin.PluginContainer;
|
||||||
|
import com.velocitypowered.api.plugin.PluginManager;
|
||||||
|
import java.lang.invoke.MethodHandles;
|
||||||
|
import java.lang.reflect.Method;
|
||||||
|
import java.lang.reflect.Modifier;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Comparator;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.locks.ReadWriteLock;
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
import java.util.function.Predicate;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import org.apache.logging.log4j.LogManager;
|
||||||
|
import org.apache.logging.log4j.Logger;
|
||||||
|
import org.checkerframework.checker.nullness.qual.Nullable;
|
||||||
|
import org.lanternpowered.lmbda.LambdaFactory;
|
||||||
|
import org.lanternpowered.lmbda.LambdaType;
|
||||||
|
|
||||||
|
public class VelocityEventManager implements EventManager {
|
||||||
|
|
||||||
|
private static final Logger logger = LogManager.getLogger(VelocityEventManager.class);
|
||||||
|
|
||||||
|
private static final MethodHandles.Lookup methodHandlesLookup = MethodHandles.lookup();
|
||||||
|
private static final LambdaType<UntargetedEventHandler> untargetedHandlerType =
|
||||||
|
LambdaType.of(UntargetedEventHandler.class);
|
||||||
|
private static final LambdaType<UntargetedEventHandler.Void> untargetedVoidHandlerType =
|
||||||
|
LambdaType.of(UntargetedEventHandler.Void.class);
|
||||||
|
|
||||||
|
private static final Comparator<HandlerRegistration> handlerComparator =
|
||||||
|
Comparator.comparingInt(o -> o.order);
|
||||||
|
|
||||||
|
private final ExecutorService asyncExecutor;
|
||||||
|
private final PluginManager pluginManager;
|
||||||
|
|
||||||
|
private final Multimap<Class<?>, HandlerRegistration> handlersByType = HashMultimap.create();
|
||||||
|
private final LoadingCache<Class<?>, @Nullable HandlersCache> handlersCache =
|
||||||
|
Caffeine.newBuilder().build(this::bakeHandlers);
|
||||||
|
|
||||||
|
private final LoadingCache<Method, UntargetedEventHandler> untargetedMethodHandlers =
|
||||||
|
Caffeine.newBuilder().weakValues().build(this::buildUntargetedMethodHandler);
|
||||||
|
|
||||||
|
private final ReadWriteLock lock = new ReentrantReadWriteLock();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initializes the Velocity event manager.
|
||||||
|
*
|
||||||
|
* @param pluginManager a reference to the Velocity plugin manager
|
||||||
|
*/
|
||||||
|
public VelocityEventManager(final PluginManager pluginManager) {
|
||||||
|
this.pluginManager = pluginManager;
|
||||||
|
this.asyncExecutor = Executors
|
||||||
|
.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactoryBuilder()
|
||||||
|
.setNameFormat("Velocity Async Event Executor - #%d").setDaemon(true).build());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Represents the registration of a single {@link EventHandler}.
|
||||||
|
*/
|
||||||
|
static final class HandlerRegistration {
|
||||||
|
|
||||||
|
final PluginContainer plugin;
|
||||||
|
final short order;
|
||||||
|
final Class<?> eventType;
|
||||||
|
final EventHandler<Object> handler;
|
||||||
|
final AsyncType asyncType;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The instance of the {@link EventHandler} or the listener
|
||||||
|
* instance that was registered.
|
||||||
|
*/
|
||||||
|
final Object instance;
|
||||||
|
|
||||||
|
public HandlerRegistration(final PluginContainer plugin, final short order,
|
||||||
|
final Class<?> eventType, final Object instance, final EventHandler<Object> handler,
|
||||||
|
final AsyncType asyncType) {
|
||||||
|
this.plugin = plugin;
|
||||||
|
this.order = order;
|
||||||
|
this.eventType = eventType;
|
||||||
|
this.instance = instance;
|
||||||
|
this.handler = handler;
|
||||||
|
this.asyncType = asyncType;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
enum AsyncType {
|
||||||
|
/**
|
||||||
|
* The complete event will be handled on an async thread.
|
||||||
|
*/
|
||||||
|
ALWAYS,
|
||||||
|
/**
|
||||||
|
* The event will initially start on the netty thread, and possibly
|
||||||
|
* switch over to an async thread.
|
||||||
|
*/
|
||||||
|
SOMETIMES,
|
||||||
|
/**
|
||||||
|
* The event will never run async, everything is handled on
|
||||||
|
* the netty thread.
|
||||||
|
*/
|
||||||
|
NEVER
|
||||||
|
}
|
||||||
|
|
||||||
|
static final class HandlersCache {
|
||||||
|
|
||||||
|
final AsyncType asyncType;
|
||||||
|
final HandlerRegistration[] handlers;
|
||||||
|
|
||||||
|
HandlersCache(final HandlerRegistration[] handlers, final AsyncType asyncType) {
|
||||||
|
this.asyncType = asyncType;
|
||||||
|
this.handlers = handlers;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static List<Class<?>> getEventTypes(final Class<?> eventType) {
|
||||||
|
return TypeToken.of(eventType).getTypes().rawTypes().stream()
|
||||||
|
.filter(type -> type != Object.class)
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
|
||||||
|
private @Nullable HandlersCache bakeHandlers(final Class<?> eventType) {
|
||||||
|
final List<HandlerRegistration> baked = new ArrayList<>();
|
||||||
|
final List<Class<?>> types = getEventTypes(eventType);
|
||||||
|
|
||||||
|
lock.readLock().lock();
|
||||||
|
try {
|
||||||
|
for (final Class<?> type : types) {
|
||||||
|
baked.addAll(handlersByType.get(type));
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
lock.readLock().unlock();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (baked.isEmpty()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
baked.sort(handlerComparator);
|
||||||
|
|
||||||
|
final AsyncType asyncType;
|
||||||
|
if (baked.stream().anyMatch(reg -> reg.asyncType == AsyncType.ALWAYS)) {
|
||||||
|
asyncType = AsyncType.ALWAYS;
|
||||||
|
} else if (baked.stream().anyMatch(reg -> reg.asyncType == AsyncType.SOMETIMES)) {
|
||||||
|
asyncType = AsyncType.SOMETIMES;
|
||||||
|
} else {
|
||||||
|
asyncType = AsyncType.NEVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
return new HandlersCache(baked.toArray(new HandlerRegistration[0]), asyncType);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates an {@link UntargetedEventHandler} for the given {@link Method}. This essentially
|
||||||
|
* implements the {@link UntargetedEventHandler} (or the no async task variant) to invoke the
|
||||||
|
* target method. The implemented class is defined in the same package as the declaring class.
|
||||||
|
* The {@link UntargetedEventHandler} interface must be public so the implementation can access
|
||||||
|
* it.
|
||||||
|
*
|
||||||
|
* @param method The method to generate an untargeted handler for
|
||||||
|
* @return The untargeted handler
|
||||||
|
*/
|
||||||
|
private UntargetedEventHandler buildUntargetedMethodHandler(final Method method)
|
||||||
|
throws IllegalAccessException {
|
||||||
|
final MethodHandles.Lookup lookup = MethodHandles.privateLookupIn(
|
||||||
|
method.getDeclaringClass(), methodHandlesLookup);
|
||||||
|
final LambdaType<? extends UntargetedEventHandler> type;
|
||||||
|
if (EventTask.class.isAssignableFrom(method.getReturnType())) {
|
||||||
|
type = untargetedHandlerType;
|
||||||
|
} else {
|
||||||
|
type = untargetedVoidHandlerType;
|
||||||
|
}
|
||||||
|
return LambdaFactory.create(type.defineClassesWith(lookup), lookup.unreflect(method));
|
||||||
|
}
|
||||||
|
|
||||||
|
static final class MethodHandlerInfo {
|
||||||
|
|
||||||
|
final Method method;
|
||||||
|
final AsyncType asyncType;
|
||||||
|
final @Nullable Class<?> eventType;
|
||||||
|
final short order;
|
||||||
|
final @Nullable String errors;
|
||||||
|
|
||||||
|
private MethodHandlerInfo(final Method method, final AsyncType asyncType,
|
||||||
|
final @Nullable Class<?> eventType, final short order, final @Nullable String errors) {
|
||||||
|
this.method = method;
|
||||||
|
this.asyncType = asyncType;
|
||||||
|
this.eventType = eventType;
|
||||||
|
this.order = order;
|
||||||
|
this.errors = errors;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void collectMethods(final Class<?> targetClass,
|
||||||
|
final Map<String, MethodHandlerInfo> collected) {
|
||||||
|
for (final Method method : targetClass.getDeclaredMethods()) {
|
||||||
|
final Subscribe subscribe = method.getAnnotation(Subscribe.class);
|
||||||
|
if (subscribe == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
String key = method.getName()
|
||||||
|
+ "("
|
||||||
|
+ Arrays.stream(method.getParameterTypes())
|
||||||
|
.map(Class::getName)
|
||||||
|
.collect(Collectors.joining(","))
|
||||||
|
+ ")";
|
||||||
|
if (Modifier.isPrivate(method.getModifiers())) {
|
||||||
|
key = targetClass.getName() + "$" + key;
|
||||||
|
}
|
||||||
|
if (collected.containsKey(key)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
final Set<String> errors = new HashSet<>();
|
||||||
|
if (Modifier.isStatic(method.getModifiers())) {
|
||||||
|
errors.add("method must not be static");
|
||||||
|
}
|
||||||
|
if (Modifier.isAbstract(method.getModifiers())) {
|
||||||
|
errors.add("method must not be abstract");
|
||||||
|
}
|
||||||
|
Class<?> eventType = null;
|
||||||
|
if (method.getParameterCount() != 1) {
|
||||||
|
errors.add("method must have a single parameter which is the event");
|
||||||
|
} else {
|
||||||
|
eventType = method.getParameterTypes()[0];
|
||||||
|
}
|
||||||
|
AsyncType asyncType = AsyncType.NEVER;
|
||||||
|
final Class<?> returnType = method.getReturnType();
|
||||||
|
if (returnType != void.class
|
||||||
|
&& returnType != EventTask.class
|
||||||
|
&& returnType != EventTask.Basic.class
|
||||||
|
&& returnType != EventTask.WithContinuation.class) {
|
||||||
|
errors.add("method return type must be void, AsyncTask, "
|
||||||
|
+ "AsyncTask.Basic or AsyncTask.WithContinuation");
|
||||||
|
} else if (returnType == EventTask.class
|
||||||
|
|| returnType == EventTask.Basic.class
|
||||||
|
|| returnType == EventTask.WithContinuation.class) {
|
||||||
|
asyncType = AsyncType.SOMETIMES;
|
||||||
|
}
|
||||||
|
if (subscribe.async()) {
|
||||||
|
asyncType = AsyncType.ALWAYS;
|
||||||
|
}
|
||||||
|
final short order = subscribe.order();
|
||||||
|
final String errorsJoined = errors.isEmpty() ? null : String.join(",", errors);
|
||||||
|
collected.put(key, new MethodHandlerInfo(method, asyncType, eventType, order, errorsJoined));
|
||||||
|
}
|
||||||
|
final Class<?> superclass = targetClass.getSuperclass();
|
||||||
|
if (superclass != Object.class) {
|
||||||
|
collectMethods(superclass, collected);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private PluginContainer ensurePlugin(final Object plugin) {
|
||||||
|
requireNonNull(plugin, "plugin");
|
||||||
|
return pluginManager.fromInstance(plugin)
|
||||||
|
.orElseThrow(() -> new IllegalArgumentException("Specified plugin is not loaded"));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void register(final List<HandlerRegistration> registrations) {
|
||||||
|
lock.writeLock().lock();
|
||||||
|
try {
|
||||||
|
for (final HandlerRegistration registration : registrations) {
|
||||||
|
handlersByType.put(registration.eventType, registration);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
lock.writeLock().unlock();
|
||||||
|
}
|
||||||
|
// Invalidate all the affected event subtypes
|
||||||
|
handlersCache.invalidateAll(registrations.stream()
|
||||||
|
.flatMap(registration -> getEventTypes(registration.eventType).stream())
|
||||||
|
.distinct()
|
||||||
|
.collect(Collectors.toList()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void register(final Object plugin, final Object listener) {
|
||||||
|
requireNonNull(listener, "listener");
|
||||||
|
final PluginContainer pluginContainer = ensurePlugin(plugin);
|
||||||
|
if (plugin == listener) {
|
||||||
|
throw new IllegalArgumentException("The plugin main instance is automatically registered.");
|
||||||
|
}
|
||||||
|
|
||||||
|
final Class<?> targetClass = listener.getClass();
|
||||||
|
final Map<String, MethodHandlerInfo> collected = new HashMap<>();
|
||||||
|
collectMethods(targetClass, collected);
|
||||||
|
|
||||||
|
final List<HandlerRegistration> registrations = new ArrayList<>();
|
||||||
|
for (final MethodHandlerInfo info : collected.values()) {
|
||||||
|
if (info.errors != null) {
|
||||||
|
logger.info("Invalid listener method {} in {}: {}",
|
||||||
|
info.method.getName(), info.method.getDeclaringClass().getName(), info.errors);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
final UntargetedEventHandler untargetedHandler =
|
||||||
|
untargetedMethodHandlers.get(info.method);
|
||||||
|
assert untargetedHandler != null;
|
||||||
|
final EventHandler<Object> handler = event -> untargetedHandler.execute(listener, event);
|
||||||
|
registrations.add(new HandlerRegistration(pluginContainer, info.order,
|
||||||
|
info.eventType, listener, handler, info.asyncType));
|
||||||
|
}
|
||||||
|
|
||||||
|
register(registrations);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public <E> void register(final Object plugin, final Class<E> eventClass,
|
||||||
|
final short order, final EventHandler<E> handler) {
|
||||||
|
final PluginContainer pluginContainer = ensurePlugin(plugin);
|
||||||
|
requireNonNull(eventClass, "eventClass");
|
||||||
|
requireNonNull(handler, "handler");
|
||||||
|
|
||||||
|
final HandlerRegistration registration = new HandlerRegistration(pluginContainer, order,
|
||||||
|
eventClass, handler, (EventHandler<Object>) handler, AsyncType.SOMETIMES);
|
||||||
|
register(Collections.singletonList(registration));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void unregisterListeners(final Object plugin) {
|
||||||
|
final PluginContainer pluginContainer = ensurePlugin(plugin);
|
||||||
|
unregisterIf(registration -> registration.plugin == pluginContainer);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void unregisterListener(final Object plugin, final Object handler) {
|
||||||
|
final PluginContainer pluginContainer = ensurePlugin(plugin);
|
||||||
|
requireNonNull(handler, "handler");
|
||||||
|
unregisterIf(registration ->
|
||||||
|
registration.plugin == pluginContainer && registration.handler == handler);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <E> void unregister(final Object plugin, final EventHandler<E> handler) {
|
||||||
|
unregisterListener(plugin, handler);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void unregisterIf(final Predicate<HandlerRegistration> predicate) {
|
||||||
|
final List<HandlerRegistration> removed = new ArrayList<>();
|
||||||
|
lock.writeLock().lock();
|
||||||
|
try {
|
||||||
|
final Iterator<HandlerRegistration> it = handlersByType.values().iterator();
|
||||||
|
while (it.hasNext()) {
|
||||||
|
final HandlerRegistration registration = it.next();
|
||||||
|
if (predicate.test(registration)) {
|
||||||
|
it.remove();
|
||||||
|
removed.add(registration);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
lock.writeLock().unlock();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Invalidate all the affected event subtypes
|
||||||
|
handlersCache.invalidateAll(removed.stream()
|
||||||
|
.flatMap(registration -> getEventTypes(registration.eventType).stream())
|
||||||
|
.distinct()
|
||||||
|
.collect(Collectors.toList()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void fireAndForget(final Object event) {
|
||||||
|
requireNonNull(event, "event");
|
||||||
|
final HandlersCache handlersCache = this.handlersCache.get(event.getClass());
|
||||||
|
if (handlersCache == null) {
|
||||||
|
// Optimization: nobody's listening.
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
fire(null, event, handlersCache);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <E> CompletableFuture<E> fire(final E event) {
|
||||||
|
requireNonNull(event, "event");
|
||||||
|
final HandlersCache handlersCache = this.handlersCache.get(event.getClass());
|
||||||
|
if (handlersCache == null) {
|
||||||
|
// Optimization: nobody's listening.
|
||||||
|
return CompletableFuture.completedFuture(event);
|
||||||
|
}
|
||||||
|
final CompletableFuture<E> future = new CompletableFuture<>();
|
||||||
|
fire(future, event, handlersCache);
|
||||||
|
return future;
|
||||||
|
}
|
||||||
|
|
||||||
|
private <E> void fire(final @Nullable CompletableFuture<E> future,
|
||||||
|
final E event, final HandlersCache handlersCache) {
|
||||||
|
if (handlersCache.asyncType == AsyncType.ALWAYS) {
|
||||||
|
// We already know that the event needs to be handled async, so
|
||||||
|
// execute it asynchronously from the start
|
||||||
|
asyncExecutor.execute(() -> fire(future, event, 0, true, handlersCache.handlers));
|
||||||
|
} else {
|
||||||
|
fire(future, event, 0, false, handlersCache.handlers);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private <E> void fire(final @Nullable CompletableFuture<E> future, final E event,
|
||||||
|
final int offset, final boolean currentlyAsync, final HandlerRegistration[] registrations) {
|
||||||
|
for (int i = offset; i < registrations.length; i++) {
|
||||||
|
final HandlerRegistration registration = registrations[i];
|
||||||
|
try {
|
||||||
|
final EventTask eventTask = registration.handler.execute(event);
|
||||||
|
if (eventTask == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (eventTask instanceof EventTask.WithContinuation) {
|
||||||
|
final EventTask.WithContinuation withContinuation =
|
||||||
|
(EventTask.WithContinuation) eventTask;
|
||||||
|
final int index = i;
|
||||||
|
final Continuation continuation = new Continuation() {
|
||||||
|
private final AtomicBoolean resumed = new AtomicBoolean();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void resume() {
|
||||||
|
resume(null);
|
||||||
|
}
|
||||||
|
|
||||||
|
void resume(final @Nullable Throwable exception) {
|
||||||
|
// Only allow the continuation to be resumed once
|
||||||
|
if (!resumed.compareAndSet(false, true)) {
|
||||||
|
throw new IllegalStateException("The continuation can only be resumed once.");
|
||||||
|
}
|
||||||
|
if (exception != null) {
|
||||||
|
logHandlerException(registration, exception);
|
||||||
|
}
|
||||||
|
if (index + 1 == registrations.length) {
|
||||||
|
// Optimization: don't schedule a task just to complete the future
|
||||||
|
if (future != null) {
|
||||||
|
future.complete(event);
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
asyncExecutor.execute(() -> fire(future, event, index + 1, true, registrations));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void resumeWithException(final Throwable exception) {
|
||||||
|
resume(requireNonNull(exception, "exception"));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
final Runnable task = () -> {
|
||||||
|
try {
|
||||||
|
withContinuation.run(continuation);
|
||||||
|
} catch (final Throwable t) {
|
||||||
|
continuation.resumeWithException(t);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
if (currentlyAsync || !eventTask.requiresAsync()) {
|
||||||
|
task.run();
|
||||||
|
} else {
|
||||||
|
asyncExecutor.execute(task);
|
||||||
|
}
|
||||||
|
// fire will continue in another thread once the async task is
|
||||||
|
// executed and the continuation is resumed
|
||||||
|
return;
|
||||||
|
} else {
|
||||||
|
final EventTask.Basic basic = (EventTask.Basic) eventTask;
|
||||||
|
if (currentlyAsync || !basic.requiresAsync()) {
|
||||||
|
// We are already async or we don't need async, so we can just run the
|
||||||
|
// task and continue with the next handler
|
||||||
|
basic.run();
|
||||||
|
} else {
|
||||||
|
final int index = i;
|
||||||
|
// We are not yet in an async context, so the async task needs to be scheduled
|
||||||
|
// to the async executor, the event handling will continue on an async thread.
|
||||||
|
asyncExecutor.execute(() -> {
|
||||||
|
try {
|
||||||
|
basic.run();
|
||||||
|
} catch (final Throwable t) {
|
||||||
|
logHandlerException(registration, t);
|
||||||
|
}
|
||||||
|
fire(future, event, index + 1, true, registrations);
|
||||||
|
});
|
||||||
|
return; // fire will continue in another thread once the async task is completed
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (final Throwable t) {
|
||||||
|
logHandlerException(registration, t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (future != null) {
|
||||||
|
future.complete(event);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void logHandlerException(
|
||||||
|
final HandlerRegistration registration, final Throwable t) {
|
||||||
|
logger.error("Couldn't pass {} to {}", registration.eventType.getSimpleName(),
|
||||||
|
registration.plugin.getDescription().getId(), t);
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean shutdown() throws InterruptedException {
|
||||||
|
asyncExecutor.shutdown();
|
||||||
|
return asyncExecutor.awaitTermination(10, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
public ExecutorService getAsyncExecutor() {
|
||||||
|
return asyncExecutor;
|
||||||
|
}
|
||||||
|
}
|
@ -1,229 +0,0 @@
|
|||||||
package com.velocitypowered.proxy.plugin;
|
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
|
||||||
import com.google.common.collect.ListMultimap;
|
|
||||||
import com.google.common.collect.Multimaps;
|
|
||||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
|
||||||
import com.velocitypowered.api.event.EventHandler;
|
|
||||||
import com.velocitypowered.api.event.EventManager;
|
|
||||||
import com.velocitypowered.api.event.Subscribe;
|
|
||||||
import com.velocitypowered.api.event.lifecycle.ProxyShutdownEvent;
|
|
||||||
import com.velocitypowered.api.plugin.PluginManager;
|
|
||||||
import java.lang.reflect.Method;
|
|
||||||
import java.net.URL;
|
|
||||||
import java.security.AccessController;
|
|
||||||
import java.security.PrivilegedAction;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.IdentityHashMap;
|
|
||||||
import java.util.concurrent.CompletableFuture;
|
|
||||||
import java.util.concurrent.ExecutorService;
|
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import net.kyori.event.EventSubscriber;
|
|
||||||
import net.kyori.event.PostResult;
|
|
||||||
import net.kyori.event.SimpleEventBus;
|
|
||||||
import net.kyori.event.method.MethodScanner;
|
|
||||||
import net.kyori.event.method.MethodSubscriptionAdapter;
|
|
||||||
import net.kyori.event.method.SimpleMethodSubscriptionAdapter;
|
|
||||||
import net.kyori.event.method.asm.ASMEventExecutorFactory;
|
|
||||||
import org.apache.logging.log4j.LogManager;
|
|
||||||
import org.apache.logging.log4j.Logger;
|
|
||||||
import org.checkerframework.checker.nullness.qual.NonNull;
|
|
||||||
|
|
||||||
public class VelocityEventManager implements EventManager {
|
|
||||||
|
|
||||||
private static final Logger logger = LogManager.getLogger(VelocityEventManager.class);
|
|
||||||
|
|
||||||
private final ListMultimap<Object, Object> registeredListenersByPlugin = Multimaps
|
|
||||||
.synchronizedListMultimap(Multimaps.newListMultimap(new IdentityHashMap<>(), ArrayList::new));
|
|
||||||
private final ListMultimap<Object, EventHandler<?>> registeredHandlersByPlugin = Multimaps
|
|
||||||
.synchronizedListMultimap(Multimaps.newListMultimap(new IdentityHashMap<>(), ArrayList::new));
|
|
||||||
private final SimpleEventBus<Object> bus;
|
|
||||||
private final MethodSubscriptionAdapter<Object> methodAdapter;
|
|
||||||
private final ExecutorService service;
|
|
||||||
private final PluginManager pluginManager;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Initializes the Velocity event manager.
|
|
||||||
*
|
|
||||||
* @param pluginManager a reference to the Velocity plugin manager
|
|
||||||
*/
|
|
||||||
public VelocityEventManager(PluginManager pluginManager) {
|
|
||||||
// Expose the event executors to the plugins - required in order for the generated ASM classes
|
|
||||||
// to work.
|
|
||||||
PluginClassLoader cl = AccessController.doPrivileged(
|
|
||||||
(PrivilegedAction<PluginClassLoader>) () -> new PluginClassLoader(new URL[0]));
|
|
||||||
cl.addToClassloaders();
|
|
||||||
|
|
||||||
// Initialize the event bus.
|
|
||||||
this.bus = new SimpleEventBus<Object>(Object.class) {
|
|
||||||
@Override
|
|
||||||
protected boolean shouldPost(@NonNull Object event, @NonNull EventSubscriber<?> subscriber) {
|
|
||||||
// Velocity doesn't use Cancellable or generic events, so we can skip those checks.
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
this.methodAdapter = new SimpleMethodSubscriptionAdapter<>(bus,
|
|
||||||
new ASMEventExecutorFactory<>(cl),
|
|
||||||
new VelocityMethodScanner());
|
|
||||||
this.pluginManager = pluginManager;
|
|
||||||
this.service = Executors
|
|
||||||
.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactoryBuilder()
|
|
||||||
.setNameFormat("Velocity Event Executor - #%d").setDaemon(true).build());
|
|
||||||
}
|
|
||||||
|
|
||||||
private void ensurePlugin(Object plugin) {
|
|
||||||
Preconditions.checkNotNull(plugin, "plugin");
|
|
||||||
Preconditions.checkArgument(pluginManager.fromInstance(plugin).isPresent(),
|
|
||||||
"Specified plugin is not loaded");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void register(Object plugin, Object listener) {
|
|
||||||
ensurePlugin(plugin);
|
|
||||||
Preconditions.checkNotNull(listener, "listener");
|
|
||||||
if (plugin == listener && registeredListenersByPlugin.containsEntry(plugin, plugin)) {
|
|
||||||
throw new IllegalArgumentException("The plugin main instance is automatically registered.");
|
|
||||||
}
|
|
||||||
|
|
||||||
registeredListenersByPlugin.put(plugin, listener);
|
|
||||||
methodAdapter.register(listener);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
@SuppressWarnings("type.argument.type.incompatible")
|
|
||||||
public <E> void register(Object plugin, Class<E> eventClass, short postOrder,
|
|
||||||
EventHandler<E> handler) {
|
|
||||||
ensurePlugin(plugin);
|
|
||||||
Preconditions.checkNotNull(eventClass, "eventClass");
|
|
||||||
Preconditions.checkNotNull(handler, "listener");
|
|
||||||
|
|
||||||
registeredHandlersByPlugin.put(plugin, handler);
|
|
||||||
bus.register(eventClass, new KyoriToVelocityHandler<>(handler, postOrder));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public <E> CompletableFuture<E> fire(E event) {
|
|
||||||
if (event == null) {
|
|
||||||
throw new NullPointerException("event");
|
|
||||||
}
|
|
||||||
if (!bus.hasSubscribers(event.getClass())) {
|
|
||||||
// Optimization: nobody's listening.
|
|
||||||
return CompletableFuture.completedFuture(event);
|
|
||||||
}
|
|
||||||
|
|
||||||
return CompletableFuture.supplyAsync(() -> {
|
|
||||||
fireEvent(event);
|
|
||||||
return event;
|
|
||||||
}, service);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void fireAndForget(Object event) {
|
|
||||||
if (event == null) {
|
|
||||||
throw new NullPointerException("event");
|
|
||||||
}
|
|
||||||
if (!bus.hasSubscribers(event.getClass())) {
|
|
||||||
// Optimization: nobody's listening.
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
service.execute(() -> fireEvent(event));
|
|
||||||
}
|
|
||||||
|
|
||||||
private void fireEvent(Object event) {
|
|
||||||
PostResult result = bus.post(event);
|
|
||||||
if (!result.exceptions().isEmpty()) {
|
|
||||||
logger.error("Some errors occurred whilst posting event {}.", event);
|
|
||||||
int i = 0;
|
|
||||||
for (Throwable exception : result.exceptions().values()) {
|
|
||||||
logger.error("#{}: \n", ++i, exception);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void unregisterHandler(EventHandler<?> handler) {
|
|
||||||
bus.unregister(s -> s instanceof KyoriToVelocityHandler
|
|
||||||
&& ((KyoriToVelocityHandler<?>) s).handler == handler);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void unregisterListeners(Object plugin) {
|
|
||||||
ensurePlugin(plugin);
|
|
||||||
Collection<Object> listeners = registeredListenersByPlugin.removeAll(plugin);
|
|
||||||
listeners.forEach(methodAdapter::unregister);
|
|
||||||
Collection<EventHandler<?>> handlers = registeredHandlersByPlugin.removeAll(plugin);
|
|
||||||
handlers.forEach(this::unregisterHandler);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void unregisterListener(Object plugin, Object listener) {
|
|
||||||
ensurePlugin(plugin);
|
|
||||||
Preconditions.checkNotNull(listener, "listener");
|
|
||||||
if (registeredListenersByPlugin.remove(plugin, listener)) {
|
|
||||||
methodAdapter.unregister(listener);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public <E> void unregister(Object plugin, EventHandler<E> handler) {
|
|
||||||
ensurePlugin(plugin);
|
|
||||||
Preconditions.checkNotNull(handler, "listener");
|
|
||||||
if (registeredHandlersByPlugin.remove(plugin, handler)) {
|
|
||||||
unregisterHandler(handler);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean shutdown() throws InterruptedException {
|
|
||||||
service.shutdown();
|
|
||||||
return service.awaitTermination(10, TimeUnit.SECONDS);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void fireShutdownEvent() {
|
|
||||||
// We shut down the proxy already, so the fact this executes in the main thread is irrelevant.
|
|
||||||
fireEvent(new ProxyShutdownEvent());
|
|
||||||
}
|
|
||||||
|
|
||||||
public ExecutorService getService() {
|
|
||||||
return service;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static class VelocityMethodScanner implements MethodScanner<Object> {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean shouldRegister(@NonNull Object listener, @NonNull Method method) {
|
|
||||||
return method.isAnnotationPresent(Subscribe.class);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int postOrder(@NonNull Object listener, @NonNull Method method) {
|
|
||||||
return method.getAnnotation(Subscribe.class).order();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean consumeCancelledEvents(@NonNull Object listener, @NonNull Method method) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static class KyoriToVelocityHandler<E> implements EventSubscriber<E> {
|
|
||||||
|
|
||||||
private final EventHandler<E> handler;
|
|
||||||
private final short postOrder;
|
|
||||||
|
|
||||||
private KyoriToVelocityHandler(EventHandler<E> handler, short postOrder) {
|
|
||||||
this.handler = handler;
|
|
||||||
this.postOrder = postOrder;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void invoke(@NonNull E event) {
|
|
||||||
handler.execute(event);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int postOrder() {
|
|
||||||
return postOrder;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -21,8 +21,8 @@ import com.velocitypowered.api.command.CommandMeta;
|
|||||||
import com.velocitypowered.api.command.CommandSource;
|
import com.velocitypowered.api.command.CommandSource;
|
||||||
import com.velocitypowered.api.command.RawCommand;
|
import com.velocitypowered.api.command.RawCommand;
|
||||||
import com.velocitypowered.api.command.SimpleCommand;
|
import com.velocitypowered.api.command.SimpleCommand;
|
||||||
import com.velocitypowered.proxy.plugin.MockEventManager;
|
import com.velocitypowered.proxy.event.MockEventManager;
|
||||||
import com.velocitypowered.proxy.plugin.VelocityEventManager;
|
import com.velocitypowered.proxy.event.VelocityEventManager;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
226
proxy/src/test/java/com/velocitypowered/proxy/event/EventTest.java
Normale Datei
226
proxy/src/test/java/com/velocitypowered/proxy/event/EventTest.java
Normale Datei
@ -0,0 +1,226 @@
|
|||||||
|
package com.velocitypowered.proxy.event;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
|
import com.velocitypowered.api.event.EventTask;
|
||||||
|
import com.velocitypowered.api.event.PostOrder;
|
||||||
|
import com.velocitypowered.api.event.Subscribe;
|
||||||
|
import com.velocitypowered.proxy.testutil.FakePluginManager;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import org.junit.jupiter.api.AfterAll;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.junit.jupiter.api.TestInstance;
|
||||||
|
|
||||||
|
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
|
||||||
|
public class EventTest {
|
||||||
|
|
||||||
|
private final VelocityEventManager eventManager =
|
||||||
|
new VelocityEventManager(new FakePluginManager());
|
||||||
|
|
||||||
|
@AfterAll
|
||||||
|
void shutdown() throws Exception {
|
||||||
|
eventManager.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
static final class TestEvent {
|
||||||
|
}
|
||||||
|
|
||||||
|
static void assertAsyncThread(final Thread thread) {
|
||||||
|
assertTrue(thread.getName().contains("Velocity Async Event Executor"));
|
||||||
|
}
|
||||||
|
|
||||||
|
static void assertSyncThread(final Thread thread) {
|
||||||
|
assertEquals(Thread.currentThread(), thread);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void handleMethodListener(final Object listener) throws Exception {
|
||||||
|
eventManager.register(FakePluginManager.PLUGIN_A, listener);
|
||||||
|
try {
|
||||||
|
eventManager.fire(new TestEvent()).get();
|
||||||
|
} finally {
|
||||||
|
eventManager.unregisterListeners(FakePluginManager.PLUGIN_A);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testAlwaysSync() throws Exception {
|
||||||
|
final AlwaysSyncListener listener = new AlwaysSyncListener();
|
||||||
|
handleMethodListener(listener);
|
||||||
|
assertSyncThread(listener.thread);
|
||||||
|
assertEquals(1, listener.result);
|
||||||
|
}
|
||||||
|
|
||||||
|
static final class AlwaysSyncListener {
|
||||||
|
|
||||||
|
Thread thread;
|
||||||
|
int result;
|
||||||
|
|
||||||
|
@Subscribe
|
||||||
|
void sync(TestEvent event) {
|
||||||
|
result++;
|
||||||
|
thread = Thread.currentThread();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testAlwaysAsync() throws Exception {
|
||||||
|
final AlwaysAsyncListener listener = new AlwaysAsyncListener();
|
||||||
|
handleMethodListener(listener);
|
||||||
|
assertAsyncThread(listener.threadA);
|
||||||
|
assertAsyncThread(listener.threadB);
|
||||||
|
assertAsyncThread(listener.threadC);
|
||||||
|
assertEquals(3, listener.result);
|
||||||
|
}
|
||||||
|
|
||||||
|
static final class AlwaysAsyncListener {
|
||||||
|
|
||||||
|
Thread threadA;
|
||||||
|
Thread threadB;
|
||||||
|
Thread threadC;
|
||||||
|
int result;
|
||||||
|
|
||||||
|
@Subscribe(async = true)
|
||||||
|
void async0(TestEvent event) {
|
||||||
|
result++;
|
||||||
|
threadA = Thread.currentThread();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Subscribe
|
||||||
|
EventTask async1(TestEvent event) {
|
||||||
|
threadB = Thread.currentThread();
|
||||||
|
return EventTask.async(() -> result++);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Subscribe
|
||||||
|
void async2(TestEvent event) {
|
||||||
|
result++;
|
||||||
|
threadC = Thread.currentThread();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testSometimesAsync() throws Exception {
|
||||||
|
final SometimesAsyncListener listener = new SometimesAsyncListener();
|
||||||
|
handleMethodListener(listener);
|
||||||
|
assertSyncThread(listener.threadA);
|
||||||
|
assertSyncThread(listener.threadB);
|
||||||
|
assertAsyncThread(listener.threadC);
|
||||||
|
assertAsyncThread(listener.threadD);
|
||||||
|
assertEquals(3, listener.result);
|
||||||
|
}
|
||||||
|
|
||||||
|
static final class SometimesAsyncListener {
|
||||||
|
|
||||||
|
Thread threadA;
|
||||||
|
Thread threadB;
|
||||||
|
Thread threadC;
|
||||||
|
Thread threadD;
|
||||||
|
int result;
|
||||||
|
|
||||||
|
@Subscribe(order = PostOrder.EARLY)
|
||||||
|
void notAsync(TestEvent event) {
|
||||||
|
result++;
|
||||||
|
threadA = Thread.currentThread();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Subscribe
|
||||||
|
EventTask notAsyncUntilTask(TestEvent event) {
|
||||||
|
threadB = Thread.currentThread();
|
||||||
|
return EventTask.async(() -> {
|
||||||
|
threadC = Thread.currentThread();
|
||||||
|
result++;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Subscribe(order = PostOrder.LATE)
|
||||||
|
void stillAsyncAfterTask(TestEvent event) {
|
||||||
|
threadD = Thread.currentThread();
|
||||||
|
result++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testContinuation() throws Exception {
|
||||||
|
final ContinuationListener listener = new ContinuationListener();
|
||||||
|
handleMethodListener(listener);
|
||||||
|
assertSyncThread(listener.threadA);
|
||||||
|
assertSyncThread(listener.threadB);
|
||||||
|
assertAsyncThread(listener.threadC);
|
||||||
|
assertEquals(2, listener.value.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
static final class ContinuationListener {
|
||||||
|
|
||||||
|
Thread threadA;
|
||||||
|
Thread threadB;
|
||||||
|
Thread threadC;
|
||||||
|
|
||||||
|
final AtomicInteger value = new AtomicInteger();
|
||||||
|
|
||||||
|
@Subscribe(order = PostOrder.EARLY)
|
||||||
|
EventTask continuation(TestEvent event) {
|
||||||
|
threadA = Thread.currentThread();
|
||||||
|
return EventTask.withContinuation(continuation -> {
|
||||||
|
value.incrementAndGet();
|
||||||
|
threadB = Thread.currentThread();
|
||||||
|
new Thread(() -> {
|
||||||
|
try {
|
||||||
|
Thread.sleep(100);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
value.incrementAndGet();
|
||||||
|
continuation.resume();
|
||||||
|
}).start();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Subscribe(order = PostOrder.LATE)
|
||||||
|
void afterContinuation(TestEvent event) {
|
||||||
|
threadC = Thread.currentThread();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testAsyncContinuation() throws Exception {
|
||||||
|
final AsyncContinuationListener listener = new AsyncContinuationListener();
|
||||||
|
handleMethodListener(listener);
|
||||||
|
assertSyncThread(listener.threadA);
|
||||||
|
assertAsyncThread(listener.threadB);
|
||||||
|
assertAsyncThread(listener.threadC);
|
||||||
|
assertEquals(2, listener.value.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
static final class AsyncContinuationListener {
|
||||||
|
|
||||||
|
Thread threadA;
|
||||||
|
Thread threadB;
|
||||||
|
Thread threadC;
|
||||||
|
|
||||||
|
final AtomicInteger value = new AtomicInteger();
|
||||||
|
|
||||||
|
@Subscribe(order = PostOrder.EARLY)
|
||||||
|
EventTask continuation(TestEvent event) {
|
||||||
|
threadA = Thread.currentThread();
|
||||||
|
return EventTask.asyncWithContinuation(continuation -> {
|
||||||
|
value.incrementAndGet();
|
||||||
|
threadB = Thread.currentThread();
|
||||||
|
new Thread(() -> {
|
||||||
|
try {
|
||||||
|
Thread.sleep(100);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
value.incrementAndGet();
|
||||||
|
continuation.resume();
|
||||||
|
}).start();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Subscribe(order = PostOrder.LATE)
|
||||||
|
void afterContinuation(TestEvent event) {
|
||||||
|
threadC = Thread.currentThread();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -1,4 +1,6 @@
|
|||||||
package com.velocitypowered.proxy.plugin;
|
package com.velocitypowered.proxy.event;
|
||||||
|
|
||||||
|
import com.velocitypowered.proxy.plugin.MockPluginManager;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A mock {@link VelocityEventManager}. Must be shutdown after use!
|
* A mock {@link VelocityEventManager}. Must be shutdown after use!
|
Laden…
In neuem Issue referenzieren
Einen Benutzer sperren