From 7ed0bc82dd609e827b659666a6f314590f1e757d Mon Sep 17 00:00:00 2001 From: "Kristian S. Stangeland" Date: Wed, 10 Oct 2012 04:41:07 +0200 Subject: [PATCH] Make it possible for threads to delay packet transmission. Threads can now increment a shared counter indicating that a packet should not be transmitted after the default packet listener processing. This can be useful if a packet listener needs information from additional packets before it can complete. Packet listeners that whish to use this method begin by calling incrementPacketDelay(). It is then responsible for calling signalPacketTransmission() when it's done waiting. All processing on PacketEvents outside packet listeners must be synchronized with getProcessingLock(). --- .../protocol/AsynchronousManager.java | 10 +++- .../protocol/async/AsyncFilterManager.java | 21 +++++-- .../protocol/async/AsyncListenerHandler.java | 16 +++-- .../comphenix/protocol/async/AsyncMarker.java | 58 ++++++++++++++++++- .../protocol/async/PacketProcessingQueue.java | 5 +- .../protocol/async/PacketSendingQueue.java | 2 +- 6 files changed, 98 insertions(+), 14 deletions(-) diff --git a/ProtocolLib/src/com/comphenix/protocol/AsynchronousManager.java b/ProtocolLib/src/com/comphenix/protocol/AsynchronousManager.java index 55d75f99..9929b6be 100644 --- a/ProtocolLib/src/com/comphenix/protocol/AsynchronousManager.java +++ b/ProtocolLib/src/com/comphenix/protocol/AsynchronousManager.java @@ -15,7 +15,6 @@ import com.comphenix.protocol.events.PacketListener; * @author Kristian */ public interface AsynchronousManager { - /** * Registers an asynchronous packet handler. *

@@ -74,4 +73,13 @@ public interface AsynchronousManager { * Remove listeners, close threads and transmit every delayed packet. */ public abstract void cleanupAll(); + + /** + * Signal that a packet is ready to be transmitted. + *

+ * This should only be called if {@link com.comphenix.protocol.async.AsyncMarker#incrementProcessingDelay() AsyncMarker.incrementProcessingDelay()} + * has been called previously. + * @param packet - packet to signal. + */ + public abstract void signalPacketTransmission(PacketEvent packet); } \ No newline at end of file diff --git a/ProtocolLib/src/com/comphenix/protocol/async/AsyncFilterManager.java b/ProtocolLib/src/com/comphenix/protocol/async/AsyncFilterManager.java index ecc841d0..6a3ce7ca 100644 --- a/ProtocolLib/src/com/comphenix/protocol/async/AsyncFilterManager.java +++ b/ProtocolLib/src/com/comphenix/protocol/async/AsyncFilterManager.java @@ -256,14 +256,27 @@ public class AsyncFilterManager implements AsynchronousManager { serverQueue.cleanupAll(); } + @Override + public void signalPacketTransmission(PacketEvent packet) { + signalPacketTransmission(packet, onMainThread()); + } + /** * Signal that a packet is ready to be transmitted. * @param packet - packet to signal. + * @param onMainThread - whether or not this method was run by the main thread. */ - public void signalPacketUpdate(PacketEvent packet) { - getSendingQueue(packet).signalPacketUpdate(packet, onMainThread()); + private void signalPacketTransmission(PacketEvent packet, boolean onMainThread) { + if (packet.getAsyncMarker() == null) + throw new IllegalArgumentException( + "A sync packet cannot be transmitted by the asynchronous manager."); + + // Only send if the packet is ready + if (packet.getAsyncMarker().decrementProcessingDelay() == 0) { + getSendingQueue(packet).signalPacketUpdate(packet, onMainThread); + } } - + /** * Retrieve the sending queue this packet belongs to. * @param packet - the packet. @@ -277,7 +290,7 @@ public class AsyncFilterManager implements AsynchronousManager { * Signal that a packet has finished processing. * @param packet - packet to signal. */ - public void signalProcessingDone(PacketEvent packet) { + public void signalFreeProcessingSlot(PacketEvent packet) { getProcessingQueue(packet).signalProcessingDone(); } diff --git a/ProtocolLib/src/com/comphenix/protocol/async/AsyncListenerHandler.java b/ProtocolLib/src/com/comphenix/protocol/async/AsyncListenerHandler.java index 0859738c..86950c40 100644 --- a/ProtocolLib/src/com/comphenix/protocol/async/AsyncListenerHandler.java +++ b/ProtocolLib/src/com/comphenix/protocol/async/AsyncListenerHandler.java @@ -371,10 +371,12 @@ public class AsyncListenerHandler { marker.setListenerHandler(this); marker.setWorkerID(workerID); - if (packet.isServerPacket()) - listener.onPacketSending(packet); - else - listener.onPacketReceiving(packet); + synchronized (marker.getProcessingLock()) { + if (packet.isServerPacket()) + listener.onPacketSending(packet); + else + listener.onPacketReceiving(packet); + } } catch (Throwable e) { // Minecraft doesn't want your Exception. @@ -393,8 +395,10 @@ public class AsyncListenerHandler { } // There are no more listeners - queue the packet for transmission - filterManager.signalPacketUpdate(packet); - filterManager.signalProcessingDone(packet); + filterManager.signalFreeProcessingSlot(packet); + + // Note that listeners can opt to delay the packet transmission + filterManager.signalPacketTransmission(packet); } } catch (InterruptedException e) { diff --git a/ProtocolLib/src/com/comphenix/protocol/async/AsyncMarker.java b/ProtocolLib/src/com/comphenix/protocol/async/AsyncMarker.java index a5e15f0e..be1d0ff9 100644 --- a/ProtocolLib/src/com/comphenix/protocol/async/AsyncMarker.java +++ b/ProtocolLib/src/com/comphenix/protocol/async/AsyncMarker.java @@ -6,6 +6,7 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.Iterator; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import net.minecraft.server.Packet; @@ -61,12 +62,18 @@ public class AsyncMarker implements Serializable, Comparable { // Whether or not the packet has been processed by the listeners private volatile boolean processed; + // Whether or not to delay processing + private AtomicInteger processingDelay = new AtomicInteger(); + // Whether or not the packet has been sent private volatile boolean transmitted; // Whether or not the asynchronous processing itself should be cancelled private volatile boolean asyncCancelled; + // Used to synchronize processing on the shared PacketEvent + private Object processingLock = new Object(); + // Used to identify the asynchronous worker private AsyncListenerHandler listenerHandler; private int workerID; @@ -178,6 +185,55 @@ public class AsyncMarker implements Serializable, Comparable { this.processed = processed; } + /** + * Increment the number of times this packet must be signalled as done before its transmitted. + *

+ * This is useful if an asynchronous listener is waiting for further information before the + * packet can be sent to the user. A packet listener MUST eventually call signalPacketUpdate, + * even if the packet is cancelled, after this method is called. + *

+ * It is recommended that processing outside a packet listener is wrapped in a synchronized block + * using the {@link #getProcessingLock()} method. + *

+ * To decrement the processing delay, call signalPacketUpdate. A thread that calls this method + * multiple times must call signalPacketUpdate at least that many times. + * @return The new processing delay. + */ + public int incrementProcessingDelay() { + return processingDelay.incrementAndGet(); + } + + /** + * Decrement the number of times this packet must be signalled as done before it's transmitted. + * @return The new processing delay. If zero, the packet should be sent. + */ + int decrementProcessingDelay() { + return processingDelay.decrementAndGet(); + } + + /** + * Retrieve the number of times a packet must be signalled to be done before it's sent. + * @return Number of processing delays. + */ + public int getProcessingDelay() { + return processingDelay.get(); + } + + /** + * Processing lock used to synchronize access to the parent PacketEvent and PacketContainer. + *

+ * This lock is automatically acquired for every asynchronous packet listener. It should only be + * used to synchronize access to a PacketEvent if it's processing has been delayed. + * @return A processing lock. + */ + public Object getProcessingLock() { + return processingLock; + } + + public void setProcessingLock(Object processingLock) { + this.processingLock = processingLock; + } + /** * Retrieve whether or not this packet has already been sent. * @return TRUE if it has been sent before, FALSE otherwise. @@ -276,7 +332,7 @@ public class AsyncMarker implements Serializable, Comparable { * @param event - the packet to send. * @throws IOException If the packet couldn't be sent. */ - public void sendPacket(PacketEvent event) throws IOException { + void sendPacket(PacketEvent event) throws IOException { try { if (event.isServerPacket()) { packetStream.sendServerPacket(event.getPlayer(), event.getPacket(), false); diff --git a/ProtocolLib/src/com/comphenix/protocol/async/PacketProcessingQueue.java b/ProtocolLib/src/com/comphenix/protocol/async/PacketProcessingQueue.java index 08ab06ad..ea660076 100644 --- a/ProtocolLib/src/com/comphenix/protocol/async/PacketProcessingQueue.java +++ b/ProtocolLib/src/com/comphenix/protocol/async/PacketProcessingQueue.java @@ -100,6 +100,8 @@ class PacketProcessingQueue extends AbstractConcurrentListenerMultimap> list = getListener(packet.getPacketID()); + marker.incrementProcessingDelay(); + // Yes, removing the marker will cause the chain to stop if (list != null) { Iterator> iterator = list.iterator(); @@ -112,7 +114,8 @@ class PacketProcessingQueue extends AbstractConcurrentListenerMultimap