diff --git a/ProtocolLib/src/main/java/com/comphenix/protocol/AsynchronousManager.java b/ProtocolLib/src/main/java/com/comphenix/protocol/AsynchronousManager.java index a449b1ca..4d1d8f35 100644 --- a/ProtocolLib/src/main/java/com/comphenix/protocol/AsynchronousManager.java +++ b/ProtocolLib/src/main/java/com/comphenix/protocol/AsynchronousManager.java @@ -99,4 +99,22 @@ public interface AsynchronousManager { * @param packet - packet to signal. */ public abstract void signalPacketTransmission(PacketEvent packet); + + /** + * Register a synchronous listener that handles packets when they time out. + * @param listener - synchronous listener that will handle timed out packets. + */ + public abstract void registerTimeoutHandler(PacketListener listener); + + /** + * Unregisters a given timeout listener. + * @param listener - the timeout listener to unregister. + */ + public abstract void unregisterTimeoutHandler(PacketListener listener); + + /** + * Get a immutable list of every registered timeout handler. + * @return List of every registered timeout handler. + */ + public abstract Set getTimeoutHandlers(); } \ No newline at end of file diff --git a/ProtocolLib/src/main/java/com/comphenix/protocol/async/AsyncFilterManager.java b/ProtocolLib/src/main/java/com/comphenix/protocol/async/AsyncFilterManager.java index 7b5744e7..7dd7b504 100644 --- a/ProtocolLib/src/main/java/com/comphenix/protocol/async/AsyncFilterManager.java +++ b/ProtocolLib/src/main/java/com/comphenix/protocol/async/AsyncFilterManager.java @@ -20,6 +20,7 @@ package com.comphenix.protocol.async; import java.util.Collection; import java.util.List; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import org.bukkit.plugin.Plugin; @@ -34,7 +35,10 @@ import com.comphenix.protocol.events.PacketEvent; import com.comphenix.protocol.events.PacketListener; import com.comphenix.protocol.injector.PacketFilterManager; import com.comphenix.protocol.injector.PrioritizedListener; +import com.comphenix.protocol.injector.SortedPacketListenerList; import com.google.common.base.Objects; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; /** * Represents a filter manager for asynchronous packets. @@ -43,9 +47,14 @@ import com.google.common.base.Objects; */ public class AsyncFilterManager implements AsynchronousManager { + private SortedPacketListenerList serverTimeoutListeners; + private SortedPacketListenerList clientTimeoutListeners; + private Set timeoutListeners; + private PacketProcessingQueue serverProcessingQueue; private PacketSendingQueue serverQueue; + private PacketProcessingQueue clientProcessingQueue; private PacketSendingQueue clientQueue; @@ -68,11 +77,30 @@ public class AsyncFilterManager implements AsynchronousManager { public AsyncFilterManager(ErrorReporter reporter, BukkitScheduler scheduler, ProtocolManager manager) { + // Initialize timeout listeners + serverTimeoutListeners = new SortedPacketListenerList(); + clientTimeoutListeners = new SortedPacketListenerList(); + timeoutListeners = Sets.newSetFromMap(new ConcurrentHashMap()); + // Server packets are synchronized already - this.serverQueue = new PacketSendingQueue(false); + this.serverQueue = new PacketSendingQueue(false) { + @Override + protected void onPacketTimeout(PacketEvent event) { + if (!cleaningUp) { + serverTimeoutListeners.invokePacketSending(AsyncFilterManager.this.reporter, event); + } + } + }; // Client packets must be synchronized - this.clientQueue = new PacketSendingQueue(true); + this.clientQueue = new PacketSendingQueue(true) { + @Override + protected void onPacketTimeout(PacketEvent event) { + if (!cleaningUp) { + clientTimeoutListeners.invokePacketSending(AsyncFilterManager.this.reporter, event); + } + } + }; this.serverProcessingQueue = new PacketProcessingQueue(serverQueue); this.clientProcessingQueue = new PacketProcessingQueue(clientQueue); @@ -89,6 +117,27 @@ public class AsyncFilterManager implements AsynchronousManager { return registerAsyncHandler(listener, true); } + @Override + public void registerTimeoutHandler(PacketListener listener) { + if (listener == null) + throw new IllegalArgumentException("listener cannot be NULL."); + if (!timeoutListeners.add(listener)) + return; + + ListeningWhitelist sending = listener.getSendingWhitelist(); + ListeningWhitelist receiving = listener.getReceivingWhitelist(); + + if (!ListeningWhitelist.isEmpty(sending)) + serverTimeoutListeners.addListener(listener, sending); + if (!ListeningWhitelist.isEmpty(receiving)) + serverTimeoutListeners.addListener(listener, receiving); + } + + @Override + public Set getTimeoutHandlers() { + return ImmutableSet.copyOf(timeoutListeners); + } + /** * Registers an asynchronous packet handler. *

@@ -131,6 +180,21 @@ public class AsyncFilterManager implements AsynchronousManager { return whitelist != null && whitelist.getWhitelist().size() > 0; } + @Override + public void unregisterTimeoutHandler(PacketListener listener) { + if (listener == null) + throw new IllegalArgumentException("listener cannot be NULL."); + + ListeningWhitelist sending = listener.getSendingWhitelist(); + ListeningWhitelist receiving = listener.getReceivingWhitelist(); + + // Do it in the opposite order + if (serverTimeoutListeners.removeListener(listener, sending).size() > 0 || + clientTimeoutListeners.removeListener(listener, receiving).size() > 0) { + timeoutListeners.remove(listener); + } + } + @Override public void unregisterAsyncHandler(AsyncListenerHandler handler) { if (handler == null) @@ -276,6 +340,10 @@ public class AsyncFilterManager implements AsynchronousManager { cleaningUp = true; serverProcessingQueue.cleanupAll(); serverQueue.cleanupAll(); + + timeoutListeners.clear(); + serverTimeoutListeners = null; + clientTimeoutListeners = null; } @Override @@ -333,7 +401,6 @@ public class AsyncFilterManager implements AsynchronousManager { * Send any due packets, or clean up packets that have expired. */ public void sendProcessedPackets(int tickCounter, boolean onMainThread) { - // The server queue is unlikely to need checking that often if (tickCounter % 10 == 0) { serverQueue.trySendPackets(onMainThread); diff --git a/ProtocolLib/src/main/java/com/comphenix/protocol/async/AsyncMarker.java b/ProtocolLib/src/main/java/com/comphenix/protocol/async/AsyncMarker.java index 989545fc..febbbbcf 100644 --- a/ProtocolLib/src/main/java/com/comphenix/protocol/async/AsyncMarker.java +++ b/ProtocolLib/src/main/java/com/comphenix/protocol/async/AsyncMarker.java @@ -418,7 +418,7 @@ public class AsyncMarker implements Serializable, Comparable { // We're in 1.2.5 alwaysSync = true; } else { - System.err.println("Cannot determine asynchronous state of packets!"); + System.err.println("[ProtocolLib] Cannot determine asynchronous state of packets!"); alwaysSync = true; } } diff --git a/ProtocolLib/src/main/java/com/comphenix/protocol/async/PacketSendingQueue.java b/ProtocolLib/src/main/java/com/comphenix/protocol/async/PacketSendingQueue.java index e9816259..698c2def 100644 --- a/ProtocolLib/src/main/java/com/comphenix/protocol/async/PacketSendingQueue.java +++ b/ProtocolLib/src/main/java/com/comphenix/protocol/async/PacketSendingQueue.java @@ -27,13 +27,14 @@ import org.bukkit.entity.Player; import com.comphenix.protocol.events.PacketEvent; import com.comphenix.protocol.injector.PlayerLoggedOutException; +import com.comphenix.protocol.injector.SortedPacketListenerList; import com.comphenix.protocol.reflect.FieldAccessException; /** * Represents packets ready to be transmitted to a client. * @author Kristian */ -class PacketSendingQueue { +abstract class PacketSendingQueue { public static final int INITIAL_CAPACITY = 64; @@ -77,7 +78,7 @@ class PacketSendingQueue { AsyncMarker marker = packetUpdated.getAsyncMarker(); // Should we reorder the event? - if (marker.getQueuedSendingIndex() != marker.getNewSendingIndex()) { + if (marker.getQueuedSendingIndex() != marker.getNewSendingIndex() && !marker.hasExpired()) { PacketEvent copy = PacketEvent.fromSynchronous(packetUpdated, marker); // "Cancel" the original event @@ -127,6 +128,7 @@ class PacketSendingQueue { if (holder != null) { PacketEvent current = holder.getEvent(); AsyncMarker marker = current.getAsyncMarker(); + boolean hasExpired = marker.hasExpired(); // Abort if we're not on the main thread if (synchronizeMain) { @@ -144,8 +146,16 @@ class PacketSendingQueue { } } - if (marker.isProcessed() || marker.hasExpired()) { - if (marker.isProcessed() && !current.isCancelled()) { + if (marker.isProcessed() || hasExpired) { + if (hasExpired) { + // Notify timeout listeners + onPacketTimeout(current); + + // Recompute + marker = current.getAsyncMarker(); + hasExpired = marker.hasExpired(); + } + if (marker.isProcessed() && !current.isCancelled() && !hasExpired) { // Silently skip players that have logged out if (isOnline(current.getPlayer())) { sendPacket(current); @@ -162,6 +172,12 @@ class PacketSendingQueue { } } + /** + * Invoked when a packet has timed out. + * @param event - the timed out packet. + */ + protected abstract void onPacketTimeout(PacketEvent event); + private boolean isOnline(Player player) { return player != null && player.isOnline(); } diff --git a/ProtocolLib/src/main/java/com/comphenix/protocol/injector/PacketFilterManager.java b/ProtocolLib/src/main/java/com/comphenix/protocol/injector/PacketFilterManager.java index e84998eb..29ee6f9d 100644 --- a/ProtocolLib/src/main/java/com/comphenix/protocol/injector/PacketFilterManager.java +++ b/ProtocolLib/src/main/java/com/comphenix/protocol/injector/PacketFilterManager.java @@ -108,8 +108,8 @@ public final class PacketFilterManager implements ProtocolManager, ListenerInvok private PlayerInjectionHandler playerInjection; // The two listener containers - private SortedPacketListenerList recievedListeners = new SortedPacketListenerList(); - private SortedPacketListenerList sendingListeners = new SortedPacketListenerList(); + private SortedPacketListenerList recievedListeners; + private SortedPacketListenerList sendingListeners; // Whether or not this class has been closed private volatile boolean hasClosed; @@ -150,6 +150,10 @@ public final class PacketFilterManager implements ProtocolManager, ListenerInvok // Just boilerplate final DelayedSingleTask finalUnhookTask = unhookTask; + // Listener containers + this.recievedListeners = new SortedPacketListenerList(); + this.sendingListeners = new SortedPacketListenerList(); + // References this.unhookTask = unhookTask; this.server = server; @@ -366,12 +370,16 @@ public final class PacketFilterManager implements ProtocolManager, ListenerInvok @Override public void invokePacketRecieving(PacketEvent event) { - handlePacket(recievedListeners, event, false); + if (!hasClosed) { + handlePacket(recievedListeners, event, false); + } } @Override public void invokePacketSending(PacketEvent event) { - handlePacket(sendingListeners, event, true); + if (!hasClosed) { + handlePacket(sendingListeners, event, true); + } } /** @@ -812,6 +820,8 @@ public final class PacketFilterManager implements ProtocolManager, ListenerInvok // Remove listeners packetListeners.clear(); + recievedListeners = null; + sendingListeners = null; // Clean up async handlers. We have to do this last. asyncFilterManager.cleanupAll(); diff --git a/ProtocolLib/src/main/java/com/comphenix/protocol/injector/SortedPacketListenerList.java b/ProtocolLib/src/main/java/com/comphenix/protocol/injector/SortedPacketListenerList.java index 5a1ef288..d3184b10 100644 --- a/ProtocolLib/src/main/java/com/comphenix/protocol/injector/SortedPacketListenerList.java +++ b/ProtocolLib/src/main/java/com/comphenix/protocol/injector/SortedPacketListenerList.java @@ -29,7 +29,7 @@ import com.comphenix.protocol.events.PacketListener; * * @author Kristian */ -class SortedPacketListenerList extends AbstractConcurrentListenerMultimap { +public final class SortedPacketListenerList extends AbstractConcurrentListenerMultimap { /** * Invokes the given packet event for every registered listener.