diff --git a/api/src/main/java/com/velocitypowered/api/plugin/PluginContainer.java b/api/src/main/java/com/velocitypowered/api/plugin/PluginContainer.java index 6521b652a..2420f5548 100644 --- a/api/src/main/java/com/velocitypowered/api/plugin/PluginContainer.java +++ b/api/src/main/java/com/velocitypowered/api/plugin/PluginContainer.java @@ -8,6 +8,7 @@ package com.velocitypowered.api.plugin; import java.util.Optional; +import java.util.concurrent.ExecutorService; /** * A wrapper around a plugin loaded by the proxy. @@ -29,4 +30,12 @@ public interface PluginContainer { default Optional getInstance() { return Optional.empty(); } + + /** + * Returns an executor service for this plugin. The executor will use a cached + * thread pool. + * + * @return an {@link ExecutorService} associated with this plugin + */ + ExecutorService getExecutorService(); } diff --git a/proxy/src/main/java/com/velocitypowered/proxy/plugin/loader/VelocityPluginContainer.java b/proxy/src/main/java/com/velocitypowered/proxy/plugin/loader/VelocityPluginContainer.java index 6774597fe..fb9a4ea3f 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/plugin/loader/VelocityPluginContainer.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/plugin/loader/VelocityPluginContainer.java @@ -17,9 +17,12 @@ package com.velocitypowered.proxy.plugin.loader; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.velocitypowered.api.plugin.PluginContainer; import com.velocitypowered.api.plugin.PluginDescription; import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; /** * Implements {@link PluginContainer}. @@ -28,6 +31,7 @@ public class VelocityPluginContainer implements PluginContainer { private final PluginDescription description; private Object instance; + private volatile ExecutorService service; public VelocityPluginContainer(PluginDescription description) { this.description = description; @@ -46,4 +50,24 @@ public class VelocityPluginContainer implements PluginContainer { public void setInstance(Object instance) { this.instance = instance; } + + @Override + public ExecutorService getExecutorService() { + if (this.service == null) { + synchronized (this) { + if (this.service == null) { + String name = this.description.getName().orElse(this.description.getId()); + this.service = Executors.unconfigurableExecutorService( + Executors.newCachedThreadPool( + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat(name + " - Task Executor #%d") + .build() + ) + ); + } + } + } + + return this.service; + } } diff --git a/proxy/src/main/java/com/velocitypowered/proxy/plugin/loader/java/VelocityPluginModule.java b/proxy/src/main/java/com/velocitypowered/proxy/plugin/loader/java/VelocityPluginModule.java index c702b4944..8e59816af 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/plugin/loader/java/VelocityPluginModule.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/plugin/loader/java/VelocityPluginModule.java @@ -25,6 +25,7 @@ import com.velocitypowered.api.plugin.PluginDescription; import com.velocitypowered.api.plugin.annotation.DataDirectory; import com.velocitypowered.api.proxy.ProxyServer; import java.nio.file.Path; +import java.util.concurrent.ExecutorService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,5 +53,7 @@ class VelocityPluginModule implements Module { .toInstance(basePluginPath.resolve(description.getId())); binder.bind(PluginDescription.class).toInstance(description); binder.bind(PluginContainer.class).toInstance(pluginContainer); + + binder.bind(ExecutorService.class).toProvider(pluginContainer::getExecutorService); } } diff --git a/proxy/src/main/java/com/velocitypowered/proxy/scheduler/VelocityScheduler.java b/proxy/src/main/java/com/velocitypowered/proxy/scheduler/VelocityScheduler.java index 4d27a2ecf..96d0b94d6 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/scheduler/VelocityScheduler.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/scheduler/VelocityScheduler.java @@ -24,39 +24,39 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Multimap; import com.google.common.collect.Multimaps; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.velocitypowered.api.plugin.PluginContainer; import com.velocitypowered.api.plugin.PluginManager; import com.velocitypowered.api.scheduler.ScheduledTask; import com.velocitypowered.api.scheduler.Scheduler; import com.velocitypowered.api.scheduler.TaskStatus; +import com.velocitypowered.proxy.plugin.loader.VelocityPluginContainer; import java.util.Collection; import java.util.HashSet; import java.util.IdentityHashMap; +import java.util.Optional; import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; +import org.jetbrains.annotations.VisibleForTesting; /** * The Velocity "scheduler", which is actually a thin wrapper around * {@link ScheduledExecutorService} and a dynamically-sized {@link ExecutorService}. - * Many plugins are accustomed to the Bukkit Scheduler model although it is not relevant + * Many plugins are accustomed to the Bukkit Scheduler model, although it is not relevant * in a proxy context. */ public class VelocityScheduler implements Scheduler { - private static final int MAX_SCHEDULER_POOLED_THREAD_CAP = 200; - private final PluginManager pluginManager; - private final ExecutorService taskService; private final ScheduledExecutorService timerExecutionService; private final Multimap tasksByPlugin = Multimaps.synchronizedMultimap( Multimaps.newSetMultimap(new IdentityHashMap<>(), HashSet::new)); @@ -68,10 +68,6 @@ public class VelocityScheduler implements Scheduler { */ public VelocityScheduler(PluginManager pluginManager) { this.pluginManager = pluginManager; - this.taskService = new ThreadPoolExecutor(1, MAX_SCHEDULER_POOLED_THREAD_CAP, - 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), - new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("Velocity Task Scheduler - #%d").build()); this.timerExecutionService = Executors .newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true) .setNameFormat("Velocity Task Scheduler Timer").build()); @@ -81,16 +77,18 @@ public class VelocityScheduler implements Scheduler { public TaskBuilder buildTask(Object plugin, Runnable runnable) { checkNotNull(plugin, "plugin"); checkNotNull(runnable, "runnable"); - checkArgument(pluginManager.fromInstance(plugin).isPresent(), "plugin is not registered"); - return new TaskBuilderImpl(plugin, runnable, null); + final Optional container = pluginManager.fromInstance(plugin); + checkArgument(container.isPresent(), "plugin is not registered"); + return new TaskBuilderImpl(container.get(), runnable); } @Override public TaskBuilder buildTask(Object plugin, Consumer consumer) { checkNotNull(plugin, "plugin"); checkNotNull(consumer, "consumer"); - checkArgument(pluginManager.fromInstance(plugin).isPresent(), "plugin is not registered"); - return new TaskBuilderImpl(plugin, null, consumer); + final Optional container = pluginManager.fromInstance(plugin); + checkArgument(container.isPresent(), "plugin is not registered"); + return new TaskBuilderImpl(container.get(), consumer); } @Override @@ -118,22 +116,55 @@ public class VelocityScheduler implements Scheduler { task.cancel(); } timerExecutionService.shutdown(); - taskService.shutdown(); - return taskService.awaitTermination(10, TimeUnit.SECONDS); + for (final PluginContainer container : this.pluginManager.getPlugins()) { + if (container instanceof VelocityPluginContainer) { + (container).getExecutorService().shutdown(); + } + } + + boolean allShutdown = true; + for (final PluginContainer container : this.pluginManager.getPlugins()) { + if (!(container instanceof VelocityPluginContainer)) { + continue; + } + final String id = container.getDescription().getId(); + final ExecutorService service = (container).getExecutorService(); + + try { + if (!service.awaitTermination(10, TimeUnit.SECONDS)) { + service.shutdownNow(); + Log.logger.warn("Executor for plugin {} did not shut down within 10 seconds. " + + "Continuing with shutdown...", id); + allShutdown = false; + } + + } catch (final InterruptedException e) { + Log.logger.warn("Executor for plugin {} did not shut down within 10 seconds. " + + "Continuing with shutdown...", id); + } + } + + return allShutdown; } private class TaskBuilderImpl implements TaskBuilder { - private final Object plugin; + private final PluginContainer container; private final Runnable runnable; private final Consumer consumer; private long delay; // ms private long repeat; // ms - private TaskBuilderImpl(Object plugin, Runnable runnable, Consumer consumer) { - this.plugin = plugin; - this.runnable = runnable; + private TaskBuilderImpl(PluginContainer container, Consumer consumer) { + this.container = container; this.consumer = consumer; + this.runnable = null; + } + + private TaskBuilderImpl(PluginContainer container, Runnable runnable) { + this.container = container; + this.consumer = null; + this.runnable = runnable; } @Override @@ -162,16 +193,17 @@ public class VelocityScheduler implements Scheduler { @Override public ScheduledTask schedule() { - VelocityTask task = new VelocityTask(plugin, runnable, consumer, delay, repeat); - tasksByPlugin.put(plugin, task); + VelocityTask task = new VelocityTask(container, runnable, consumer, delay, repeat); + tasksByPlugin.put(container.getInstance().get(), task); task.schedule(); return task; } } - private class VelocityTask implements Runnable, ScheduledTask { + @VisibleForTesting + class VelocityTask implements Runnable, ScheduledTask { - private final Object plugin; + private final PluginContainer container; private final Runnable runnable; private final Consumer consumer; private final long delay; @@ -179,9 +211,9 @@ public class VelocityScheduler implements Scheduler { private @Nullable ScheduledFuture future; private volatile @Nullable Thread currentTaskThread; - private VelocityTask(Object plugin, Runnable runnable, Consumer consumer, - long delay, long repeat) { - this.plugin = plugin; + private VelocityTask(PluginContainer container, Runnable runnable, + Consumer consumer, long delay, long repeat) { + this.container = container; this.runnable = runnable; this.consumer = consumer; this.delay = delay; @@ -199,7 +231,8 @@ public class VelocityScheduler implements Scheduler { @Override public Object plugin() { - return plugin; + //noinspection OptionalGetWithoutIsPresent + return container.getInstance().get(); } @Override @@ -235,7 +268,7 @@ public class VelocityScheduler implements Scheduler { @Override public void run() { - taskService.execute(() -> { + container.getExecutorService().execute(() -> { currentTaskThread = Thread.currentThread(); try { if (runnable != null) { @@ -248,11 +281,10 @@ public class VelocityScheduler implements Scheduler { if (e instanceof InterruptedException) { Thread.currentThread().interrupt(); } else { - String friendlyPluginName = pluginManager.fromInstance(plugin) - .map(container -> container.getDescription().getName() - .orElse(container.getDescription().getId())) - .orElse("UNKNOWN"); - Log.logger.error("Exception in task {} by plugin {}", runnable, friendlyPluginName, + String friendlyPluginName = container.getDescription().getName() + .orElse(container.getDescription().getId()); + Object unit = consumer == null ? runnable : consumer; + Log.logger.error("Exception in task {} by plugin {}", unit, friendlyPluginName, e); } } finally { @@ -265,7 +297,17 @@ public class VelocityScheduler implements Scheduler { } private void onFinish() { - tasksByPlugin.remove(plugin, this); + tasksByPlugin.remove(plugin(), this); + } + + public void awaitCompletion() { + try { + future.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } } } diff --git a/proxy/src/test/java/com/velocitypowered/proxy/scheduler/VelocitySchedulerTest.java b/proxy/src/test/java/com/velocitypowered/proxy/scheduler/VelocitySchedulerTest.java index df9ac1d31..70cd20b10 100644 --- a/proxy/src/test/java/com/velocitypowered/proxy/scheduler/VelocitySchedulerTest.java +++ b/proxy/src/test/java/com/velocitypowered/proxy/scheduler/VelocitySchedulerTest.java @@ -21,6 +21,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import com.velocitypowered.api.scheduler.ScheduledTask; import com.velocitypowered.api.scheduler.TaskStatus; +import com.velocitypowered.proxy.scheduler.VelocityScheduler.VelocityTask; import com.velocitypowered.proxy.testutil.FakePluginManager; import java.time.Duration; import java.util.concurrent.CountDownLatch; @@ -39,6 +40,7 @@ class VelocitySchedulerTest { ScheduledTask task = scheduler.buildTask(FakePluginManager.PLUGIN_A, latch::countDown) .schedule(); latch.await(); + ((VelocityTask) task).awaitCompletion(); assertEquals(TaskStatus.FINISHED, task.status()); } @@ -50,7 +52,6 @@ class VelocitySchedulerTest { .delay(100, TimeUnit.SECONDS) .schedule(); task.cancel(); - Thread.sleep(200); assertEquals(3, i.get()); assertEquals(TaskStatus.CANCELLED, task.status()); } @@ -70,23 +71,26 @@ class VelocitySchedulerTest { @Test void obtainTasksFromPlugin() throws Exception { VelocityScheduler scheduler = new VelocityScheduler(new FakePluginManager()); - AtomicInteger i = new AtomicInteger(0); - CountDownLatch latch = new CountDownLatch(1); + CountDownLatch runningLatch = new CountDownLatch(1); + CountDownLatch endingLatch = new CountDownLatch(1); scheduler.buildTask(FakePluginManager.PLUGIN_A, task -> { - if (i.getAndIncrement() >= 1) { - task.cancel(); - latch.countDown(); + runningLatch.countDown(); + task.cancel(); + try { + endingLatch.await(); + } catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); } }).delay(50, TimeUnit.MILLISECONDS) .repeat(Duration.ofMillis(5)) .schedule(); + runningLatch.await(); + assertEquals(scheduler.tasksByPlugin(FakePluginManager.PLUGIN_A).size(), 1); - latch.await(); - - assertEquals(scheduler.tasksByPlugin(FakePluginManager.PLUGIN_A).size(), 0); + endingLatch.countDown(); } @Test diff --git a/proxy/src/test/java/com/velocitypowered/proxy/testutil/FakePluginManager.java b/proxy/src/test/java/com/velocitypowered/proxy/testutil/FakePluginManager.java index cb15f98fd..04ba3867d 100644 --- a/proxy/src/test/java/com/velocitypowered/proxy/testutil/FakePluginManager.java +++ b/proxy/src/test/java/com/velocitypowered/proxy/testutil/FakePluginManager.java @@ -24,6 +24,8 @@ import com.velocitypowered.api.plugin.PluginManager; import java.nio.file.Path; import java.util.Collection; import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ForkJoinPool; import org.checkerframework.checker.nullness.qual.NonNull; /** @@ -79,10 +81,12 @@ public class FakePluginManager implements PluginManager { private final String id; private final Object instance; + private final ExecutorService service; private FakePluginContainer(String id, Object instance) { this.id = id; this.instance = instance; + this.service = ForkJoinPool.commonPool(); } @Override @@ -94,5 +98,10 @@ public class FakePluginManager implements PluginManager { public Optional getInstance() { return Optional.of(instance); } + + @Override + public ExecutorService getExecutorService() { + return service; + } } }