From 81321383d5bd8209ac79e6dbf0c51ae31a78499c Mon Sep 17 00:00:00 2001 From: "Kristian S. Stangeland" Date: Sat, 29 Sep 2012 19:33:22 +0200 Subject: [PATCH] Renamed the async marker. Added handling of close. --- .../protocol/async/AsyncFilterManager.java | 8 +-- .../{AsyncPacket.java => AsyncMarker.java} | 21 +++++-- .../protocol/async/ListenerToken.java | 2 +- .../protocol/async/PacketProcessingQueue.java | 2 +- .../protocol/async/PacketSendingQueue.java | 56 +++++++++---------- .../protocol/events/PacketEvent.java | 12 ++-- 6 files changed, 57 insertions(+), 44 deletions(-) rename ProtocolLib/src/com/comphenix/protocol/async/{AsyncPacket.java => AsyncMarker.java} (86%) diff --git a/ProtocolLib/src/com/comphenix/protocol/async/AsyncFilterManager.java b/ProtocolLib/src/com/comphenix/protocol/async/AsyncFilterManager.java index 36e6fc01..b6a75576 100644 --- a/ProtocolLib/src/com/comphenix/protocol/async/AsyncFilterManager.java +++ b/ProtocolLib/src/com/comphenix/protocol/async/AsyncFilterManager.java @@ -62,7 +62,7 @@ public class AsyncFilterManager { * @param syncPacket - synchronous packet event. * @param asyncMarker - the asynchronous marker to use. */ - public void enqueueSyncPacket(PacketEvent syncPacket, AsyncPacket asyncMarker) { + public void enqueueSyncPacket(PacketEvent syncPacket, AsyncMarker asyncMarker) { PacketEvent newEvent = PacketEvent.fromSynchronous(syncPacket, asyncMarker); // Start the process @@ -76,14 +76,14 @@ public class AsyncFilterManager { * @param timeoutDelta - how long (in ms) until the packet expire. * @return An async marker. */ - public AsyncPacket createAsyncMarker(long sendingDelta, long timeoutDelta) { + public AsyncMarker createAsyncMarker(long sendingDelta, long timeoutDelta) { return createAsyncMarker(sendingDelta, timeoutDelta, currentSendingIndex.incrementAndGet(), System.currentTimeMillis()); } // Helper method - private AsyncPacket createAsyncMarker(long sendingDelta, long timeoutDelta, long sendingIndex, long currentTime) { - return new AsyncPacket(packetStream, sendingIndex, sendingDelta, System.currentTimeMillis(), timeoutDelta); + private AsyncMarker createAsyncMarker(long sendingDelta, long timeoutDelta, long sendingIndex, long currentTime) { + return new AsyncMarker(packetStream, sendingIndex, sendingDelta, System.currentTimeMillis(), timeoutDelta); } public PacketStream getPacketStream() { diff --git a/ProtocolLib/src/com/comphenix/protocol/async/AsyncPacket.java b/ProtocolLib/src/com/comphenix/protocol/async/AsyncMarker.java similarity index 86% rename from ProtocolLib/src/com/comphenix/protocol/async/AsyncPacket.java rename to ProtocolLib/src/com/comphenix/protocol/async/AsyncMarker.java index c4c87a9d..dbf5940b 100644 --- a/ProtocolLib/src/com/comphenix/protocol/async/AsyncPacket.java +++ b/ProtocolLib/src/com/comphenix/protocol/async/AsyncMarker.java @@ -11,11 +11,11 @@ import com.comphenix.protocol.injector.PrioritizedListener; import com.google.common.primitives.Longs; /** - * Represents a packet that is being processed by asynchronous listeners. + * Contains information about the packet that is being processed by asynchronous listeners. * * @author Kristian */ -public class AsyncPacket implements Serializable, Comparable { +public class AsyncMarker implements Serializable, Comparable { /** * Generated by Eclipse. @@ -47,12 +47,15 @@ public class AsyncPacket implements Serializable, Comparable { // Whether or not the packet has been processed by the listeners private volatile boolean processed; + + // Whethre or not the packet has been sent + private volatile boolean transmitted; /** * Create a container for asyncronous packets. * @param initialTime - the current time in milliseconds since 01.01.1970 00:00. */ - AsyncPacket(PacketStream packetStream, long sendingIndex, long sendingDelta, long initialTime, long timeoutDelta) { + AsyncMarker(PacketStream packetStream, long sendingIndex, long sendingDelta, long initialTime, long timeoutDelta) { if (packetStream == null) throw new IllegalArgumentException("packetStream cannot be NULL"); @@ -143,6 +146,14 @@ public class AsyncPacket implements Serializable, Comparable { this.processed = processed; } + /** + * Retrieve whether or not this packet has already been sent. + * @return TRUE if it has been sent before, FALSE otherwise. + */ + public boolean isTransmitted() { + return transmitted; + } + /** * Retrieve iterator for the next listener in line. * @return Next async packet listener iterator. @@ -171,6 +182,8 @@ public class AsyncPacket implements Serializable, Comparable { } else { packetStream.recieveClientPacket(event.getPlayer(), event.getPacket(), false); } + transmitted = true; + } catch (InvocationTargetException e) { throw new IOException("Cannot send packet", e); } catch (IllegalAccessException e) { @@ -179,7 +192,7 @@ public class AsyncPacket implements Serializable, Comparable { } @Override - public int compareTo(AsyncPacket o) { + public int compareTo(AsyncMarker o) { if (o == null) return 1; else diff --git a/ProtocolLib/src/com/comphenix/protocol/async/ListenerToken.java b/ProtocolLib/src/com/comphenix/protocol/async/ListenerToken.java index 439902ce..472762af 100644 --- a/ProtocolLib/src/com/comphenix/protocol/async/ListenerToken.java +++ b/ProtocolLib/src/com/comphenix/protocol/async/ListenerToken.java @@ -95,7 +95,7 @@ class ListenerToken { mainLoop: while (!cancelled) { PacketEvent packet = queuedPackets.take(); - AsyncPacket marker = packet.getAsyncMarker(); + AsyncMarker marker = packet.getAsyncMarker(); // Handle cancel requests if (packet == null || marker == null || !packet.isAsynchronous()) { diff --git a/ProtocolLib/src/com/comphenix/protocol/async/PacketProcessingQueue.java b/ProtocolLib/src/com/comphenix/protocol/async/PacketProcessingQueue.java index 33d10389..f1d7c431 100644 --- a/ProtocolLib/src/com/comphenix/protocol/async/PacketProcessingQueue.java +++ b/ProtocolLib/src/com/comphenix/protocol/async/PacketProcessingQueue.java @@ -77,7 +77,7 @@ class PacketProcessingQueue extends AbstractConcurrentListenerMultimap> list = getListener(packet.getPacketID()); - AsyncPacket marker = packet.getAsyncMarker(); + AsyncMarker marker = packet.getAsyncMarker(); if (list != null) { Iterator> iterator = list.iterator(); diff --git a/ProtocolLib/src/com/comphenix/protocol/async/PacketSendingQueue.java b/ProtocolLib/src/com/comphenix/protocol/async/PacketSendingQueue.java index fca9de83..867bdcbe 100644 --- a/ProtocolLib/src/com/comphenix/protocol/async/PacketSendingQueue.java +++ b/ProtocolLib/src/com/comphenix/protocol/async/PacketSendingQueue.java @@ -27,7 +27,18 @@ class PacketSendingQueue { public synchronized void signalPacketUpdate(PacketEvent packetUpdated) { // Mark this packet as finished packetUpdated.getAsyncMarker().setProcessed(true); - signalPacketUpdates(); + + // Transmit as many packets as we can + while (true) { + PacketEvent current = sendingQueue.peek(); + + if (current != null && current.getAsyncMarker().isProcessed()) { + sendPacket(current); + sendingQueue.poll(); + } else { + break; + } + } } /** @@ -38,12 +49,7 @@ class PacketSendingQueue { PacketEvent current = sendingQueue.poll(); if (current != null) { - // Just print the error - try { - current.getAsyncMarker().sendPacket(current); - } catch (IOException e) { - e.printStackTrace(); - } + sendPacket(current); } else { break; } @@ -51,29 +57,23 @@ class PacketSendingQueue { } /** - * Invoked when potentially every packet is finished. + * Transmit a packet, if it hasn't already. + * @param event - the packet to transmit. */ - private void signalPacketUpdates() { - // Transmit as many packets as we can - while (true) { - PacketEvent current = sendingQueue.peek(); - - if (current != null && current.getAsyncMarker().isProcessed()) { - // Just print the error - try { - current.getAsyncMarker().sendPacket(current); - } catch (IOException e) { - e.printStackTrace(); - } - - sendingQueue.poll(); - - } else { - break; - } - } + private void sendPacket(PacketEvent event) { - // And we're done + AsyncMarker marker = event.getAsyncMarker(); + + try { + // Don't send a packet twice + if (marker != null && !marker.isTransmitted()) { + marker.sendPacket(event); + } + + } catch (IOException e) { + // Just print the error + e.printStackTrace(); + } } /** diff --git a/ProtocolLib/src/com/comphenix/protocol/events/PacketEvent.java b/ProtocolLib/src/com/comphenix/protocol/events/PacketEvent.java index 9eedc109..d393b20b 100644 --- a/ProtocolLib/src/com/comphenix/protocol/events/PacketEvent.java +++ b/ProtocolLib/src/com/comphenix/protocol/events/PacketEvent.java @@ -25,7 +25,7 @@ import java.util.EventObject; import org.bukkit.entity.Player; import org.bukkit.event.Cancellable; -import com.comphenix.protocol.async.AsyncPacket; +import com.comphenix.protocol.async.AsyncMarker; public class PacketEvent extends EventObject implements Cancellable { /** @@ -38,7 +38,7 @@ public class PacketEvent extends EventObject implements Cancellable { private boolean serverPacket; private boolean cancel; - private AsyncPacket asyncMarker; + private AsyncMarker asyncMarker; private boolean asynchronous; /** @@ -56,7 +56,7 @@ public class PacketEvent extends EventObject implements Cancellable { this.serverPacket = serverPacket; } - private PacketEvent(PacketEvent origial, AsyncPacket asyncMarker) { + private PacketEvent(PacketEvent origial, AsyncMarker asyncMarker) { super(origial.source); this.packet = origial.packet; this.player = origial.player; @@ -93,7 +93,7 @@ public class PacketEvent extends EventObject implements Cancellable { * @param marker - the asynchronous marker. * @return The new packet event. */ - public static PacketEvent fromSynchronous(PacketEvent event, AsyncPacket marker) { + public static PacketEvent fromSynchronous(PacketEvent event, AsyncMarker marker) { return new PacketEvent(event, marker); } @@ -160,7 +160,7 @@ public class PacketEvent extends EventObject implements Cancellable { * asynchronous event, the marker is used to correctly pass the packet around to the different threads. * @return The current asynchronous marker. */ - public AsyncPacket getAsyncMarker() { + public AsyncMarker getAsyncMarker() { return asyncMarker; } @@ -171,7 +171,7 @@ public class PacketEvent extends EventObject implements Cancellable { * to be processed asynchronously with the given settings. * @param asyncMarker - the new asynchronous marker. */ - public void setAsyncMarker(AsyncPacket asyncMarker) { + public void setAsyncMarker(AsyncMarker asyncMarker) { this.asyncMarker = asyncMarker; }