diff --git a/ProtocolLib/src/com/comphenix/protocol/async/AsyncFilterManager.java b/ProtocolLib/src/com/comphenix/protocol/async/AsyncFilterManager.java index b6a75576..10a46f90 100644 --- a/ProtocolLib/src/com/comphenix/protocol/async/AsyncFilterManager.java +++ b/ProtocolLib/src/com/comphenix/protocol/async/AsyncFilterManager.java @@ -6,6 +6,7 @@ import java.util.logging.Logger; import org.bukkit.plugin.Plugin; import com.comphenix.protocol.PacketStream; +import com.comphenix.protocol.events.ListeningWhitelist; import com.comphenix.protocol.events.PacketEvent; import com.comphenix.protocol.events.PacketListener; @@ -16,8 +17,11 @@ import com.comphenix.protocol.events.PacketListener; */ public class AsyncFilterManager { - private PacketProcessingQueue processingQueue; - private PacketSendingQueue sendingQueue; + private PacketProcessingQueue serverProcessingQueue; + private PacketSendingQueue serverQueue; + + private PacketProcessingQueue clientProcessingQueue; + private PacketSendingQueue clientQueue; private PacketStream packetStream; private Logger logger; @@ -29,8 +33,10 @@ public class AsyncFilterManager { private AtomicInteger currentSendingIndex = new AtomicInteger(); public AsyncFilterManager(Logger logger, PacketStream packetStream) { - this.sendingQueue = new PacketSendingQueue(); - this.processingQueue = new PacketProcessingQueue(sendingQueue); + this.serverQueue = new PacketSendingQueue(); + this.clientQueue = new PacketSendingQueue(); + this.serverProcessingQueue = new PacketProcessingQueue(serverQueue); + this.clientProcessingQueue = new PacketProcessingQueue(clientQueue); this.packetStream = packetStream; this.logger = logger; @@ -40,10 +46,23 @@ public class AsyncFilterManager { public ListenerToken registerAsyncHandler(Plugin plugin, PacketListener listener) { ListenerToken token = new ListenerToken(plugin, mainThread, this, listener); - processingQueue.addListener(token, listener.getSendingWhitelist()); + // Add listener to either or both processing queue + if (hasValidWhitelist(listener.getSendingWhitelist())) + serverProcessingQueue.addListener(token, listener.getSendingWhitelist()); + if (hasValidWhitelist(listener.getReceivingWhitelist())) + clientProcessingQueue.addListener(token, listener.getReceivingWhitelist()); + return token; } + private boolean hasValidWhitelist(ListeningWhitelist whitelist) { + return whitelist != null && whitelist.getWhitelist().size() > 0; + } + + /** + * Unregisters and closes the given asynchronous handler. + * @param listenerToken - asynchronous handler. + */ public void unregisterAsyncHandler(ListenerToken listenerToken) { if (listenerToken == null) throw new IllegalArgumentException("listenerToken cannot be NULL"); @@ -53,8 +72,14 @@ public class AsyncFilterManager { // Called by ListenerToken void unregisterAsyncHandlerInternal(ListenerToken listenerToken) { - // Just remove it from the queue - processingQueue.removeListener(listenerToken, listenerToken.getAsyncListener().getSendingWhitelist()); + + PacketListener listener = listenerToken.getAsyncListener(); + + // Just remove it from the queue(s) + if (hasValidWhitelist(listener.getSendingWhitelist())) + serverProcessingQueue.removeListener(listenerToken, listener.getSendingWhitelist()); + if (hasValidWhitelist(listener.getReceivingWhitelist())) + clientProcessingQueue.removeListener(listenerToken, listener.getReceivingWhitelist()); } /** @@ -66,8 +91,25 @@ public class AsyncFilterManager { PacketEvent newEvent = PacketEvent.fromSynchronous(syncPacket, asyncMarker); // Start the process - sendingQueue.enqueue(newEvent); - processingQueue.enqueuePacket(newEvent); + getSendingQueue(syncPacket).enqueue(newEvent); + getProcessingQueue(syncPacket).enqueue(newEvent); + } + + /** + * Determine if a given synchronous packet has asynchronous listeners. + * @param packet - packet to test. + * @return TRUE if it does, FALSE otherwise. + */ + public boolean hasAsynchronousListeners(PacketEvent packet) { + return getProcessingQueue(packet).getListener(packet.getPacketID()).size() > 0; + } + + /** + * Construct a asynchronous marker with all the default values. + * @return Asynchronous marker. + */ + public AsyncMarker createAsyncMarker() { + return createAsyncMarker(AsyncMarker.DEFAULT_SENDING_DELTA, AsyncMarker.DEFAULT_TIMEOUT_DELTA); } /** @@ -86,27 +128,61 @@ public class AsyncFilterManager { return new AsyncMarker(packetStream, sendingIndex, sendingDelta, System.currentTimeMillis(), timeoutDelta); } + /** + * Retrieve the default packet stream. + * @return Default packet stream. + */ public PacketStream getPacketStream() { return packetStream; } + /** + * Retrieve the default error logger. + * @return Default logger. + */ public Logger getLogger() { return logger; } - PacketProcessingQueue getProcessingQueue() { - return processingQueue; - } - - PacketSendingQueue getSendingQueue() { - return sendingQueue; - } - /** * Remove listeners, close threads and transmit every delayed packet. */ public void cleanupAll() { - processingQueue.cleanupAll(); - sendingQueue.cleanupAll(); + serverProcessingQueue.cleanupAll(); + serverQueue.cleanupAll(); + } + + /** + * Signal that a packet is ready to be transmitted. + * @param packet - packet to signal. + */ + public void signalPacketUpdate(PacketEvent packet) { + getSendingQueue(packet).signalPacketUpdate(packet); + } + + /** + * Retrieve the sending queue this packet belongs to. + * @param packet - the packet. + * @return The server or client sending queue the packet belongs to. + */ + private PacketSendingQueue getSendingQueue(PacketEvent packet) { + return packet.isServerPacket() ? serverQueue : clientQueue; + } + + /** + * Signal that a packet has finished processing. + * @param packet - packet to signal. + */ + public void signalProcessingDone(PacketEvent packet) { + getProcessingQueue(packet).signalProcessingDone(); + } + + /** + * Retrieve the processing queue this packet belongs to. + * @param packet - the packet. + * @return The server or client sending processing the packet belongs to. + */ + private PacketProcessingQueue getProcessingQueue(PacketEvent packet) { + return packet.isServerPacket() ? serverProcessingQueue : clientProcessingQueue; } } diff --git a/ProtocolLib/src/com/comphenix/protocol/async/AsyncMarker.java b/ProtocolLib/src/com/comphenix/protocol/async/AsyncMarker.java index dbf5940b..378e9c18 100644 --- a/ProtocolLib/src/com/comphenix/protocol/async/AsyncMarker.java +++ b/ProtocolLib/src/com/comphenix/protocol/async/AsyncMarker.java @@ -27,6 +27,11 @@ public class AsyncMarker implements Serializable, Comparable { */ public static final int DEFAULT_TIMEOUT_DELTA = 60000; + /** + * Default number of packets to skip. + */ + public static final int DEFAULT_SENDING_DELTA = 0; + /** * The packet stream responsible for transmitting the packet when it's done processing. */ @@ -48,8 +53,11 @@ public class AsyncMarker implements Serializable, Comparable { // Whether or not the packet has been processed by the listeners private volatile boolean processed; - // Whethre or not the packet has been sent + // Whether or not the packet has been sent private volatile boolean transmitted; + + // Whether or not the asynchronous processing itself should be cancelled + private volatile boolean asyncCancelled; /** * Create a container for asyncronous packets. @@ -86,6 +94,14 @@ public class AsyncMarker implements Serializable, Comparable { return timeout; } + /** + * Set the time the packet will be forcefully rejected. + * @param timeout - time to reject the packet, in milliseconds since 01.01.1970 00:00. + */ + public void setTimeout(long timeout) { + this.timeout = timeout; + } + /** * Retrieve the order the packet was originally transmitted. * @return The original packet index. @@ -154,6 +170,39 @@ public class AsyncMarker implements Serializable, Comparable { return transmitted; } + /** + * Determine if this packet has expired. + * @return TRUE if it has, FALSE otherwise. + */ + public boolean hasExpired() { + return hasExpired(System.currentTimeMillis()); + } + + /** + * Determine if this packet has expired given this time. + * @param currentTime - the current time in milliseconds since 01.01.1970 00:00. + * @return TRUE if it has, FALSE otherwise. + */ + public boolean hasExpired(long currentTime) { + return timeout < currentTime; + } + + /** + * Determine if the asynchronous handling should be cancelled. + * @return TRUE if it should, FALSE otherwise. + */ + public boolean isAsyncCancelled() { + return asyncCancelled; + } + + /** + * Set whether or not the asynchronous handling should be cancelled. + * @param asyncCancelled - TRUE to cancel it, FALSE otherwise. + */ + public void setAsyncCancelled(boolean asyncCancelled) { + this.asyncCancelled = asyncCancelled; + } + /** * Retrieve iterator for the next listener in line. * @return Next async packet listener iterator. diff --git a/ProtocolLib/src/com/comphenix/protocol/async/ListenerToken.java b/ProtocolLib/src/com/comphenix/protocol/async/ListenerToken.java index 472762af..7ada78ae 100644 --- a/ProtocolLib/src/com/comphenix/protocol/async/ListenerToken.java +++ b/ProtocolLib/src/com/comphenix/protocol/async/ListenerToken.java @@ -126,8 +126,8 @@ class ListenerToken { } // There are no more listeners - queue the packet for transmission - filterManager.getSendingQueue().signalPacketUpdate(packet); - filterManager.getProcessingQueue().signalProcessingDone(); + filterManager.signalPacketUpdate(packet); + filterManager.signalProcessingDone(packet); } } catch (InterruptedException e) { diff --git a/ProtocolLib/src/com/comphenix/protocol/async/PacketProcessingQueue.java b/ProtocolLib/src/com/comphenix/protocol/async/PacketProcessingQueue.java index f1d7c431..f703889e 100644 --- a/ProtocolLib/src/com/comphenix/protocol/async/PacketProcessingQueue.java +++ b/ProtocolLib/src/com/comphenix/protocol/async/PacketProcessingQueue.java @@ -55,7 +55,7 @@ class PacketProcessingQueue extends AbstractConcurrentListenerMultimap * If the packet is synchronous, this marker will be used to schedule an asynchronous event. In this * asynchronous event, the marker is used to correctly pass the packet around to the different threads. - * @return The current asynchronous marker. + *

+ * Note that if there are no asynchronous events that can receive this packet, the marker is NULL. + * @return The current asynchronous marker, or NULL. */ public AsyncMarker getAsyncMarker() { return asyncMarker; } - + /** * Set the asynchronous marker. *

* If the marker is non-null at the end of an synchronous event processing, the packet will be scheduled - * to be processed asynchronously with the given settings. - * @param asyncMarker - the new asynchronous marker. + * to be processed asynchronously with the given settings. + *

+ * Note that if there are no asynchronous events that can receive this packet, the marker should be NULL. + * @param asyncMarker - the new asynchronous marker, or NULL. */ public void setAsyncMarker(AsyncMarker asyncMarker) { this.asyncMarker = asyncMarker; @@ -182,7 +186,7 @@ public class PacketEvent extends EventObject implements Cancellable { public boolean isAsynchronous() { return asynchronous; } - + private void writeObject(ObjectOutputStream output) throws IOException { // Default serialization output.defaultWriteObject(); diff --git a/ProtocolLib/src/com/comphenix/protocol/injector/PacketFilterManager.java b/ProtocolLib/src/com/comphenix/protocol/injector/PacketFilterManager.java index 10c256d9..75016015 100644 --- a/ProtocolLib/src/com/comphenix/protocol/injector/PacketFilterManager.java +++ b/ProtocolLib/src/com/comphenix/protocol/injector/PacketFilterManager.java @@ -47,6 +47,7 @@ import org.bukkit.plugin.PluginManager; import com.comphenix.protocol.ProtocolManager; import com.comphenix.protocol.async.AsyncFilterManager; +import com.comphenix.protocol.async.AsyncMarker; import com.comphenix.protocol.events.*; import com.comphenix.protocol.reflect.FieldAccessException; import com.comphenix.protocol.reflect.FuzzyReflection; @@ -284,7 +285,7 @@ public final class PacketFilterManager implements ProtocolManager { * @param event - the packet event to invoke. */ public void invokePacketRecieving(PacketEvent event) { - recievedListeners.invokePacketRecieving(logger, event); + handlePacket(recievedListeners, event); } /** @@ -292,7 +293,36 @@ public final class PacketFilterManager implements ProtocolManager { * @param event - the packet event to invoke. */ public void invokePacketSending(PacketEvent event) { - sendingListeners.invokePacketSending(logger, event); + handlePacket(sendingListeners, event); + } + + /** + * Handle a packet sending or receiving event. + *

+ * Note that we also handle asynchronous events. + * @param packetListeners - packet listeners that will receive this event. + * @param event - the evnet to broadcast. + */ + private void handlePacket(SortedPacketListenerList packetListeners, PacketEvent event) { + + // By default, asynchronous packets are queued for processing + if (asyncFilterManager.hasAsynchronousListeners(event)) { + event.setAsyncMarker(asyncFilterManager.createAsyncMarker()); + } + + // Process synchronous events + packetListeners.invokePacketRecieving(logger, event); + + // To cancel the asynchronous processing, use the async marker + if (!event.isCancelled() && !hasAsyncCancelled(event.getAsyncMarker())) { + asyncFilterManager.enqueueSyncPacket(event, event.getAsyncMarker()); + event.setCancelled(true); + } + } + + // NULL marker mean we're dealing with no asynchronous listeners + private boolean hasAsyncCancelled(AsyncMarker marker) { + return marker == null || marker.isAsyncCancelled(); } /**