3
0
Mirror von https://github.com/PaperMC/Velocity.git synchronisiert 2024-11-17 05:20:14 +01:00

Merge branch 'dev/3.0.0' into dev/5.0.0

Dieser Commit ist enthalten in:
Andrew Steinborn 2023-05-14 02:53:47 -04:00
Commit 64693cc97d
6 geänderte Dateien mit 135 neuen und 44 gelöschten Zeilen

Datei anzeigen

@ -8,6 +8,7 @@
package com.velocitypowered.api.plugin;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
/**
* A wrapper around a plugin loaded by the proxy.
@ -29,4 +30,12 @@ public interface PluginContainer {
default Optional<?> getInstance() {
return Optional.empty();
}
/**
* Returns an executor service for this plugin. The executor will use a cached
* thread pool.
*
* @return an {@link ExecutorService} associated with this plugin
*/
ExecutorService getExecutorService();
}

Datei anzeigen

@ -17,9 +17,12 @@
package com.velocitypowered.proxy.plugin.loader;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.velocitypowered.api.plugin.PluginContainer;
import com.velocitypowered.api.plugin.PluginDescription;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Implements {@link PluginContainer}.
@ -28,6 +31,7 @@ public class VelocityPluginContainer implements PluginContainer {
private final PluginDescription description;
private Object instance;
private volatile ExecutorService service;
public VelocityPluginContainer(PluginDescription description) {
this.description = description;
@ -46,4 +50,24 @@ public class VelocityPluginContainer implements PluginContainer {
public void setInstance(Object instance) {
this.instance = instance;
}
@Override
public ExecutorService getExecutorService() {
if (this.service == null) {
synchronized (this) {
if (this.service == null) {
String name = this.description.getName().orElse(this.description.getId());
this.service = Executors.unconfigurableExecutorService(
Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat(name + " - Task Executor #%d")
.build()
)
);
}
}
}
return this.service;
}
}

Datei anzeigen

@ -25,6 +25,7 @@ import com.velocitypowered.api.plugin.PluginDescription;
import com.velocitypowered.api.plugin.annotation.DataDirectory;
import com.velocitypowered.api.proxy.ProxyServer;
import java.nio.file.Path;
import java.util.concurrent.ExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -52,5 +53,7 @@ class VelocityPluginModule implements Module {
.toInstance(basePluginPath.resolve(description.getId()));
binder.bind(PluginDescription.class).toInstance(description);
binder.bind(PluginContainer.class).toInstance(pluginContainer);
binder.bind(ExecutorService.class).toProvider(pluginContainer::getExecutorService);
}
}

Datei anzeigen

@ -24,39 +24,39 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.velocitypowered.api.plugin.PluginContainer;
import com.velocitypowered.api.plugin.PluginManager;
import com.velocitypowered.api.scheduler.ScheduledTask;
import com.velocitypowered.api.scheduler.Scheduler;
import com.velocitypowered.api.scheduler.TaskStatus;
import com.velocitypowered.proxy.plugin.loader.VelocityPluginContainer;
import java.util.Collection;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.jetbrains.annotations.VisibleForTesting;
/**
* The Velocity "scheduler", which is actually a thin wrapper around
* {@link ScheduledExecutorService} and a dynamically-sized {@link ExecutorService}.
* Many plugins are accustomed to the Bukkit Scheduler model although it is not relevant
* Many plugins are accustomed to the Bukkit Scheduler model, although it is not relevant
* in a proxy context.
*/
public class VelocityScheduler implements Scheduler {
private static final int MAX_SCHEDULER_POOLED_THREAD_CAP = 200;
private final PluginManager pluginManager;
private final ExecutorService taskService;
private final ScheduledExecutorService timerExecutionService;
private final Multimap<Object, ScheduledTask> tasksByPlugin = Multimaps.synchronizedMultimap(
Multimaps.newSetMultimap(new IdentityHashMap<>(), HashSet::new));
@ -68,10 +68,6 @@ public class VelocityScheduler implements Scheduler {
*/
public VelocityScheduler(PluginManager pluginManager) {
this.pluginManager = pluginManager;
this.taskService = new ThreadPoolExecutor(1, MAX_SCHEDULER_POOLED_THREAD_CAP,
60L, TimeUnit.SECONDS, new SynchronousQueue<>(),
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Velocity Task Scheduler - #%d").build());
this.timerExecutionService = Executors
.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Velocity Task Scheduler Timer").build());
@ -81,16 +77,18 @@ public class VelocityScheduler implements Scheduler {
public TaskBuilder buildTask(Object plugin, Runnable runnable) {
checkNotNull(plugin, "plugin");
checkNotNull(runnable, "runnable");
checkArgument(pluginManager.fromInstance(plugin).isPresent(), "plugin is not registered");
return new TaskBuilderImpl(plugin, runnable, null);
final Optional<PluginContainer> container = pluginManager.fromInstance(plugin);
checkArgument(container.isPresent(), "plugin is not registered");
return new TaskBuilderImpl(container.get(), runnable);
}
@Override
public TaskBuilder buildTask(Object plugin, Consumer<ScheduledTask> consumer) {
checkNotNull(plugin, "plugin");
checkNotNull(consumer, "consumer");
checkArgument(pluginManager.fromInstance(plugin).isPresent(), "plugin is not registered");
return new TaskBuilderImpl(plugin, null, consumer);
final Optional<PluginContainer> container = pluginManager.fromInstance(plugin);
checkArgument(container.isPresent(), "plugin is not registered");
return new TaskBuilderImpl(container.get(), consumer);
}
@Override
@ -118,22 +116,55 @@ public class VelocityScheduler implements Scheduler {
task.cancel();
}
timerExecutionService.shutdown();
taskService.shutdown();
return taskService.awaitTermination(10, TimeUnit.SECONDS);
for (final PluginContainer container : this.pluginManager.getPlugins()) {
if (container instanceof VelocityPluginContainer) {
(container).getExecutorService().shutdown();
}
}
boolean allShutdown = true;
for (final PluginContainer container : this.pluginManager.getPlugins()) {
if (!(container instanceof VelocityPluginContainer)) {
continue;
}
final String id = container.getDescription().getId();
final ExecutorService service = (container).getExecutorService();
try {
if (!service.awaitTermination(10, TimeUnit.SECONDS)) {
service.shutdownNow();
Log.logger.warn("Executor for plugin {} did not shut down within 10 seconds. "
+ "Continuing with shutdown...", id);
allShutdown = false;
}
} catch (final InterruptedException e) {
Log.logger.warn("Executor for plugin {} did not shut down within 10 seconds. "
+ "Continuing with shutdown...", id);
}
}
return allShutdown;
}
private class TaskBuilderImpl implements TaskBuilder {
private final Object plugin;
private final PluginContainer container;
private final Runnable runnable;
private final Consumer<ScheduledTask> consumer;
private long delay; // ms
private long repeat; // ms
private TaskBuilderImpl(Object plugin, Runnable runnable, Consumer<ScheduledTask> consumer) {
this.plugin = plugin;
this.runnable = runnable;
private TaskBuilderImpl(PluginContainer container, Consumer<ScheduledTask> consumer) {
this.container = container;
this.consumer = consumer;
this.runnable = null;
}
private TaskBuilderImpl(PluginContainer container, Runnable runnable) {
this.container = container;
this.consumer = null;
this.runnable = runnable;
}
@Override
@ -162,16 +193,17 @@ public class VelocityScheduler implements Scheduler {
@Override
public ScheduledTask schedule() {
VelocityTask task = new VelocityTask(plugin, runnable, consumer, delay, repeat);
tasksByPlugin.put(plugin, task);
VelocityTask task = new VelocityTask(container, runnable, consumer, delay, repeat);
tasksByPlugin.put(container.getInstance().get(), task);
task.schedule();
return task;
}
}
private class VelocityTask implements Runnable, ScheduledTask {
@VisibleForTesting
class VelocityTask implements Runnable, ScheduledTask {
private final Object plugin;
private final PluginContainer container;
private final Runnable runnable;
private final Consumer<ScheduledTask> consumer;
private final long delay;
@ -179,9 +211,9 @@ public class VelocityScheduler implements Scheduler {
private @Nullable ScheduledFuture<?> future;
private volatile @Nullable Thread currentTaskThread;
private VelocityTask(Object plugin, Runnable runnable, Consumer<ScheduledTask> consumer,
long delay, long repeat) {
this.plugin = plugin;
private VelocityTask(PluginContainer container, Runnable runnable,
Consumer<ScheduledTask> consumer, long delay, long repeat) {
this.container = container;
this.runnable = runnable;
this.consumer = consumer;
this.delay = delay;
@ -199,7 +231,8 @@ public class VelocityScheduler implements Scheduler {
@Override
public Object plugin() {
return plugin;
//noinspection OptionalGetWithoutIsPresent
return container.getInstance().get();
}
@Override
@ -235,7 +268,7 @@ public class VelocityScheduler implements Scheduler {
@Override
public void run() {
taskService.execute(() -> {
container.getExecutorService().execute(() -> {
currentTaskThread = Thread.currentThread();
try {
if (runnable != null) {
@ -248,11 +281,10 @@ public class VelocityScheduler implements Scheduler {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
} else {
String friendlyPluginName = pluginManager.fromInstance(plugin)
.map(container -> container.getDescription().getName()
.orElse(container.getDescription().getId()))
.orElse("UNKNOWN");
Log.logger.error("Exception in task {} by plugin {}", runnable, friendlyPluginName,
String friendlyPluginName = container.getDescription().getName()
.orElse(container.getDescription().getId());
Object unit = consumer == null ? runnable : consumer;
Log.logger.error("Exception in task {} by plugin {}", unit, friendlyPluginName,
e);
}
} finally {
@ -265,7 +297,17 @@ public class VelocityScheduler implements Scheduler {
}
private void onFinish() {
tasksByPlugin.remove(plugin, this);
tasksByPlugin.remove(plugin(), this);
}
public void awaitCompletion() {
try {
future.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
}

Datei anzeigen

@ -21,6 +21,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import com.velocitypowered.api.scheduler.ScheduledTask;
import com.velocitypowered.api.scheduler.TaskStatus;
import com.velocitypowered.proxy.scheduler.VelocityScheduler.VelocityTask;
import com.velocitypowered.proxy.testutil.FakePluginManager;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
@ -39,6 +40,7 @@ class VelocitySchedulerTest {
ScheduledTask task = scheduler.buildTask(FakePluginManager.PLUGIN_A, latch::countDown)
.schedule();
latch.await();
((VelocityTask) task).awaitCompletion();
assertEquals(TaskStatus.FINISHED, task.status());
}
@ -50,7 +52,6 @@ class VelocitySchedulerTest {
.delay(100, TimeUnit.SECONDS)
.schedule();
task.cancel();
Thread.sleep(200);
assertEquals(3, i.get());
assertEquals(TaskStatus.CANCELLED, task.status());
}
@ -70,23 +71,26 @@ class VelocitySchedulerTest {
@Test
void obtainTasksFromPlugin() throws Exception {
VelocityScheduler scheduler = new VelocityScheduler(new FakePluginManager());
AtomicInteger i = new AtomicInteger(0);
CountDownLatch latch = new CountDownLatch(1);
CountDownLatch runningLatch = new CountDownLatch(1);
CountDownLatch endingLatch = new CountDownLatch(1);
scheduler.buildTask(FakePluginManager.PLUGIN_A, task -> {
if (i.getAndIncrement() >= 1) {
task.cancel();
latch.countDown();
runningLatch.countDown();
task.cancel();
try {
endingLatch.await();
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
}).delay(50, TimeUnit.MILLISECONDS)
.repeat(Duration.ofMillis(5))
.schedule();
runningLatch.await();
assertEquals(scheduler.tasksByPlugin(FakePluginManager.PLUGIN_A).size(), 1);
latch.await();
assertEquals(scheduler.tasksByPlugin(FakePluginManager.PLUGIN_A).size(), 0);
endingLatch.countDown();
}
@Test

Datei anzeigen

@ -24,6 +24,8 @@ import com.velocitypowered.api.plugin.PluginManager;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import org.checkerframework.checker.nullness.qual.NonNull;
/**
@ -79,10 +81,12 @@ public class FakePluginManager implements PluginManager {
private final String id;
private final Object instance;
private final ExecutorService service;
private FakePluginContainer(String id, Object instance) {
this.id = id;
this.instance = instance;
this.service = ForkJoinPool.commonPool();
}
@Override
@ -94,5 +98,10 @@ public class FakePluginManager implements PluginManager {
public Optional<?> getInstance() {
return Optional.of(instance);
}
@Override
public ExecutorService getExecutorService() {
return service;
}
}
}