diff --git a/ProtocolLib/src/com/comphenix/protocol/AsynchronousManager.java b/ProtocolLib/src/com/comphenix/protocol/AsynchronousManager.java index 55d75f99..9929b6be 100644 --- a/ProtocolLib/src/com/comphenix/protocol/AsynchronousManager.java +++ b/ProtocolLib/src/com/comphenix/protocol/AsynchronousManager.java @@ -15,7 +15,6 @@ import com.comphenix.protocol.events.PacketListener; * @author Kristian */ public interface AsynchronousManager { - /** * Registers an asynchronous packet handler. *

@@ -74,4 +73,13 @@ public interface AsynchronousManager { * Remove listeners, close threads and transmit every delayed packet. */ public abstract void cleanupAll(); + + /** + * Signal that a packet is ready to be transmitted. + *

+ * This should only be called if {@link com.comphenix.protocol.async.AsyncMarker#incrementProcessingDelay() AsyncMarker.incrementProcessingDelay()} + * has been called previously. + * @param packet - packet to signal. + */ + public abstract void signalPacketTransmission(PacketEvent packet); } \ No newline at end of file diff --git a/ProtocolLib/src/com/comphenix/protocol/async/AsyncFilterManager.java b/ProtocolLib/src/com/comphenix/protocol/async/AsyncFilterManager.java index ecc841d0..6a3ce7ca 100644 --- a/ProtocolLib/src/com/comphenix/protocol/async/AsyncFilterManager.java +++ b/ProtocolLib/src/com/comphenix/protocol/async/AsyncFilterManager.java @@ -256,14 +256,27 @@ public class AsyncFilterManager implements AsynchronousManager { serverQueue.cleanupAll(); } + @Override + public void signalPacketTransmission(PacketEvent packet) { + signalPacketTransmission(packet, onMainThread()); + } + /** * Signal that a packet is ready to be transmitted. * @param packet - packet to signal. + * @param onMainThread - whether or not this method was run by the main thread. */ - public void signalPacketUpdate(PacketEvent packet) { - getSendingQueue(packet).signalPacketUpdate(packet, onMainThread()); + private void signalPacketTransmission(PacketEvent packet, boolean onMainThread) { + if (packet.getAsyncMarker() == null) + throw new IllegalArgumentException( + "A sync packet cannot be transmitted by the asynchronous manager."); + + // Only send if the packet is ready + if (packet.getAsyncMarker().decrementProcessingDelay() == 0) { + getSendingQueue(packet).signalPacketUpdate(packet, onMainThread); + } } - + /** * Retrieve the sending queue this packet belongs to. * @param packet - the packet. @@ -277,7 +290,7 @@ public class AsyncFilterManager implements AsynchronousManager { * Signal that a packet has finished processing. * @param packet - packet to signal. */ - public void signalProcessingDone(PacketEvent packet) { + public void signalFreeProcessingSlot(PacketEvent packet) { getProcessingQueue(packet).signalProcessingDone(); } diff --git a/ProtocolLib/src/com/comphenix/protocol/async/AsyncListenerHandler.java b/ProtocolLib/src/com/comphenix/protocol/async/AsyncListenerHandler.java index 0859738c..86950c40 100644 --- a/ProtocolLib/src/com/comphenix/protocol/async/AsyncListenerHandler.java +++ b/ProtocolLib/src/com/comphenix/protocol/async/AsyncListenerHandler.java @@ -371,10 +371,12 @@ public class AsyncListenerHandler { marker.setListenerHandler(this); marker.setWorkerID(workerID); - if (packet.isServerPacket()) - listener.onPacketSending(packet); - else - listener.onPacketReceiving(packet); + synchronized (marker.getProcessingLock()) { + if (packet.isServerPacket()) + listener.onPacketSending(packet); + else + listener.onPacketReceiving(packet); + } } catch (Throwable e) { // Minecraft doesn't want your Exception. @@ -393,8 +395,10 @@ public class AsyncListenerHandler { } // There are no more listeners - queue the packet for transmission - filterManager.signalPacketUpdate(packet); - filterManager.signalProcessingDone(packet); + filterManager.signalFreeProcessingSlot(packet); + + // Note that listeners can opt to delay the packet transmission + filterManager.signalPacketTransmission(packet); } } catch (InterruptedException e) { diff --git a/ProtocolLib/src/com/comphenix/protocol/async/AsyncMarker.java b/ProtocolLib/src/com/comphenix/protocol/async/AsyncMarker.java index a5e15f0e..be1d0ff9 100644 --- a/ProtocolLib/src/com/comphenix/protocol/async/AsyncMarker.java +++ b/ProtocolLib/src/com/comphenix/protocol/async/AsyncMarker.java @@ -6,6 +6,7 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.Iterator; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import net.minecraft.server.Packet; @@ -61,12 +62,18 @@ public class AsyncMarker implements Serializable, Comparable { // Whether or not the packet has been processed by the listeners private volatile boolean processed; + // Whether or not to delay processing + private AtomicInteger processingDelay = new AtomicInteger(); + // Whether or not the packet has been sent private volatile boolean transmitted; // Whether or not the asynchronous processing itself should be cancelled private volatile boolean asyncCancelled; + // Used to synchronize processing on the shared PacketEvent + private Object processingLock = new Object(); + // Used to identify the asynchronous worker private AsyncListenerHandler listenerHandler; private int workerID; @@ -178,6 +185,55 @@ public class AsyncMarker implements Serializable, Comparable { this.processed = processed; } + /** + * Increment the number of times this packet must be signalled as done before its transmitted. + *

+ * This is useful if an asynchronous listener is waiting for further information before the + * packet can be sent to the user. A packet listener MUST eventually call signalPacketUpdate, + * even if the packet is cancelled, after this method is called. + *

+ * It is recommended that processing outside a packet listener is wrapped in a synchronized block + * using the {@link #getProcessingLock()} method. + *

+ * To decrement the processing delay, call signalPacketUpdate. A thread that calls this method + * multiple times must call signalPacketUpdate at least that many times. + * @return The new processing delay. + */ + public int incrementProcessingDelay() { + return processingDelay.incrementAndGet(); + } + + /** + * Decrement the number of times this packet must be signalled as done before it's transmitted. + * @return The new processing delay. If zero, the packet should be sent. + */ + int decrementProcessingDelay() { + return processingDelay.decrementAndGet(); + } + + /** + * Retrieve the number of times a packet must be signalled to be done before it's sent. + * @return Number of processing delays. + */ + public int getProcessingDelay() { + return processingDelay.get(); + } + + /** + * Processing lock used to synchronize access to the parent PacketEvent and PacketContainer. + *

+ * This lock is automatically acquired for every asynchronous packet listener. It should only be + * used to synchronize access to a PacketEvent if it's processing has been delayed. + * @return A processing lock. + */ + public Object getProcessingLock() { + return processingLock; + } + + public void setProcessingLock(Object processingLock) { + this.processingLock = processingLock; + } + /** * Retrieve whether or not this packet has already been sent. * @return TRUE if it has been sent before, FALSE otherwise. @@ -276,7 +332,7 @@ public class AsyncMarker implements Serializable, Comparable { * @param event - the packet to send. * @throws IOException If the packet couldn't be sent. */ - public void sendPacket(PacketEvent event) throws IOException { + void sendPacket(PacketEvent event) throws IOException { try { if (event.isServerPacket()) { packetStream.sendServerPacket(event.getPlayer(), event.getPacket(), false); diff --git a/ProtocolLib/src/com/comphenix/protocol/async/PacketProcessingQueue.java b/ProtocolLib/src/com/comphenix/protocol/async/PacketProcessingQueue.java index 08ab06ad..ea660076 100644 --- a/ProtocolLib/src/com/comphenix/protocol/async/PacketProcessingQueue.java +++ b/ProtocolLib/src/com/comphenix/protocol/async/PacketProcessingQueue.java @@ -100,6 +100,8 @@ class PacketProcessingQueue extends AbstractConcurrentListenerMultimap> list = getListener(packet.getPacketID()); + marker.incrementProcessingDelay(); + // Yes, removing the marker will cause the chain to stop if (list != null) { Iterator> iterator = list.iterator(); @@ -112,7 +114,8 @@ class PacketProcessingQueue extends AbstractConcurrentListenerMultimap