13
0
geforkt von Mirrors/Velocity

Rewrote scheduler to use ScheduledExecutorService for delay/repeat tasks

Dieser Commit ist enthalten in:
Andrew Steinborn 2018-08-26 17:56:43 -04:00
Ursprung ffd6217170
Commit ef556afbf3
4 geänderte Dateien mit 91 neuen und 61 gelöschten Zeilen

Datei anzeigen

@ -25,7 +25,6 @@ import com.velocitypowered.proxy.messages.VelocityChannelRegistrar;
import com.velocitypowered.proxy.plugin.VelocityEventManager; import com.velocitypowered.proxy.plugin.VelocityEventManager;
import com.velocitypowered.proxy.protocol.util.FaviconSerializer; import com.velocitypowered.proxy.protocol.util.FaviconSerializer;
import com.velocitypowered.proxy.plugin.VelocityPluginManager; import com.velocitypowered.proxy.plugin.VelocityPluginManager;
import com.velocitypowered.proxy.scheduler.Sleeper;
import com.velocitypowered.proxy.scheduler.VelocityScheduler; import com.velocitypowered.proxy.scheduler.VelocityScheduler;
import com.velocitypowered.proxy.util.AddressUtil; import com.velocitypowered.proxy.util.AddressUtil;
import com.velocitypowered.proxy.util.EncryptionUtils; import com.velocitypowered.proxy.util.EncryptionUtils;
@ -137,7 +136,7 @@ public class VelocityServer implements ProxyServer {
ipAttemptLimiter = new Ratelimiter(configuration.getLoginRatelimit()); ipAttemptLimiter = new Ratelimiter(configuration.getLoginRatelimit());
httpClient = new NettyHttpClient(this); httpClient = new NettyHttpClient(this);
eventManager = new VelocityEventManager(pluginManager); eventManager = new VelocityEventManager(pluginManager);
scheduler = new VelocityScheduler(pluginManager, Sleeper.SYSTEM); scheduler = new VelocityScheduler(pluginManager);
channelRegistrar = new VelocityChannelRegistrar(); channelRegistrar = new VelocityChannelRegistrar();
loadPlugins(); loadPlugins();

Datei anzeigen

@ -1,7 +0,0 @@
package com.velocitypowered.proxy.scheduler;
public interface Sleeper {
void sleep(long ms) throws InterruptedException;
Sleeper SYSTEM = Thread::sleep;
}

Datei anzeigen

@ -14,22 +14,22 @@ import org.apache.logging.log4j.Logger;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.IdentityHashMap; import java.util.IdentityHashMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.*;
import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.TimeUnit;
public class VelocityScheduler implements Scheduler { public class VelocityScheduler implements Scheduler {
private final PluginManager pluginManager; private final PluginManager pluginManager;
private final ExecutorService taskService; private final ExecutorService taskService;
private final Sleeper sleeper; private final ScheduledExecutorService timerExecutionService;
private final Multimap<Object, ScheduledTask> tasksByPlugin = Multimaps.synchronizedListMultimap( private final Multimap<Object, ScheduledTask> tasksByPlugin = Multimaps.synchronizedListMultimap(
Multimaps.newListMultimap(new IdentityHashMap<>(), ArrayList::new)); Multimaps.newListMultimap(new IdentityHashMap<>(), ArrayList::new));
public VelocityScheduler(PluginManager pluginManager, Sleeper sleeper) { public VelocityScheduler(PluginManager pluginManager) {
this.pluginManager = pluginManager; this.pluginManager = pluginManager;
this.sleeper = sleeper;
this.taskService = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true) this.taskService = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Velocity Task Scheduler - #%d").build()); .setNameFormat("Velocity Task Scheduler - #%d").build());
this.timerExecutionService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Velocity Task Scheduler Timer").build());
} }
@Override @Override
@ -44,6 +44,7 @@ public class VelocityScheduler implements Scheduler {
for (ScheduledTask task : ImmutableList.copyOf(tasksByPlugin.values())) { for (ScheduledTask task : ImmutableList.copyOf(tasksByPlugin.values())) {
task.cancel(); task.cancel();
} }
timerExecutionService.shutdown();
taskService.shutdown(); taskService.shutdown();
return taskService.awaitTermination(10, TimeUnit.SECONDS); return taskService.awaitTermination(10, TimeUnit.SECONDS);
} }
@ -85,27 +86,30 @@ public class VelocityScheduler implements Scheduler {
@Override @Override
public ScheduledTask schedule() { public ScheduledTask schedule() {
VelocityTask task = new VelocityTask(plugin, runnable, delay, repeat); if (delay == 0 && repeat == 0) {
taskService.execute(task); // A special purpose, simplified implementation
tasksByPlugin.put(plugin, task); VelocityImmediatelyScheduledTask task = new VelocityImmediatelyScheduledTask(plugin, runnable);
return task; tasksByPlugin.put(plugin, task);
taskService.execute(task);
return task;
} else {
VelocityTask task = new VelocityTask(plugin, runnable, delay, repeat);
tasksByPlugin.put(plugin, task);
return task;
}
} }
} }
private class VelocityTask implements Runnable, ScheduledTask { private class VelocityImmediatelyScheduledTask implements ScheduledTask, Runnable {
private final Object plugin; private final Object plugin;
private final Runnable runnable; private final Runnable runnable;
private final long delay; private final AtomicReference<TaskStatus> status;
private final long repeat;
private volatile TaskStatus status;
private Thread taskThread; private Thread taskThread;
private VelocityTask(Object plugin, Runnable runnable, long delay, long repeat) { private VelocityImmediatelyScheduledTask(Object plugin, Runnable runnable) {
this.plugin = plugin; this.plugin = plugin;
this.runnable = runnable; this.runnable = runnable;
this.delay = delay; this.status = new AtomicReference<>(TaskStatus.SCHEDULED);
this.repeat = repeat;
this.status = TaskStatus.SCHEDULED;
} }
@Override @Override
@ -115,13 +119,12 @@ public class VelocityScheduler implements Scheduler {
@Override @Override
public TaskStatus status() { public TaskStatus status() {
return status; return status.get();
} }
@Override @Override
public void cancel() { public void cancel() {
if (status == TaskStatus.SCHEDULED) { if (status.compareAndSet(TaskStatus.SCHEDULED, TaskStatus.CANCELLED)) {
status = TaskStatus.CANCELLED;
if (taskThread != null) { if (taskThread != null) {
taskThread.interrupt(); taskThread.interrupt();
} }
@ -131,39 +134,74 @@ public class VelocityScheduler implements Scheduler {
@Override @Override
public void run() { public void run() {
taskThread = Thread.currentThread(); taskThread = Thread.currentThread();
if (delay > 0) { try {
try { runnable.run();
sleeper.sleep(delay); } catch (Exception e) {
} catch (InterruptedException e) { Log.logger.error("Exception in task {} by plugin {}", runnable, plugin);
if (status == TaskStatus.CANCELLED) { }
onFinish(); status.compareAndSet(TaskStatus.SCHEDULED, TaskStatus.FINISHED);
return; taskThread = null;
} tasksByPlugin.remove(plugin, this);
} }
}
private class VelocityTask implements Runnable, ScheduledTask {
private final Object plugin;
private final Runnable runnable;
private ScheduledFuture<?> future;
private VelocityTask(Object plugin, Runnable runnable, long delay, long repeat) {
this.plugin = plugin;
this.runnable = runnable;
if (repeat == 0) {
this.future = timerExecutionService.schedule(this, delay, TimeUnit.MILLISECONDS);
} else {
this.future = timerExecutionService.scheduleAtFixedRate(this, delay, repeat, TimeUnit.MILLISECONDS);
}
}
@Override
public Object plugin() {
return plugin;
}
@Override
public TaskStatus status() {
if (future == null) {
return TaskStatus.SCHEDULED;
} }
while (status != TaskStatus.CANCELLED) { if (future.isCancelled()) {
try { return TaskStatus.CANCELLED;
runnable.run(); }
} catch (Exception e) {
if (future.isDone()) {
return TaskStatus.FINISHED;
}
return TaskStatus.SCHEDULED;
}
@Override
public void cancel() {
if (future != null) {
future.cancel(true);
onFinish();
}
}
@Override
public void run() {
try {
runnable.run();
} catch (Exception e) {
// Since we can't catch InterruptedException separately...
if (e instanceof InterruptedException) {
onFinish();
} else {
Log.logger.error("Exception in task {} by plugin {}", runnable, plugin); Log.logger.error("Exception in task {} by plugin {}", runnable, plugin);
} }
if (repeat > 0) {
try {
sleeper.sleep(repeat);
} catch (InterruptedException e) {
if (status == TaskStatus.CANCELLED) {
break;
}
}
} else {
status = TaskStatus.FINISHED;
break;
}
} }
onFinish();
} }
private void onFinish() { private void onFinish() {

Datei anzeigen

@ -16,7 +16,7 @@ class VelocitySchedulerTest {
@Test @Test
void buildTask() throws Exception { void buildTask() throws Exception {
VelocityScheduler scheduler = new VelocityScheduler(new FakePluginManager(), Sleeper.SYSTEM); VelocityScheduler scheduler = new VelocityScheduler(new FakePluginManager());
CountDownLatch latch = new CountDownLatch(1); CountDownLatch latch = new CountDownLatch(1);
ScheduledTask task = scheduler.buildTask(FakePluginManager.PLUGIN_A, latch::countDown).schedule(); ScheduledTask task = scheduler.buildTask(FakePluginManager.PLUGIN_A, latch::countDown).schedule();
latch.await(); latch.await();
@ -25,7 +25,7 @@ class VelocitySchedulerTest {
@Test @Test
void cancelWorks() throws Exception { void cancelWorks() throws Exception {
VelocityScheduler scheduler = new VelocityScheduler(new FakePluginManager(), Sleeper.SYSTEM); VelocityScheduler scheduler = new VelocityScheduler(new FakePluginManager());
AtomicInteger i = new AtomicInteger(3); AtomicInteger i = new AtomicInteger(3);
ScheduledTask task = scheduler.buildTask(FakePluginManager.PLUGIN_A, i::decrementAndGet) ScheduledTask task = scheduler.buildTask(FakePluginManager.PLUGIN_A, i::decrementAndGet)
.delay(100, TimeUnit.SECONDS) .delay(100, TimeUnit.SECONDS)
@ -38,7 +38,7 @@ class VelocitySchedulerTest {
@Test @Test
void repeatTaskWorks() throws Exception { void repeatTaskWorks() throws Exception {
VelocityScheduler scheduler = new VelocityScheduler(new FakePluginManager(), Sleeper.SYSTEM); VelocityScheduler scheduler = new VelocityScheduler(new FakePluginManager());
CountDownLatch latch = new CountDownLatch(3); CountDownLatch latch = new CountDownLatch(3);
ScheduledTask task = scheduler.buildTask(FakePluginManager.PLUGIN_A, latch::countDown) ScheduledTask task = scheduler.buildTask(FakePluginManager.PLUGIN_A, latch::countDown)
.delay(100, TimeUnit.MILLISECONDS) .delay(100, TimeUnit.MILLISECONDS)