From ef556afbf33365101583eca9decf9ad55da3f1b6 Mon Sep 17 00:00:00 2001 From: Andrew Steinborn Date: Sun, 26 Aug 2018 17:56:43 -0400 Subject: [PATCH] Rewrote scheduler to use ScheduledExecutorService for delay/repeat tasks --- .../velocitypowered/proxy/VelocityServer.java | 3 +- .../proxy/scheduler/Sleeper.java | 7 - .../proxy/scheduler/VelocityScheduler.java | 136 +++++++++++------- .../scheduler/VelocitySchedulerTest.java | 6 +- 4 files changed, 91 insertions(+), 61 deletions(-) delete mode 100644 proxy/src/main/java/com/velocitypowered/proxy/scheduler/Sleeper.java diff --git a/proxy/src/main/java/com/velocitypowered/proxy/VelocityServer.java b/proxy/src/main/java/com/velocitypowered/proxy/VelocityServer.java index 92db57593..2faa1dea9 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/VelocityServer.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/VelocityServer.java @@ -25,7 +25,6 @@ import com.velocitypowered.proxy.messages.VelocityChannelRegistrar; import com.velocitypowered.proxy.plugin.VelocityEventManager; import com.velocitypowered.proxy.protocol.util.FaviconSerializer; import com.velocitypowered.proxy.plugin.VelocityPluginManager; -import com.velocitypowered.proxy.scheduler.Sleeper; import com.velocitypowered.proxy.scheduler.VelocityScheduler; import com.velocitypowered.proxy.util.AddressUtil; import com.velocitypowered.proxy.util.EncryptionUtils; @@ -137,7 +136,7 @@ public class VelocityServer implements ProxyServer { ipAttemptLimiter = new Ratelimiter(configuration.getLoginRatelimit()); httpClient = new NettyHttpClient(this); eventManager = new VelocityEventManager(pluginManager); - scheduler = new VelocityScheduler(pluginManager, Sleeper.SYSTEM); + scheduler = new VelocityScheduler(pluginManager); channelRegistrar = new VelocityChannelRegistrar(); loadPlugins(); diff --git a/proxy/src/main/java/com/velocitypowered/proxy/scheduler/Sleeper.java b/proxy/src/main/java/com/velocitypowered/proxy/scheduler/Sleeper.java deleted file mode 100644 index b090ad110..000000000 --- a/proxy/src/main/java/com/velocitypowered/proxy/scheduler/Sleeper.java +++ /dev/null @@ -1,7 +0,0 @@ -package com.velocitypowered.proxy.scheduler; - -public interface Sleeper { - void sleep(long ms) throws InterruptedException; - - Sleeper SYSTEM = Thread::sleep; -} diff --git a/proxy/src/main/java/com/velocitypowered/proxy/scheduler/VelocityScheduler.java b/proxy/src/main/java/com/velocitypowered/proxy/scheduler/VelocityScheduler.java index fdd577f34..7193e46ee 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/scheduler/VelocityScheduler.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/scheduler/VelocityScheduler.java @@ -14,22 +14,22 @@ import org.apache.logging.log4j.Logger; import java.util.ArrayList; import java.util.IdentityHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicReference; public class VelocityScheduler implements Scheduler { private final PluginManager pluginManager; private final ExecutorService taskService; - private final Sleeper sleeper; + private final ScheduledExecutorService timerExecutionService; private final Multimap tasksByPlugin = Multimaps.synchronizedListMultimap( Multimaps.newListMultimap(new IdentityHashMap<>(), ArrayList::new)); - public VelocityScheduler(PluginManager pluginManager, Sleeper sleeper) { + public VelocityScheduler(PluginManager pluginManager) { this.pluginManager = pluginManager; - this.sleeper = sleeper; this.taskService = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true) .setNameFormat("Velocity Task Scheduler - #%d").build()); + this.timerExecutionService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("Velocity Task Scheduler Timer").build()); } @Override @@ -44,6 +44,7 @@ public class VelocityScheduler implements Scheduler { for (ScheduledTask task : ImmutableList.copyOf(tasksByPlugin.values())) { task.cancel(); } + timerExecutionService.shutdown(); taskService.shutdown(); return taskService.awaitTermination(10, TimeUnit.SECONDS); } @@ -85,27 +86,30 @@ public class VelocityScheduler implements Scheduler { @Override public ScheduledTask schedule() { - VelocityTask task = new VelocityTask(plugin, runnable, delay, repeat); - taskService.execute(task); - tasksByPlugin.put(plugin, task); - return task; + if (delay == 0 && repeat == 0) { + // A special purpose, simplified implementation + VelocityImmediatelyScheduledTask task = new VelocityImmediatelyScheduledTask(plugin, runnable); + tasksByPlugin.put(plugin, task); + taskService.execute(task); + return task; + } else { + VelocityTask task = new VelocityTask(plugin, runnable, delay, repeat); + tasksByPlugin.put(plugin, task); + return task; + } } } - private class VelocityTask implements Runnable, ScheduledTask { + private class VelocityImmediatelyScheduledTask implements ScheduledTask, Runnable { private final Object plugin; private final Runnable runnable; - private final long delay; - private final long repeat; - private volatile TaskStatus status; + private final AtomicReference status; private Thread taskThread; - private VelocityTask(Object plugin, Runnable runnable, long delay, long repeat) { + private VelocityImmediatelyScheduledTask(Object plugin, Runnable runnable) { this.plugin = plugin; this.runnable = runnable; - this.delay = delay; - this.repeat = repeat; - this.status = TaskStatus.SCHEDULED; + this.status = new AtomicReference<>(TaskStatus.SCHEDULED); } @Override @@ -115,13 +119,12 @@ public class VelocityScheduler implements Scheduler { @Override public TaskStatus status() { - return status; + return status.get(); } @Override public void cancel() { - if (status == TaskStatus.SCHEDULED) { - status = TaskStatus.CANCELLED; + if (status.compareAndSet(TaskStatus.SCHEDULED, TaskStatus.CANCELLED)) { if (taskThread != null) { taskThread.interrupt(); } @@ -131,39 +134,74 @@ public class VelocityScheduler implements Scheduler { @Override public void run() { taskThread = Thread.currentThread(); - if (delay > 0) { - try { - sleeper.sleep(delay); - } catch (InterruptedException e) { - if (status == TaskStatus.CANCELLED) { - onFinish(); - return; - } - } + try { + runnable.run(); + } catch (Exception e) { + Log.logger.error("Exception in task {} by plugin {}", runnable, plugin); + } + status.compareAndSet(TaskStatus.SCHEDULED, TaskStatus.FINISHED); + taskThread = null; + tasksByPlugin.remove(plugin, this); + } + } + + private class VelocityTask implements Runnable, ScheduledTask { + private final Object plugin; + private final Runnable runnable; + private ScheduledFuture future; + + private VelocityTask(Object plugin, Runnable runnable, long delay, long repeat) { + this.plugin = plugin; + this.runnable = runnable; + if (repeat == 0) { + this.future = timerExecutionService.schedule(this, delay, TimeUnit.MILLISECONDS); + } else { + this.future = timerExecutionService.scheduleAtFixedRate(this, delay, repeat, TimeUnit.MILLISECONDS); + } + } + + @Override + public Object plugin() { + return plugin; + } + + @Override + public TaskStatus status() { + if (future == null) { + return TaskStatus.SCHEDULED; } - while (status != TaskStatus.CANCELLED) { - try { - runnable.run(); - } catch (Exception e) { + if (future.isCancelled()) { + return TaskStatus.CANCELLED; + } + + if (future.isDone()) { + return TaskStatus.FINISHED; + } + + return TaskStatus.SCHEDULED; + } + + @Override + public void cancel() { + if (future != null) { + future.cancel(true); + onFinish(); + } + } + + @Override + public void run() { + try { + runnable.run(); + } catch (Exception e) { + // Since we can't catch InterruptedException separately... + if (e instanceof InterruptedException) { + onFinish(); + } else { Log.logger.error("Exception in task {} by plugin {}", runnable, plugin); } - - if (repeat > 0) { - try { - sleeper.sleep(repeat); - } catch (InterruptedException e) { - if (status == TaskStatus.CANCELLED) { - break; - } - } - } else { - status = TaskStatus.FINISHED; - break; - } } - - onFinish(); } private void onFinish() { diff --git a/proxy/src/test/java/com/velocitypowered/proxy/scheduler/VelocitySchedulerTest.java b/proxy/src/test/java/com/velocitypowered/proxy/scheduler/VelocitySchedulerTest.java index d4d7a951a..d6d6896ee 100644 --- a/proxy/src/test/java/com/velocitypowered/proxy/scheduler/VelocitySchedulerTest.java +++ b/proxy/src/test/java/com/velocitypowered/proxy/scheduler/VelocitySchedulerTest.java @@ -16,7 +16,7 @@ class VelocitySchedulerTest { @Test void buildTask() throws Exception { - VelocityScheduler scheduler = new VelocityScheduler(new FakePluginManager(), Sleeper.SYSTEM); + VelocityScheduler scheduler = new VelocityScheduler(new FakePluginManager()); CountDownLatch latch = new CountDownLatch(1); ScheduledTask task = scheduler.buildTask(FakePluginManager.PLUGIN_A, latch::countDown).schedule(); latch.await(); @@ -25,7 +25,7 @@ class VelocitySchedulerTest { @Test void cancelWorks() throws Exception { - VelocityScheduler scheduler = new VelocityScheduler(new FakePluginManager(), Sleeper.SYSTEM); + VelocityScheduler scheduler = new VelocityScheduler(new FakePluginManager()); AtomicInteger i = new AtomicInteger(3); ScheduledTask task = scheduler.buildTask(FakePluginManager.PLUGIN_A, i::decrementAndGet) .delay(100, TimeUnit.SECONDS) @@ -38,7 +38,7 @@ class VelocitySchedulerTest { @Test void repeatTaskWorks() throws Exception { - VelocityScheduler scheduler = new VelocityScheduler(new FakePluginManager(), Sleeper.SYSTEM); + VelocityScheduler scheduler = new VelocityScheduler(new FakePluginManager()); CountDownLatch latch = new CountDownLatch(3); ScheduledTask task = scheduler.buildTask(FakePluginManager.PLUGIN_A, latch::countDown) .delay(100, TimeUnit.MILLISECONDS)