diff --git a/ProtocolLib/src/com/comphenix/protocol/ProtocolLibrary.java b/ProtocolLib/src/com/comphenix/protocol/ProtocolLibrary.java index f41491ca..d09d901b 100644 --- a/ProtocolLib/src/com/comphenix/protocol/ProtocolLibrary.java +++ b/ProtocolLib/src/com/comphenix/protocol/ProtocolLibrary.java @@ -43,6 +43,13 @@ public class ProtocolLibrary extends JavaPlugin { // Structure compiler private BackgroundCompiler backgroundCompiler; + // Used to (mostly) clean up packets that have expired + private int asyncPacketTask = -1; + + // Number of ticks between each cleanup. We don't need to do this often, + // as it's only indeeded to detected timeouts. + private static final int ASYNC_PACKET_DELAY = 10; + @Override public void onLoad() { logger = getLoggerSafely(); @@ -69,6 +76,9 @@ public class ProtocolLibrary extends JavaPlugin { // Inject our hook into already existing players protocolManager.initializePlayers(server.getOnlinePlayers()); + // Timeout + createAsyncTask(server); + // Try to enable statistics try { statistisc = new Statistics(this); @@ -79,6 +89,26 @@ public class ProtocolLibrary extends JavaPlugin { } } + private void createAsyncTask(Server server) { + try { + if (asyncPacketTask < 0) + throw new IllegalStateException("Async task has already been created"); + + // Attempt to create task + asyncPacketTask = server.getScheduler().scheduleSyncRepeatingTask(this, new Runnable() { + @Override + public void run() { + protocolManager.getAsyncFilterManager().sendProcessedPackets(); + } + }, ASYNC_PACKET_DELAY, ASYNC_PACKET_DELAY); + + } catch (Throwable e) { + if (asyncPacketTask == -1) { + logger.log(Level.SEVERE, "Unable to create packet timeout task.", e); + } + } + } + private void checkForIncompatibility(PluginManager manager) { // Plugin authors: Notify me to remove these String[] incompatiblePlugins = {}; @@ -100,6 +130,12 @@ public class ProtocolLibrary extends JavaPlugin { BackgroundCompiler.setInstance(null); } + // Clean up + if (asyncPacketTask >= 0) { + getServer().getScheduler().cancelTask(asyncPacketTask); + asyncPacketTask = -1; + } + protocolManager.close(); protocolManager = null; statistisc = null; diff --git a/ProtocolLib/src/com/comphenix/protocol/async/AsyncFilterManager.java b/ProtocolLib/src/com/comphenix/protocol/async/AsyncFilterManager.java index 6ad4f219..cefdd30d 100644 --- a/ProtocolLib/src/com/comphenix/protocol/async/AsyncFilterManager.java +++ b/ProtocolLib/src/com/comphenix/protocol/async/AsyncFilterManager.java @@ -185,4 +185,12 @@ public class AsyncFilterManager { private PacketProcessingQueue getProcessingQueue(PacketEvent packet) { return packet.isServerPacket() ? serverProcessingQueue : clientProcessingQueue; } + + /** + * Send any due packets, or clean up packets that have expired. + */ + public void sendProcessedPackets() { + clientQueue.trySendPackets(); + serverQueue.trySendPackets(); + } } diff --git a/ProtocolLib/src/com/comphenix/protocol/async/AsyncListenerHandler.java b/ProtocolLib/src/com/comphenix/protocol/async/AsyncListenerHandler.java index 72c04470..90461670 100644 --- a/ProtocolLib/src/com/comphenix/protocol/async/AsyncListenerHandler.java +++ b/ProtocolLib/src/com/comphenix/protocol/async/AsyncListenerHandler.java @@ -26,6 +26,9 @@ public class AsyncListenerHandler { // Cancel the async handler private volatile boolean cancelled; + // If we've started the listener loop before + private volatile boolean started; + // The packet listener private PacketListener listener; @@ -105,6 +108,13 @@ public class AsyncListenerHandler { // Danger, danger! if (Thread.currentThread().getId() == mainThread.getId()) throw new IllegalStateException("Do not call this method from the main thread."); + if (started) + throw new IllegalStateException("A listener cannot be run by multiple threads. Create a new listener instead."); + if (cancelled) + throw new IllegalStateException("Listener has been cancelled. Create a new listener instead."); + + // Proceed + started = true; try { mainLoop: @@ -158,6 +168,7 @@ public class AsyncListenerHandler { if (!cancelled) { filterManager.unregisterAsyncHandlerInternal(this); cancelled = true; + started = false; } }