13
0
geforkt von Mirrors/Velocity

Give each plugin its own executor service (#1010)

This is part of preparatory work for Velocity 5.0.0's revamped event system, but this change is safe to bring into the 3.x.x series. This affects the scheduler for now, but command execution will also be moved into the per-plugin thread pool, along with invocations of `EventTask.async()`.
Dieser Commit ist enthalten in:
Andrew Steinborn 2023-05-14 02:51:25 -04:00 committet von GitHub
Ursprung 7f776abf55
Commit a29c753e39
Es konnte kein GPG-Schlüssel zu dieser Signatur gefunden werden
GPG-Schlüssel-ID: 4AEE18F83AFDEB23
6 geänderte Dateien mit 135 neuen und 44 gelöschten Zeilen

Datei anzeigen

@ -8,6 +8,7 @@
package com.velocitypowered.api.plugin;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
/**
* A wrapper around a plugin loaded by the proxy.
@ -29,4 +30,12 @@ public interface PluginContainer {
default Optional<?> getInstance() {
return Optional.empty();
}
/**
* Returns an executor service for this plugin. The executor will use a cached
* thread pool.
*
* @return an {@link ExecutorService} associated with this plugin
*/
ExecutorService getExecutorService();
}

Datei anzeigen

@ -17,9 +17,12 @@
package com.velocitypowered.proxy.plugin.loader;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.velocitypowered.api.plugin.PluginContainer;
import com.velocitypowered.api.plugin.PluginDescription;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Implements {@link PluginContainer}.
@ -28,6 +31,7 @@ public class VelocityPluginContainer implements PluginContainer {
private final PluginDescription description;
private Object instance;
private volatile ExecutorService service;
public VelocityPluginContainer(PluginDescription description) {
this.description = description;
@ -46,4 +50,24 @@ public class VelocityPluginContainer implements PluginContainer {
public void setInstance(Object instance) {
this.instance = instance;
}
@Override
public ExecutorService getExecutorService() {
if (this.service == null) {
synchronized (this) {
if (this.service == null) {
String name = this.description.getName().orElse(this.description.getId());
this.service = Executors.unconfigurableExecutorService(
Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat(name + " - Task Executor #%d")
.build()
)
);
}
}
}
return this.service;
}
}

Datei anzeigen

@ -25,6 +25,7 @@ import com.velocitypowered.api.plugin.PluginDescription;
import com.velocitypowered.api.plugin.annotation.DataDirectory;
import com.velocitypowered.api.proxy.ProxyServer;
import java.nio.file.Path;
import java.util.concurrent.ExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -52,5 +53,7 @@ class VelocityPluginModule implements Module {
.toInstance(basePluginPath.resolve(description.getId()));
binder.bind(PluginDescription.class).toInstance(description);
binder.bind(PluginContainer.class).toInstance(pluginContainer);
binder.bind(ExecutorService.class).toProvider(pluginContainer::getExecutorService);
}
}

Datei anzeigen

@ -24,39 +24,39 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.velocitypowered.api.plugin.PluginContainer;
import com.velocitypowered.api.plugin.PluginManager;
import com.velocitypowered.api.scheduler.ScheduledTask;
import com.velocitypowered.api.scheduler.Scheduler;
import com.velocitypowered.api.scheduler.TaskStatus;
import com.velocitypowered.proxy.plugin.loader.VelocityPluginContainer;
import java.util.Collection;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.jetbrains.annotations.VisibleForTesting;
/**
* The Velocity "scheduler", which is actually a thin wrapper around
* {@link ScheduledExecutorService} and a dynamically-sized {@link ExecutorService}.
* Many plugins are accustomed to the Bukkit Scheduler model although it is not relevant
* Many plugins are accustomed to the Bukkit Scheduler model, although it is not relevant
* in a proxy context.
*/
public class VelocityScheduler implements Scheduler {
private static final int MAX_SCHEDULER_POOLED_THREAD_CAP = 200;
private final PluginManager pluginManager;
private final ExecutorService taskService;
private final ScheduledExecutorService timerExecutionService;
private final Multimap<Object, ScheduledTask> tasksByPlugin = Multimaps.synchronizedMultimap(
Multimaps.newSetMultimap(new IdentityHashMap<>(), HashSet::new));
@ -68,10 +68,6 @@ public class VelocityScheduler implements Scheduler {
*/
public VelocityScheduler(PluginManager pluginManager) {
this.pluginManager = pluginManager;
this.taskService = new ThreadPoolExecutor(1, MAX_SCHEDULER_POOLED_THREAD_CAP,
60L, TimeUnit.SECONDS, new SynchronousQueue<>(),
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Velocity Task Scheduler - #%d").build());
this.timerExecutionService = Executors
.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Velocity Task Scheduler Timer").build());
@ -81,16 +77,18 @@ public class VelocityScheduler implements Scheduler {
public TaskBuilder buildTask(Object plugin, Runnable runnable) {
checkNotNull(plugin, "plugin");
checkNotNull(runnable, "runnable");
checkArgument(pluginManager.fromInstance(plugin).isPresent(), "plugin is not registered");
return new TaskBuilderImpl(plugin, runnable, null);
final Optional<PluginContainer> container = pluginManager.fromInstance(plugin);
checkArgument(container.isPresent(), "plugin is not registered");
return new TaskBuilderImpl(container.get(), runnable);
}
@Override
public TaskBuilder buildTask(Object plugin, Consumer<ScheduledTask> consumer) {
checkNotNull(plugin, "plugin");
checkNotNull(consumer, "consumer");
checkArgument(pluginManager.fromInstance(plugin).isPresent(), "plugin is not registered");
return new TaskBuilderImpl(plugin, null, consumer);
final Optional<PluginContainer> container = pluginManager.fromInstance(plugin);
checkArgument(container.isPresent(), "plugin is not registered");
return new TaskBuilderImpl(container.get(), consumer);
}
@Override
@ -118,22 +116,55 @@ public class VelocityScheduler implements Scheduler {
task.cancel();
}
timerExecutionService.shutdown();
taskService.shutdown();
return taskService.awaitTermination(10, TimeUnit.SECONDS);
for (final PluginContainer container : this.pluginManager.getPlugins()) {
if (container instanceof VelocityPluginContainer) {
(container).getExecutorService().shutdown();
}
}
boolean allShutdown = true;
for (final PluginContainer container : this.pluginManager.getPlugins()) {
if (!(container instanceof VelocityPluginContainer)) {
continue;
}
final String id = container.getDescription().getId();
final ExecutorService service = (container).getExecutorService();
try {
if (!service.awaitTermination(10, TimeUnit.SECONDS)) {
service.shutdownNow();
Log.logger.warn("Executor for plugin {} did not shut down within 10 seconds. "
+ "Continuing with shutdown...", id);
allShutdown = false;
}
} catch (final InterruptedException e) {
Log.logger.warn("Executor for plugin {} did not shut down within 10 seconds. "
+ "Continuing with shutdown...", id);
}
}
return allShutdown;
}
private class TaskBuilderImpl implements TaskBuilder {
private final Object plugin;
private final PluginContainer container;
private final Runnable runnable;
private final Consumer<ScheduledTask> consumer;
private long delay; // ms
private long repeat; // ms
private TaskBuilderImpl(Object plugin, Runnable runnable, Consumer<ScheduledTask> consumer) {
this.plugin = plugin;
this.runnable = runnable;
private TaskBuilderImpl(PluginContainer container, Consumer<ScheduledTask> consumer) {
this.container = container;
this.consumer = consumer;
this.runnable = null;
}
private TaskBuilderImpl(PluginContainer container, Runnable runnable) {
this.container = container;
this.consumer = null;
this.runnable = runnable;
}
@Override
@ -162,16 +193,17 @@ public class VelocityScheduler implements Scheduler {
@Override
public ScheduledTask schedule() {
VelocityTask task = new VelocityTask(plugin, runnable, consumer, delay, repeat);
tasksByPlugin.put(plugin, task);
VelocityTask task = new VelocityTask(container, runnable, consumer, delay, repeat);
tasksByPlugin.put(container.getInstance().get(), task);
task.schedule();
return task;
}
}
private class VelocityTask implements Runnable, ScheduledTask {
@VisibleForTesting
class VelocityTask implements Runnable, ScheduledTask {
private final Object plugin;
private final PluginContainer container;
private final Runnable runnable;
private final Consumer<ScheduledTask> consumer;
private final long delay;
@ -179,9 +211,9 @@ public class VelocityScheduler implements Scheduler {
private @Nullable ScheduledFuture<?> future;
private volatile @Nullable Thread currentTaskThread;
private VelocityTask(Object plugin, Runnable runnable, Consumer<ScheduledTask> consumer,
long delay, long repeat) {
this.plugin = plugin;
private VelocityTask(PluginContainer container, Runnable runnable,
Consumer<ScheduledTask> consumer, long delay, long repeat) {
this.container = container;
this.runnable = runnable;
this.consumer = consumer;
this.delay = delay;
@ -199,7 +231,8 @@ public class VelocityScheduler implements Scheduler {
@Override
public Object plugin() {
return plugin;
//noinspection OptionalGetWithoutIsPresent
return container.getInstance().get();
}
@Override
@ -235,7 +268,7 @@ public class VelocityScheduler implements Scheduler {
@Override
public void run() {
taskService.execute(() -> {
container.getExecutorService().execute(() -> {
currentTaskThread = Thread.currentThread();
try {
if (runnable != null) {
@ -248,11 +281,10 @@ public class VelocityScheduler implements Scheduler {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
} else {
String friendlyPluginName = pluginManager.fromInstance(plugin)
.map(container -> container.getDescription().getName()
.orElse(container.getDescription().getId()))
.orElse("UNKNOWN");
Log.logger.error("Exception in task {} by plugin {}", runnable, friendlyPluginName,
String friendlyPluginName = container.getDescription().getName()
.orElse(container.getDescription().getId());
Object unit = consumer == null ? runnable : consumer;
Log.logger.error("Exception in task {} by plugin {}", unit, friendlyPluginName,
e);
}
} finally {
@ -265,7 +297,17 @@ public class VelocityScheduler implements Scheduler {
}
private void onFinish() {
tasksByPlugin.remove(plugin, this);
tasksByPlugin.remove(plugin(), this);
}
public void awaitCompletion() {
try {
future.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
}

Datei anzeigen

@ -21,6 +21,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import com.velocitypowered.api.scheduler.ScheduledTask;
import com.velocitypowered.api.scheduler.TaskStatus;
import com.velocitypowered.proxy.scheduler.VelocityScheduler.VelocityTask;
import com.velocitypowered.proxy.testutil.FakePluginManager;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
@ -39,6 +40,7 @@ class VelocitySchedulerTest {
ScheduledTask task = scheduler.buildTask(FakePluginManager.PLUGIN_A, latch::countDown)
.schedule();
latch.await();
((VelocityTask) task).awaitCompletion();
assertEquals(TaskStatus.FINISHED, task.status());
}
@ -50,7 +52,6 @@ class VelocitySchedulerTest {
.delay(100, TimeUnit.SECONDS)
.schedule();
task.cancel();
Thread.sleep(200);
assertEquals(3, i.get());
assertEquals(TaskStatus.CANCELLED, task.status());
}
@ -70,23 +71,26 @@ class VelocitySchedulerTest {
@Test
void obtainTasksFromPlugin() throws Exception {
VelocityScheduler scheduler = new VelocityScheduler(new FakePluginManager());
AtomicInteger i = new AtomicInteger(0);
CountDownLatch latch = new CountDownLatch(1);
CountDownLatch runningLatch = new CountDownLatch(1);
CountDownLatch endingLatch = new CountDownLatch(1);
scheduler.buildTask(FakePluginManager.PLUGIN_A, task -> {
if (i.getAndIncrement() >= 1) {
task.cancel();
latch.countDown();
runningLatch.countDown();
task.cancel();
try {
endingLatch.await();
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
}).delay(50, TimeUnit.MILLISECONDS)
.repeat(Duration.ofMillis(5))
.schedule();
runningLatch.await();
assertEquals(scheduler.tasksByPlugin(FakePluginManager.PLUGIN_A).size(), 1);
latch.await();
assertEquals(scheduler.tasksByPlugin(FakePluginManager.PLUGIN_A).size(), 0);
endingLatch.countDown();
}
@Test

Datei anzeigen

@ -24,6 +24,8 @@ import com.velocitypowered.api.plugin.PluginManager;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import org.checkerframework.checker.nullness.qual.NonNull;
/**
@ -79,10 +81,12 @@ public class FakePluginManager implements PluginManager {
private final String id;
private final Object instance;
private final ExecutorService service;
private FakePluginContainer(String id, Object instance) {
this.id = id;
this.instance = instance;
this.service = ForkJoinPool.commonPool();
}
@Override
@ -94,5 +98,10 @@ public class FakePluginManager implements PluginManager {
public Optional<?> getInstance() {
return Optional.of(instance);
}
@Override
public ExecutorService getExecutorService() {
return service;
}
}
}