diff --git a/ProtocolLib/src/com/comphenix/protocol/AsynchronousManager.java b/ProtocolLib/src/com/comphenix/protocol/AsynchronousManager.java
index eca8466a..55d75f99 100644
--- a/ProtocolLib/src/com/comphenix/protocol/AsynchronousManager.java
+++ b/ProtocolLib/src/com/comphenix/protocol/AsynchronousManager.java
@@ -20,7 +20,6 @@ public interface AsynchronousManager {
* Registers an asynchronous packet handler.
*
* To start listening asynchronously, pass the getListenerLoop() runnable to a different thread.
- * @param plugin - the plugin that is registering the handler.
* @param listener - the packet listener that will recieve these asynchronous events.
* @return An asynchrouns handler.
*/
diff --git a/ProtocolLib/src/com/comphenix/protocol/async/AsyncFilterManager.java b/ProtocolLib/src/com/comphenix/protocol/async/AsyncFilterManager.java
index 0862bed4..ecc841d0 100644
--- a/ProtocolLib/src/com/comphenix/protocol/async/AsyncFilterManager.java
+++ b/ProtocolLib/src/com/comphenix/protocol/async/AsyncFilterManager.java
@@ -53,6 +53,7 @@ public class AsyncFilterManager implements AsynchronousManager {
// Server packets are synchronized already
this.serverQueue = new PacketSendingQueue(false);
+
// Client packets must be synchronized
this.clientQueue = new PacketSendingQueue(true);
@@ -68,28 +69,40 @@ public class AsyncFilterManager implements AsynchronousManager {
@Override
public AsyncListenerHandler registerAsyncHandler(PacketListener listener) {
+ return registerAsyncHandler(listener, true);
+ }
+
+ /**
+ * Registers an asynchronous packet handler.
+ *
+ * To start listening asynchronously, pass the getListenerLoop() runnable to a different thread.
+ *
+ * Asynchronous events will only be executed if a synchronous listener with the same packets is registered.
+ * If you already have a synchronous event, call this method with autoInject set to FALSE.
+ *
+ * @param listener - the packet listener that will recieve these asynchronous events.
+ * @param autoInject - whether or not to automatically create the corresponding synchronous listener,
+ * @return An asynchrouns handler.
+ */
+ public AsyncListenerHandler registerAsyncHandler(PacketListener listener, boolean autoInject) {
AsyncListenerHandler handler = new AsyncListenerHandler(mainThread, this, listener);
ListeningWhitelist sendingWhitelist = listener.getSendingWhitelist();
ListeningWhitelist receivingWhitelist = listener.getReceivingWhitelist();
- // We need a synchronized listener to get the ball rolling
- boolean hasListener = true;
-
// Add listener to either or both processing queue
if (hasValidWhitelist(sendingWhitelist)) {
PacketFilterManager.verifyWhitelist(listener, sendingWhitelist);
serverProcessingQueue.addListener(handler, sendingWhitelist);
- hasListener &= hasPacketListener(sendingWhitelist);
}
if (hasValidWhitelist(receivingWhitelist)) {
PacketFilterManager.verifyWhitelist(listener, receivingWhitelist);
clientProcessingQueue.addListener(handler, receivingWhitelist);
- hasListener &= hasPacketListener(receivingWhitelist);
}
- if (!hasListener) {
+ // We need a synchronized listener to get the ball rolling
+ if (autoInject) {
handler.setNullPacketListener(new NullPacketListener(listener));
manager.addPacketListener(handler.getNullPacketListener());
}
@@ -97,15 +110,6 @@ public class AsyncFilterManager implements AsynchronousManager {
return handler;
}
- /**
- * Determine if the given packets are represented.
- * @param whitelist - list of packets.
- * @return TRUE if they are all registered, FALSE otherwise.
- */
- private boolean hasPacketListener(ListeningWhitelist whitelist) {
- return manager.getSendingFilters().containsAll(whitelist.getWhitelist());
- }
-
private boolean hasValidWhitelist(ListeningWhitelist whitelist) {
return whitelist != null && whitelist.getWhitelist().size() > 0;
}
diff --git a/ProtocolLib/src/com/comphenix/protocol/async/AsyncListenerHandler.java b/ProtocolLib/src/com/comphenix/protocol/async/AsyncListenerHandler.java
index e4cd15fc..87c09e26 100644
--- a/ProtocolLib/src/com/comphenix/protocol/async/AsyncListenerHandler.java
+++ b/ProtocolLib/src/com/comphenix/protocol/async/AsyncListenerHandler.java
@@ -1,6 +1,10 @@
package com.comphenix.protocol.async;
+import java.util.HashSet;
+import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import org.bukkit.plugin.Plugin;
@@ -17,18 +21,26 @@ import com.comphenix.protocol.events.PacketListener;
public class AsyncListenerHandler {
/**
- * Signal an end to the packet processing.
+ * Signal an end to packet processing.
*/
private static final PacketEvent INTERUPT_PACKET = new PacketEvent(new Object());
+ /**
+ * Called when the threads have to wake up for something important.
+ */
+ private static final PacketEvent WAKEUP_PACKET = new PacketEvent(new Object());
+
+ // Unique worker ID
+ private static final AtomicInteger nextID = new AtomicInteger();
+
// Default queue capacity
private static int DEFAULT_CAPACITY = 1024;
// Cancel the async handler
private volatile boolean cancelled;
- // If we've started the listener loop before
- private volatile boolean started;
+ // Number of worker threads
+ private final AtomicInteger started = new AtomicInteger();
// The packet listener
private PacketListener listener;
@@ -40,6 +52,10 @@ public class AsyncListenerHandler {
// List of queued packets
private ArrayBlockingQueue queuedPackets = new ArrayBlockingQueue(DEFAULT_CAPACITY);
+ // List of cancelled tasks
+ private final Set stoppedTasks = new HashSet();
+ private final Object stopLock = new Object();
+
// Minecraft main thread
private Thread mainThread;
@@ -78,18 +94,26 @@ public class AsyncListenerHandler {
return nullPacketListener;
}
+ private String getPluginName() {
+ return PacketAdapter.getPluginName(listener);
+ }
+
+ /**
+ * Retrieve the plugin associated with this async listener.
+ * @return The plugin.
+ */
+ public Plugin getPlugin() {
+ return listener != null ? listener.getPlugin() : null;
+ }
+
/**
* Cancel the handler.
*/
public void cancel() {
// Remove the listener as quickly as possible
close();
-
- // Poison Pill Shutdown
- queuedPackets.clear();
- queuedPackets.add(INTERUPT_PACKET);
}
-
+
/**
* Queue a packet for processing.
* @param packet - a packet for processing.
@@ -103,46 +127,208 @@ public class AsyncListenerHandler {
}
/**
- * Create a runnable that will initiate the listener loop.
+ * Create a worker that will initiate the listener loop. Note that using stop() to
+ * close a specific worker is less efficient than stopping an arbitrary worker.
*
* Warning: Never call the run() method in the main thread.
*/
- public Runnable getListenerLoop() {
- return new Runnable() {
+ public AsyncRunnable getListenerLoop() {
+ return new AsyncRunnable() {
+
+ private final AtomicBoolean firstRun = new AtomicBoolean();
+ private final AtomicBoolean finished = new AtomicBoolean();
+ private final int id = nextID.incrementAndGet();
+
+ @Override
+ public int getID() {
+ return id;
+ }
+
@Override
public void run() {
- listenerLoop();
+ // Careful now
+ if (firstRun.compareAndSet(false, true)) {
+ listenerLoop(id);
+
+ synchronized (stopLock) {
+ stoppedTasks.remove(id);
+ stopLock.notifyAll();
+ finished.set(true);
+ }
+
+ } else {
+ if (finished.get())
+ throw new IllegalStateException(
+ "This listener has already been run. Create a new instead.");
+ else
+ throw new IllegalStateException(
+ "This listener loop has already been started. Create a new instead.");
+ }
+ }
+
+ @Override
+ public boolean stop() throws InterruptedException {
+ synchronized (stopLock) {
+ if (!isRunning())
+ return false;
+
+ stoppedTasks.add(id);
+
+ // Wake up threads - we have a listener to stop
+ for (int i = 0; i < getWorkers(); i++) {
+ queuedPackets.offer(WAKEUP_PACKET);
+ }
+
+ finished.set(true);
+ waitForStops();
+ return true;
+ }
+ }
+
+ @Override
+ public boolean isRunning() {
+ return firstRun.get() && !finished.get();
+ }
+
+ @Override
+ public boolean isFinished() {
+ return finished.get();
}
};
}
+ /**
+ * Start a singler worker thread handling the asynchronous.
+ */
+ public synchronized void start() {
+ if (listener.getPlugin() == null)
+ throw new IllegalArgumentException("Cannot start task without a valid plugin.");
+ if (cancelled)
+ throw new IllegalStateException("Cannot start a worker when the listener is closing.");
+
+ filterManager.scheduleAsyncTask(listener.getPlugin(), getListenerLoop());
+ }
+
+ /**
+ * Start multiple worker threads for this listener.
+ * @param count - number of worker threads to start.
+ */
+ public synchronized void start(int count) {
+ for (int i = 0; i < count; i++)
+ start();
+ }
+
+ /**
+ * Stop a worker thread.
+ */
+ public synchronized void stop() {
+ queuedPackets.add(INTERUPT_PACKET);
+ }
+
+ /**
+ * Stop the given amount of worker threads.
+ * @param count - number of threads to stop.
+ */
+ public synchronized void stop(int count) {
+ for (int i = 0; i < count; i++)
+ stop();
+ }
+
+ /**
+ * Set the current number of workers.
+ *
+ * This method can only be called with a count of zero when the listener is closing.
+ * @param count - new number of workers.
+ */
+ public synchronized void setWorkers(int count) {
+ if (count < 0)
+ throw new IllegalArgumentException("Number of workers cannot be less than zero.");
+ if (count > DEFAULT_CAPACITY)
+ throw new IllegalArgumentException("Cannot initiate more than " + DEFAULT_CAPACITY + " workers");
+ if (cancelled && count > 0)
+ throw new IllegalArgumentException("Cannot add workers when the listener is closing.");
+
+ long time = System.currentTimeMillis();
+
+ // Try to get to the correct count
+ while (started.get() != count) {
+ if (started.get() < count)
+ start();
+ else
+ stop();
+
+ // May happen if another thread is doing something similar to "setWorkers"
+ if ((System.currentTimeMillis() - time) > 1000)
+ throw new RuntimeException("Failed to set worker count.");
+ }
+ }
+
+ /**
+ * Retrieve the current number of registered workers.
+ *
+ * Note that the returned value may be out of data.
+ * @return Number of registered workers.
+ */
+ public synchronized int getWorkers() {
+ return started.get();
+ }
+
+ /**
+ * Wait until every tasks scheduled to stop has actually stopped.
+ * @return TRUE if the current listener should stop, FALSE otherwise.
+ * @throws InterruptedException - If the current thread was interrupted.
+ */
+ private boolean waitForStops() throws InterruptedException {
+ synchronized (stopLock) {
+ while (stoppedTasks.size() > 0 && !cancelled) {
+ stopLock.wait();
+ }
+ return cancelled;
+ }
+ }
+
// DO NOT call this method from the main thread
- private void listenerLoop() {
+ private void listenerLoop(int workerID) {
// Danger, danger!
if (Thread.currentThread().getId() == mainThread.getId())
throw new IllegalStateException("Do not call this method from the main thread.");
- if (started)
- throw new IllegalStateException("A listener cannot be run by multiple threads. Create a new listener instead.");
if (cancelled)
throw new IllegalStateException("Listener has been cancelled. Create a new listener instead.");
-
- // Proceed
- started = true;
-
+
try {
+ // Wait if certain threads are stopping
+ if (waitForStops())
+ return;
+
+ // Proceed
+ started.incrementAndGet();
+
mainLoop:
while (!cancelled) {
PacketEvent packet = queuedPackets.take();
AsyncMarker marker = packet.getAsyncMarker();
// Handle cancel requests
- if (packet == null || marker == null || !packet.isAsynchronous()) {
- break;
+ if (packet == null || marker == null || packet == INTERUPT_PACKET) {
+ return;
+
+ } else if (packet == WAKEUP_PACKET) {
+ // This is a bit slow, but it should be safe
+ synchronized (stopLock) {
+ // Are we the one who is supposed to stop?
+ if (stoppedTasks.contains(workerID))
+ return;
+ if (waitForStops())
+ return;
+ }
}
// Here's the core of the asynchronous processing
try {
+ marker.setListenerHandler(this);
+ marker.setWorkerID(workerID);
+
if (packet.isServerPacket())
listener.onPacketSending(packet);
else
@@ -171,40 +357,35 @@ public class AsyncListenerHandler {
} catch (InterruptedException e) {
// We're done
+ } finally {
+ // Clean up
+ started.decrementAndGet();
+ close();
}
-
- // Clean up
- close();
}
- private void close() {
+ private synchronized void close() {
// Remove the listener itself
if (!cancelled) {
filterManager.unregisterAsyncHandlerInternal(this);
cancelled = true;
- started = false;
+
+ // Tell every uncancelled thread to end
+ stopThreads();
}
}
- private String getPluginName() {
- return PacketAdapter.getPluginName(listener);
- }
-
/**
- * Retrieve the plugin associated with this async listener.
- * @return The plugin.
+ * Use the poision pill method to stop every worker thread.
*/
- public Plugin getPlugin() {
- return listener != null ? listener.getPlugin() : null;
- }
-
- /**
- * Start the asynchronous listener using the Bukkit scheduler.
- */
- public void start() {
- if (listener.getPlugin() == null)
- throw new IllegalArgumentException("Cannot start task without a valid plugin.");
+ private void stopThreads() {
+ // Poison Pill Shutdown
+ queuedPackets.clear();
+ stop(started.get());
- filterManager.scheduleAsyncTask(listener.getPlugin(), getListenerLoop());
+ // Individual shut down is irrelevant now
+ synchronized (stopLock) {
+ stopLock.notifyAll();
+ }
}
}
diff --git a/ProtocolLib/src/com/comphenix/protocol/async/AsyncMarker.java b/ProtocolLib/src/com/comphenix/protocol/async/AsyncMarker.java
index 8cecb1c8..a5e15f0e 100644
--- a/ProtocolLib/src/com/comphenix/protocol/async/AsyncMarker.java
+++ b/ProtocolLib/src/com/comphenix/protocol/async/AsyncMarker.java
@@ -67,6 +67,10 @@ public class AsyncMarker implements Serializable, Comparable {
// Whether or not the asynchronous processing itself should be cancelled
private volatile boolean asyncCancelled;
+ // Used to identify the asynchronous worker
+ private AsyncListenerHandler listenerHandler;
+ private int workerID;
+
// Determine if Minecraft processes this packet asynchronously
private static Method isMinecraftAsync;
private static boolean alwaysSync;
@@ -214,12 +218,48 @@ public class AsyncMarker implements Serializable, Comparable {
public void setAsyncCancelled(boolean asyncCancelled) {
this.asyncCancelled = asyncCancelled;
}
-
+
+ /**
+ * Retrieve the current asynchronous listener handler.
+ * @return Asychronous listener handler, or NULL if this packet is not asynchronous.
+ */
+ public AsyncListenerHandler getListenerHandler() {
+ return listenerHandler;
+ }
+
+ /**
+ * Set the current asynchronous listener handler.
+ *
+ * Used by the worker to update the value.
+ * @param listenerHandler - new listener handler.
+ */
+ void setListenerHandler(AsyncListenerHandler listenerHandler) {
+ this.listenerHandler = listenerHandler;
+ }
+
+ /**
+ * Retrieve the current worker ID.
+ * @return Current worker ID.
+ */
+ public int getWorkerID() {
+ return workerID;
+ }
+
+ /**
+ * Set the current worker ID.
+ *
+ * Used by the worker.
+ * @param workerID - new worker ID.
+ */
+ void setWorkerID(int workerID) {
+ this.workerID = workerID;
+ }
+
/**
* Retrieve iterator for the next listener in line.
* @return Next async packet listener iterator.
*/
- public Iterator> getListenerTraversal() {
+ Iterator> getListenerTraversal() {
return listenerTraversal;
}
diff --git a/ProtocolLib/src/com/comphenix/protocol/async/AsyncRunnable.java b/ProtocolLib/src/com/comphenix/protocol/async/AsyncRunnable.java
new file mode 100644
index 00000000..2c0004d9
--- /dev/null
+++ b/ProtocolLib/src/com/comphenix/protocol/async/AsyncRunnable.java
@@ -0,0 +1,35 @@
+package com.comphenix.protocol.async;
+
+/**
+ * A runnable representing a asynchronous event listener.
+ *
+ * @author Kristian
+ */
+public interface AsyncRunnable extends Runnable {
+
+ /**
+ * Retrieve a unique worker ID.
+ * @return Unique worker ID.
+ */
+ public int getID();
+
+ /**
+ * Stop the given runnable.
+ *
+ * This may not occur right away.
+ * @return TRUE if the thread was stopped, FALSE if it was already stopped.
+ */
+ public boolean stop() throws InterruptedException;
+
+ /**
+ * Determine if we're running or not.
+ * @return TRUE if we're running, FALSE otherwise.
+ */
+ public boolean isRunning();
+
+ /**
+ * Determine if this runnable has already run its course.
+ * @return TRUE if it has been stopped, FALSE otherwise.
+ */
+ boolean isFinished();
+}
diff --git a/ProtocolLib/src/com/comphenix/protocol/async/PacketEventHolder.java b/ProtocolLib/src/com/comphenix/protocol/async/PacketEventHolder.java
new file mode 100644
index 00000000..429d5caf
--- /dev/null
+++ b/ProtocolLib/src/com/comphenix/protocol/async/PacketEventHolder.java
@@ -0,0 +1,40 @@
+package com.comphenix.protocol.async;
+
+import com.comphenix.protocol.events.PacketEvent;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ComparisonChain;
+
+/**
+ * Provides a comparable to a packet event.
+ *
+ * @author Kristian
+ */
+class PacketEventHolder implements Comparable {
+
+ private PacketEvent event;
+
+ /**
+ * A wrapper that ensures the packet event is ordered by sending index.
+ * @param event - packet event to wrap.
+ */
+ public PacketEventHolder(PacketEvent event) {
+ this.event = Preconditions.checkNotNull(event, "Event must be non-null");
+ }
+
+ /**
+ * Retrieve the stored event.
+ * @return The stored event.
+ */
+ public PacketEvent getEvent() {
+ return event;
+ }
+
+ @Override
+ public int compareTo(PacketEventHolder other) {
+ AsyncMarker marker = other != null ? other.getEvent().getAsyncMarker() : null;
+
+ return ComparisonChain.start().
+ compare(event.getAsyncMarker(), marker).
+ result();
+ }
+}
diff --git a/ProtocolLib/src/com/comphenix/protocol/async/PacketProcessingQueue.java b/ProtocolLib/src/com/comphenix/protocol/async/PacketProcessingQueue.java
index 37349e83..08ab06ad 100644
--- a/ProtocolLib/src/com/comphenix/protocol/async/PacketProcessingQueue.java
+++ b/ProtocolLib/src/com/comphenix/protocol/async/PacketProcessingQueue.java
@@ -2,12 +2,14 @@ package com.comphenix.protocol.async;
import java.util.Collection;
import java.util.Iterator;
-import java.util.concurrent.ArrayBlockingQueue;
+import java.util.Queue;
import java.util.concurrent.Semaphore;
import com.comphenix.protocol.concurrency.AbstractConcurrentListenerMultimap;
import com.comphenix.protocol.events.PacketEvent;
import com.comphenix.protocol.injector.PrioritizedListener;
+import com.google.common.collect.MinMaxPriorityQueue;
+
/**
* Handles the processing of every packet type.
@@ -16,6 +18,9 @@ import com.comphenix.protocol.injector.PrioritizedListener;
*/
class PacketProcessingQueue extends AbstractConcurrentListenerMultimap {
+ // Initial number of elements
+ public static final int INITIAL_CAPACITY = 64;
+
/**
* Default maximum number of packets to process concurrently.
*/
@@ -33,18 +38,23 @@ class PacketProcessingQueue extends AbstractConcurrentListenerMultimap processingQueue;
+ private Queue processingQueue;
// Packets for sending
private PacketSendingQueue sendingQueue;
public PacketProcessingQueue(PacketSendingQueue sendingQueue) {
- this(sendingQueue, DEFAULT_QUEUE_LIMIT, DEFAULT_MAXIMUM_CONCURRENCY);
+ this(sendingQueue, INITIAL_CAPACITY, DEFAULT_QUEUE_LIMIT, DEFAULT_MAXIMUM_CONCURRENCY);
}
- public PacketProcessingQueue(PacketSendingQueue sendingQueue, int queueLimit, int maximumConcurrency) {
+ public PacketProcessingQueue(PacketSendingQueue sendingQueue, int initialSize, int maximumSize, int maximumConcurrency) {
super();
- this.processingQueue = new ArrayBlockingQueue(queueLimit);
+
+ this.processingQueue = Synchronization.queue(MinMaxPriorityQueue.
+ expectedSize(initialSize).
+ maximumSize(maximumSize).
+ create(), null);
+
this.maximumConcurrency = maximumConcurrency;
this.concurrentProcessing = new Semaphore(maximumConcurrency);
this.sendingQueue = sendingQueue;
@@ -58,8 +68,8 @@ class PacketProcessingQueue extends AbstractConcurrentListenerMultimap> list = getListener(packet.getPacketID());
+ if (holder != null) {
+ PacketEvent packet = holder.getEvent();
AsyncMarker marker = packet.getAsyncMarker();
+ Collection> list = getListener(packet.getPacketID());
// Yes, removing the marker will cause the chain to stop
if (list != null) {
diff --git a/ProtocolLib/src/com/comphenix/protocol/async/PacketSendingQueue.java b/ProtocolLib/src/com/comphenix/protocol/async/PacketSendingQueue.java
index 500cf2c3..0899ed58 100644
--- a/ProtocolLib/src/com/comphenix/protocol/async/PacketSendingQueue.java
+++ b/ProtocolLib/src/com/comphenix/protocol/async/PacketSendingQueue.java
@@ -1,15 +1,15 @@
package com.comphenix.protocol.async;
import java.io.IOException;
-import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.PriorityBlockingQueue;
+import org.bukkit.entity.Player;
+
import com.comphenix.protocol.events.PacketEvent;
import com.comphenix.protocol.reflect.FieldAccessException;
-import com.google.common.collect.ComparisonChain;
/**
* Represents packets ready to be transmitted to a client.
@@ -17,28 +17,28 @@ import com.google.common.collect.ComparisonChain;
*/
class PacketSendingQueue {
- private static final int INITIAL_CAPACITY = 64;
+ public static final int INITIAL_CAPACITY = 64;
- private PriorityBlockingQueue sendingQueue;
+ private PriorityBlockingQueue sendingQueue;
// Whether or not packet transmission can only occur on the main thread
private final boolean synchronizeMain;
+ /**
+ * Number of packet events in the queue.
+ * @return The number of packet events in the queue.
+ */
+ public int size() {
+ return sendingQueue.size();
+ }
+
/**
* Create a packet sending queue.
* @param synchronizeMain - whether or not to synchronize with the main thread.
*/
public PacketSendingQueue(boolean synchronizeMain) {
+ this.sendingQueue = new PriorityBlockingQueue(INITIAL_CAPACITY);
this.synchronizeMain = synchronizeMain;
- this.sendingQueue = new PriorityBlockingQueue(INITIAL_CAPACITY, new Comparator() {
- // Compare using the async marker
- @Override
- public int compare(PacketEvent o1, PacketEvent o2) {
- return ComparisonChain.start().
- compare(o1.getAsyncMarker(), o2.getAsyncMarker()).
- result();
- }
- });
}
/**
@@ -46,7 +46,7 @@ class PacketSendingQueue {
* @param packet
*/
public void enqueue(PacketEvent packet) {
- sendingQueue.add(packet);
+ sendingQueue.add(new PacketEventHolder(packet));
}
/**
@@ -70,7 +70,9 @@ class PacketSendingQueue {
Set lookup = new HashSet(packetsRemoved);
// Note that this is O(n), so it might be expensive
- for (PacketEvent event : sendingQueue) {
+ for (PacketEventHolder holder : sendingQueue) {
+ PacketEvent event = holder.getEvent();
+
if (lookup.contains(event.getPacketID())) {
event.getAsyncMarker().setProcessed(true);
}
@@ -88,9 +90,10 @@ class PacketSendingQueue {
// Transmit as many packets as we can
while (true) {
- PacketEvent current = sendingQueue.peek();
+ PacketEventHolder holder = sendingQueue.peek();
- if (current != null) {
+ if (holder != null) {
+ PacketEvent current = holder.getEvent();
AsyncMarker marker = current.getAsyncMarker();
// Abort if we're not on the main thread
@@ -111,7 +114,10 @@ class PacketSendingQueue {
if (marker.isProcessed() || marker.hasExpired()) {
if (marker.isProcessed() && !current.isCancelled()) {
- sendPacket(current);
+ // Silently skip players that have logged out
+ if (isOnline(current.getPlayer())) {
+ sendPacket(current);
+ }
}
sendingQueue.poll();
@@ -124,15 +130,19 @@ class PacketSendingQueue {
}
}
+ private boolean isOnline(Player player) {
+ return player != null && player.isOnline();
+ }
+
/**
* Send every packet, regardless of the processing state.
*/
private void forceSend() {
while (true) {
- PacketEvent current = sendingQueue.poll();
+ PacketEventHolder holder = sendingQueue.poll();
- if (current != null) {
- sendPacket(current);
+ if (holder != null) {
+ sendPacket(holder.getEvent());
} else {
break;
}
diff --git a/ProtocolLib/src/com/comphenix/protocol/async/Synchronization.java b/ProtocolLib/src/com/comphenix/protocol/async/Synchronization.java
new file mode 100644
index 00000000..71e949e7
--- /dev/null
+++ b/ProtocolLib/src/com/comphenix/protocol/async/Synchronization.java
@@ -0,0 +1,211 @@
+package com.comphenix.protocol.async;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Queue;
+
+import javax.annotation.Nullable;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Synchronization views copied from Google Guava.
+ *
+ * @author Kristian
+ */
+class Synchronization {
+
+ /**
+ * Create a synchronized wrapper for the given queue.
+ *
+ * This wrapper cannot synchronize the iterator(). Callers are expected
+ * to synchronize iterators manually.
+ * @param queue - the queue to synchronize.
+ * @param mutex - synchronization mutex, or NULL to use the queue.
+ * @return A synchronization wrapper.
+ */
+ public static Queue queue(Queue queue, @Nullable Object mutex) {
+ return (queue instanceof SynchronizedQueue) ?
+ queue :
+ new SynchronizedQueue(queue, mutex);
+ }
+
+ private static class SynchronizedObject implements Serializable {
+ private static final long serialVersionUID = -4408866092364554628L;
+
+ final Object delegate;
+ final Object mutex;
+
+ SynchronizedObject(Object delegate, @Nullable Object mutex) {
+ this.delegate = Preconditions.checkNotNull(delegate);
+ this.mutex = (mutex == null) ? this : mutex;
+ }
+
+ Object delegate() {
+ return delegate;
+ }
+
+ // No equals and hashCode; see ForwardingObject for details.
+
+ @Override
+ public String toString() {
+ synchronized (mutex) {
+ return delegate.toString();
+ }
+ }
+ }
+
+ private static class SynchronizedCollection extends SynchronizedObject implements Collection {
+ private static final long serialVersionUID = 5440572373531285692L;
+
+ private SynchronizedCollection(Collection delegate,
+ @Nullable Object mutex) {
+ super(delegate, mutex);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ Collection delegate() {
+ return (Collection) super.delegate();
+ }
+
+ @Override
+ public boolean add(E e) {
+ synchronized (mutex) {
+ return delegate().add(e);
+ }
+ }
+
+ @Override
+ public boolean addAll(Collection extends E> c) {
+ synchronized (mutex) {
+ return delegate().addAll(c);
+ }
+ }
+
+ @Override
+ public void clear() {
+ synchronized (mutex) {
+ delegate().clear();
+ }
+ }
+
+ @Override
+ public boolean contains(Object o) {
+ synchronized (mutex) {
+ return delegate().contains(o);
+ }
+ }
+
+ @Override
+ public boolean containsAll(Collection> c) {
+ synchronized (mutex) {
+ return delegate().containsAll(c);
+ }
+ }
+
+ @Override
+ public boolean isEmpty() {
+ synchronized (mutex) {
+ return delegate().isEmpty();
+ }
+ }
+
+ @Override
+ public Iterator iterator() {
+ return delegate().iterator(); // manually synchronized
+ }
+
+ @Override
+ public boolean remove(Object o) {
+ synchronized (mutex) {
+ return delegate().remove(o);
+ }
+ }
+
+ @Override
+ public boolean removeAll(Collection> c) {
+ synchronized (mutex) {
+ return delegate().removeAll(c);
+ }
+ }
+
+ @Override
+ public boolean retainAll(Collection> c) {
+ synchronized (mutex) {
+ return delegate().retainAll(c);
+ }
+ }
+
+ @Override
+ public int size() {
+ synchronized (mutex) {
+ return delegate().size();
+ }
+ }
+
+ @Override
+ public Object[] toArray() {
+ synchronized (mutex) {
+ return delegate().toArray();
+ }
+ }
+
+ @Override
+ public T[] toArray(T[] a) {
+ synchronized (mutex) {
+ return delegate().toArray(a);
+ }
+ }
+
+ }
+
+ private static class SynchronizedQueue extends SynchronizedCollection implements Queue {
+ private static final long serialVersionUID = 1961791630386791902L;
+
+ SynchronizedQueue(Queue delegate, @Nullable Object mutex) {
+ super(delegate, mutex);
+ }
+
+ @Override
+ Queue delegate() {
+ return (Queue) super.delegate();
+ }
+
+ @Override
+ public E element() {
+ synchronized (mutex) {
+ return delegate().element();
+ }
+ }
+
+ @Override
+ public boolean offer(E e) {
+ synchronized (mutex) {
+ return delegate().offer(e);
+ }
+ }
+
+ @Override
+ public E peek() {
+ synchronized (mutex) {
+ return delegate().peek();
+ }
+ }
+
+ @Override
+ public E poll() {
+ synchronized (mutex) {
+ return delegate().poll();
+ }
+ }
+
+ @Override
+ public E remove() {
+ synchronized (mutex) {
+ return delegate().remove();
+ }
+ }
+ }
+}
diff --git a/ProtocolLib/src/com/comphenix/protocol/injector/ListenerInvoker.java b/ProtocolLib/src/com/comphenix/protocol/injector/ListenerInvoker.java
new file mode 100644
index 00000000..e5bc0e4b
--- /dev/null
+++ b/ProtocolLib/src/com/comphenix/protocol/injector/ListenerInvoker.java
@@ -0,0 +1,27 @@
+package com.comphenix.protocol.injector;
+
+import net.minecraft.server.Packet;
+
+import com.comphenix.protocol.events.PacketEvent;
+
+public interface ListenerInvoker {
+
+ /**
+ * Invokes the given packet event for every registered listener.
+ * @param event - the packet event to invoke.
+ */
+ public abstract void invokePacketRecieving(PacketEvent event);
+
+ /**
+ * Invokes the given packet event for every registered listener.
+ * @param event - the packet event to invoke.
+ */
+ public abstract void invokePacketSending(PacketEvent event);
+
+ /**
+ * Retrieve the associated ID of a packet.
+ * @param packet - the packet.
+ * @return The packet ID.
+ */
+ public abstract int getPacketID(Packet packet);
+}
\ No newline at end of file
diff --git a/ProtocolLib/src/com/comphenix/protocol/injector/NetworkServerInjector.java b/ProtocolLib/src/com/comphenix/protocol/injector/NetworkServerInjector.java
deleted file mode 100644
index fb901a1a..00000000
--- a/ProtocolLib/src/com/comphenix/protocol/injector/NetworkServerInjector.java
+++ /dev/null
@@ -1,181 +0,0 @@
-package com.comphenix.protocol.injector;
-
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.Set;
-
-import net.minecraft.server.Packet;
-import net.sf.cglib.proxy.Enhancer;
-import net.sf.cglib.proxy.Factory;
-import net.sf.cglib.proxy.MethodInterceptor;
-import net.sf.cglib.proxy.MethodProxy;
-
-import org.bukkit.entity.Player;
-
-import com.comphenix.protocol.events.PacketListener;
-import com.comphenix.protocol.reflect.FieldAccessException;
-import com.comphenix.protocol.reflect.FieldUtils;
-import com.comphenix.protocol.reflect.FuzzyReflection;
-import com.comphenix.protocol.reflect.StructureModifier;
-import com.comphenix.protocol.reflect.instances.CollectionGenerator;
-import com.comphenix.protocol.reflect.instances.DefaultInstances;
-import com.comphenix.protocol.reflect.instances.ExistingGenerator;
-import com.comphenix.protocol.reflect.instances.PrimitiveGenerator;
-
-/**
- * Represents a player hook into the NetServerHandler class.
- *
- * @author Kristian
- */
-public class NetworkServerInjector extends PlayerInjector {
-
- private static Method sendPacketMethod;
-
- private StructureModifier