From a29c753e394441ef2e926ef7902b581df84ac4eb Mon Sep 17 00:00:00 2001 From: Andrew Steinborn Date: Sun, 14 May 2023 02:51:25 -0400 Subject: [PATCH] Give each plugin its own executor service (#1010) This is part of preparatory work for Velocity 5.0.0's revamped event system, but this change is safe to bring into the 3.x.x series. This affects the scheduler for now, but command execution will also be moved into the per-plugin thread pool, along with invocations of `EventTask.async()`. --- .../api/plugin/PluginContainer.java | 9 ++ .../loader/VelocityPluginContainer.java | 24 ++++ .../loader/java/VelocityPluginModule.java | 3 + .../proxy/scheduler/VelocityScheduler.java | 112 ++++++++++++------ .../scheduler/VelocitySchedulerTest.java | 22 ++-- .../proxy/testutil/FakePluginManager.java | 9 ++ 6 files changed, 135 insertions(+), 44 deletions(-) diff --git a/api/src/main/java/com/velocitypowered/api/plugin/PluginContainer.java b/api/src/main/java/com/velocitypowered/api/plugin/PluginContainer.java index 6521b652a..2420f5548 100644 --- a/api/src/main/java/com/velocitypowered/api/plugin/PluginContainer.java +++ b/api/src/main/java/com/velocitypowered/api/plugin/PluginContainer.java @@ -8,6 +8,7 @@ package com.velocitypowered.api.plugin; import java.util.Optional; +import java.util.concurrent.ExecutorService; /** * A wrapper around a plugin loaded by the proxy. @@ -29,4 +30,12 @@ public interface PluginContainer { default Optional getInstance() { return Optional.empty(); } + + /** + * Returns an executor service for this plugin. The executor will use a cached + * thread pool. + * + * @return an {@link ExecutorService} associated with this plugin + */ + ExecutorService getExecutorService(); } diff --git a/proxy/src/main/java/com/velocitypowered/proxy/plugin/loader/VelocityPluginContainer.java b/proxy/src/main/java/com/velocitypowered/proxy/plugin/loader/VelocityPluginContainer.java index 6774597fe..fb9a4ea3f 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/plugin/loader/VelocityPluginContainer.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/plugin/loader/VelocityPluginContainer.java @@ -17,9 +17,12 @@ package com.velocitypowered.proxy.plugin.loader; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.velocitypowered.api.plugin.PluginContainer; import com.velocitypowered.api.plugin.PluginDescription; import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; /** * Implements {@link PluginContainer}. @@ -28,6 +31,7 @@ public class VelocityPluginContainer implements PluginContainer { private final PluginDescription description; private Object instance; + private volatile ExecutorService service; public VelocityPluginContainer(PluginDescription description) { this.description = description; @@ -46,4 +50,24 @@ public class VelocityPluginContainer implements PluginContainer { public void setInstance(Object instance) { this.instance = instance; } + + @Override + public ExecutorService getExecutorService() { + if (this.service == null) { + synchronized (this) { + if (this.service == null) { + String name = this.description.getName().orElse(this.description.getId()); + this.service = Executors.unconfigurableExecutorService( + Executors.newCachedThreadPool( + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat(name + " - Task Executor #%d") + .build() + ) + ); + } + } + } + + return this.service; + } } diff --git a/proxy/src/main/java/com/velocitypowered/proxy/plugin/loader/java/VelocityPluginModule.java b/proxy/src/main/java/com/velocitypowered/proxy/plugin/loader/java/VelocityPluginModule.java index c702b4944..8e59816af 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/plugin/loader/java/VelocityPluginModule.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/plugin/loader/java/VelocityPluginModule.java @@ -25,6 +25,7 @@ import com.velocitypowered.api.plugin.PluginDescription; import com.velocitypowered.api.plugin.annotation.DataDirectory; import com.velocitypowered.api.proxy.ProxyServer; import java.nio.file.Path; +import java.util.concurrent.ExecutorService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,5 +53,7 @@ class VelocityPluginModule implements Module { .toInstance(basePluginPath.resolve(description.getId())); binder.bind(PluginDescription.class).toInstance(description); binder.bind(PluginContainer.class).toInstance(pluginContainer); + + binder.bind(ExecutorService.class).toProvider(pluginContainer::getExecutorService); } } diff --git a/proxy/src/main/java/com/velocitypowered/proxy/scheduler/VelocityScheduler.java b/proxy/src/main/java/com/velocitypowered/proxy/scheduler/VelocityScheduler.java index 4d27a2ecf..96d0b94d6 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/scheduler/VelocityScheduler.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/scheduler/VelocityScheduler.java @@ -24,39 +24,39 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Multimap; import com.google.common.collect.Multimaps; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.velocitypowered.api.plugin.PluginContainer; import com.velocitypowered.api.plugin.PluginManager; import com.velocitypowered.api.scheduler.ScheduledTask; import com.velocitypowered.api.scheduler.Scheduler; import com.velocitypowered.api.scheduler.TaskStatus; +import com.velocitypowered.proxy.plugin.loader.VelocityPluginContainer; import java.util.Collection; import java.util.HashSet; import java.util.IdentityHashMap; +import java.util.Optional; import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; +import org.jetbrains.annotations.VisibleForTesting; /** * The Velocity "scheduler", which is actually a thin wrapper around * {@link ScheduledExecutorService} and a dynamically-sized {@link ExecutorService}. - * Many plugins are accustomed to the Bukkit Scheduler model although it is not relevant + * Many plugins are accustomed to the Bukkit Scheduler model, although it is not relevant * in a proxy context. */ public class VelocityScheduler implements Scheduler { - private static final int MAX_SCHEDULER_POOLED_THREAD_CAP = 200; - private final PluginManager pluginManager; - private final ExecutorService taskService; private final ScheduledExecutorService timerExecutionService; private final Multimap tasksByPlugin = Multimaps.synchronizedMultimap( Multimaps.newSetMultimap(new IdentityHashMap<>(), HashSet::new)); @@ -68,10 +68,6 @@ public class VelocityScheduler implements Scheduler { */ public VelocityScheduler(PluginManager pluginManager) { this.pluginManager = pluginManager; - this.taskService = new ThreadPoolExecutor(1, MAX_SCHEDULER_POOLED_THREAD_CAP, - 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), - new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("Velocity Task Scheduler - #%d").build()); this.timerExecutionService = Executors .newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true) .setNameFormat("Velocity Task Scheduler Timer").build()); @@ -81,16 +77,18 @@ public class VelocityScheduler implements Scheduler { public TaskBuilder buildTask(Object plugin, Runnable runnable) { checkNotNull(plugin, "plugin"); checkNotNull(runnable, "runnable"); - checkArgument(pluginManager.fromInstance(plugin).isPresent(), "plugin is not registered"); - return new TaskBuilderImpl(plugin, runnable, null); + final Optional container = pluginManager.fromInstance(plugin); + checkArgument(container.isPresent(), "plugin is not registered"); + return new TaskBuilderImpl(container.get(), runnable); } @Override public TaskBuilder buildTask(Object plugin, Consumer consumer) { checkNotNull(plugin, "plugin"); checkNotNull(consumer, "consumer"); - checkArgument(pluginManager.fromInstance(plugin).isPresent(), "plugin is not registered"); - return new TaskBuilderImpl(plugin, null, consumer); + final Optional container = pluginManager.fromInstance(plugin); + checkArgument(container.isPresent(), "plugin is not registered"); + return new TaskBuilderImpl(container.get(), consumer); } @Override @@ -118,22 +116,55 @@ public class VelocityScheduler implements Scheduler { task.cancel(); } timerExecutionService.shutdown(); - taskService.shutdown(); - return taskService.awaitTermination(10, TimeUnit.SECONDS); + for (final PluginContainer container : this.pluginManager.getPlugins()) { + if (container instanceof VelocityPluginContainer) { + (container).getExecutorService().shutdown(); + } + } + + boolean allShutdown = true; + for (final PluginContainer container : this.pluginManager.getPlugins()) { + if (!(container instanceof VelocityPluginContainer)) { + continue; + } + final String id = container.getDescription().getId(); + final ExecutorService service = (container).getExecutorService(); + + try { + if (!service.awaitTermination(10, TimeUnit.SECONDS)) { + service.shutdownNow(); + Log.logger.warn("Executor for plugin {} did not shut down within 10 seconds. " + + "Continuing with shutdown...", id); + allShutdown = false; + } + + } catch (final InterruptedException e) { + Log.logger.warn("Executor for plugin {} did not shut down within 10 seconds. " + + "Continuing with shutdown...", id); + } + } + + return allShutdown; } private class TaskBuilderImpl implements TaskBuilder { - private final Object plugin; + private final PluginContainer container; private final Runnable runnable; private final Consumer consumer; private long delay; // ms private long repeat; // ms - private TaskBuilderImpl(Object plugin, Runnable runnable, Consumer consumer) { - this.plugin = plugin; - this.runnable = runnable; + private TaskBuilderImpl(PluginContainer container, Consumer consumer) { + this.container = container; this.consumer = consumer; + this.runnable = null; + } + + private TaskBuilderImpl(PluginContainer container, Runnable runnable) { + this.container = container; + this.consumer = null; + this.runnable = runnable; } @Override @@ -162,16 +193,17 @@ public class VelocityScheduler implements Scheduler { @Override public ScheduledTask schedule() { - VelocityTask task = new VelocityTask(plugin, runnable, consumer, delay, repeat); - tasksByPlugin.put(plugin, task); + VelocityTask task = new VelocityTask(container, runnable, consumer, delay, repeat); + tasksByPlugin.put(container.getInstance().get(), task); task.schedule(); return task; } } - private class VelocityTask implements Runnable, ScheduledTask { + @VisibleForTesting + class VelocityTask implements Runnable, ScheduledTask { - private final Object plugin; + private final PluginContainer container; private final Runnable runnable; private final Consumer consumer; private final long delay; @@ -179,9 +211,9 @@ public class VelocityScheduler implements Scheduler { private @Nullable ScheduledFuture future; private volatile @Nullable Thread currentTaskThread; - private VelocityTask(Object plugin, Runnable runnable, Consumer consumer, - long delay, long repeat) { - this.plugin = plugin; + private VelocityTask(PluginContainer container, Runnable runnable, + Consumer consumer, long delay, long repeat) { + this.container = container; this.runnable = runnable; this.consumer = consumer; this.delay = delay; @@ -199,7 +231,8 @@ public class VelocityScheduler implements Scheduler { @Override public Object plugin() { - return plugin; + //noinspection OptionalGetWithoutIsPresent + return container.getInstance().get(); } @Override @@ -235,7 +268,7 @@ public class VelocityScheduler implements Scheduler { @Override public void run() { - taskService.execute(() -> { + container.getExecutorService().execute(() -> { currentTaskThread = Thread.currentThread(); try { if (runnable != null) { @@ -248,11 +281,10 @@ public class VelocityScheduler implements Scheduler { if (e instanceof InterruptedException) { Thread.currentThread().interrupt(); } else { - String friendlyPluginName = pluginManager.fromInstance(plugin) - .map(container -> container.getDescription().getName() - .orElse(container.getDescription().getId())) - .orElse("UNKNOWN"); - Log.logger.error("Exception in task {} by plugin {}", runnable, friendlyPluginName, + String friendlyPluginName = container.getDescription().getName() + .orElse(container.getDescription().getId()); + Object unit = consumer == null ? runnable : consumer; + Log.logger.error("Exception in task {} by plugin {}", unit, friendlyPluginName, e); } } finally { @@ -265,7 +297,17 @@ public class VelocityScheduler implements Scheduler { } private void onFinish() { - tasksByPlugin.remove(plugin, this); + tasksByPlugin.remove(plugin(), this); + } + + public void awaitCompletion() { + try { + future.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } } } diff --git a/proxy/src/test/java/com/velocitypowered/proxy/scheduler/VelocitySchedulerTest.java b/proxy/src/test/java/com/velocitypowered/proxy/scheduler/VelocitySchedulerTest.java index df9ac1d31..70cd20b10 100644 --- a/proxy/src/test/java/com/velocitypowered/proxy/scheduler/VelocitySchedulerTest.java +++ b/proxy/src/test/java/com/velocitypowered/proxy/scheduler/VelocitySchedulerTest.java @@ -21,6 +21,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import com.velocitypowered.api.scheduler.ScheduledTask; import com.velocitypowered.api.scheduler.TaskStatus; +import com.velocitypowered.proxy.scheduler.VelocityScheduler.VelocityTask; import com.velocitypowered.proxy.testutil.FakePluginManager; import java.time.Duration; import java.util.concurrent.CountDownLatch; @@ -39,6 +40,7 @@ class VelocitySchedulerTest { ScheduledTask task = scheduler.buildTask(FakePluginManager.PLUGIN_A, latch::countDown) .schedule(); latch.await(); + ((VelocityTask) task).awaitCompletion(); assertEquals(TaskStatus.FINISHED, task.status()); } @@ -50,7 +52,6 @@ class VelocitySchedulerTest { .delay(100, TimeUnit.SECONDS) .schedule(); task.cancel(); - Thread.sleep(200); assertEquals(3, i.get()); assertEquals(TaskStatus.CANCELLED, task.status()); } @@ -70,23 +71,26 @@ class VelocitySchedulerTest { @Test void obtainTasksFromPlugin() throws Exception { VelocityScheduler scheduler = new VelocityScheduler(new FakePluginManager()); - AtomicInteger i = new AtomicInteger(0); - CountDownLatch latch = new CountDownLatch(1); + CountDownLatch runningLatch = new CountDownLatch(1); + CountDownLatch endingLatch = new CountDownLatch(1); scheduler.buildTask(FakePluginManager.PLUGIN_A, task -> { - if (i.getAndIncrement() >= 1) { - task.cancel(); - latch.countDown(); + runningLatch.countDown(); + task.cancel(); + try { + endingLatch.await(); + } catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); } }).delay(50, TimeUnit.MILLISECONDS) .repeat(Duration.ofMillis(5)) .schedule(); + runningLatch.await(); + assertEquals(scheduler.tasksByPlugin(FakePluginManager.PLUGIN_A).size(), 1); - latch.await(); - - assertEquals(scheduler.tasksByPlugin(FakePluginManager.PLUGIN_A).size(), 0); + endingLatch.countDown(); } @Test diff --git a/proxy/src/test/java/com/velocitypowered/proxy/testutil/FakePluginManager.java b/proxy/src/test/java/com/velocitypowered/proxy/testutil/FakePluginManager.java index cb15f98fd..04ba3867d 100644 --- a/proxy/src/test/java/com/velocitypowered/proxy/testutil/FakePluginManager.java +++ b/proxy/src/test/java/com/velocitypowered/proxy/testutil/FakePluginManager.java @@ -24,6 +24,8 @@ import com.velocitypowered.api.plugin.PluginManager; import java.nio.file.Path; import java.util.Collection; import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ForkJoinPool; import org.checkerframework.checker.nullness.qual.NonNull; /** @@ -79,10 +81,12 @@ public class FakePluginManager implements PluginManager { private final String id; private final Object instance; + private final ExecutorService service; private FakePluginContainer(String id, Object instance) { this.id = id; this.instance = instance; + this.service = ForkJoinPool.commonPool(); } @Override @@ -94,5 +98,10 @@ public class FakePluginManager implements PluginManager { public Optional getInstance() { return Optional.of(instance); } + + @Override + public ExecutorService getExecutorService() { + return service; + } } }