diff --git a/ProtocolLib/src/com/comphenix/protocol/async/AsyncFilterManager.java b/ProtocolLib/src/com/comphenix/protocol/async/AsyncFilterManager.java index 86f9d789..36e6fc01 100644 --- a/ProtocolLib/src/com/comphenix/protocol/async/AsyncFilterManager.java +++ b/ProtocolLib/src/com/comphenix/protocol/async/AsyncFilterManager.java @@ -7,6 +7,7 @@ import org.bukkit.plugin.Plugin; import com.comphenix.protocol.PacketStream; import com.comphenix.protocol.events.PacketEvent; +import com.comphenix.protocol.events.PacketListener; /** * Represents a filter manager for asynchronous packets. @@ -36,7 +37,7 @@ public class AsyncFilterManager { this.mainThread = Thread.currentThread(); } - public ListenerToken registerAsyncHandler(Plugin plugin, AsyncListener listener) { + public ListenerToken registerAsyncHandler(Plugin plugin, PacketListener listener) { ListenerToken token = new ListenerToken(plugin, mainThread, this, listener); processingQueue.addListener(token, listener.getSendingWhitelist()); @@ -56,15 +57,33 @@ public class AsyncFilterManager { processingQueue.removeListener(listenerToken, listenerToken.getAsyncListener().getSendingWhitelist()); } - public void enqueueSyncPacket(PacketEvent syncPacket, int sendingDelta, long timeoutDelta) { - AsyncPacket asyncPacket = new AsyncPacket(packetStream, syncPacket, - currentSendingIndex.getAndIncrement() + sendingDelta, - System.currentTimeMillis(), - timeoutDelta); + /** + * Enqueue a packet for asynchronous processing. + * @param syncPacket - synchronous packet event. + * @param asyncMarker - the asynchronous marker to use. + */ + public void enqueueSyncPacket(PacketEvent syncPacket, AsyncPacket asyncMarker) { + PacketEvent newEvent = PacketEvent.fromSynchronous(syncPacket, asyncMarker); // Start the process - sendingQueue.enqueue(asyncPacket); - processingQueue.enqueuePacket(asyncPacket); + sendingQueue.enqueue(newEvent); + processingQueue.enqueuePacket(newEvent); + } + + /** + * Construct an async marker with the given sending priority delta and timeout delta. + * @param sendingDelta - how many packets we're willing to wait. + * @param timeoutDelta - how long (in ms) until the packet expire. + * @return An async marker. + */ + public AsyncPacket createAsyncMarker(long sendingDelta, long timeoutDelta) { + return createAsyncMarker(sendingDelta, timeoutDelta, + currentSendingIndex.incrementAndGet(), System.currentTimeMillis()); + } + + // Helper method + private AsyncPacket createAsyncMarker(long sendingDelta, long timeoutDelta, long sendingIndex, long currentTime) { + return new AsyncPacket(packetStream, sendingIndex, sendingDelta, System.currentTimeMillis(), timeoutDelta); } public PacketStream getPacketStream() { @@ -83,9 +102,11 @@ public class AsyncFilterManager { return sendingQueue; } + /** + * Remove listeners, close threads and transmit every delayed packet. + */ public void cleanupAll() { - // Remove all listeners - - // We don't necessarily remove packets, as this might be a part of a server reload + processingQueue.cleanupAll(); + sendingQueue.cleanupAll(); } } diff --git a/ProtocolLib/src/com/comphenix/protocol/async/AsyncListener.java b/ProtocolLib/src/com/comphenix/protocol/async/AsyncListener.java deleted file mode 100644 index 49342139..00000000 --- a/ProtocolLib/src/com/comphenix/protocol/async/AsyncListener.java +++ /dev/null @@ -1,17 +0,0 @@ -package com.comphenix.protocol.async; - -import org.bukkit.plugin.Plugin; - -import com.comphenix.protocol.events.ListeningWhitelist; - -public interface AsyncListener { - public void onAsyncPacket(AsyncPacket packet); - - public ListeningWhitelist getSendingWhitelist(); - - /** - * Retrieve the plugin that created this async packet listener. - * @return The plugin, or NULL if not available. - */ - public Plugin getPlugin(); -} diff --git a/ProtocolLib/src/com/comphenix/protocol/async/AsyncPacket.java b/ProtocolLib/src/com/comphenix/protocol/async/AsyncPacket.java index 9bcc2e53..c4c87a9d 100644 --- a/ProtocolLib/src/com/comphenix/protocol/async/AsyncPacket.java +++ b/ProtocolLib/src/com/comphenix/protocol/async/AsyncPacket.java @@ -1,5 +1,6 @@ package com.comphenix.protocol.async; +import java.io.IOException; import java.io.Serializable; import java.lang.reflect.InvocationTargetException; import java.util.Iterator; @@ -15,12 +16,7 @@ import com.google.common.primitives.Longs; * @author Kristian */ public class AsyncPacket implements Serializable, Comparable { - - /** - * Signal an end to the packet processing. - */ - static final AsyncPacket INTERUPT_PACKET = new AsyncPacket(); - + /** * Generated by Eclipse. */ @@ -31,11 +27,6 @@ public class AsyncPacket implements Serializable, Comparable { */ public static final int DEFAULT_TIMEOUT_DELTA = 60000; - /** - * The original synchronized packet. - */ - private PacketEvent packetEvent; - /** * The packet stream responsible for transmitting the packet when it's done processing. */ @@ -56,33 +47,16 @@ public class AsyncPacket implements Serializable, Comparable { // Whether or not the packet has been processed by the listeners private volatile boolean processed; - - private AsyncPacket() { - // Used by the poision pill pattern - } - - /** - * Determine whether or not this is a signal for the async listener to interrupt processing. - * @return Interrupt packet processing. - */ - boolean isInteruptPacket() { - // This is only possble if we're dealing with the poision pill packet - return packetEvent == null || packetStream == null; - } - + /** * Create a container for asyncronous packets. - * @param packetEvent - the synchronous packet event. * @param initialTime - the current time in milliseconds since 01.01.1970 00:00. */ - public AsyncPacket(PacketStream packetStream, PacketEvent packetEvent, long sendingIndex, long initialTime, long timeoutDelta) { - if (packetEvent == null) - throw new IllegalArgumentException("packetEvent cannot be NULL"); + AsyncPacket(PacketStream packetStream, long sendingIndex, long sendingDelta, long initialTime, long timeoutDelta) { if (packetStream == null) throw new IllegalArgumentException("packetStream cannot be NULL"); this.packetStream = packetStream; - this.packetEvent = packetEvent; // Timeout this.initialTime = initialTime; @@ -137,22 +111,6 @@ public class AsyncPacket implements Serializable, Comparable { this.newSendingIndex = newSendingIndex; } - /** - * Retrieve the original synchronous packet event. - * @return The original packet event. - */ - public PacketEvent getPacketEvent() { - return packetEvent; - } - - /** - * Retrieve the packet ID of the underlying packet. - * @return Packet ID. - */ - public int getPacketID() { - return packetEvent.getPacketID(); - } - /** * Retrieve the packet stream responsible for transmitting this packet. * @return The packet stream. @@ -173,7 +131,7 @@ public class AsyncPacket implements Serializable, Comparable { * Retrieve whether or not this packet has been processed by the async listeners. * @return TRUE if it has been processed, FALSE otherwise. */ - boolean isProcessed() { + public boolean isProcessed() { return processed; } @@ -189,21 +147,9 @@ public class AsyncPacket implements Serializable, Comparable { * Retrieve iterator for the next listener in line. * @return Next async packet listener iterator. */ - Iterator> getListenerTraversal() { + public Iterator> getListenerTraversal() { return listenerTraversal; } - - /** - * We're done processing. Send the packet. - */ - void sendPacket() { - try { - // We only support server packets at this stage - packetStream.sendServerPacket(packetEvent.getPlayer(), packetEvent.getPacket(), false); - } catch (InvocationTargetException e) { - e.printStackTrace(); - } - } /** * Set the iterator for the next listener. @@ -212,7 +158,26 @@ public class AsyncPacket implements Serializable, Comparable { void setListenerTraversal(Iterator> listenerTraversal) { this.listenerTraversal = listenerTraversal; } - + + /** + * Transmit a given packet to the current packet stream. + * @param event - the packet to send. + * @throws IOException If the packet couldn't be sent. + */ + public void sendPacket(PacketEvent event) throws IOException { + try { + if (event.isServerPacket()) { + packetStream.sendServerPacket(event.getPlayer(), event.getPacket(), false); + } else { + packetStream.recieveClientPacket(event.getPlayer(), event.getPacket(), false); + } + } catch (InvocationTargetException e) { + throw new IOException("Cannot send packet", e); + } catch (IllegalAccessException e) { + throw new IOException("Cannot send packet", e); + } + } + @Override public int compareTo(AsyncPacket o) { if (o == null) diff --git a/ProtocolLib/src/com/comphenix/protocol/async/ListenerToken.java b/ProtocolLib/src/com/comphenix/protocol/async/ListenerToken.java index b932b252..439902ce 100644 --- a/ProtocolLib/src/com/comphenix/protocol/async/ListenerToken.java +++ b/ProtocolLib/src/com/comphenix/protocol/async/ListenerToken.java @@ -5,8 +5,16 @@ import java.util.logging.Level; import org.bukkit.plugin.Plugin; -public class ListenerToken { +import com.comphenix.protocol.events.PacketEvent; +import com.comphenix.protocol.events.PacketListener; +class ListenerToken { + + /** + * Signal an end to the packet processing. + */ + private static final PacketEvent INTERUPT_PACKET = new PacketEvent(new Object()); + // Default queue capacity private static int DEFAULT_CAPACITY = 1024; @@ -14,7 +22,7 @@ public class ListenerToken { private volatile boolean cancelled; // The packet listener - private AsyncListener listener; + private PacketListener listener; // The original plugin private Plugin plugin; @@ -23,12 +31,12 @@ public class ListenerToken { private AsyncFilterManager filterManager; // List of queued packets - private ArrayBlockingQueue queuedPackets = new ArrayBlockingQueue(DEFAULT_CAPACITY); + private ArrayBlockingQueue queuedPackets = new ArrayBlockingQueue(DEFAULT_CAPACITY); // Minecraft main thread private Thread mainThread; - public ListenerToken(Plugin plugin, Thread mainThread, AsyncFilterManager filterManager, AsyncListener listener) { + public ListenerToken(Plugin plugin, Thread mainThread, AsyncFilterManager filterManager, PacketListener listener) { if (filterManager == null) throw new IllegalArgumentException("filterManager cannot be NULL"); if (listener == null) @@ -44,7 +52,7 @@ public class ListenerToken { return cancelled; } - public AsyncListener getAsyncListener() { + public PacketListener getAsyncListener() { return listener; } @@ -57,7 +65,7 @@ public class ListenerToken { // Poison Pill Shutdown queuedPackets.clear(); - queuedPackets.add(AsyncPacket.INTERUPT_PACKET); + queuedPackets.add(INTERUPT_PACKET); } /** @@ -65,7 +73,7 @@ public class ListenerToken { * @param packet - a packet for processing. * @throws IllegalStateException If the underlying packet queue is full. */ - public void enqueuePacket(AsyncPacket packet) { + public void enqueuePacket(PacketEvent packet) { if (packet == null) throw new IllegalArgumentException("packet is NULL"); @@ -86,16 +94,21 @@ public class ListenerToken { try { mainLoop: while (!cancelled) { - AsyncPacket packet = queuedPackets.take(); + PacketEvent packet = queuedPackets.take(); + AsyncPacket marker = packet.getAsyncMarker(); // Handle cancel requests - if (packet == null || packet.isInteruptPacket()) { + if (packet == null || marker == null || !packet.isAsynchronous()) { break; } // Here's the core of the asynchronous processing try { - listener.onAsyncPacket(packet); + if (packet.isServerPacket()) + listener.onPacketSending(packet); + else + listener.onPacketReceiving(packet); + } catch (Throwable e) { // Minecraft doesn't want your Exception. filterManager.getLogger().log(Level.SEVERE, @@ -103,8 +116,8 @@ public class ListenerToken { } // Now, get the next non-cancelled listener - for (; packet.getListenerTraversal().hasNext(); ) { - ListenerToken token = packet.getListenerTraversal().next().getListener(); + for (; marker.getListenerTraversal().hasNext(); ) { + ListenerToken token = marker.getListenerTraversal().next().getListener(); if (!token.isCancelled()) { token.enqueuePacket(packet); diff --git a/ProtocolLib/src/com/comphenix/protocol/async/PacketProcessingQueue.java b/ProtocolLib/src/com/comphenix/protocol/async/PacketProcessingQueue.java index 9061f96f..33d10389 100644 --- a/ProtocolLib/src/com/comphenix/protocol/async/PacketProcessingQueue.java +++ b/ProtocolLib/src/com/comphenix/protocol/async/PacketProcessingQueue.java @@ -6,6 +6,7 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Semaphore; import com.comphenix.protocol.concurrency.AbstractConcurrentListenerMultimap; +import com.comphenix.protocol.events.PacketEvent; import com.comphenix.protocol.injector.PrioritizedListener; /** @@ -32,7 +33,7 @@ class PacketProcessingQueue extends AbstractConcurrentListenerMultimap processingQueue; + private ArrayBlockingQueue processingQueue; // Packets for sending private PacketSendingQueue sendingQueue; @@ -43,7 +44,7 @@ class PacketProcessingQueue extends AbstractConcurrentListenerMultimap(queueLimit); + this.processingQueue = new ArrayBlockingQueue(queueLimit); this.maximumConcurrency = maximumConcurrency; this.concurrentProcessing = new Semaphore(maximumConcurrency); this.sendingQueue = sendingQueue; @@ -54,7 +55,7 @@ class PacketProcessingQueue extends AbstractConcurrentListenerMultimap> list = getListener(packet.getPacketID()); + AsyncPacket marker = packet.getAsyncMarker(); if (list != null) { Iterator> iterator = list.iterator(); if (iterator.hasNext()) { - packet.setListenerTraversal(iterator); + marker.setListenerTraversal(iterator); iterator.next().getListener().enqueuePacket(packet); continue; } @@ -113,7 +115,15 @@ class PacketProcessingQueue extends AbstractConcurrentListenerMultimap token : ) + public void cleanupAll() { + // Cancel all the threads and every listener + for (PrioritizedListener token : values()) { + if (token != null) { + token.getListener().cancel(); + } + } + + // Remove the rest, just in case + clearListeners(); } } diff --git a/ProtocolLib/src/com/comphenix/protocol/async/PacketSendingQueue.java b/ProtocolLib/src/com/comphenix/protocol/async/PacketSendingQueue.java index db2223f4..fca9de83 100644 --- a/ProtocolLib/src/com/comphenix/protocol/async/PacketSendingQueue.java +++ b/ProtocolLib/src/com/comphenix/protocol/async/PacketSendingQueue.java @@ -1,41 +1,85 @@ package com.comphenix.protocol.async; +import java.io.IOException; import java.util.concurrent.PriorityBlockingQueue; +import com.comphenix.protocol.events.PacketEvent; + /** * Represents packets ready to be transmitted to a client. * @author Kristian */ class PacketSendingQueue { - private PriorityBlockingQueue sendingQueue; + private PriorityBlockingQueue sendingQueue; /** * Enqueue a packet for sending. * @param packet */ - public void enqueue(AsyncPacket packet) { + public void enqueue(PacketEvent packet) { sendingQueue.add(packet); } /** * Invoked when one of the packets have finished processing. */ - public synchronized void signalPacketUpdate(AsyncPacket packetUpdated) { - + public synchronized void signalPacketUpdate(PacketEvent packetUpdated) { // Mark this packet as finished - packetUpdated.setProcessed(true); - - // Transmit as many packets as we can + packetUpdated.getAsyncMarker().setProcessed(true); + signalPacketUpdates(); + } + + /** + * Send every packet, regardless of the processing state. + */ + public synchronized void forceSend() { while (true) { - AsyncPacket current = sendingQueue.peek(); + PacketEvent current = sendingQueue.poll(); - if (current != null && current.isProcessed()) { - current.sendPacket(); - sendingQueue.poll(); + if (current != null) { + // Just print the error + try { + current.getAsyncMarker().sendPacket(current); + } catch (IOException e) { + e.printStackTrace(); + } } else { break; } } } + + /** + * Invoked when potentially every packet is finished. + */ + private void signalPacketUpdates() { + // Transmit as many packets as we can + while (true) { + PacketEvent current = sendingQueue.peek(); + + if (current != null && current.getAsyncMarker().isProcessed()) { + // Just print the error + try { + current.getAsyncMarker().sendPacket(current); + } catch (IOException e) { + e.printStackTrace(); + } + + sendingQueue.poll(); + + } else { + break; + } + } + + // And we're done + } + + /** + * Automatically transmits every delayed packet. + */ + public void cleanupAll() { + forceSend(); + } } diff --git a/ProtocolLib/src/com/comphenix/protocol/concurrency/AbstractConcurrentListenerMultimap.java b/ProtocolLib/src/com/comphenix/protocol/concurrency/AbstractConcurrentListenerMultimap.java index ae61cf9b..5154f261 100644 --- a/ProtocolLib/src/com/comphenix/protocol/concurrency/AbstractConcurrentListenerMultimap.java +++ b/ProtocolLib/src/com/comphenix/protocol/concurrency/AbstractConcurrentListenerMultimap.java @@ -8,6 +8,7 @@ import java.util.concurrent.ConcurrentMap; import com.comphenix.protocol.events.ListeningWhitelist; import com.comphenix.protocol.injector.PrioritizedListener; +import com.google.common.collect.Iterables; /** * A thread-safe implementation of a listener multimap. @@ -103,4 +104,19 @@ public abstract class AbstractConcurrentListenerMultimap { public Collection> getListener(int packetID) { return listeners.get(packetID); } + + /** + * Retrieve every listener. + * @return Every listener. + */ + protected Iterable> values() { + return Iterables.concat(listeners.values()); + } + + /** + * Remove all packet listeners. + */ + protected void clearListeners() { + listeners.clear(); + } } diff --git a/ProtocolLib/src/com/comphenix/protocol/events/PacketEvent.java b/ProtocolLib/src/com/comphenix/protocol/events/PacketEvent.java index f3fa4ae2..9eedc109 100644 --- a/ProtocolLib/src/com/comphenix/protocol/events/PacketEvent.java +++ b/ProtocolLib/src/com/comphenix/protocol/events/PacketEvent.java @@ -25,6 +25,8 @@ import java.util.EventObject; import org.bukkit.entity.Player; import org.bukkit.event.Cancellable; +import com.comphenix.protocol.async.AsyncPacket; + public class PacketEvent extends EventObject implements Cancellable { /** * Automatically generated by Eclipse. @@ -35,7 +37,10 @@ public class PacketEvent extends EventObject implements Cancellable { private PacketContainer packet; private boolean serverPacket; private boolean cancel; - + + private AsyncPacket asyncMarker; + private boolean asynchronous; + /** * Use the static constructors to create instances of this event. * @param source - the event source. @@ -50,6 +55,15 @@ public class PacketEvent extends EventObject implements Cancellable { this.player = player; this.serverPacket = serverPacket; } + + private PacketEvent(PacketEvent origial, AsyncPacket asyncMarker) { + super(origial.source); + this.packet = origial.packet; + this.player = origial.player; + this.serverPacket = origial.serverPacket; + this.asyncMarker = asyncMarker; + this.asynchronous = true; + } /** * Creates an event representing a client packet transmission. @@ -73,6 +87,16 @@ public class PacketEvent extends EventObject implements Cancellable { return new PacketEvent(source, packet, recipient, true); } + /** + * Create an asynchronous packet event from a synchronous event and a async marker. + * @param event - the original synchronous event. + * @param marker - the asynchronous marker. + * @return The new packet event. + */ + public static PacketEvent fromSynchronous(PacketEvent event, AsyncPacket marker) { + return new PacketEvent(event, marker); + } + /** * Retrieves the packet that will be sent to the player. * @return Packet to send to the player. @@ -129,6 +153,36 @@ public class PacketEvent extends EventObject implements Cancellable { return serverPacket; } + /** + * Retrieve the asynchronous marker. + *

+ * If the packet is synchronous, this marker will be used to schedule an asynchronous event. In this + * asynchronous event, the marker is used to correctly pass the packet around to the different threads. + * @return The current asynchronous marker. + */ + public AsyncPacket getAsyncMarker() { + return asyncMarker; + } + + /** + * Set the asynchronous marker. + *

+ * If the marker is non-null at the end of an synchronous event processing, the packet will be scheduled + * to be processed asynchronously with the given settings. + * @param asyncMarker - the new asynchronous marker. + */ + public void setAsyncMarker(AsyncPacket asyncMarker) { + this.asyncMarker = asyncMarker; + } + + /** + * Determine if the packet event has been executed asynchronously or not. + * @return TRUE if this packet event is asynchronous, FALSE otherwise. + */ + public boolean isAsynchronous() { + return asynchronous; + } + private void writeObject(ObjectOutputStream output) throws IOException { // Default serialization output.defaultWriteObject(); diff --git a/ProtocolLib/src/com/comphenix/protocol/injector/PacketFilterManager.java b/ProtocolLib/src/com/comphenix/protocol/injector/PacketFilterManager.java index 85329a2e..10c256d9 100644 --- a/ProtocolLib/src/com/comphenix/protocol/injector/PacketFilterManager.java +++ b/ProtocolLib/src/com/comphenix/protocol/injector/PacketFilterManager.java @@ -668,10 +668,7 @@ public final class PacketFilterManager implements ProtocolManager { for (PlayerInjector injection : playerInjection.values()) { injection.cleanupAll(); } - - // Clean up async handlers - asyncFilterManager.cleanupAll(); - + // Remove packet handlers if (packetInjector != null) packetInjector.cleanupAll(); @@ -681,6 +678,9 @@ public final class PacketFilterManager implements ProtocolManager { playerInjection.clear(); connectionLookup.clear(); hasClosed = true; + + // Clean up async handlers. We have to do this last. + asyncFilterManager.cleanupAll(); } @Override