+ * To start listening asynchronously, pass the getListenerLoop() runnable to a different thread.
+ * @param plugin - the plugin that is registering the handler.
+ * @param listener - the packet listener that will recieve these asynchronous events.
+ * @return An asynchrouns handler.
+ */
+ public abstract AsyncListenerHandler registerAsyncHandler(PacketListener listener);
+
+ /**
+ * Unregisters and closes the given asynchronous handler.
+ * @param handler - asynchronous handler.
+ */
+ public abstract void unregisterAsyncHandler(AsyncListenerHandler handler);
+
+ /**
+ * Unregisters every asynchronous handler associated with this plugin.
+ * @param plugin - the original plugin.
+ */
+ public void unregisterAsyncHandlers(Plugin plugin);
+
+ /**
+ * Retrieves a immutable set containing the ID of the sent server packets that will be
+ * observed by the asynchronous listeners.
+ * @return Every filtered server packet.
+ */
+ public abstract Set getSendingFilters();
+
+ /**
+ * Retrieves a immutable set containing the ID of the recieved client packets that will be
+ * observed by the asynchronous listeners.
+ * @return Every filtered client packet.
+ */
+ public abstract Set getReceivingFilters();
+
+ /**
+ * Determine if a given synchronous packet has asynchronous listeners.
+ * @param packet - packet to test.
+ * @return TRUE if it does, FALSE otherwise.
+ */
+ public abstract boolean hasAsynchronousListeners(PacketEvent packet);
+
+ /**
+ * Retrieve the default packet stream.
+ * @return Default packet stream.
+ */
+ public abstract PacketStream getPacketStream();
+
+ /**
+ * Retrieve the default error logger.
+ * @return Default logger.
+ */
+ public abstract Logger getLogger();
+
+ /**
+ * Remove listeners, close threads and transmit every delayed packet.
+ */
+ public abstract void cleanupAll();
+}
\ No newline at end of file
diff --git a/ProtocolLib/src/com/comphenix/protocol/PacketStream.java b/ProtocolLib/src/com/comphenix/protocol/PacketStream.java
new file mode 100644
index 00000000..464f6f5c
--- /dev/null
+++ b/ProtocolLib/src/com/comphenix/protocol/PacketStream.java
@@ -0,0 +1,55 @@
+package com.comphenix.protocol;
+
+import java.lang.reflect.InvocationTargetException;
+
+import org.bukkit.entity.Player;
+
+import com.comphenix.protocol.events.PacketContainer;
+
+/**
+ * Represents a object capable of sending or receiving packets.
+ *
+ * @author Kristian
+ */
+public interface PacketStream {
+
+ /**
+ * Send a packet to the given player.
+ * @param reciever - the reciever.
+ * @param packet - packet to send.
+ * @throws InvocationTargetException - if an error occured when sending the packet.
+ */
+ public void sendServerPacket(Player reciever, PacketContainer packet)
+ throws InvocationTargetException;
+
+ /**
+ * Send a packet to the given player.
+ * @param reciever - the reciever.
+ * @param packet - packet to send.
+ * @param filters - whether or not to invoke any packet filters.
+ * @throws InvocationTargetException - if an error occured when sending the packet.
+ */
+ public void sendServerPacket(Player reciever, PacketContainer packet, boolean filters)
+ throws InvocationTargetException;
+
+ /**
+ * Simulate recieving a certain packet from a given player.
+ * @param sender - the sender.
+ * @param packet - the packet that was sent.
+ * @throws InvocationTargetException If the reflection machinery failed.
+ * @throws IllegalAccessException If the underlying method caused an error.
+ */
+ public void recieveClientPacket(Player sender, PacketContainer packet)
+ throws IllegalAccessException, InvocationTargetException;
+
+ /**
+ * Simulate recieving a certain packet from a given player.
+ * @param sender - the sender.
+ * @param packet - the packet that was sent.
+ * @param filters - whether or not to invoke any packet filters.
+ * @throws InvocationTargetException If the reflection machinery failed.
+ * @throws IllegalAccessException If the underlying method caused an error.
+ */
+ public void recieveClientPacket(Player sender, PacketContainer packet, boolean filters)
+ throws IllegalAccessException, InvocationTargetException;
+}
diff --git a/ProtocolLib/src/com/comphenix/protocol/ProtocolLibrary.java b/ProtocolLib/src/com/comphenix/protocol/ProtocolLibrary.java
index 7a1ba112..1b8a7132 100644
--- a/ProtocolLib/src/com/comphenix/protocol/ProtocolLibrary.java
+++ b/ProtocolLib/src/com/comphenix/protocol/ProtocolLibrary.java
@@ -25,8 +25,10 @@ import org.bukkit.Server;
import org.bukkit.plugin.PluginManager;
import org.bukkit.plugin.java.JavaPlugin;
+import com.comphenix.protocol.async.AsyncFilterManager;
import com.comphenix.protocol.injector.PacketFilterManager;
import com.comphenix.protocol.metrics.Statistics;
+import com.comphenix.protocol.reflect.compiler.BackgroundCompiler;
public class ProtocolLibrary extends JavaPlugin {
@@ -39,10 +41,19 @@ public class ProtocolLibrary extends JavaPlugin {
// Metrics and statistisc
private Statistics statistisc;
+ // Structure compiler
+ private BackgroundCompiler backgroundCompiler;
+
+ // Used to clean up server packets that have expired.
+ // But mostly required to simulate recieving client packets.
+ private int asyncPacketTask = -1;
+ private int tickCounter = 0;
+ private static final int ASYNC_PACKET_DELAY = 1;
+
@Override
public void onLoad() {
logger = getLoggerSafely();
- protocolManager = new PacketFilterManager(getClassLoader(), logger);
+ protocolManager = new PacketFilterManager(getClassLoader(), getServer(), logger);
}
@Override
@@ -50,6 +61,12 @@ public class ProtocolLibrary extends JavaPlugin {
Server server = getServer();
PluginManager manager = server.getPluginManager();
+ // Initialize background compiler
+ if (backgroundCompiler == null) {
+ backgroundCompiler = new BackgroundCompiler(getClassLoader());
+ BackgroundCompiler.setInstance(backgroundCompiler);
+ }
+
// Notify server managers of incompatible plugins
checkForIncompatibility(manager);
@@ -59,6 +76,9 @@ public class ProtocolLibrary extends JavaPlugin {
// Inject our hook into already existing players
protocolManager.initializePlayers(server.getOnlinePlayers());
+ // Timeout
+ createAsyncTask(server);
+
// Try to enable statistics
try {
statistisc = new Statistics(this);
@@ -69,9 +89,32 @@ public class ProtocolLibrary extends JavaPlugin {
}
}
+ private void createAsyncTask(Server server) {
+ try {
+ if (asyncPacketTask >= 0)
+ throw new IllegalStateException("Async task has already been created");
+
+ // Attempt to create task
+ asyncPacketTask = server.getScheduler().scheduleSyncRepeatingTask(this, new Runnable() {
+ @Override
+ public void run() {
+ AsyncFilterManager manager = (AsyncFilterManager) protocolManager.getAsynchronousManager();
+
+ // We KNOW we're on the main thread at the moment
+ manager.sendProcessedPackets(tickCounter++, true);
+ }
+ }, ASYNC_PACKET_DELAY, ASYNC_PACKET_DELAY);
+
+ } catch (Throwable e) {
+ if (asyncPacketTask == -1) {
+ logger.log(Level.SEVERE, "Unable to create packet timeout task.", e);
+ }
+ }
+ }
+
private void checkForIncompatibility(PluginManager manager) {
// Plugin authors: Notify me to remove these
- String[] incompatiblePlugins = { "TagAPI" };
+ String[] incompatiblePlugins = {};
for (String plugin : incompatiblePlugins) {
if (manager.getPlugin(plugin) != null) {
@@ -83,6 +126,19 @@ public class ProtocolLibrary extends JavaPlugin {
@Override
public void onDisable() {
+ // Disable compiler
+ if (backgroundCompiler != null) {
+ backgroundCompiler.shutdownAll();
+ backgroundCompiler = null;
+ BackgroundCompiler.setInstance(null);
+ }
+
+ // Clean up
+ if (asyncPacketTask >= 0) {
+ getServer().getScheduler().cancelTask(asyncPacketTask);
+ asyncPacketTask = -1;
+ }
+
protocolManager.close();
protocolManager = null;
statistisc = null;
diff --git a/ProtocolLib/src/com/comphenix/protocol/ProtocolManager.java b/ProtocolLib/src/com/comphenix/protocol/ProtocolManager.java
index 87062685..eaaf3702 100644
--- a/ProtocolLib/src/com/comphenix/protocol/ProtocolManager.java
+++ b/ProtocolLib/src/com/comphenix/protocol/ProtocolManager.java
@@ -17,7 +17,6 @@
package com.comphenix.protocol;
-import java.lang.reflect.InvocationTargetException;
import java.util.List;
import java.util.Set;
@@ -35,7 +34,7 @@ import com.google.common.collect.ImmutableSet;
* Represents an API for accessing the Minecraft protocol.
* @author Kristian
*/
-public interface ProtocolManager {
+public interface ProtocolManager extends PacketStream {
/**
* Retrieves a list of every registered packet listener.
@@ -66,46 +65,6 @@ public interface ProtocolManager {
* @param plugin - the plugin to unload.
*/
public void removePacketListeners(Plugin plugin);
-
- /**
- * Send a packet to the given player.
- * @param reciever - the reciever.
- * @param packet - packet to send.
- * @throws InvocationTargetException - if an error occured when sending the packet.
- */
- public void sendServerPacket(Player reciever, PacketContainer packet)
- throws InvocationTargetException;
-
- /**
- * Send a packet to the given player.
- * @param reciever - the reciever.
- * @param packet - packet to send.
- * @param filters - whether or not to invoke any packet filters.
- * @throws InvocationTargetException - if an error occured when sending the packet.
- */
- public void sendServerPacket(Player reciever, PacketContainer packet, boolean filters)
- throws InvocationTargetException;
-
- /**
- * Simulate recieving a certain packet from a given player.
- * @param sender - the sender.
- * @param packet - the packet that was sent.
- * @throws InvocationTargetException If the reflection machinery failed.
- * @throws IllegalAccessException If the underlying method caused an error.
- */
- public void recieveClientPacket(Player sender, PacketContainer packet)
- throws IllegalAccessException, InvocationTargetException;
-
- /**
- * Simulate recieving a certain packet from a given player.
- * @param sender - the sender.
- * @param packet - the packet that was sent.
- * @param filters - whether or not to invoke any packet filters.
- * @throws InvocationTargetException If the reflection machinery failed.
- * @throws IllegalAccessException If the underlying method caused an error.
- */
- public void recieveClientPacket(Player sender, PacketContainer packet, boolean filters)
- throws IllegalAccessException, InvocationTargetException;
/**
* Constructs a new encapsulated Minecraft packet with the given ID.
@@ -163,4 +122,10 @@ public interface ProtocolManager {
* @return TRUE if it has, FALSE otherwise.
*/
public boolean isClosed();
+
+ /**
+ * Retrieve the current asyncronous packet manager.
+ * @return Asyncronous packet manager.
+ */
+ public AsynchronousManager getAsynchronousManager();
}
\ No newline at end of file
diff --git a/ProtocolLib/src/com/comphenix/protocol/async/AsyncFilterManager.java b/ProtocolLib/src/com/comphenix/protocol/async/AsyncFilterManager.java
new file mode 100644
index 00000000..0862bed4
--- /dev/null
+++ b/ProtocolLib/src/com/comphenix/protocol/async/AsyncFilterManager.java
@@ -0,0 +1,301 @@
+package com.comphenix.protocol.async;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Logger;
+
+import org.bukkit.plugin.Plugin;
+import org.bukkit.scheduler.BukkitScheduler;
+
+import com.comphenix.protocol.AsynchronousManager;
+import com.comphenix.protocol.PacketStream;
+import com.comphenix.protocol.ProtocolManager;
+import com.comphenix.protocol.events.ListeningWhitelist;
+import com.comphenix.protocol.events.PacketEvent;
+import com.comphenix.protocol.events.PacketListener;
+import com.comphenix.protocol.injector.PacketFilterManager;
+import com.comphenix.protocol.injector.PrioritizedListener;
+import com.google.common.base.Objects;
+
+/**
+ * Represents a filter manager for asynchronous packets.
+ *
+ * @author Kristian
+ */
+public class AsyncFilterManager implements AsynchronousManager {
+
+ private PacketProcessingQueue serverProcessingQueue;
+ private PacketSendingQueue serverQueue;
+
+ private PacketProcessingQueue clientProcessingQueue;
+ private PacketSendingQueue clientQueue;
+
+ private Logger logger;
+
+ // The likely main thread
+ private Thread mainThread;
+
+ // Default scheduler
+ private BukkitScheduler scheduler;
+
+ // Our protocol manager
+ private ProtocolManager manager;
+
+ // Current packet index
+ private AtomicInteger currentSendingIndex = new AtomicInteger();
+
+ // Whether or not we're currently cleaning up
+ private volatile boolean cleaningUp;
+
+ public AsyncFilterManager(Logger logger, BukkitScheduler scheduler, ProtocolManager manager) {
+
+ // Server packets are synchronized already
+ this.serverQueue = new PacketSendingQueue(false);
+ // Client packets must be synchronized
+ this.clientQueue = new PacketSendingQueue(true);
+
+ this.serverProcessingQueue = new PacketProcessingQueue(serverQueue);
+ this.clientProcessingQueue = new PacketProcessingQueue(clientQueue);
+
+ this.scheduler = scheduler;
+ this.manager = manager;
+
+ this.logger = logger;
+ this.mainThread = Thread.currentThread();
+ }
+
+ @Override
+ public AsyncListenerHandler registerAsyncHandler(PacketListener listener) {
+ AsyncListenerHandler handler = new AsyncListenerHandler(mainThread, this, listener);
+
+ ListeningWhitelist sendingWhitelist = listener.getSendingWhitelist();
+ ListeningWhitelist receivingWhitelist = listener.getReceivingWhitelist();
+
+ // We need a synchronized listener to get the ball rolling
+ boolean hasListener = true;
+
+ // Add listener to either or both processing queue
+ if (hasValidWhitelist(sendingWhitelist)) {
+ PacketFilterManager.verifyWhitelist(listener, sendingWhitelist);
+ serverProcessingQueue.addListener(handler, sendingWhitelist);
+ hasListener &= hasPacketListener(sendingWhitelist);
+ }
+
+ if (hasValidWhitelist(receivingWhitelist)) {
+ PacketFilterManager.verifyWhitelist(listener, receivingWhitelist);
+ clientProcessingQueue.addListener(handler, receivingWhitelist);
+ hasListener &= hasPacketListener(receivingWhitelist);
+ }
+
+ if (!hasListener) {
+ handler.setNullPacketListener(new NullPacketListener(listener));
+ manager.addPacketListener(handler.getNullPacketListener());
+ }
+
+ return handler;
+ }
+
+ /**
+ * Determine if the given packets are represented.
+ * @param whitelist - list of packets.
+ * @return TRUE if they are all registered, FALSE otherwise.
+ */
+ private boolean hasPacketListener(ListeningWhitelist whitelist) {
+ return manager.getSendingFilters().containsAll(whitelist.getWhitelist());
+ }
+
+ private boolean hasValidWhitelist(ListeningWhitelist whitelist) {
+ return whitelist != null && whitelist.getWhitelist().size() > 0;
+ }
+
+ @Override
+ public void unregisterAsyncHandler(AsyncListenerHandler handler) {
+ if (handler == null)
+ throw new IllegalArgumentException("listenerToken cannot be NULL");
+
+ handler.cancel();
+ }
+
+ // Called by AsyncListenerHandler
+ void unregisterAsyncHandlerInternal(AsyncListenerHandler handler) {
+
+ PacketListener listener = handler.getAsyncListener();
+ boolean synchronusOK = onMainThread();
+
+ // Unregister null packet listeners
+ if (handler.getNullPacketListener() != null) {
+ manager.removePacketListener(handler.getNullPacketListener());
+ }
+
+ // Just remove it from the queue(s)
+ if (hasValidWhitelist(listener.getSendingWhitelist())) {
+ List removed = serverProcessingQueue.removeListener(handler, listener.getSendingWhitelist());
+
+ // We're already taking care of this, so don't do anything
+ if (!cleaningUp)
+ serverQueue.signalPacketUpdate(removed, synchronusOK);
+ }
+
+ if (hasValidWhitelist(listener.getReceivingWhitelist())) {
+ List removed = clientProcessingQueue.removeListener(handler, listener.getReceivingWhitelist());
+
+ if (!cleaningUp)
+ clientQueue.signalPacketUpdate(removed, synchronusOK);
+ }
+ }
+
+ /**
+ * Determine if we're running on the main thread.
+ * @return TRUE if we are, FALSE otherwise.
+ */
+ private boolean onMainThread() {
+ return Thread.currentThread().getId() == mainThread.getId();
+ }
+
+ @Override
+ public void unregisterAsyncHandlers(Plugin plugin) {
+ unregisterAsyncHandlers(serverProcessingQueue, plugin);
+ unregisterAsyncHandlers(clientProcessingQueue, plugin);
+ }
+
+ private void unregisterAsyncHandlers(PacketProcessingQueue processingQueue, Plugin plugin) {
+
+ // Iterate through every packet listener
+ for (PrioritizedListener listener : processingQueue.values()) {
+ // Remove the listener
+ if (Objects.equal(listener.getListener().getPlugin(), plugin)) {
+ unregisterAsyncHandler(listener.getListener());
+ }
+ }
+ }
+
+ /**
+ * Enqueue a packet for asynchronous processing.
+ * @param syncPacket - synchronous packet event.
+ * @param asyncMarker - the asynchronous marker to use.
+ */
+ public void enqueueSyncPacket(PacketEvent syncPacket, AsyncMarker asyncMarker) {
+ PacketEvent newEvent = PacketEvent.fromSynchronous(syncPacket, asyncMarker);
+
+ // Start the process
+ getSendingQueue(syncPacket).enqueue(newEvent);
+
+ // We know this is occuring on the main thread, so pass TRUE
+ getProcessingQueue(syncPacket).enqueue(newEvent, true);
+ }
+
+ @Override
+ public Set getSendingFilters() {
+ return serverProcessingQueue.keySet();
+ }
+
+ @Override
+ public Set getReceivingFilters() {
+ return clientProcessingQueue.keySet();
+ }
+
+ /**
+ * Used to create a default asynchronous task.
+ * @param plugin - the calling plugin.
+ * @param runnable - the runnable.
+ */
+ public void scheduleAsyncTask(Plugin plugin, Runnable runnable) {
+ scheduler.scheduleAsyncDelayedTask(plugin, runnable);
+ }
+
+ @Override
+ public boolean hasAsynchronousListeners(PacketEvent packet) {
+ Collection> list = getProcessingQueue(packet).getListener(packet.getPacketID());
+ return list != null && list.size() > 0;
+ }
+
+ /**
+ * Construct a asynchronous marker with all the default values.
+ * @return Asynchronous marker.
+ */
+ public AsyncMarker createAsyncMarker() {
+ return createAsyncMarker(AsyncMarker.DEFAULT_SENDING_DELTA, AsyncMarker.DEFAULT_TIMEOUT_DELTA);
+ }
+
+ /**
+ * Construct an async marker with the given sending priority delta and timeout delta.
+ * @param sendingDelta - how many packets we're willing to wait.
+ * @param timeoutDelta - how long (in ms) until the packet expire.
+ * @return An async marker.
+ */
+ public AsyncMarker createAsyncMarker(long sendingDelta, long timeoutDelta) {
+ return createAsyncMarker(sendingDelta, timeoutDelta,
+ currentSendingIndex.incrementAndGet(), System.currentTimeMillis());
+ }
+
+ // Helper method
+ private AsyncMarker createAsyncMarker(long sendingDelta, long timeoutDelta, long sendingIndex, long currentTime) {
+ return new AsyncMarker(manager, sendingIndex, sendingDelta, System.currentTimeMillis(), timeoutDelta);
+ }
+
+ @Override
+ public PacketStream getPacketStream() {
+ return manager;
+ }
+
+ @Override
+ public Logger getLogger() {
+ return logger;
+ }
+
+ @Override
+ public void cleanupAll() {
+ cleaningUp = true;
+ serverProcessingQueue.cleanupAll();
+ serverQueue.cleanupAll();
+ }
+
+ /**
+ * Signal that a packet is ready to be transmitted.
+ * @param packet - packet to signal.
+ */
+ public void signalPacketUpdate(PacketEvent packet) {
+ getSendingQueue(packet).signalPacketUpdate(packet, onMainThread());
+ }
+
+ /**
+ * Retrieve the sending queue this packet belongs to.
+ * @param packet - the packet.
+ * @return The server or client sending queue the packet belongs to.
+ */
+ private PacketSendingQueue getSendingQueue(PacketEvent packet) {
+ return packet.isServerPacket() ? serverQueue : clientQueue;
+ }
+
+ /**
+ * Signal that a packet has finished processing.
+ * @param packet - packet to signal.
+ */
+ public void signalProcessingDone(PacketEvent packet) {
+ getProcessingQueue(packet).signalProcessingDone();
+ }
+
+ /**
+ * Retrieve the processing queue this packet belongs to.
+ * @param packet - the packet.
+ * @return The server or client sending processing the packet belongs to.
+ */
+ private PacketProcessingQueue getProcessingQueue(PacketEvent packet) {
+ return packet.isServerPacket() ? serverProcessingQueue : clientProcessingQueue;
+ }
+
+ /**
+ * Send any due packets, or clean up packets that have expired.
+ */
+ public void sendProcessedPackets(int tickCounter, boolean onMainThread) {
+
+ // The server queue is unlikely to need checking that often
+ if (tickCounter % 10 == 0) {
+ serverQueue.trySendPackets(onMainThread);
+ }
+
+ clientQueue.trySendPackets(onMainThread);
+ }
+}
diff --git a/ProtocolLib/src/com/comphenix/protocol/async/AsyncListenerHandler.java b/ProtocolLib/src/com/comphenix/protocol/async/AsyncListenerHandler.java
new file mode 100644
index 00000000..e4cd15fc
--- /dev/null
+++ b/ProtocolLib/src/com/comphenix/protocol/async/AsyncListenerHandler.java
@@ -0,0 +1,210 @@
+package com.comphenix.protocol.async;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.logging.Level;
+
+import org.bukkit.plugin.Plugin;
+
+import com.comphenix.protocol.events.PacketAdapter;
+import com.comphenix.protocol.events.PacketEvent;
+import com.comphenix.protocol.events.PacketListener;
+
+/**
+ * Represents a handler for an asynchronous event.
+ *
+ * @author Kristian
+ */
+public class AsyncListenerHandler {
+
+ /**
+ * Signal an end to the packet processing.
+ */
+ private static final PacketEvent INTERUPT_PACKET = new PacketEvent(new Object());
+
+ // Default queue capacity
+ private static int DEFAULT_CAPACITY = 1024;
+
+ // Cancel the async handler
+ private volatile boolean cancelled;
+
+ // If we've started the listener loop before
+ private volatile boolean started;
+
+ // The packet listener
+ private PacketListener listener;
+
+ // The filter manager
+ private AsyncFilterManager filterManager;
+ private NullPacketListener nullPacketListener;
+
+ // List of queued packets
+ private ArrayBlockingQueue queuedPackets = new ArrayBlockingQueue(DEFAULT_CAPACITY);
+
+ // Minecraft main thread
+ private Thread mainThread;
+
+ public AsyncListenerHandler(Thread mainThread, AsyncFilterManager filterManager, PacketListener listener) {
+ if (filterManager == null)
+ throw new IllegalArgumentException("filterManager cannot be NULL");
+ if (listener == null)
+ throw new IllegalArgumentException("listener cannot be NULL");
+
+ this.mainThread = mainThread;
+ this.filterManager = filterManager;
+ this.listener = listener;
+ }
+
+ public boolean isCancelled() {
+ return cancelled;
+ }
+
+ public PacketListener getAsyncListener() {
+ return listener;
+ }
+
+ /**
+ * Set the synchronized listener that has been automatically created.
+ * @param nullPacketListener - automatically created listener.
+ */
+ void setNullPacketListener(NullPacketListener nullPacketListener) {
+ this.nullPacketListener = nullPacketListener;
+ }
+
+ /**
+ * Retrieve the synchronized listener that was automatically created.
+ * @return Automatically created listener.
+ */
+ PacketListener getNullPacketListener() {
+ return nullPacketListener;
+ }
+
+ /**
+ * Cancel the handler.
+ */
+ public void cancel() {
+ // Remove the listener as quickly as possible
+ close();
+
+ // Poison Pill Shutdown
+ queuedPackets.clear();
+ queuedPackets.add(INTERUPT_PACKET);
+ }
+
+ /**
+ * Queue a packet for processing.
+ * @param packet - a packet for processing.
+ * @throws IllegalStateException If the underlying packet queue is full.
+ */
+ public void enqueuePacket(PacketEvent packet) {
+ if (packet == null)
+ throw new IllegalArgumentException("packet is NULL");
+
+ queuedPackets.add(packet);
+ }
+
+ /**
+ * Create a runnable that will initiate the listener loop.
+ *
+ * Warning: Never call the run() method in the main thread.
+ */
+ public Runnable getListenerLoop() {
+ return new Runnable() {
+ @Override
+ public void run() {
+ listenerLoop();
+ }
+ };
+ }
+
+ // DO NOT call this method from the main thread
+ private void listenerLoop() {
+
+ // Danger, danger!
+ if (Thread.currentThread().getId() == mainThread.getId())
+ throw new IllegalStateException("Do not call this method from the main thread.");
+ if (started)
+ throw new IllegalStateException("A listener cannot be run by multiple threads. Create a new listener instead.");
+ if (cancelled)
+ throw new IllegalStateException("Listener has been cancelled. Create a new listener instead.");
+
+ // Proceed
+ started = true;
+
+ try {
+ mainLoop:
+ while (!cancelled) {
+ PacketEvent packet = queuedPackets.take();
+ AsyncMarker marker = packet.getAsyncMarker();
+
+ // Handle cancel requests
+ if (packet == null || marker == null || !packet.isAsynchronous()) {
+ break;
+ }
+
+ // Here's the core of the asynchronous processing
+ try {
+ if (packet.isServerPacket())
+ listener.onPacketSending(packet);
+ else
+ listener.onPacketReceiving(packet);
+
+ } catch (Throwable e) {
+ // Minecraft doesn't want your Exception.
+ filterManager.getLogger().log(Level.SEVERE,
+ "Unhandled exception occured in onAsyncPacket() for " + getPluginName(), e);
+ }
+
+ // Now, get the next non-cancelled listener
+ for (; marker.getListenerTraversal().hasNext(); ) {
+ AsyncListenerHandler handler = marker.getListenerTraversal().next().getListener();
+
+ if (!handler.isCancelled()) {
+ handler.enqueuePacket(packet);
+ continue mainLoop;
+ }
+ }
+
+ // There are no more listeners - queue the packet for transmission
+ filterManager.signalPacketUpdate(packet);
+ filterManager.signalProcessingDone(packet);
+ }
+
+ } catch (InterruptedException e) {
+ // We're done
+ }
+
+ // Clean up
+ close();
+ }
+
+ private void close() {
+ // Remove the listener itself
+ if (!cancelled) {
+ filterManager.unregisterAsyncHandlerInternal(this);
+ cancelled = true;
+ started = false;
+ }
+ }
+
+ private String getPluginName() {
+ return PacketAdapter.getPluginName(listener);
+ }
+
+ /**
+ * Retrieve the plugin associated with this async listener.
+ * @return The plugin.
+ */
+ public Plugin getPlugin() {
+ return listener != null ? listener.getPlugin() : null;
+ }
+
+ /**
+ * Start the asynchronous listener using the Bukkit scheduler.
+ */
+ public void start() {
+ if (listener.getPlugin() == null)
+ throw new IllegalArgumentException("Cannot start task without a valid plugin.");
+
+ filterManager.scheduleAsyncTask(listener.getPlugin(), getListenerLoop());
+ }
+}
diff --git a/ProtocolLib/src/com/comphenix/protocol/async/AsyncMarker.java b/ProtocolLib/src/com/comphenix/protocol/async/AsyncMarker.java
new file mode 100644
index 00000000..8cecb1c8
--- /dev/null
+++ b/ProtocolLib/src/com/comphenix/protocol/async/AsyncMarker.java
@@ -0,0 +1,305 @@
+package com.comphenix.protocol.async;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Iterator;
+import java.util.List;
+
+import net.minecraft.server.Packet;
+
+import com.comphenix.protocol.PacketStream;
+import com.comphenix.protocol.events.PacketEvent;
+import com.comphenix.protocol.injector.PrioritizedListener;
+import com.comphenix.protocol.reflect.FieldAccessException;
+import com.comphenix.protocol.reflect.FuzzyReflection;
+import com.google.common.primitives.Longs;
+
+/**
+ * Contains information about the packet that is being processed by asynchronous listeners.
+ *
+ * Asynchronous listeners can use this to set packet timeout or transmission order.
+ *
+ * @author Kristian
+ */
+public class AsyncMarker implements Serializable, Comparable {
+
+ /**
+ * Generated by Eclipse.
+ */
+ private static final long serialVersionUID = -2621498096616187384L;
+
+ /**
+ * Default number of milliseconds until a packet will rejected.
+ */
+ public static final int DEFAULT_TIMEOUT_DELTA = 60000;
+
+ /**
+ * Default number of packets to skip.
+ */
+ public static final int DEFAULT_SENDING_DELTA = 0;
+
+ /**
+ * The packet stream responsible for transmitting the packet when it's done processing.
+ */
+ private transient PacketStream packetStream;
+
+ /**
+ * Current list of async packet listeners.
+ */
+ private transient Iterator> listenerTraversal;
+
+ // Timeout handling
+ private long initialTime;
+ private long timeout;
+
+ // Packet order
+ private long originalSendingIndex;
+ private long newSendingIndex;
+
+ // Whether or not the packet has been processed by the listeners
+ private volatile boolean processed;
+
+ // Whether or not the packet has been sent
+ private volatile boolean transmitted;
+
+ // Whether or not the asynchronous processing itself should be cancelled
+ private volatile boolean asyncCancelled;
+
+ // Determine if Minecraft processes this packet asynchronously
+ private static Method isMinecraftAsync;
+ private static boolean alwaysSync;
+
+ /**
+ * Create a container for asyncronous packets.
+ * @param initialTime - the current time in milliseconds since 01.01.1970 00:00.
+ */
+ AsyncMarker(PacketStream packetStream, long sendingIndex, long sendingDelta, long initialTime, long timeoutDelta) {
+ if (packetStream == null)
+ throw new IllegalArgumentException("packetStream cannot be NULL");
+
+ this.packetStream = packetStream;
+
+ // Timeout
+ this.initialTime = initialTime;
+ this.timeout = initialTime + timeoutDelta;
+
+ // Sending index
+ this.originalSendingIndex = sendingIndex;
+ this.newSendingIndex = sendingIndex;
+ }
+
+ /**
+ * Retrieve the time the packet was initially queued for asynchronous processing.
+ * @return The initial time in number of milliseconds since 01.01.1970 00:00.
+ */
+ public long getInitialTime() {
+ return initialTime;
+ }
+
+ /**
+ * Retrieve the time the packet will be forcefully rejected.
+ * @return The time to reject the packet, in milliseconds since 01.01.1970 00:00.
+ */
+ public long getTimeout() {
+ return timeout;
+ }
+
+ /**
+ * Set the time the packet will be forcefully rejected.
+ * @param timeout - time to reject the packet, in milliseconds since 01.01.1970 00:00.
+ */
+ public void setTimeout(long timeout) {
+ this.timeout = timeout;
+ }
+
+ /**
+ * Retrieve the order the packet was originally transmitted.
+ * @return The original packet index.
+ */
+ public long getOriginalSendingIndex() {
+ return originalSendingIndex;
+ }
+
+ /**
+ * Retrieve the desired sending order after processing has completed.
+ *
+ * Higher sending order means lower priority.
+ * @return Desired sending order.
+ */
+ public long getNewSendingIndex() {
+ return newSendingIndex;
+ }
+
+ /**
+ * Sets the desired sending order after processing has completed.
+ *
+ * Higher sending order means lower priority.
+ * @param newSendingIndex - new packet send index.
+ */
+ public void setNewSendingIndex(long newSendingIndex) {
+ this.newSendingIndex = newSendingIndex;
+ }
+
+ /**
+ * Retrieve the packet stream responsible for transmitting this packet.
+ * @return The packet stream.
+ */
+ public PacketStream getPacketStream() {
+ return packetStream;
+ }
+
+ /**
+ * Sets the output packet stream responsible for transmitting this packet.
+ * @param packetStream - new output packet stream.
+ */
+ public void setPacketStream(PacketStream packetStream) {
+ this.packetStream = packetStream;
+ }
+
+ /**
+ * Retrieve whether or not this packet has been processed by the async listeners.
+ * @return TRUE if it has been processed, FALSE otherwise.
+ */
+ public boolean isProcessed() {
+ return processed;
+ }
+
+ /**
+ * Sets whether or not this packet has been processed by the async listeners.
+ * @param processed - TRUE if it has, FALSE otherwise.
+ */
+ void setProcessed(boolean processed) {
+ this.processed = processed;
+ }
+
+ /**
+ * Retrieve whether or not this packet has already been sent.
+ * @return TRUE if it has been sent before, FALSE otherwise.
+ */
+ public boolean isTransmitted() {
+ return transmitted;
+ }
+
+ /**
+ * Determine if this packet has expired.
+ * @return TRUE if it has, FALSE otherwise.
+ */
+ public boolean hasExpired() {
+ return hasExpired(System.currentTimeMillis());
+ }
+
+ /**
+ * Determine if this packet has expired given this time.
+ * @param currentTime - the current time in milliseconds since 01.01.1970 00:00.
+ * @return TRUE if it has, FALSE otherwise.
+ */
+ public boolean hasExpired(long currentTime) {
+ return timeout < currentTime;
+ }
+
+ /**
+ * Determine if the asynchronous handling should be cancelled.
+ * @return TRUE if it should, FALSE otherwise.
+ */
+ public boolean isAsyncCancelled() {
+ return asyncCancelled;
+ }
+
+ /**
+ * Set whether or not the asynchronous handling should be cancelled.
+ * @param asyncCancelled - TRUE to cancel it, FALSE otherwise.
+ */
+ public void setAsyncCancelled(boolean asyncCancelled) {
+ this.asyncCancelled = asyncCancelled;
+ }
+
+ /**
+ * Retrieve iterator for the next listener in line.
+ * @return Next async packet listener iterator.
+ */
+ public Iterator> getListenerTraversal() {
+ return listenerTraversal;
+ }
+
+ /**
+ * Set the iterator for the next listener.
+ * @param listenerTraversal - the new async packet listener iterator.
+ */
+ void setListenerTraversal(Iterator> listenerTraversal) {
+ this.listenerTraversal = listenerTraversal;
+ }
+
+ /**
+ * Transmit a given packet to the current packet stream.
+ * @param event - the packet to send.
+ * @throws IOException If the packet couldn't be sent.
+ */
+ public void sendPacket(PacketEvent event) throws IOException {
+ try {
+ if (event.isServerPacket()) {
+ packetStream.sendServerPacket(event.getPlayer(), event.getPacket(), false);
+ } else {
+ packetStream.recieveClientPacket(event.getPlayer(), event.getPacket(), false);
+ }
+ transmitted = true;
+
+ } catch (InvocationTargetException e) {
+ throw new IOException("Cannot send packet", e);
+ } catch (IllegalAccessException e) {
+ throw new IOException("Cannot send packet", e);
+ }
+ }
+
+ /**
+ * Determine if Minecraft allows asynchronous processing of this packet.
+ * @return TRUE if it does, FALSE otherwise.
+ */
+ public boolean isMinecraftAsync(PacketEvent event) throws FieldAccessException {
+
+ if (isMinecraftAsync == null && !alwaysSync) {
+ try {
+ isMinecraftAsync = FuzzyReflection.fromClass(Packet.class).getMethodByName("a_.*");
+ } catch (RuntimeException e) {
+ // This will occur in 1.2.5 (or possibly in later versions)
+ List methods = FuzzyReflection.fromClass(Packet.class).
+ getMethodListByParameters(boolean.class, new Class[] {});
+
+ // Try to look for boolean methods
+ if (methods.size() == 2) {
+ isMinecraftAsync = methods.get(1);
+ } else if (methods.size() == 1) {
+ // We're in 1.2.5
+ alwaysSync = true;
+ } else {
+ System.err.println("Cannot determine asynchronous state of packets!");
+ alwaysSync = true;
+ }
+ }
+ }
+
+ if (alwaysSync) {
+ return false;
+ } else {
+ try {
+ // Wrap exceptions
+ return (Boolean) isMinecraftAsync.invoke(event.getPacket().getHandle());
+ } catch (IllegalArgumentException e) {
+ throw new FieldAccessException("Illegal argument", e);
+ } catch (IllegalAccessException e) {
+ throw new FieldAccessException("Unable to reflect method call 'a_', or: isAsyncPacket.", e);
+ } catch (InvocationTargetException e) {
+ throw new FieldAccessException("Minecraft error", e);
+ }
+ }
+ }
+
+ @Override
+ public int compareTo(AsyncMarker o) {
+ if (o == null)
+ return 1;
+ else
+ return Longs.compare(getNewSendingIndex(), o.getNewSendingIndex());
+ }
+}
diff --git a/ProtocolLib/src/com/comphenix/protocol/async/NullPacketListener.java b/ProtocolLib/src/com/comphenix/protocol/async/NullPacketListener.java
new file mode 100644
index 00000000..d02559d3
--- /dev/null
+++ b/ProtocolLib/src/com/comphenix/protocol/async/NullPacketListener.java
@@ -0,0 +1,62 @@
+package com.comphenix.protocol.async;
+
+import org.bukkit.plugin.Plugin;
+
+import com.comphenix.protocol.events.ListenerPriority;
+import com.comphenix.protocol.events.ListeningWhitelist;
+import com.comphenix.protocol.events.PacketEvent;
+import com.comphenix.protocol.events.PacketListener;
+
+/**
+ * Represents a NO OPERATION listener.
+ *
+ * @author Kristian
+ */
+class NullPacketListener implements PacketListener {
+
+ private ListeningWhitelist sendingWhitelist;
+ private ListeningWhitelist receivingWhitelist;
+ private Plugin plugin;
+
+ /**
+ * Create a no-op listener with the same whitelist and plugin as the given listener.
+ * @param original - the packet listener to copy.
+ */
+ public NullPacketListener(PacketListener original) {
+ this.sendingWhitelist = cloneWhitelist(ListenerPriority.LOW, original.getSendingWhitelist());
+ this.receivingWhitelist = cloneWhitelist(ListenerPriority.LOW, original.getReceivingWhitelist());
+ this.plugin = original.getPlugin();
+ }
+
+ @Override
+ public void onPacketSending(PacketEvent event) {
+ // NULL
+ }
+
+ @Override
+ public void onPacketReceiving(PacketEvent event) {
+ // NULL
+ }
+
+ @Override
+ public ListeningWhitelist getSendingWhitelist() {
+ return sendingWhitelist;
+ }
+
+ @Override
+ public ListeningWhitelist getReceivingWhitelist() {
+ return receivingWhitelist;
+ }
+
+ private ListeningWhitelist cloneWhitelist(ListenerPriority priority, ListeningWhitelist whitelist) {
+ if (whitelist != null)
+ return new ListeningWhitelist(priority, whitelist.getWhitelist());
+ else
+ return null;
+ }
+
+ @Override
+ public Plugin getPlugin() {
+ return plugin;
+ }
+}
diff --git a/ProtocolLib/src/com/comphenix/protocol/async/PacketProcessingQueue.java b/ProtocolLib/src/com/comphenix/protocol/async/PacketProcessingQueue.java
new file mode 100644
index 00000000..37349e83
--- /dev/null
+++ b/ProtocolLib/src/com/comphenix/protocol/async/PacketProcessingQueue.java
@@ -0,0 +1,133 @@
+package com.comphenix.protocol.async;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Semaphore;
+
+import com.comphenix.protocol.concurrency.AbstractConcurrentListenerMultimap;
+import com.comphenix.protocol.events.PacketEvent;
+import com.comphenix.protocol.injector.PrioritizedListener;
+
+/**
+ * Handles the processing of every packet type.
+ *
+ * @author Kristian
+ */
+class PacketProcessingQueue extends AbstractConcurrentListenerMultimap {
+
+ /**
+ * Default maximum number of packets to process concurrently.
+ */
+ public static final int DEFAULT_MAXIMUM_CONCURRENCY = 32;
+
+ /**
+ * Default maximum number of packets to queue for processing.
+ */
+ public static final int DEFAULT_QUEUE_LIMIT = 1024 * 60;
+
+ /**
+ * Number of packets we're processing concurrently.
+ */
+ private final int maximumConcurrency;
+ private Semaphore concurrentProcessing;
+
+ // Queued packets for being processed
+ private ArrayBlockingQueue processingQueue;
+
+ // Packets for sending
+ private PacketSendingQueue sendingQueue;
+
+ public PacketProcessingQueue(PacketSendingQueue sendingQueue) {
+ this(sendingQueue, DEFAULT_QUEUE_LIMIT, DEFAULT_MAXIMUM_CONCURRENCY);
+ }
+
+ public PacketProcessingQueue(PacketSendingQueue sendingQueue, int queueLimit, int maximumConcurrency) {
+ super();
+ this.processingQueue = new ArrayBlockingQueue(queueLimit);
+ this.maximumConcurrency = maximumConcurrency;
+ this.concurrentProcessing = new Semaphore(maximumConcurrency);
+ this.sendingQueue = sendingQueue;
+ }
+
+ /**
+ * Enqueue a packet for processing by the asynchronous listeners.
+ * @param packet - packet to process.
+ * @param onMainThread - whether or not this is occuring on the main thread.
+ * @return TRUE if we sucessfully queued the packet, FALSE if the queue ran out if space.
+ */
+ public boolean enqueue(PacketEvent packet, boolean onMainThread) {
+ try {
+ processingQueue.add(packet);
+
+ // Begin processing packets
+ signalBeginProcessing(onMainThread);
+ return true;
+ } catch (IllegalStateException e) {
+ return false;
+ }
+ }
+
+ /**
+ * Called by the current method and each thread to signal that a packet might be ready for processing.
+ * @param onMainThread - whether or not this is occuring on the main thread.
+ */
+ public void signalBeginProcessing(boolean onMainThread) {
+ while (concurrentProcessing.tryAcquire()) {
+ PacketEvent packet = processingQueue.poll();
+
+ // Any packet queued?
+ if (packet != null) {
+ Collection> list = getListener(packet.getPacketID());
+ AsyncMarker marker = packet.getAsyncMarker();
+
+ // Yes, removing the marker will cause the chain to stop
+ if (list != null) {
+ Iterator> iterator = list.iterator();
+
+ if (iterator.hasNext()) {
+ marker.setListenerTraversal(iterator);
+ iterator.next().getListener().enqueuePacket(packet);
+ continue;
+ }
+ }
+
+ // The packet has no further listeners. Just send it.
+ sendingQueue.signalPacketUpdate(packet, onMainThread);
+ signalProcessingDone();
+
+ } else {
+ // No more queued packets.
+ signalProcessingDone();
+ return;
+ }
+ }
+ }
+
+ /**
+ * Called when a packet has been processed.
+ */
+ public void signalProcessingDone() {
+ concurrentProcessing.release();
+ }
+
+ /**
+ * Retrieve the maximum number of packets to process at any given time.
+ * @return Number of simultaneous packet to process.
+ */
+ public int getMaximumConcurrency() {
+ return maximumConcurrency;
+ }
+
+ public void cleanupAll() {
+ // Cancel all the threads and every listener
+ for (PrioritizedListener handler : values()) {
+ if (handler != null) {
+ handler.getListener().cancel();
+ }
+ }
+
+ // Remove the rest, just in case
+ clearListeners();
+ }
+}
diff --git a/ProtocolLib/src/com/comphenix/protocol/async/PacketSendingQueue.java b/ProtocolLib/src/com/comphenix/protocol/async/PacketSendingQueue.java
new file mode 100644
index 00000000..500cf2c3
--- /dev/null
+++ b/ProtocolLib/src/com/comphenix/protocol/async/PacketSendingQueue.java
@@ -0,0 +1,177 @@
+package com.comphenix.protocol.async;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.PriorityBlockingQueue;
+
+import com.comphenix.protocol.events.PacketEvent;
+import com.comphenix.protocol.reflect.FieldAccessException;
+import com.google.common.collect.ComparisonChain;
+
+/**
+ * Represents packets ready to be transmitted to a client.
+ * @author Kristian
+ */
+class PacketSendingQueue {
+
+ private static final int INITIAL_CAPACITY = 64;
+
+ private PriorityBlockingQueue sendingQueue;
+
+ // Whether or not packet transmission can only occur on the main thread
+ private final boolean synchronizeMain;
+
+ /**
+ * Create a packet sending queue.
+ * @param synchronizeMain - whether or not to synchronize with the main thread.
+ */
+ public PacketSendingQueue(boolean synchronizeMain) {
+ this.synchronizeMain = synchronizeMain;
+ this.sendingQueue = new PriorityBlockingQueue(INITIAL_CAPACITY, new Comparator() {
+ // Compare using the async marker
+ @Override
+ public int compare(PacketEvent o1, PacketEvent o2) {
+ return ComparisonChain.start().
+ compare(o1.getAsyncMarker(), o2.getAsyncMarker()).
+ result();
+ }
+ });
+ }
+
+ /**
+ * Enqueue a packet for sending.
+ * @param packet
+ */
+ public void enqueue(PacketEvent packet) {
+ sendingQueue.add(packet);
+ }
+
+ /**
+ * Invoked when one of the packets have finished processing.
+ * @param packetUpdated - the packet that has now been updated.
+ * @param onMainThread - whether or not this is occuring on the main thread.
+ */
+ public synchronized void signalPacketUpdate(PacketEvent packetUpdated, boolean onMainThread) {
+ // Mark this packet as finished
+ packetUpdated.getAsyncMarker().setProcessed(true);
+ trySendPackets(onMainThread);
+ }
+
+ /***
+ * Invoked when a list of packet IDs are no longer associated with any listeners.
+ * @param packetsRemoved - packets that no longer have any listeners.
+ * @param onMainThread - whether or not this is occuring on the main thread.
+ */
+ public synchronized void signalPacketUpdate(List packetsRemoved, boolean onMainThread) {
+
+ Set lookup = new HashSet(packetsRemoved);
+
+ // Note that this is O(n), so it might be expensive
+ for (PacketEvent event : sendingQueue) {
+ if (lookup.contains(event.getPacketID())) {
+ event.getAsyncMarker().setProcessed(true);
+ }
+ }
+
+ // This is likely to have changed the situation a bit
+ trySendPackets(onMainThread);
+ }
+
+ /**
+ * Attempt to send any remaining packets.
+ * @param onMainThread - whether or not this is occuring on the main thread.
+ */
+ public void trySendPackets(boolean onMainThread) {
+
+ // Transmit as many packets as we can
+ while (true) {
+ PacketEvent current = sendingQueue.peek();
+
+ if (current != null) {
+ AsyncMarker marker = current.getAsyncMarker();
+
+ // Abort if we're not on the main thread
+ if (synchronizeMain) {
+ try {
+ boolean wantAsync = marker.isMinecraftAsync(current);
+ boolean wantSync = !wantAsync;
+
+ // Quit if we haven't fulfilled our promise
+ if ((onMainThread && wantAsync) || (!onMainThread && wantSync))
+ return;
+
+ } catch (FieldAccessException e) {
+ e.printStackTrace();
+ return;
+ }
+ }
+
+ if (marker.isProcessed() || marker.hasExpired()) {
+ if (marker.isProcessed() && !current.isCancelled()) {
+ sendPacket(current);
+ }
+
+ sendingQueue.poll();
+ continue;
+ }
+ }
+
+ // Only repeat when packets are removed
+ break;
+ }
+ }
+
+ /**
+ * Send every packet, regardless of the processing state.
+ */
+ private void forceSend() {
+ while (true) {
+ PacketEvent current = sendingQueue.poll();
+
+ if (current != null) {
+ sendPacket(current);
+ } else {
+ break;
+ }
+ }
+ }
+
+ /**
+ * Whether or not the packet transmission must synchronize with the main thread.
+ * @return TRUE if it must, FALSE otherwise.
+ */
+ public boolean isSynchronizeMain() {
+ return synchronizeMain;
+ }
+
+ /**
+ * Transmit a packet, if it hasn't already.
+ * @param event - the packet to transmit.
+ */
+ private void sendPacket(PacketEvent event) {
+
+ AsyncMarker marker = event.getAsyncMarker();
+
+ try {
+ // Don't send a packet twice
+ if (marker != null && !marker.isTransmitted()) {
+ marker.sendPacket(event);
+ }
+
+ } catch (IOException e) {
+ // Just print the error
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * Automatically transmits every delayed packet.
+ */
+ public void cleanupAll() {
+ // Note that the cleanup itself will always occur on the main thread
+ forceSend();
+ }
+}
diff --git a/ProtocolLib/src/com/comphenix/protocol/concurrency/AbstractConcurrentListenerMultimap.java b/ProtocolLib/src/com/comphenix/protocol/concurrency/AbstractConcurrentListenerMultimap.java
new file mode 100644
index 00000000..00741fe2
--- /dev/null
+++ b/ProtocolLib/src/com/comphenix/protocol/concurrency/AbstractConcurrentListenerMultimap.java
@@ -0,0 +1,131 @@
+package com.comphenix.protocol.concurrency;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import com.comphenix.protocol.events.ListeningWhitelist;
+import com.comphenix.protocol.injector.PrioritizedListener;
+import com.google.common.collect.Iterables;
+
+/**
+ * A thread-safe implementation of a listener multimap.
+ *
+ * @author Kristian
+ */
+public abstract class AbstractConcurrentListenerMultimap {
+
+ // The core of our map
+ private ConcurrentMap>> listeners =
+ new ConcurrentHashMap>>();
+
+ /**
+ * Adds a listener to its requested list of packet recievers.
+ * @param listener - listener with a list of packets to recieve notifcations for.
+ * @param whitelist - the packet whitelist to use.
+ */
+ public void addListener(TListener listener, ListeningWhitelist whitelist) {
+
+ PrioritizedListener prioritized = new PrioritizedListener(listener, whitelist.getPriority());
+
+ for (Integer packetID : whitelist.getWhitelist()) {
+ addListener(packetID, prioritized);
+ }
+ }
+
+ // Add the listener to a specific packet notifcation list
+ private void addListener(Integer packetID, PrioritizedListener listener) {
+
+ SortedCopyOnWriteArray> list = listeners.get(packetID);
+
+ // We don't want to create this for every lookup
+ if (list == null) {
+ // It would be nice if we could use a PriorityBlockingQueue, but it doesn't preseve iterator order,
+ // which is a essential feature for our purposes.
+ final SortedCopyOnWriteArray> value = new SortedCopyOnWriteArray>();
+
+ list = listeners.putIfAbsent(packetID, value);
+
+ // We may end up creating multiple multisets, but we'll agree
+ // on the one to use.
+ if (list == null) {
+ list = value;
+ }
+ }
+
+ // Thread safe
+ list.add(listener);
+ }
+
+ /**
+ * Removes the given listener from the packet event list.
+ * @param listener - listener to remove.
+ * @param whitelist - the packet whitelist that was used.
+ * @return Every packet ID that was removed due to no listeners.
+ */
+ public List removeListener(TListener listener, ListeningWhitelist whitelist) {
+
+ List removedPackets = new ArrayList();
+
+ // Again, not terribly efficient. But adding or removing listeners should be a rare event.
+ for (Integer packetID : whitelist.getWhitelist()) {
+
+ SortedCopyOnWriteArray> list = listeners.get(packetID);
+
+ // Remove any listeners
+ if (list != null) {
+ // Don't remove from newly created lists
+ if (list.size() > 0) {
+ // Remove this listener. Note that priority is generally ignored.
+ list.remove(new PrioritizedListener(listener, whitelist.getPriority()));
+
+ if (list.size() == 0) {
+ listeners.remove(packetID);
+ removedPackets.add(packetID);
+ }
+ }
+ }
+
+ // Move on to the next
+ }
+
+ return removedPackets;
+ }
+
+ /**
+ * Retrieve the registered listeners, in order from the lowest to the highest priority.
+ *
+ * The returned list is thread-safe and doesn't require synchronization.
+ * @param packetID - packet ID.
+ * @return Registered listeners.
+ */
+ public Collection> getListener(int packetID) {
+ return listeners.get(packetID);
+ }
+
+ /**
+ * Retrieve every listener.
+ * @return Every listener.
+ */
+ public Iterable> values() {
+ return Iterables.concat(listeners.values());
+ }
+
+ /**
+ * Retrieve every registered packet ID:
+ * @return Registered packet ID.
+ */
+ public Set keySet() {
+ return listeners.keySet();
+ }
+
+ /**
+ * Remove all packet listeners.
+ */
+ protected void clearListeners() {
+ listeners.clear();
+ }
+}
diff --git a/ProtocolLib/src/com/comphenix/protocol/events/ListeningWhitelist.java b/ProtocolLib/src/com/comphenix/protocol/events/ListeningWhitelist.java
index ee1a3870..cca5740d 100644
--- a/ProtocolLib/src/com/comphenix/protocol/events/ListeningWhitelist.java
+++ b/ProtocolLib/src/com/comphenix/protocol/events/ListeningWhitelist.java
@@ -62,7 +62,7 @@ public class ListeningWhitelist {
* @return TRUE if there are any packets, FALSE otherwise.
*/
public boolean isEnabled() {
- return whitelist != null || whitelist.size() > 0;
+ return whitelist != null && whitelist.size() > 0;
}
/**
@@ -86,6 +86,23 @@ public class ListeningWhitelist {
return Objects.hashCode(priority, whitelist);
}
+ /**
+ * Determine if any of the given IDs can be found in the whitelist.
+ * @param whitelist - whitelist to test.
+ * @param idList - list of packet IDs to find.
+ * @return TRUE if any of the packets in the list can be found in the whitelist, FALSE otherwise.
+ */
+ public static boolean containsAny(ListeningWhitelist whitelist, int... idList) {
+ if (whitelist != null) {
+ for (int i = 0; i < idList.length; i++) {
+ if (whitelist.getWhitelist().contains(idList[i]))
+ return true;
+ }
+ }
+
+ return false;
+ }
+
@Override
public boolean equals(final Object obj){
if(obj instanceof ListeningWhitelist){
diff --git a/ProtocolLib/src/com/comphenix/protocol/events/PacketContainer.java b/ProtocolLib/src/com/comphenix/protocol/events/PacketContainer.java
index 09207da9..34bd0a49 100644
--- a/ProtocolLib/src/com/comphenix/protocol/events/PacketContainer.java
+++ b/ProtocolLib/src/com/comphenix/protocol/events/PacketContainer.java
@@ -17,6 +17,12 @@
package com.comphenix.protocol.events;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
@@ -40,13 +46,18 @@ import net.minecraft.server.Packet;
*
* @author Kristian
*/
-public class PacketContainer {
+public class PacketContainer implements Serializable {
- protected Packet handle;
- protected int id;
+ /**
+ * Generated by Eclipse.
+ */
+ private static final long serialVersionUID = 2074805748222377230L;
+ protected int id;
+ protected transient Packet handle;
+
// Current structure modifier
- protected StructureModifier