From 0b200472f187a7bb296f194f341f9037e111037f Mon Sep 17 00:00:00 2001 From: "Kristian S. Stangeland" Date: Wed, 21 Nov 2012 01:39:43 +0100 Subject: [PATCH] Process asynchronous packets on an async thread. Bukkit complains if we try to send an async packet on the main thread, so we will have to add a new background thread that can transmit packets processed by light-weight packet listeners. In addition, fixed a bug causing the "uncancel" method in PacketInjector from not working properly. That bug is as persistent as a zombie. --- .../protocol/async/AsyncFilterManager.java | 1 + .../protocol/async/PacketSendingQueue.java | 159 ++++++++++++------ .../protocol/async/PlayerSendingHandler.java | 42 ++++- .../protocol/injector/PacketInjector.java | 4 +- .../protocol/injector/ReadPacketModifier.java | 9 + 5 files changed, 155 insertions(+), 60 deletions(-) diff --git a/ProtocolLib/src/main/java/com/comphenix/protocol/async/AsyncFilterManager.java b/ProtocolLib/src/main/java/com/comphenix/protocol/async/AsyncFilterManager.java index 5fe61857..92de92a1 100644 --- a/ProtocolLib/src/main/java/com/comphenix/protocol/async/AsyncFilterManager.java +++ b/ProtocolLib/src/main/java/com/comphenix/protocol/async/AsyncFilterManager.java @@ -90,6 +90,7 @@ public class AsyncFilterManager implements AsynchronousManager { this.playerSendingHandler = new PlayerSendingHandler(reporter, serverTimeoutListeners, clientTimeoutListeners); this.serverProcessingQueue = new PacketProcessingQueue(playerSendingHandler); this.clientProcessingQueue = new PacketProcessingQueue(playerSendingHandler); + this.playerSendingHandler.initializeScheduler(); this.scheduler = scheduler; this.manager = manager; diff --git a/ProtocolLib/src/main/java/com/comphenix/protocol/async/PacketSendingQueue.java b/ProtocolLib/src/main/java/com/comphenix/protocol/async/PacketSendingQueue.java index d55aea30..bc1bcaf5 100644 --- a/ProtocolLib/src/main/java/com/comphenix/protocol/async/PacketSendingQueue.java +++ b/ProtocolLib/src/main/java/com/comphenix/protocol/async/PacketSendingQueue.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.Executor; import java.util.concurrent.PriorityBlockingQueue; import org.bukkit.entity.Player; @@ -39,8 +40,24 @@ abstract class PacketSendingQueue { private PriorityBlockingQueue sendingQueue; + // Asynchronous packet sending + private Executor asynchronousSender; + // Whether or not packet transmission can only occur on the main thread private final boolean synchronizeMain; + + // Whether or not we've run the cleanup procedure + private boolean cleanedUp = false; + + /** + * Create a packet sending queue. + * @param synchronizeMain - whether or not to synchronize with the main thread. + */ + public PacketSendingQueue(boolean synchronizeMain, Executor asynchronousSender) { + this.sendingQueue = new PriorityBlockingQueue(INITIAL_CAPACITY); + this.synchronizeMain = synchronizeMain; + this.asynchronousSender = asynchronousSender; + } /** * Number of packet events in the queue. @@ -50,15 +67,6 @@ abstract class PacketSendingQueue { return sendingQueue.size(); } - /** - * Create a packet sending queue. - * @param synchronizeMain - whether or not to synchronize with the main thread. - */ - public PacketSendingQueue(boolean synchronizeMain) { - this.sendingQueue = new PriorityBlockingQueue(INITIAL_CAPACITY); - this.synchronizeMain = synchronizeMain; - } - /** * Enqueue a packet for sending. * @param packet - packet to queue. @@ -119,55 +127,99 @@ abstract class PacketSendingQueue { * @param onMainThread - whether or not this is occuring on the main thread. */ public void trySendPackets(boolean onMainThread) { - + // Whether or not to continue sending packets + boolean sending = true; + // Transmit as many packets as we can - while (true) { - PacketEventHolder holder = sendingQueue.peek(); - + while (sending) { + PacketEventHolder holder = sendingQueue.poll(); + if (holder != null) { - PacketEvent current = holder.getEvent(); - AsyncMarker marker = current.getAsyncMarker(); - boolean hasExpired = marker.hasExpired(); - - // Abort if we're not on the main thread - if (synchronizeMain) { - try { - boolean wantAsync = marker.isMinecraftAsync(current); - boolean wantSync = !wantAsync; - - // Quit if we haven't fulfilled our promise - if ((onMainThread && wantAsync) || (!onMainThread && wantSync)) - return; - - } catch (FieldAccessException e) { - e.printStackTrace(); - return; - } + sending = processPacketHolder(onMainThread, holder); + + if (!sending) { + // Add it back again + sendingQueue.add(holder); } - if (marker.isProcessed() || hasExpired) { - if (hasExpired) { - // Notify timeout listeners - onPacketTimeout(current); - - // Recompute - marker = current.getAsyncMarker(); - hasExpired = marker.hasExpired(); - } - if (marker.isProcessed() && !current.isCancelled() && !hasExpired) { - // Silently skip players that have logged out - if (isOnline(current.getPlayer())) { - sendPacket(current); - } + } else { + // No more packets to send + sending = false; + } + } + } + + /** + * Invoked when a packet might be ready for transmission. + * @param onMainThread - TRUE if we're on the main thread, FALSE otherwise. + * @param holder - packet container. + * @return TRUE to continue sending packets, FALSE otherwise. + */ + private boolean processPacketHolder(boolean onMainThread, final PacketEventHolder holder) { + PacketEvent current = holder.getEvent(); + AsyncMarker marker = current.getAsyncMarker(); + boolean hasExpired = marker.hasExpired(); + + // Guard in cause the queue is closed + if (cleanedUp) { + return true; + } + + // End condition? + if (marker.isProcessed() || hasExpired) { + if (hasExpired) { + // Notify timeout listeners + onPacketTimeout(current); + + // Recompute + marker = current.getAsyncMarker(); + hasExpired = marker.hasExpired(); + } + + // Abort if we're not on the main thread + if (synchronizeMain && !hasExpired) { + try { + boolean wantAsync = marker.isMinecraftAsync(current); + boolean wantSync = !wantAsync; + + // Wait for the next main thread heartbeat if we haven't fulfilled our promise + if (!onMainThread && wantSync) { + return false; } - sendingQueue.poll(); - continue; + // Let's give it what it wants, then + if (onMainThread && wantAsync) { + asynchronousSender.execute(new Runnable() { + @Override + public void run() { + // We know this isn't on the main thread + processPacketHolder(false, holder); + } + }); + + // The executor will take it from here + return true; + } + + } catch (FieldAccessException e) { + e.printStackTrace(); + // Skip this packet + return true; } } - // Only repeat when packets are removed - break; + if (marker.isProcessed() && !current.isCancelled() && !hasExpired) { + // Silently skip players that have logged out + if (isOnline(current.getPlayer())) { + sendPacket(current); + } + } + + return true; + + } else { + // Add it back and stop sending + return false; } } @@ -234,7 +286,12 @@ abstract class PacketSendingQueue { * Automatically transmits every delayed packet. */ public void cleanupAll() { - // Note that the cleanup itself will always occur on the main thread - forceSend(); + if (!cleanedUp) { + // Note that the cleanup itself will always occur on the main thread + forceSend(); + + // And we're done + cleanedUp = true; + } } } diff --git a/ProtocolLib/src/main/java/com/comphenix/protocol/async/PlayerSendingHandler.java b/ProtocolLib/src/main/java/com/comphenix/protocol/async/PlayerSendingHandler.java index cb234d88..59bf3a2e 100644 --- a/ProtocolLib/src/main/java/com/comphenix/protocol/async/PlayerSendingHandler.java +++ b/ProtocolLib/src/main/java/com/comphenix/protocol/async/PlayerSendingHandler.java @@ -3,12 +3,16 @@ package com.comphenix.protocol.async; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; import org.bukkit.entity.Player; import com.comphenix.protocol.error.ErrorReporter; import com.comphenix.protocol.events.PacketEvent; import com.comphenix.protocol.injector.SortedPacketListenerList; +import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * Contains every sending queue for every player. @@ -24,6 +28,9 @@ class PlayerSendingHandler { private SortedPacketListenerList serverTimeoutListeners; private SortedPacketListenerList clientTimeoutListeners; + // Asynchronous packet sending + private Executor asynchronousSender; + // Whether or not we're currently cleaning up private volatile boolean cleaningUp; @@ -38,7 +45,7 @@ class PlayerSendingHandler { public QueueContainer() { // Server packets are synchronized already - serverQueue = new PacketSendingQueue(false) { + serverQueue = new PacketSendingQueue(false, asynchronousSender) { @Override protected void onPacketTimeout(PacketEvent event) { if (!cleaningUp) { @@ -48,7 +55,7 @@ class PlayerSendingHandler { }; // Client packets must be synchronized - clientQueue = new PacketSendingQueue(true) { + clientQueue = new PacketSendingQueue(true, asynchronousSender) { @Override protected void onPacketTimeout(PacketEvent event) { if (!cleaningUp) { @@ -67,6 +74,12 @@ class PlayerSendingHandler { } } + /** + * Initialize a packet sending handler. + * @param reporter - error reporter. + * @param serverTimeoutListeners - set of server timeout listeners. + * @param clientTimeoutListeners - set of client timeout listeners. + */ public PlayerSendingHandler(ErrorReporter reporter, SortedPacketListenerList serverTimeoutListeners, SortedPacketListenerList clientTimeoutListeners) { @@ -75,7 +88,20 @@ class PlayerSendingHandler { this.clientTimeoutListeners = clientTimeoutListeners; // Initialize storage of queues - playerSendingQueues = new ConcurrentHashMap(); + this.playerSendingQueues = new ConcurrentHashMap(); + } + + /** + * Start the asynchronous packet sender. + */ + public synchronized void initializeScheduler() { + if (asynchronousSender == null) { + ThreadFactory factory = new ThreadFactoryBuilder(). + setDaemon(true). + setNameFormat("ProtocolLib-AsyncSender %s"). + build(); + asynchronousSender = Executors.newSingleThreadExecutor(factory); + } } /** @@ -202,10 +228,12 @@ class PlayerSendingHandler { * Send all pending packets and clean up queues. */ public void cleanupAll() { - cleaningUp = true; - - sendAllPackets(); - playerSendingQueues.clear(); + if (!cleaningUp) { + cleaningUp = true; + + sendAllPackets(); + playerSendingQueues.clear(); + } } /** diff --git a/ProtocolLib/src/main/java/com/comphenix/protocol/injector/PacketInjector.java b/ProtocolLib/src/main/java/com/comphenix/protocol/injector/PacketInjector.java index 951e4d79..7630c360 100644 --- a/ProtocolLib/src/main/java/com/comphenix/protocol/injector/PacketInjector.java +++ b/ProtocolLib/src/main/java/com/comphenix/protocol/injector/PacketInjector.java @@ -83,8 +83,8 @@ class PacketInjector { public void undoCancel(Integer id, Packet packet) { ReadPacketModifier modifier = readModifier.get(id); - // Cancelled packets are represented with NULL - if (modifier != null && modifier.getOverride(packet) == null) { + // See if this packet has been cancelled before + if (modifier != null && modifier.hasCancelled(packet)) { modifier.removeOverride(packet); } } diff --git a/ProtocolLib/src/main/java/com/comphenix/protocol/injector/ReadPacketModifier.java b/ProtocolLib/src/main/java/com/comphenix/protocol/injector/ReadPacketModifier.java index 6e1967e1..ee32ddc2 100644 --- a/ProtocolLib/src/main/java/com/comphenix/protocol/injector/ReadPacketModifier.java +++ b/ProtocolLib/src/main/java/com/comphenix/protocol/injector/ReadPacketModifier.java @@ -74,6 +74,15 @@ class ReadPacketModifier implements MethodInterceptor { return override.get(packet); } + /** + * Determine if the given packet has been cancelled before. + * @param packet - the packet to check. + * @return TRUE if it has been cancelled, FALSE otherwise. + */ + public boolean hasCancelled(Packet packet) { + return getOverride(packet) == CANCEL_MARKER; + } + @Override public Object intercept(Object thisObj, Method method, Object[] args, MethodProxy proxy) throws Throwable {