diff --git a/ProtocolLib/src/com/comphenix/protocol/async/AsyncFilterManager.java b/ProtocolLib/src/com/comphenix/protocol/async/AsyncFilterManager.java index cefdd30d..a5a7c21a 100644 --- a/ProtocolLib/src/com/comphenix/protocol/async/AsyncFilterManager.java +++ b/ProtocolLib/src/com/comphenix/protocol/async/AsyncFilterManager.java @@ -1,5 +1,6 @@ package com.comphenix.protocol.async; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Logger; @@ -95,6 +96,24 @@ public class AsyncFilterManager { getProcessingQueue(syncPacket).enqueue(newEvent); } + /** + * Retrieves a immutable set containing the ID of the sent server packets that will be + * observed by the asynchronous listeners. + * @return Every filtered server packet. + */ + public Set getSendingFilters() { + return serverProcessingQueue.keySet(); + } + + /** + * Retrieves a immutable set containing the ID of the recieved client packets that will be + * observed by the asynchronous listeners. + * @return Every filtered client packet. + */ + public Set getReceivingFilters() { + return clientProcessingQueue.keySet(); + } + /** * Determine if a given synchronous packet has asynchronous listeners. * @param packet - packet to test. diff --git a/ProtocolLib/src/com/comphenix/protocol/async/PacketSendingQueue.java b/ProtocolLib/src/com/comphenix/protocol/async/PacketSendingQueue.java index 97452f7b..5028738f 100644 --- a/ProtocolLib/src/com/comphenix/protocol/async/PacketSendingQueue.java +++ b/ProtocolLib/src/com/comphenix/protocol/async/PacketSendingQueue.java @@ -1,9 +1,11 @@ package com.comphenix.protocol.async; import java.io.IOException; +import java.util.Comparator; import java.util.concurrent.PriorityBlockingQueue; import com.comphenix.protocol.events.PacketEvent; +import com.google.common.collect.ComparisonChain; /** * Represents packets ready to be transmitted to a client. @@ -11,8 +13,22 @@ import com.comphenix.protocol.events.PacketEvent; */ class PacketSendingQueue { + private static final int INITIAL_CAPACITY = 64; + private PriorityBlockingQueue sendingQueue; + public PacketSendingQueue() { + sendingQueue = new PriorityBlockingQueue(INITIAL_CAPACITY, new Comparator() { + // Compare using the async marker + @Override + public int compare(PacketEvent o1, PacketEvent o2) { + return ComparisonChain.start(). + compare(o1.getAsyncMarker(), o2.getAsyncMarker()). + result(); + } + }); + } + /** * Enqueue a packet for sending. * @param packet diff --git a/ProtocolLib/src/com/comphenix/protocol/concurrency/AbstractConcurrentListenerMultimap.java b/ProtocolLib/src/com/comphenix/protocol/concurrency/AbstractConcurrentListenerMultimap.java index 5154f261..00741fe2 100644 --- a/ProtocolLib/src/com/comphenix/protocol/concurrency/AbstractConcurrentListenerMultimap.java +++ b/ProtocolLib/src/com/comphenix/protocol/concurrency/AbstractConcurrentListenerMultimap.java @@ -3,6 +3,7 @@ package com.comphenix.protocol.concurrency; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -109,10 +110,18 @@ public abstract class AbstractConcurrentListenerMultimap { * Retrieve every listener. * @return Every listener. */ - protected Iterable> values() { + public Iterable> values() { return Iterables.concat(listeners.values()); } + /** + * Retrieve every registered packet ID: + * @return Registered packet ID. + */ + public Set keySet() { + return listeners.keySet(); + } + /** * Remove all packet listeners. */ diff --git a/ProtocolLib/src/com/comphenix/protocol/events/PacketEvent.java b/ProtocolLib/src/com/comphenix/protocol/events/PacketEvent.java index aba9a2ff..799154f3 100644 --- a/ProtocolLib/src/com/comphenix/protocol/events/PacketEvent.java +++ b/ProtocolLib/src/com/comphenix/protocol/events/PacketEvent.java @@ -166,6 +166,21 @@ public class PacketEvent extends EventObject implements Cancellable { public AsyncMarker getAsyncMarker() { return asyncMarker; } + /** + * Set the asynchronous marker. + *

+ * If the marker is non-null at the end of an synchronous event processing, the packet will be scheduled + * to be processed asynchronously with the given settings. + *

+ * Note that if there are no asynchronous events that can receive this packet, the marker should be NULL. + * @param asyncMarker - the new asynchronous marker, or NULL. + * @throws IllegalStateException If the current event is asynchronous. + */ + public void setAsyncMarker(AsyncMarker asyncMarker) { + if (isAsynchronous()) + throw new IllegalStateException("The marker is immutable for asynchronous events"); + this.asyncMarker = asyncMarker; + } /** * Determine if the packet event has been executed asynchronously or not.