From 8a26d047b227d548a2c8380eeea25064c3d8b12a Mon Sep 17 00:00:00 2001 From: "Kristian S. Stangeland" Date: Sat, 29 Sep 2012 23:48:09 +0200 Subject: [PATCH] Client packets are processed on the server, so they must be synchronized with the main thread. --- .../comphenix/protocol/ProtocolLibrary.java | 13 +++---- .../protocol/async/AsyncFilterManager.java | 32 ++++++++++++----- .../protocol/async/PacketSendingQueue.java | 36 ++++++++++++++----- 3 files changed, 58 insertions(+), 23 deletions(-) diff --git a/ProtocolLib/src/com/comphenix/protocol/ProtocolLibrary.java b/ProtocolLib/src/com/comphenix/protocol/ProtocolLibrary.java index f989b1ad..56668637 100644 --- a/ProtocolLib/src/com/comphenix/protocol/ProtocolLibrary.java +++ b/ProtocolLib/src/com/comphenix/protocol/ProtocolLibrary.java @@ -44,12 +44,11 @@ public class ProtocolLibrary extends JavaPlugin { // Structure compiler private BackgroundCompiler backgroundCompiler; - // Used to (mostly) clean up packets that have expired + // Used to clean up server packets that have expired. + // But mostly required to simulate recieving client packets. private int asyncPacketTask = -1; - - // Number of ticks between each cleanup. We don't need to do this often, - // as it's only indeeded to detected timeouts. - private static final int ASYNC_PACKET_DELAY = 10; + private int tickCounter = 0; + private static final int ASYNC_PACKET_DELAY = 1; @Override public void onLoad() { @@ -100,7 +99,9 @@ public class ProtocolLibrary extends JavaPlugin { @Override public void run() { AsyncFilterManager manager = (AsyncFilterManager) protocolManager.getAsynchronousManager(); - manager.sendProcessedPackets(); + + // We KNOW we're on the main thread at the moment + manager.sendProcessedPackets(tickCounter++, true); } }, ASYNC_PACKET_DELAY, ASYNC_PACKET_DELAY); diff --git a/ProtocolLib/src/com/comphenix/protocol/async/AsyncFilterManager.java b/ProtocolLib/src/com/comphenix/protocol/async/AsyncFilterManager.java index d42c5d43..f95a9257 100644 --- a/ProtocolLib/src/com/comphenix/protocol/async/AsyncFilterManager.java +++ b/ProtocolLib/src/com/comphenix/protocol/async/AsyncFilterManager.java @@ -42,8 +42,10 @@ public class AsyncFilterManager implements AsynchronousManager { private volatile boolean cleaningUp; public AsyncFilterManager(Logger logger, PacketStream packetStream) { - this.serverQueue = new PacketSendingQueue(); - this.clientQueue = new PacketSendingQueue(); + this.serverQueue = new PacketSendingQueue(false); + // Client packets must be synchronized + this.clientQueue = new PacketSendingQueue(true); + this.serverProcessingQueue = new PacketProcessingQueue(serverQueue); this.clientProcessingQueue = new PacketProcessingQueue(clientQueue); this.packetStream = packetStream; @@ -88,6 +90,7 @@ public class AsyncFilterManager implements AsynchronousManager { void unregisterAsyncHandlerInternal(AsyncListenerHandler handler) { PacketListener listener = handler.getAsyncListener(); + boolean synchronusOK = onMainThread(); // Just remove it from the queue(s) if (hasValidWhitelist(listener.getSendingWhitelist())) { @@ -95,16 +98,24 @@ public class AsyncFilterManager implements AsynchronousManager { // We're already taking care of this, so don't do anything if (!cleaningUp) - serverQueue.signalPacketUpdate(removed); + serverQueue.signalPacketUpdate(removed, synchronusOK); } if (hasValidWhitelist(listener.getReceivingWhitelist())) { List removed = clientProcessingQueue.removeListener(handler, listener.getReceivingWhitelist()); if (!cleaningUp) - clientQueue.signalPacketUpdate(removed); + clientQueue.signalPacketUpdate(removed, synchronusOK); } } + /** + * Determine if we're running on the main thread. + * @return TRUE if we are, FALSE otherwise. + */ + private boolean onMainThread() { + return Thread.currentThread().getId() == mainThread.getId(); + } + @Override public void unregisterAsyncHandlers(Plugin plugin) { unregisterAsyncHandlers(serverProcessingQueue, plugin); @@ -196,7 +207,7 @@ public class AsyncFilterManager implements AsynchronousManager { * @param packet - packet to signal. */ public void signalPacketUpdate(PacketEvent packet) { - getSendingQueue(packet).signalPacketUpdate(packet); + getSendingQueue(packet).signalPacketUpdate(packet, onMainThread()); } /** @@ -228,8 +239,13 @@ public class AsyncFilterManager implements AsynchronousManager { /** * Send any due packets, or clean up packets that have expired. */ - public void sendProcessedPackets() { - clientQueue.trySendPackets(); - serverQueue.trySendPackets(); + public void sendProcessedPackets(int tickCounter, boolean onMainThread) { + + // The server queue is unlikely to need checking that often + if (tickCounter % 10 == 0) { + serverQueue.trySendPackets(onMainThread); + } + + clientQueue.trySendPackets(onMainThread); } } diff --git a/ProtocolLib/src/com/comphenix/protocol/async/PacketSendingQueue.java b/ProtocolLib/src/com/comphenix/protocol/async/PacketSendingQueue.java index 911fc342..5c8ddfb9 100644 --- a/ProtocolLib/src/com/comphenix/protocol/async/PacketSendingQueue.java +++ b/ProtocolLib/src/com/comphenix/protocol/async/PacketSendingQueue.java @@ -20,8 +20,12 @@ class PacketSendingQueue { private PriorityBlockingQueue sendingQueue; - public PacketSendingQueue() { - sendingQueue = new PriorityBlockingQueue(INITIAL_CAPACITY, new Comparator() { + // Whether or not packet transmission can only occur on the main thread + private final boolean synchronizeMain; + + public PacketSendingQueue(boolean synchronizeMain) { + this.synchronizeMain = synchronizeMain; + this.sendingQueue = new PriorityBlockingQueue(INITIAL_CAPACITY, new Comparator() { // Compare using the async marker @Override public int compare(PacketEvent o1, PacketEvent o2) { @@ -43,13 +47,13 @@ class PacketSendingQueue { /** * Invoked when one of the packets have finished processing. */ - public synchronized void signalPacketUpdate(PacketEvent packetUpdated) { + public synchronized void signalPacketUpdate(PacketEvent packetUpdated, boolean onMainThread) { // Mark this packet as finished packetUpdated.getAsyncMarker().setProcessed(true); - trySendPackets(); + trySendPackets(onMainThread); } - public synchronized void signalPacketUpdate(List packetsRemoved) { + public synchronized void signalPacketUpdate(List packetsRemoved, boolean onMainThread) { Set lookup = new HashSet(packetsRemoved); @@ -60,14 +64,19 @@ class PacketSendingQueue { } } - // This is likely to have changed the situation a bit - trySendPackets(); + // This is likely to have changed the situation a bit + trySendPackets(onMainThread); } /** * Attempt to send any remaining packets. */ - public synchronized void trySendPackets() { + public void trySendPackets(boolean onMainThread) { + + // Abort if we're not on the main thread + if (synchronizeMain && !onMainThread) + return; + // Transmit as many packets as we can while (true) { PacketEvent current = sendingQueue.peek(); @@ -92,7 +101,7 @@ class PacketSendingQueue { /** * Send every packet, regardless of the processing state. */ - public synchronized void forceSend() { + private void forceSend() { while (true) { PacketEvent current = sendingQueue.poll(); @@ -104,6 +113,14 @@ class PacketSendingQueue { } } + /** + * Whether or not the packet transmission must synchronize with the main thread. + * @return TRUE if it must, FALSE otherwise. + */ + public boolean isSynchronizeMain() { + return synchronizeMain; + } + /** * Transmit a packet, if it hasn't already. * @param event - the packet to transmit. @@ -128,6 +145,7 @@ class PacketSendingQueue { * Automatically transmits every delayed packet. */ public void cleanupAll() { + // Note that the cleanup itself will always occur on the main thread forceSend(); } }