Mirror von
https://github.com/PaperMC/Velocity.git
synchronisiert 2024-11-17 05:20:14 +01:00
Reintroduce sync event execution to the Velocity event system
This required a not-insubstantial number of bug fixes, since the sync support had bit-rotted somewhat. This PR also corrects a number of bugs. Finally. the per-plugin executor services are now used to execute all async event tasks.
Dieser Commit ist enthalten in:
Ursprung
3fcfb71b71
Commit
d1030c3096
@ -43,6 +43,6 @@ public @interface Subscribe {
|
|||||||
*
|
*
|
||||||
* @return Requires async
|
* @return Requires async
|
||||||
*/
|
*/
|
||||||
boolean async() default true;
|
boolean async() default false;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -45,7 +45,7 @@ class SetManifestImplVersionPlugin : Plugin<Project> {
|
|||||||
velocityHumanVersion = "${project.version} (git-$currentShortRevision)"
|
velocityHumanVersion = "${project.version} (git-$currentShortRevision)"
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
velocityHumanVersion = archiveVersion.get()
|
velocityHumanVersion = project.version.toString()
|
||||||
}
|
}
|
||||||
attributes["Implementation-Version"] = velocityHumanVersion
|
attributes["Implementation-Version"] = velocityHumanVersion
|
||||||
}
|
}
|
||||||
|
@ -521,7 +521,6 @@ public class VelocityServer implements ProxyServer, ForwardingAudience {
|
|||||||
|
|
||||||
eventManager.fire(new ProxyShutdownEvent()).join();
|
eventManager.fire(new ProxyShutdownEvent()).join();
|
||||||
|
|
||||||
timedOut = !eventManager.shutdown() || timedOut;
|
|
||||||
timedOut = !scheduler.shutdown() || timedOut;
|
timedOut = !scheduler.shutdown() || timedOut;
|
||||||
|
|
||||||
if (timedOut) {
|
if (timedOut) {
|
||||||
|
@ -46,6 +46,8 @@ import java.util.Locale;
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.ForkJoinPool;
|
||||||
import java.util.concurrent.locks.ReadWriteLock;
|
import java.util.concurrent.locks.ReadWriteLock;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
@ -68,6 +70,7 @@ public class VelocityCommandManager implements CommandManager {
|
|||||||
private final SuggestionsProvider<CommandSource> suggestionsProvider;
|
private final SuggestionsProvider<CommandSource> suggestionsProvider;
|
||||||
private final CommandGraphInjector<CommandSource> injector;
|
private final CommandGraphInjector<CommandSource> injector;
|
||||||
private final Map<String, CommandMeta> commandMetas;
|
private final Map<String, CommandMeta> commandMetas;
|
||||||
|
private final ExecutorService asyncExecutor;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructs a command manager.
|
* Constructs a command manager.
|
||||||
@ -86,6 +89,7 @@ public class VelocityCommandManager implements CommandManager {
|
|||||||
this.suggestionsProvider = new SuggestionsProvider<>(this.dispatcher, this.lock.readLock());
|
this.suggestionsProvider = new SuggestionsProvider<>(this.dispatcher, this.lock.readLock());
|
||||||
this.injector = new CommandGraphInjector<>(this.dispatcher, this.lock.readLock());
|
this.injector = new CommandGraphInjector<>(this.dispatcher, this.lock.readLock());
|
||||||
this.commandMetas = new ConcurrentHashMap<>();
|
this.commandMetas = new ConcurrentHashMap<>();
|
||||||
|
this.asyncExecutor = ForkJoinPool.commonPool(); // TODO: remove entirely
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setAnnounceProxyCommands(boolean announceProxyCommands) {
|
public void setAnnounceProxyCommands(boolean announceProxyCommands) {
|
||||||
@ -251,7 +255,7 @@ public class VelocityCommandManager implements CommandManager {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
return executeImmediately0(source, commandResult.getCommand().orElse(event.getCommand()));
|
return executeImmediately0(source, commandResult.getCommand().orElse(event.getCommand()));
|
||||||
}, eventManager.getAsyncExecutor());
|
}, asyncExecutor);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -260,8 +264,7 @@ public class VelocityCommandManager implements CommandManager {
|
|||||||
Preconditions.checkNotNull(source, "source");
|
Preconditions.checkNotNull(source, "source");
|
||||||
Preconditions.checkNotNull(cmdLine, "cmdLine");
|
Preconditions.checkNotNull(cmdLine, "cmdLine");
|
||||||
|
|
||||||
return CompletableFuture.supplyAsync(
|
return CompletableFuture.supplyAsync(() -> executeImmediately0(source, cmdLine), asyncExecutor);
|
||||||
() -> executeImmediately0(source, cmdLine), eventManager.getAsyncExecutor());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -29,7 +29,6 @@ import com.velocitypowered.api.util.Favicon;
|
|||||||
import com.velocitypowered.proxy.util.AddressUtil;
|
import com.velocitypowered.proxy.util.AddressUtil;
|
||||||
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
|
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
|
||||||
import java.net.SocketAddress;
|
import java.net.SocketAddress;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
|
@ -37,18 +37,14 @@ class EventTypeTracker {
|
|||||||
return ImmutableSet.copyOf(friends.get(eventType));
|
return ImmutableSet.copyOf(friends.get(eventType));
|
||||||
}
|
}
|
||||||
|
|
||||||
final Collection<Class<?>> types = getEventTypes(eventType);
|
final Collection<Class<?>> types = getFriendTypes(eventType);
|
||||||
for (Class<?> type : types) {
|
for (Class<?> type : types) {
|
||||||
if (type == eventType) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
friends.put(type, eventType);
|
friends.put(type, eventType);
|
||||||
}
|
}
|
||||||
return types;
|
return types;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Collection<Class<?>> getEventTypes(final Class<?> eventType) {
|
private static Collection<Class<?>> getFriendTypes(final Class<?> eventType) {
|
||||||
return TypeToken.of(eventType).getTypes().rawTypes().stream()
|
return TypeToken.of(eventType).getTypes().rawTypes().stream()
|
||||||
.filter(type -> type != Object.class)
|
.filter(type -> type != Object.class)
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
|
@ -25,7 +25,6 @@ import com.google.common.base.VerifyException;
|
|||||||
import com.google.common.collect.ArrayListMultimap;
|
import com.google.common.collect.ArrayListMultimap;
|
||||||
import com.google.common.collect.ListMultimap;
|
import com.google.common.collect.ListMultimap;
|
||||||
import com.google.common.reflect.TypeToken;
|
import com.google.common.reflect.TypeToken;
|
||||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
|
||||||
import com.velocitypowered.api.event.Continuation;
|
import com.velocitypowered.api.event.Continuation;
|
||||||
import com.velocitypowered.api.event.EventHandler;
|
import com.velocitypowered.api.event.EventHandler;
|
||||||
import com.velocitypowered.api.event.EventManager;
|
import com.velocitypowered.api.event.EventManager;
|
||||||
@ -54,9 +53,6 @@ import java.util.List;
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.ExecutorService;
|
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.concurrent.locks.ReadWriteLock;
|
import java.util.concurrent.locks.ReadWriteLock;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
import java.util.function.BiConsumer;
|
import java.util.function.BiConsumer;
|
||||||
@ -88,7 +84,6 @@ public class VelocityEventManager implements EventManager {
|
|||||||
private static final Comparator<HandlerRegistration> handlerComparator =
|
private static final Comparator<HandlerRegistration> handlerComparator =
|
||||||
Comparator.comparingInt(o -> o.order);
|
Comparator.comparingInt(o -> o.order);
|
||||||
|
|
||||||
private final ExecutorService asyncExecutor;
|
|
||||||
private final PluginManager pluginManager;
|
private final PluginManager pluginManager;
|
||||||
|
|
||||||
private final ListMultimap<Class<?>, HandlerRegistration> handlersByType =
|
private final ListMultimap<Class<?>, HandlerRegistration> handlersByType =
|
||||||
@ -111,9 +106,6 @@ public class VelocityEventManager implements EventManager {
|
|||||||
*/
|
*/
|
||||||
public VelocityEventManager(final PluginManager pluginManager) {
|
public VelocityEventManager(final PluginManager pluginManager) {
|
||||||
this.pluginManager = pluginManager;
|
this.pluginManager = pluginManager;
|
||||||
this.asyncExecutor = Executors
|
|
||||||
.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactoryBuilder()
|
|
||||||
.setNameFormat("Velocity Async Event Executor - #%d").setDaemon(true).build());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -139,6 +131,7 @@ public class VelocityEventManager implements EventManager {
|
|||||||
final short order;
|
final short order;
|
||||||
final Class<?> eventType;
|
final Class<?> eventType;
|
||||||
final EventHandler<Object> handler;
|
final EventHandler<Object> handler;
|
||||||
|
final AsyncType asyncType;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The instance of the {@link EventHandler} or the listener instance that was registered.
|
* The instance of the {@link EventHandler} or the listener instance that was registered.
|
||||||
@ -146,31 +139,40 @@ public class VelocityEventManager implements EventManager {
|
|||||||
final Object instance;
|
final Object instance;
|
||||||
|
|
||||||
public HandlerRegistration(final PluginContainer plugin, final short order,
|
public HandlerRegistration(final PluginContainer plugin, final short order,
|
||||||
final Class<?> eventType, final Object instance, final EventHandler<Object> handler) {
|
final Class<?> eventType, final Object instance, final EventHandler<Object> handler,
|
||||||
|
final AsyncType asyncType) {
|
||||||
this.plugin = plugin;
|
this.plugin = plugin;
|
||||||
this.order = order;
|
this.order = order;
|
||||||
this.eventType = eventType;
|
this.eventType = eventType;
|
||||||
this.instance = instance;
|
this.instance = instance;
|
||||||
this.handler = handler;
|
this.handler = handler;
|
||||||
|
this.asyncType = asyncType;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
enum AsyncType {
|
enum AsyncType {
|
||||||
/**
|
|
||||||
* The complete event will be handled on an async thread.
|
|
||||||
*/
|
|
||||||
ALWAYS,
|
|
||||||
/**
|
/**
|
||||||
* The event will never run async, everything is handled on the netty thread.
|
* The event will never run async, everything is handled on the netty thread.
|
||||||
*/
|
*/
|
||||||
NEVER
|
NEVER,
|
||||||
|
/**
|
||||||
|
* The event will initially start on the thread calling the {@code fire} method, and possibly
|
||||||
|
* switch over to an async thread.
|
||||||
|
*/
|
||||||
|
SOMETIMES,
|
||||||
|
/**
|
||||||
|
* The complete event will be handled on an async thread.
|
||||||
|
*/
|
||||||
|
ALWAYS
|
||||||
}
|
}
|
||||||
|
|
||||||
static final class HandlersCache {
|
static final class HandlersCache {
|
||||||
|
|
||||||
|
final AsyncType asyncType;
|
||||||
final HandlerRegistration[] handlers;
|
final HandlerRegistration[] handlers;
|
||||||
|
|
||||||
HandlersCache(final HandlerRegistration[] handlers) {
|
HandlersCache(AsyncType asyncType, final HandlerRegistration[] handlers) {
|
||||||
|
this.asyncType = asyncType;
|
||||||
this.handlers = handlers;
|
this.handlers = handlers;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -193,7 +195,15 @@ public class VelocityEventManager implements EventManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
baked.sort(handlerComparator);
|
baked.sort(handlerComparator);
|
||||||
return new HandlersCache(baked.toArray(new HandlerRegistration[0]));
|
|
||||||
|
AsyncType asyncType = AsyncType.NEVER;
|
||||||
|
for (HandlerRegistration registration : baked) {
|
||||||
|
if (registration.asyncType.compareTo(asyncType) > 0) {
|
||||||
|
asyncType = registration.asyncType;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return new HandlersCache(asyncType, baked.toArray(new HandlerRegistration[0]));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -229,15 +239,17 @@ public class VelocityEventManager implements EventManager {
|
|||||||
static final class MethodHandlerInfo {
|
static final class MethodHandlerInfo {
|
||||||
|
|
||||||
final Method method;
|
final Method method;
|
||||||
|
final AsyncType asyncType;
|
||||||
final @Nullable Class<?> eventType;
|
final @Nullable Class<?> eventType;
|
||||||
final short order;
|
final short order;
|
||||||
final @Nullable String errors;
|
final @Nullable String errors;
|
||||||
final @Nullable Class<?> continuationType;
|
final @Nullable Class<?> continuationType;
|
||||||
|
|
||||||
private MethodHandlerInfo(final Method method, final @Nullable Class<?> eventType,
|
private MethodHandlerInfo(final Method method, final AsyncType asyncType,
|
||||||
final short order, final @Nullable String errors,
|
final @Nullable Class<?> eventType, final short order, final @Nullable String errors,
|
||||||
final @Nullable Class<?> continuationType) {
|
final @Nullable Class<?> continuationType) {
|
||||||
this.method = method;
|
this.method = method;
|
||||||
|
this.asyncType = asyncType;
|
||||||
this.eventType = eventType;
|
this.eventType = eventType;
|
||||||
this.order = order;
|
this.order = order;
|
||||||
this.errors = errors;
|
this.errors = errors;
|
||||||
@ -301,18 +313,26 @@ public class VelocityEventManager implements EventManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
AsyncType asyncType = AsyncType.NEVER;
|
||||||
if (handlerAdapter == null) {
|
if (handlerAdapter == null) {
|
||||||
final Class<?> returnType = method.getReturnType();
|
final Class<?> returnType = method.getReturnType();
|
||||||
if (returnType != void.class && continuationType == Continuation.class) {
|
if (returnType != void.class && continuationType == Continuation.class) {
|
||||||
errors.add("method return type must be void if a continuation parameter is provided");
|
errors.add("method return type must be void if a continuation parameter is provided");
|
||||||
} else if (returnType != void.class && returnType != EventTask.class) {
|
} else if (returnType != void.class && returnType != EventTask.class) {
|
||||||
errors.add("method return type must be void, AsyncTask, "
|
errors.add("method return type must be void, AsyncTask, "
|
||||||
+ "AsyncTask.Basic or AsyncTask.WithContinuation");
|
+ "EventTask.Basic or EventTask.WithContinuation");
|
||||||
|
} else if (returnType == EventTask.class) {
|
||||||
|
asyncType = AsyncType.SOMETIMES;
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
asyncType = AsyncType.SOMETIMES;
|
||||||
|
}
|
||||||
|
if (subscribe.async()) {
|
||||||
|
asyncType = AsyncType.ALWAYS;
|
||||||
}
|
}
|
||||||
final short order = (short) subscribe.order().ordinal();
|
final short order = (short) subscribe.order().ordinal();
|
||||||
final String errorsJoined = errors.isEmpty() ? null : String.join(",", errors);
|
final String errorsJoined = errors.isEmpty() ? null : String.join(",", errors);
|
||||||
collected.put(key, new MethodHandlerInfo(method, eventType, order, errorsJoined,
|
collected.put(key, new MethodHandlerInfo(method, asyncType, eventType, order, errorsJoined,
|
||||||
continuationType));
|
continuationType));
|
||||||
}
|
}
|
||||||
final Class<?> superclass = targetClass.getSuperclass();
|
final Class<?> superclass = targetClass.getSuperclass();
|
||||||
@ -356,7 +376,8 @@ public class VelocityEventManager implements EventManager {
|
|||||||
requireNonNull(handler, "handler");
|
requireNonNull(handler, "handler");
|
||||||
|
|
||||||
final HandlerRegistration registration = new HandlerRegistration(pluginContainer,
|
final HandlerRegistration registration = new HandlerRegistration(pluginContainer,
|
||||||
(short) order.ordinal(), eventClass, handler, (EventHandler<Object>) handler);
|
(short) order.ordinal(), eventClass, handler, (EventHandler<Object>) handler,
|
||||||
|
AsyncType.SOMETIMES);
|
||||||
register(Collections.singletonList(registration));
|
register(Collections.singletonList(registration));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -386,7 +407,7 @@ public class VelocityEventManager implements EventManager {
|
|||||||
|
|
||||||
final EventHandler<Object> handler = untargetedHandler.buildHandler(listener);
|
final EventHandler<Object> handler = untargetedHandler.buildHandler(listener);
|
||||||
registrations.add(new HandlerRegistration(pluginContainer, info.order,
|
registrations.add(new HandlerRegistration(pluginContainer, info.order,
|
||||||
info.eventType, listener, handler));
|
info.eventType, listener, handler, info.asyncType));
|
||||||
}
|
}
|
||||||
|
|
||||||
register(registrations);
|
register(registrations);
|
||||||
@ -473,10 +494,13 @@ public class VelocityEventManager implements EventManager {
|
|||||||
|
|
||||||
private <E> void fire(final @Nullable CompletableFuture<E> future,
|
private <E> void fire(final @Nullable CompletableFuture<E> future,
|
||||||
final E event, final HandlersCache handlersCache) {
|
final E event, final HandlersCache handlersCache) {
|
||||||
// In Velocity 1.1.0, all events were fired asynchronously. As Velocity 3.0.0 is intended to be
|
final HandlerRegistration registration = handlersCache.handlers[0];
|
||||||
// largely (albeit not 100%) compatible with 1.1.x, we also fire events async. This behavior
|
if (registration.asyncType == AsyncType.ALWAYS) {
|
||||||
// will go away in Velocity Polymer.
|
registration.plugin.getExecutorService().execute(
|
||||||
asyncExecutor.execute(() -> fire(future, event, 0, true, handlersCache.handlers));
|
() -> fire(future, event, 0, true, handlersCache.handlers));
|
||||||
|
} else {
|
||||||
|
fire(future, event, 0, false, handlersCache.handlers);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final int TASK_STATE_DEFAULT = 0;
|
private static final int TASK_STATE_DEFAULT = 0;
|
||||||
@ -505,6 +529,7 @@ public class VelocityEventManager implements EventManager {
|
|||||||
private final @Nullable CompletableFuture<E> future;
|
private final @Nullable CompletableFuture<E> future;
|
||||||
private final boolean currentlyAsync;
|
private final boolean currentlyAsync;
|
||||||
private final E event;
|
private final E event;
|
||||||
|
private final Thread firedOnThread;
|
||||||
|
|
||||||
// This field is modified via a VarHandle, so this field is used and cannot be final.
|
// This field is modified via a VarHandle, so this field is used and cannot be final.
|
||||||
@SuppressWarnings({"UnusedVariable", "FieldMayBeFinal", "FieldCanBeLocal"})
|
@SuppressWarnings({"UnusedVariable", "FieldMayBeFinal", "FieldCanBeLocal"})
|
||||||
@ -527,6 +552,7 @@ public class VelocityEventManager implements EventManager {
|
|||||||
this.event = event;
|
this.event = event;
|
||||||
this.index = index;
|
this.index = index;
|
||||||
this.currentlyAsync = currentlyAsync;
|
this.currentlyAsync = currentlyAsync;
|
||||||
|
this.firedOnThread = Thread.currentThread();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -537,8 +563,8 @@ public class VelocityEventManager implements EventManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Executes the task and returns whether the next one should be executed immediately after this
|
* Executes the task and returns whether the next handler should be executed immediately
|
||||||
* one without scheduling.
|
* after this one, without additional scheduling.
|
||||||
*/
|
*/
|
||||||
boolean execute() {
|
boolean execute() {
|
||||||
state = TASK_STATE_EXECUTING;
|
state = TASK_STATE_EXECUTING;
|
||||||
@ -580,7 +606,18 @@ public class VelocityEventManager implements EventManager {
|
|||||||
}
|
}
|
||||||
if (!CONTINUATION_TASK_STATE.compareAndSet(
|
if (!CONTINUATION_TASK_STATE.compareAndSet(
|
||||||
this, TASK_STATE_EXECUTING, TASK_STATE_CONTINUE_IMMEDIATELY)) {
|
this, TASK_STATE_EXECUTING, TASK_STATE_CONTINUE_IMMEDIATELY)) {
|
||||||
asyncExecutor.execute(() -> fire(future, event, index + 1, true, registrations));
|
// We established earlier that registrations[index + 1] is a valid index.
|
||||||
|
// If we are remaining in the same thread for the next handler, fire
|
||||||
|
// the next event immediately, else fire it within the executor service
|
||||||
|
// of the plugin with the next handler.
|
||||||
|
final HandlerRegistration next = registrations[index + 1];
|
||||||
|
final Thread currentThread = Thread.currentThread();
|
||||||
|
if (currentThread == firedOnThread && next.asyncType != AsyncType.ALWAYS) {
|
||||||
|
fire(future, event, index + 1, currentlyAsync, registrations);
|
||||||
|
} else {
|
||||||
|
next.plugin.getExecutorService().execute(() ->
|
||||||
|
fire(future, event, index + 1, true, registrations));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -606,7 +643,7 @@ public class VelocityEventManager implements EventManager {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
asyncExecutor.execute(continuationTask);
|
registration.plugin.getExecutorService().execute(continuationTask);
|
||||||
}
|
}
|
||||||
// fire will continue in another thread once the async task is
|
// fire will continue in another thread once the async task is
|
||||||
// executed and the continuation is resumed
|
// executed and the continuation is resumed
|
||||||
@ -625,13 +662,4 @@ public class VelocityEventManager implements EventManager {
|
|||||||
logger.error("Couldn't pass {} to {}", registration.eventType.getSimpleName(),
|
logger.error("Couldn't pass {} to {}", registration.eventType.getSimpleName(),
|
||||||
registration.plugin.getDescription().getId(), t);
|
registration.plugin.getDescription().getId(), t);
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean shutdown() throws InterruptedException {
|
|
||||||
asyncExecutor.shutdown();
|
|
||||||
return asyncExecutor.awaitTermination(10, TimeUnit.SECONDS);
|
|
||||||
}
|
|
||||||
|
|
||||||
public ExecutorService getAsyncExecutor() {
|
|
||||||
return asyncExecutor;
|
|
||||||
}
|
|
||||||
}
|
}
|
@ -31,7 +31,6 @@ import com.velocitypowered.proxy.event.MockEventManager;
|
|||||||
import com.velocitypowered.proxy.event.VelocityEventManager;
|
import com.velocitypowered.proxy.event.VelocityEventManager;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import org.junit.jupiter.api.AfterAll;
|
|
||||||
import org.junit.jupiter.api.BeforeAll;
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
|
||||||
@ -47,16 +46,6 @@ abstract class CommandTestSuite {
|
|||||||
eventManager = new MockEventManager();
|
eventManager = new MockEventManager();
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterAll
|
|
||||||
static void afterAll() {
|
|
||||||
try {
|
|
||||||
eventManager.shutdown();
|
|
||||||
eventManager = null;
|
|
||||||
} catch (final InterruptedException e) {
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
void setUp() {
|
void setUp() {
|
||||||
this.manager = new VelocityCommandManager(eventManager);
|
this.manager = new VelocityCommandManager(eventManager);
|
||||||
|
@ -41,20 +41,24 @@ import org.junit.jupiter.api.TestInstance;
|
|||||||
public class EventTest {
|
public class EventTest {
|
||||||
|
|
||||||
public static final String CONTINUATION_TEST_THREAD_NAME = "Continuation test thread";
|
public static final String CONTINUATION_TEST_THREAD_NAME = "Continuation test thread";
|
||||||
private final VelocityEventManager eventManager =
|
private final FakePluginManager pluginManager = new FakePluginManager();
|
||||||
new VelocityEventManager(new FakePluginManager());
|
private final VelocityEventManager eventManager = new VelocityEventManager(pluginManager);
|
||||||
|
|
||||||
@AfterAll
|
@AfterAll
|
||||||
void shutdown() throws Exception {
|
void shutdown() throws Exception {
|
||||||
eventManager.shutdown();
|
pluginManager.shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
static final class TestEvent {
|
static final class TestEvent {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void assertSyncThread(final Thread thread) {
|
||||||
|
assertEquals(Thread.currentThread(), thread);
|
||||||
|
}
|
||||||
|
|
||||||
static void assertAsyncThread(final Thread thread) {
|
static void assertAsyncThread(final Thread thread) {
|
||||||
assertTrue(thread.getName().contains("Velocity Async Event Executor"));
|
assertTrue(thread.getName().contains("Test Async Thread"));
|
||||||
}
|
}
|
||||||
|
|
||||||
static void assertContinuationThread(final Thread thread) {
|
static void assertContinuationThread(final Thread thread) {
|
||||||
@ -90,6 +94,7 @@ public class EventTest {
|
|||||||
eventManager.fire(new TestEvent()).get();
|
eventManager.fire(new TestEvent()).get();
|
||||||
} finally {
|
} finally {
|
||||||
eventManager.unregisterListeners(FakePluginManager.PLUGIN_A);
|
eventManager.unregisterListeners(FakePluginManager.PLUGIN_A);
|
||||||
|
eventManager.unregisterListeners(FakePluginManager.PLUGIN_B);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check that the order is A < B < C.
|
// Check that the order is A < B < C.
|
||||||
@ -119,6 +124,7 @@ public class EventTest {
|
|||||||
eventManager.fire(new TestEvent()).get();
|
eventManager.fire(new TestEvent()).get();
|
||||||
} finally {
|
} finally {
|
||||||
eventManager.unregisterListeners(FakePluginManager.PLUGIN_A);
|
eventManager.unregisterListeners(FakePluginManager.PLUGIN_A);
|
||||||
|
eventManager.unregisterListeners(FakePluginManager.PLUGIN_B);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check that the order is A < B < C.
|
// Check that the order is A < B < C.
|
||||||
@ -126,6 +132,26 @@ public class EventTest {
|
|||||||
assertTrue(listener2Invoked.get() < listener3Invoked.get(), "Listener C invoked before B!");
|
assertTrue(listener2Invoked.get() < listener3Invoked.get(), "Listener C invoked before B!");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testAlwaysSync() throws Exception {
|
||||||
|
final AlwaysSyncListener listener = new AlwaysSyncListener();
|
||||||
|
handleMethodListener(listener);
|
||||||
|
assertSyncThread(listener.thread);
|
||||||
|
assertEquals(1, listener.result);
|
||||||
|
}
|
||||||
|
|
||||||
|
static final class AlwaysSyncListener {
|
||||||
|
|
||||||
|
@MonotonicNonNull Thread thread;
|
||||||
|
int result;
|
||||||
|
|
||||||
|
@Subscribe
|
||||||
|
void sync(TestEvent event) {
|
||||||
|
result++;
|
||||||
|
thread = Thread.currentThread();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void testAlwaysAsync() throws Exception {
|
void testAlwaysAsync() throws Exception {
|
||||||
final AlwaysAsyncListener listener = new AlwaysAsyncListener();
|
final AlwaysAsyncListener listener = new AlwaysAsyncListener();
|
||||||
@ -143,7 +169,7 @@ public class EventTest {
|
|||||||
@MonotonicNonNull Thread threadC;
|
@MonotonicNonNull Thread threadC;
|
||||||
int result;
|
int result;
|
||||||
|
|
||||||
@Subscribe
|
@Subscribe(async = true, order = PostOrder.EARLY)
|
||||||
void firstAsync(TestEvent event) {
|
void firstAsync(TestEvent event) {
|
||||||
result++;
|
result++;
|
||||||
threadA = Thread.currentThread();
|
threadA = Thread.currentThread();
|
||||||
@ -155,50 +181,93 @@ public class EventTest {
|
|||||||
return EventTask.async(() -> result++);
|
return EventTask.async(() -> result++);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Subscribe
|
@Subscribe(order = PostOrder.LATE)
|
||||||
void thirdAsync(TestEvent event) {
|
void thirdAsync(TestEvent event) {
|
||||||
result++;
|
result++;
|
||||||
threadC = Thread.currentThread();
|
threadC = Thread.currentThread();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testSometimesAsync() throws Exception {
|
||||||
|
final SometimesAsyncListener listener = new SometimesAsyncListener();
|
||||||
|
handleMethodListener(listener);
|
||||||
|
assertSyncThread(listener.threadA);
|
||||||
|
assertSyncThread(listener.threadB);
|
||||||
|
assertAsyncThread(listener.threadC);
|
||||||
|
assertAsyncThread(listener.threadD);
|
||||||
|
assertEquals(3, listener.result);
|
||||||
|
}
|
||||||
|
|
||||||
|
static final class SometimesAsyncListener {
|
||||||
|
|
||||||
|
@MonotonicNonNull Thread threadA;
|
||||||
|
@MonotonicNonNull Thread threadB;
|
||||||
|
@MonotonicNonNull Thread threadC;
|
||||||
|
@MonotonicNonNull Thread threadD;
|
||||||
|
int result;
|
||||||
|
|
||||||
|
@Subscribe(order = PostOrder.EARLY)
|
||||||
|
void notAsync(TestEvent event) {
|
||||||
|
result++;
|
||||||
|
threadA = Thread.currentThread();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Subscribe
|
||||||
|
EventTask notAsyncUntilTask(TestEvent event) {
|
||||||
|
threadB = Thread.currentThread();
|
||||||
|
return EventTask.async(() -> {
|
||||||
|
threadC = Thread.currentThread();
|
||||||
|
result++;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Subscribe(order = PostOrder.LATE)
|
||||||
|
void stillAsyncAfterTask(TestEvent event) {
|
||||||
|
threadD = Thread.currentThread();
|
||||||
|
result++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void testContinuation() throws Exception {
|
void testContinuation() throws Exception {
|
||||||
final ContinuationListener listener = new ContinuationListener();
|
final ContinuationListener listener = new ContinuationListener();
|
||||||
handleMethodListener(listener);
|
handleMethodListener(listener);
|
||||||
assertAsyncThread(listener.thread1);
|
assertSyncThread(listener.threadA);
|
||||||
assertAsyncThread(listener.thread2);
|
assertSyncThread(listener.threadB);
|
||||||
assertContinuationThread(listener.thread2Custom);
|
assertAsyncThread(listener.threadC);
|
||||||
assertAsyncThread(listener.thread3);
|
|
||||||
assertEquals(2, listener.value.get());
|
assertEquals(2, listener.value.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
static final class ContinuationListener {
|
static final class ContinuationListener {
|
||||||
|
|
||||||
@MonotonicNonNull Thread thread1;
|
@MonotonicNonNull Thread threadA;
|
||||||
@MonotonicNonNull Thread thread2;
|
@MonotonicNonNull Thread threadB;
|
||||||
@MonotonicNonNull Thread thread2Custom;
|
@MonotonicNonNull Thread threadC;
|
||||||
@MonotonicNonNull Thread thread3;
|
|
||||||
|
|
||||||
final AtomicInteger value = new AtomicInteger();
|
final AtomicInteger value = new AtomicInteger();
|
||||||
|
|
||||||
@Subscribe(order = PostOrder.EARLY)
|
@Subscribe(order = PostOrder.EARLY)
|
||||||
EventTask continuation(TestEvent event) {
|
EventTask continuation(TestEvent event) {
|
||||||
thread1 = Thread.currentThread();
|
threadA = Thread.currentThread();
|
||||||
return EventTask.withContinuation(continuation -> {
|
return EventTask.withContinuation(continuation -> {
|
||||||
value.incrementAndGet();
|
value.incrementAndGet();
|
||||||
thread2 = Thread.currentThread();
|
threadB = Thread.currentThread();
|
||||||
new Thread(() -> {
|
new Thread(() -> {
|
||||||
thread2Custom = Thread.currentThread();
|
try {
|
||||||
|
Thread.sleep(100);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
value.incrementAndGet();
|
value.incrementAndGet();
|
||||||
continuation.resume();
|
continuation.resume();
|
||||||
}, CONTINUATION_TEST_THREAD_NAME).start();
|
}).start();
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@Subscribe(order = PostOrder.LATE)
|
@Subscribe(order = PostOrder.LATE)
|
||||||
void afterContinuation(TestEvent event) {
|
void afterContinuation(TestEvent event) {
|
||||||
thread3 = Thread.currentThread();
|
threadC = Thread.currentThread();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -207,9 +276,9 @@ public class EventTest {
|
|||||||
final ResumeContinuationImmediatelyListener listener =
|
final ResumeContinuationImmediatelyListener listener =
|
||||||
new ResumeContinuationImmediatelyListener();
|
new ResumeContinuationImmediatelyListener();
|
||||||
handleMethodListener(listener);
|
handleMethodListener(listener);
|
||||||
assertAsyncThread(listener.threadA);
|
assertSyncThread(listener.threadA);
|
||||||
assertAsyncThread(listener.threadB);
|
assertSyncThread(listener.threadB);
|
||||||
assertAsyncThread(listener.threadC);
|
assertSyncThread(listener.threadC);
|
||||||
assertEquals(2, listener.result);
|
assertEquals(2, listener.result);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -241,42 +310,44 @@ public class EventTest {
|
|||||||
void testContinuationParameter() throws Exception {
|
void testContinuationParameter() throws Exception {
|
||||||
final ContinuationParameterListener listener = new ContinuationParameterListener();
|
final ContinuationParameterListener listener = new ContinuationParameterListener();
|
||||||
handleMethodListener(listener);
|
handleMethodListener(listener);
|
||||||
assertAsyncThread(listener.thread1);
|
assertSyncThread(listener.threadA);
|
||||||
assertAsyncThread(listener.thread2);
|
assertSyncThread(listener.threadB);
|
||||||
assertContinuationThread(listener.thread2Custom);
|
assertAsyncThread(listener.threadC);
|
||||||
assertAsyncThread(listener.thread3);
|
|
||||||
assertEquals(3, listener.result.get());
|
assertEquals(3, listener.result.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
static final class ContinuationParameterListener {
|
static final class ContinuationParameterListener {
|
||||||
|
|
||||||
@MonotonicNonNull Thread thread1;
|
@MonotonicNonNull Thread threadA;
|
||||||
@MonotonicNonNull Thread thread2;
|
@MonotonicNonNull Thread threadB;
|
||||||
@MonotonicNonNull Thread thread2Custom;
|
@MonotonicNonNull Thread threadC;
|
||||||
@MonotonicNonNull Thread thread3;
|
|
||||||
|
|
||||||
final AtomicInteger result = new AtomicInteger();
|
final AtomicInteger result = new AtomicInteger();
|
||||||
|
|
||||||
@Subscribe
|
@Subscribe
|
||||||
void resume(TestEvent event, Continuation continuation) {
|
void resume(TestEvent event, Continuation continuation) {
|
||||||
thread1 = Thread.currentThread();
|
threadA = Thread.currentThread();
|
||||||
result.incrementAndGet();
|
result.incrementAndGet();
|
||||||
continuation.resume();
|
continuation.resume();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Subscribe(order = PostOrder.LATE)
|
@Subscribe(order = PostOrder.LATE)
|
||||||
void resumeFromCustomThread(TestEvent event, Continuation continuation) {
|
void resumeFromCustomThread(TestEvent event, Continuation continuation) {
|
||||||
thread2 = Thread.currentThread();
|
threadB = Thread.currentThread();
|
||||||
new Thread(() -> {
|
new Thread(() -> {
|
||||||
thread2Custom = Thread.currentThread();
|
try {
|
||||||
|
Thread.sleep(100);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
result.incrementAndGet();
|
result.incrementAndGet();
|
||||||
continuation.resume();
|
continuation.resume();
|
||||||
}, CONTINUATION_TEST_THREAD_NAME).start();
|
}).start();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Subscribe(order = PostOrder.LAST)
|
@Subscribe(order = PostOrder.LAST)
|
||||||
void afterCustomThread(TestEvent event, Continuation continuation) {
|
void afterCustomThread(TestEvent event, Continuation continuation) {
|
||||||
thread3 = Thread.currentThread();
|
threadC = Thread.currentThread();
|
||||||
result.incrementAndGet();
|
result.incrementAndGet();
|
||||||
continuation.resume();
|
continuation.resume();
|
||||||
}
|
}
|
||||||
@ -328,8 +399,7 @@ public class EventTest {
|
|||||||
+ "the second is the fancy continuation");
|
+ "the second is the fancy continuation");
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
new TypeToken<TriConsumer<Object, Object, FancyContinuation>>() {
|
new TypeToken<TriConsumer<Object, Object, FancyContinuation>>() {},
|
||||||
},
|
|
||||||
invokeFunction -> (instance, event) ->
|
invokeFunction -> (instance, event) ->
|
||||||
EventTask.withContinuation(continuation ->
|
EventTask.withContinuation(continuation ->
|
||||||
invokeFunction.accept(instance, event, new FancyContinuationImpl(continuation))
|
invokeFunction.accept(instance, event, new FancyContinuationImpl(continuation))
|
||||||
|
@ -76,12 +76,12 @@ class VelocitySchedulerTest {
|
|||||||
|
|
||||||
scheduler.buildTask(FakePluginManager.PLUGIN_A, task -> {
|
scheduler.buildTask(FakePluginManager.PLUGIN_A, task -> {
|
||||||
runningLatch.countDown();
|
runningLatch.countDown();
|
||||||
task.cancel();
|
|
||||||
try {
|
try {
|
||||||
endingLatch.await();
|
endingLatch.await();
|
||||||
} catch (InterruptedException ignored) {
|
} catch (InterruptedException ignored) {
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
}
|
}
|
||||||
|
task.cancel();
|
||||||
}).delay(50, TimeUnit.MILLISECONDS)
|
}).delay(50, TimeUnit.MILLISECONDS)
|
||||||
.repeat(Duration.ofMillis(5))
|
.repeat(Duration.ofMillis(5))
|
||||||
.schedule();
|
.schedule();
|
||||||
|
@ -18,6 +18,7 @@
|
|||||||
package com.velocitypowered.proxy.testutil;
|
package com.velocitypowered.proxy.testutil;
|
||||||
|
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
import com.velocitypowered.api.plugin.PluginContainer;
|
import com.velocitypowered.api.plugin.PluginContainer;
|
||||||
import com.velocitypowered.api.plugin.PluginDescription;
|
import com.velocitypowered.api.plugin.PluginDescription;
|
||||||
import com.velocitypowered.api.plugin.PluginManager;
|
import com.velocitypowered.api.plugin.PluginManager;
|
||||||
@ -25,7 +26,7 @@ import java.nio.file.Path;
|
|||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.ForkJoinPool;
|
import java.util.concurrent.Executors;
|
||||||
import org.checkerframework.checker.nullness.qual.NonNull;
|
import org.checkerframework.checker.nullness.qual.NonNull;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -36,15 +37,19 @@ public class FakePluginManager implements PluginManager {
|
|||||||
public static final Object PLUGIN_A = new Object();
|
public static final Object PLUGIN_A = new Object();
|
||||||
public static final Object PLUGIN_B = new Object();
|
public static final Object PLUGIN_B = new Object();
|
||||||
|
|
||||||
private static final PluginContainer PC_A = new FakePluginContainer("a", PLUGIN_A);
|
private final PluginContainer containerA = new FakePluginContainer("a", PLUGIN_A);
|
||||||
private static final PluginContainer PC_B = new FakePluginContainer("b", PLUGIN_B);
|
private final PluginContainer containerB = new FakePluginContainer("b", PLUGIN_B);
|
||||||
|
|
||||||
|
private ExecutorService service = Executors.newCachedThreadPool(
|
||||||
|
new ThreadFactoryBuilder().setNameFormat("Test Async Thread").setDaemon(true).build()
|
||||||
|
);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public @NonNull Optional<PluginContainer> fromInstance(@NonNull Object instance) {
|
public @NonNull Optional<PluginContainer> fromInstance(@NonNull Object instance) {
|
||||||
if (instance == PLUGIN_A) {
|
if (instance == PLUGIN_A) {
|
||||||
return Optional.of(PC_A);
|
return Optional.of(containerA);
|
||||||
} else if (instance == PLUGIN_B) {
|
} else if (instance == PLUGIN_B) {
|
||||||
return Optional.of(PC_B);
|
return Optional.of(containerB);
|
||||||
} else {
|
} else {
|
||||||
return Optional.empty();
|
return Optional.empty();
|
||||||
}
|
}
|
||||||
@ -54,9 +59,9 @@ public class FakePluginManager implements PluginManager {
|
|||||||
public @NonNull Optional<PluginContainer> getPlugin(@NonNull String id) {
|
public @NonNull Optional<PluginContainer> getPlugin(@NonNull String id) {
|
||||||
switch (id) {
|
switch (id) {
|
||||||
case "a":
|
case "a":
|
||||||
return Optional.of(PC_A);
|
return Optional.of(containerA);
|
||||||
case "b":
|
case "b":
|
||||||
return Optional.of(PC_B);
|
return Optional.of(containerB);
|
||||||
default:
|
default:
|
||||||
return Optional.empty();
|
return Optional.empty();
|
||||||
}
|
}
|
||||||
@ -64,7 +69,7 @@ public class FakePluginManager implements PluginManager {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public @NonNull Collection<PluginContainer> getPlugins() {
|
public @NonNull Collection<PluginContainer> getPlugins() {
|
||||||
return ImmutableList.of(PC_A, PC_B);
|
return ImmutableList.of(containerA, containerB);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -77,16 +82,18 @@ public class FakePluginManager implements PluginManager {
|
|||||||
throw new UnsupportedOperationException();
|
throw new UnsupportedOperationException();
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class FakePluginContainer implements PluginContainer {
|
public void shutdown() {
|
||||||
|
this.service.shutdownNow();
|
||||||
|
}
|
||||||
|
|
||||||
|
private class FakePluginContainer implements PluginContainer {
|
||||||
|
|
||||||
private final String id;
|
private final String id;
|
||||||
private final Object instance;
|
private final Object instance;
|
||||||
private final ExecutorService service;
|
|
||||||
|
|
||||||
private FakePluginContainer(String id, Object instance) {
|
private FakePluginContainer(String id, Object instance) {
|
||||||
this.id = id;
|
this.id = id;
|
||||||
this.instance = instance;
|
this.instance = instance;
|
||||||
this.service = ForkJoinPool.commonPool();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
Laden…
In neuem Issue referenzieren
Einen Benutzer sperren