From cf68d229b0219501cf543e6803c6bb64257e3de2 Mon Sep 17 00:00:00 2001 From: "Kristian S. Stangeland" Date: Wed, 10 Oct 2012 05:42:45 +0200 Subject: [PATCH] Honor the sending index when the packet has finished processing. --- .../protocol/async/AsyncFilterManager.java | 14 ++- .../comphenix/protocol/async/AsyncMarker.java | 40 +++++++- .../protocol/async/PacketEventHolder.java | 10 +- .../protocol/async/PacketSendingQueue.java | 16 +++- .../protocol/injector/BukkitUnwrapper.java | 96 +++++++++++++++++++ .../protocol/injector/EntityUtilities.java | 1 - .../protocol/injector/PacketConstructor.java | 90 ++--------------- 7 files changed, 174 insertions(+), 93 deletions(-) create mode 100644 ProtocolLib/src/com/comphenix/protocol/injector/BukkitUnwrapper.java diff --git a/ProtocolLib/src/com/comphenix/protocol/async/AsyncFilterManager.java b/ProtocolLib/src/com/comphenix/protocol/async/AsyncFilterManager.java index 6a3ce7ca..f7933f0d 100644 --- a/ProtocolLib/src/com/comphenix/protocol/async/AsyncFilterManager.java +++ b/ProtocolLib/src/com/comphenix/protocol/async/AsyncFilterManager.java @@ -180,14 +180,18 @@ public class AsyncFilterManager implements AsynchronousManager { * @param syncPacket - synchronous packet event. * @param asyncMarker - the asynchronous marker to use. */ - public void enqueueSyncPacket(PacketEvent syncPacket, AsyncMarker asyncMarker) { + public synchronized void enqueueSyncPacket(PacketEvent syncPacket, AsyncMarker asyncMarker) { PacketEvent newEvent = PacketEvent.fromSynchronous(syncPacket, asyncMarker); + if (asyncMarker.isQueued() || asyncMarker.isTransmitted()) + throw new IllegalArgumentException("Cannot queue a packet that has already been queued."); + // Start the process getSendingQueue(syncPacket).enqueue(newEvent); // We know this is occuring on the main thread, so pass TRUE getProcessingQueue(syncPacket).enqueue(newEvent, true); + asyncMarker.setQueuedSendingIndex(asyncMarker.getNewSendingIndex()); } @Override @@ -267,12 +271,16 @@ public class AsyncFilterManager implements AsynchronousManager { * @param onMainThread - whether or not this method was run by the main thread. */ private void signalPacketTransmission(PacketEvent packet, boolean onMainThread) { - if (packet.getAsyncMarker() == null) + AsyncMarker marker = packet.getAsyncMarker(); + if (marker == null) throw new IllegalArgumentException( "A sync packet cannot be transmitted by the asynchronous manager."); + if (!marker.isQueued()) + throw new IllegalArgumentException( + "A packet must have been queued before it can be transmitted."); // Only send if the packet is ready - if (packet.getAsyncMarker().decrementProcessingDelay() == 0) { + if (marker.decrementProcessingDelay() == 0) { getSendingQueue(packet).signalPacketUpdate(packet, onMainThread); } } diff --git a/ProtocolLib/src/com/comphenix/protocol/async/AsyncMarker.java b/ProtocolLib/src/com/comphenix/protocol/async/AsyncMarker.java index be1d0ff9..9bd6e6bd 100644 --- a/ProtocolLib/src/com/comphenix/protocol/async/AsyncMarker.java +++ b/ProtocolLib/src/com/comphenix/protocol/async/AsyncMarker.java @@ -59,18 +59,21 @@ public class AsyncMarker implements Serializable, Comparable { private long originalSendingIndex; private long newSendingIndex; + // Used to determine if a packet must be reordered in the sending queue + private Long queuedSendingIndex; + // Whether or not the packet has been processed by the listeners private volatile boolean processed; - // Whether or not to delay processing - private AtomicInteger processingDelay = new AtomicInteger(); - // Whether or not the packet has been sent private volatile boolean transmitted; // Whether or not the asynchronous processing itself should be cancelled private volatile boolean asyncCancelled; + // Whether or not to delay processing + private AtomicInteger processingDelay = new AtomicInteger(); + // Used to synchronize processing on the shared PacketEvent private Object processingLock = new Object(); @@ -189,7 +192,8 @@ public class AsyncMarker implements Serializable, Comparable { * Increment the number of times this packet must be signalled as done before its transmitted. *

* This is useful if an asynchronous listener is waiting for further information before the - * packet can be sent to the user. A packet listener MUST eventually call signalPacketUpdate, + * packet can be sent to the user. A packet listener MUST eventually call + * {@link AsyncFilterManager#signalPacketTransmission(PacketEvent)}, * even if the packet is cancelled, after this method is called. *

* It is recommended that processing outside a packet listener is wrapped in a synchronized block @@ -219,6 +223,30 @@ public class AsyncMarker implements Serializable, Comparable { return processingDelay.get(); } + /** + * Whether or not this packet is or has been queued for processing. + * @return TRUE if it has, FALSE otherwise. + */ + public boolean isQueued() { + return queuedSendingIndex != null; + } + + /** + * Retrieve the sending index when the packet was queued. + * @return Queued sending index. + */ + public long getQueuedSendingIndex() { + return queuedSendingIndex != null ? queuedSendingIndex : 0; + } + + /** + * Set the sending index when the packet was queued. + * @param queuedSendingIndex - sending index. + */ + void setQueuedSendingIndex(Long queuedSendingIndex) { + this.queuedSendingIndex = queuedSendingIndex; + } + /** * Processing lock used to synchronize access to the parent PacketEvent and PacketContainer. *

@@ -269,6 +297,10 @@ public class AsyncMarker implements Serializable, Comparable { /** * Set whether or not the asynchronous handling should be cancelled. + *

+ * This is only relevant during the synchronous processing. Asynchronous + * listeners should use the normal cancel-field to cancel a PacketEvent. + * * @param asyncCancelled - TRUE to cancel it, FALSE otherwise. */ public void setAsyncCancelled(boolean asyncCancelled) { diff --git a/ProtocolLib/src/com/comphenix/protocol/async/PacketEventHolder.java b/ProtocolLib/src/com/comphenix/protocol/async/PacketEventHolder.java index 429d5caf..36cc7eec 100644 --- a/ProtocolLib/src/com/comphenix/protocol/async/PacketEventHolder.java +++ b/ProtocolLib/src/com/comphenix/protocol/async/PacketEventHolder.java @@ -12,13 +12,17 @@ import com.google.common.collect.ComparisonChain; class PacketEventHolder implements Comparable { private PacketEvent event; - + private long sendingIndex = 0; + /** * A wrapper that ensures the packet event is ordered by sending index. * @param event - packet event to wrap. */ public PacketEventHolder(PacketEvent event) { this.event = Preconditions.checkNotNull(event, "Event must be non-null"); + + if (event.getAsyncMarker() != null) + this.sendingIndex = event.getAsyncMarker().getNewSendingIndex(); } /** @@ -31,10 +35,8 @@ class PacketEventHolder implements Comparable { @Override public int compareTo(PacketEventHolder other) { - AsyncMarker marker = other != null ? other.getEvent().getAsyncMarker() : null; - return ComparisonChain.start(). - compare(event.getAsyncMarker(), marker). + compare(sendingIndex, other.sendingIndex). result(); } } diff --git a/ProtocolLib/src/com/comphenix/protocol/async/PacketSendingQueue.java b/ProtocolLib/src/com/comphenix/protocol/async/PacketSendingQueue.java index e7fd318f..c90f5b9d 100644 --- a/ProtocolLib/src/com/comphenix/protocol/async/PacketSendingQueue.java +++ b/ProtocolLib/src/com/comphenix/protocol/async/PacketSendingQueue.java @@ -55,8 +55,22 @@ class PacketSendingQueue { * @param onMainThread - whether or not this is occuring on the main thread. */ public synchronized void signalPacketUpdate(PacketEvent packetUpdated, boolean onMainThread) { + + AsyncMarker marker = packetUpdated.getAsyncMarker(); + + // Should we reorder the event? + if (marker.getQueuedSendingIndex() != marker.getNewSendingIndex()) { + PacketEvent copy = PacketEvent.fromSynchronous(packetUpdated, marker); + + // "Cancel" the original event + packetUpdated.setCancelled(true); + + // Enqueue the copy with the new sending index + enqueue(copy); + } + // Mark this packet as finished - packetUpdated.getAsyncMarker().setProcessed(true); + marker.setProcessed(true); trySendPackets(onMainThread); } diff --git a/ProtocolLib/src/com/comphenix/protocol/injector/BukkitUnwrapper.java b/ProtocolLib/src/com/comphenix/protocol/injector/BukkitUnwrapper.java new file mode 100644 index 00000000..aaf8ecf8 --- /dev/null +++ b/ProtocolLib/src/com/comphenix/protocol/injector/BukkitUnwrapper.java @@ -0,0 +1,96 @@ +package com.comphenix.protocol.injector; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import com.comphenix.protocol.injector.PacketConstructor.Unwrapper; +import com.comphenix.protocol.reflect.instances.DefaultInstances; + +/** + * Represents an object capable of converting wrapped Bukkit objects into NMS objects. + *

+ * Typical conversions include: + *

    + *
  • org.bukkit.entity.Player -> net.minecraft.server.EntityPlayer
  • + *
  • org.bukkit.World -> net.minecraft.server.WorldServer
  • + *
+ * + * @author Kristian + */ +public class BukkitUnwrapper implements Unwrapper { + + private static Map, Method> cache = new ConcurrentHashMap, Method>(); + + @SuppressWarnings("unchecked") + @Override + public Object unwrapItem(Object wrappedObject) { + + // Special case + if (wrappedObject instanceof Collection) { + return handleCollection((Collection) wrappedObject); + } + + Class currentClass = wrappedObject.getClass(); + Method cachedMethod = initializeCache(currentClass); + + try { + // Retrieve the handle + if (cachedMethod != null) + return cachedMethod.invoke(wrappedObject); + else + return null; + + } catch (IllegalArgumentException e) { + // Impossible + return null; + } catch (IllegalAccessException e) { + return null; + } catch (InvocationTargetException e) { + // This is REALLY bad + throw new RuntimeException("Minecraft error.", e); + } + } + + private Object handleCollection(Collection wrappedObject) { + + @SuppressWarnings("unchecked") + Collection copy = DefaultInstances.DEFAULT.getDefault(wrappedObject.getClass()); + + if (copy != null) { + // Unwrap every element + for (Object element : wrappedObject) { + copy.add(unwrapItem(element)); + } + return copy; + + } else { + // Impossible + return null; + } + } + + private Method initializeCache(Class type) { + + // See if we're already determined this + if (cache.containsKey(type)) { + // We will never remove from the cache, so this ought to be thread safe + return cache.get(type); + } + + try { + Method find = type.getMethod("getHandle"); + + // It's thread safe, as getMethod should return the same handle + cache.put(type, find); + return find; + + } catch (SecurityException e) { + return null; + } catch (NoSuchMethodException e) { + return null; + } + } +} diff --git a/ProtocolLib/src/com/comphenix/protocol/injector/EntityUtilities.java b/ProtocolLib/src/com/comphenix/protocol/injector/EntityUtilities.java index e3c4a2c9..3b73f761 100644 --- a/ProtocolLib/src/com/comphenix/protocol/injector/EntityUtilities.java +++ b/ProtocolLib/src/com/comphenix/protocol/injector/EntityUtilities.java @@ -15,7 +15,6 @@ import org.bukkit.craftbukkit.CraftWorld; import org.bukkit.entity.Entity; import org.bukkit.entity.Player; -import com.comphenix.protocol.injector.PacketConstructor.BukkitUnwrapper; import com.comphenix.protocol.reflect.FieldAccessException; import com.comphenix.protocol.reflect.FieldUtils; import com.comphenix.protocol.reflect.FuzzyReflection; diff --git a/ProtocolLib/src/com/comphenix/protocol/injector/PacketConstructor.java b/ProtocolLib/src/com/comphenix/protocol/injector/PacketConstructor.java index 73d6c160..c25b863b 100644 --- a/ProtocolLib/src/com/comphenix/protocol/injector/PacketConstructor.java +++ b/ProtocolLib/src/com/comphenix/protocol/injector/PacketConstructor.java @@ -2,17 +2,12 @@ package com.comphenix.protocol.injector; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.util.Collection; import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import net.minecraft.server.Packet; import com.comphenix.protocol.events.PacketContainer; import com.comphenix.protocol.reflect.FieldAccessException; -import com.comphenix.protocol.reflect.instances.DefaultInstances; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; @@ -176,82 +171,17 @@ public class PacketConstructor { return false; } - public static class BukkitUnwrapper implements Unwrapper { - - private static Map, Method> cache = new ConcurrentHashMap, Method>(); - - @SuppressWarnings("unchecked") - @Override - public Object unwrapItem(Object wrappedObject) { - - // Special case - if (wrappedObject instanceof Collection) { - return handleCollection((Collection) wrappedObject); - } - - Class currentClass = wrappedObject.getClass(); - Method cachedMethod = initializeCache(currentClass); - - try { - // Retrieve the handle - if (cachedMethod != null) - return cachedMethod.invoke(wrappedObject); - else - return null; - - } catch (IllegalArgumentException e) { - // Impossible - return null; - } catch (IllegalAccessException e) { - return null; - } catch (InvocationTargetException e) { - // This is REALLY bad - throw new RuntimeException("Minecraft error.", e); - } - } - - private Object handleCollection(Collection wrappedObject) { - - @SuppressWarnings("unchecked") - Collection copy = DefaultInstances.DEFAULT.getDefault(wrappedObject.getClass()); - - if (copy != null) { - // Unwrap every element - for (Object element : wrappedObject) { - copy.add(unwrapItem(element)); - } - return copy; - - } else { - // Impossible - return null; - } - } - - private Method initializeCache(Class type) { - - // See if we're already determined this - if (cache.containsKey(type)) { - // We will never remove from the cache, so this ought to be thread safe - return cache.get(type); - } - - try { - Method find = type.getMethod("getHandle"); - - // It's thread safe, as getMethod should return the same handle - cache.put(type, find); - return find; - - } catch (SecurityException e) { - return null; - } catch (NoSuchMethodException e) { - return null; - } - } - } - + /** + * Represents a unwrapper for a constructor parameter. + * + * @author Kristian + */ public static interface Unwrapper { + /** + * Convert the given wrapped object to the equivalent net.minecraft.server object. + * @param wrappedObject - wrapped object. + * @return The net.minecraft.server object. + */ public Object unwrapItem(Object wrappedObject); } }