From 23e676533a749b81a80f64255619e13a51e8c1d5 Mon Sep 17 00:00:00 2001 From: "Kristian S. Stangeland" Date: Sat, 29 Sep 2012 01:00:12 +0200 Subject: [PATCH] Beginning to add support for asynchronous packet listeners. --- .../protocol/async/AsyncFilterManager.java | 17 ++ .../protocol/async/AsyncListener.java | 5 + .../comphenix/protocol/async/AsyncPacket.java | 123 +++++++++++ .../protocol/async/ListenerToken.java | 38 ++++ .../protocol/async/PacketProcessingQueue.java | 74 +++++++ .../AbstractConcurrentListenerMultimap.java | 94 +++++++++ .../injector/ConcurrentListenerMultimap.java | 194 ------------------ .../injector/PacketFilterManager.java | 4 +- .../injector/PrioritizedListener.java | 59 ++++++ .../injector/SortedPacketListenerList.java | 70 +++++++ 10 files changed, 482 insertions(+), 196 deletions(-) create mode 100644 ProtocolLib/src/com/comphenix/protocol/async/AsyncFilterManager.java create mode 100644 ProtocolLib/src/com/comphenix/protocol/async/AsyncListener.java create mode 100644 ProtocolLib/src/com/comphenix/protocol/async/AsyncPacket.java create mode 100644 ProtocolLib/src/com/comphenix/protocol/async/ListenerToken.java create mode 100644 ProtocolLib/src/com/comphenix/protocol/async/PacketProcessingQueue.java create mode 100644 ProtocolLib/src/com/comphenix/protocol/concurrency/AbstractConcurrentListenerMultimap.java delete mode 100644 ProtocolLib/src/com/comphenix/protocol/injector/ConcurrentListenerMultimap.java create mode 100644 ProtocolLib/src/com/comphenix/protocol/injector/PrioritizedListener.java create mode 100644 ProtocolLib/src/com/comphenix/protocol/injector/SortedPacketListenerList.java diff --git a/ProtocolLib/src/com/comphenix/protocol/async/AsyncFilterManager.java b/ProtocolLib/src/com/comphenix/protocol/async/AsyncFilterManager.java new file mode 100644 index 00000000..dc8e972a --- /dev/null +++ b/ProtocolLib/src/com/comphenix/protocol/async/AsyncFilterManager.java @@ -0,0 +1,17 @@ +package com.comphenix.protocol.async; + +import java.util.concurrent.Future; + +/** + * Represents a filter manager for asynchronous packets. + * + * @author Kristian + */ +public class AsyncFilterManager { + + + public Future registerAsyncHandler() { + + } + +} diff --git a/ProtocolLib/src/com/comphenix/protocol/async/AsyncListener.java b/ProtocolLib/src/com/comphenix/protocol/async/AsyncListener.java new file mode 100644 index 00000000..01ef4aba --- /dev/null +++ b/ProtocolLib/src/com/comphenix/protocol/async/AsyncListener.java @@ -0,0 +1,5 @@ +package com.comphenix.protocol.async; + +public interface AsyncListener { + public void onAsyncPacket(AsyncPacket packet); +} diff --git a/ProtocolLib/src/com/comphenix/protocol/async/AsyncPacket.java b/ProtocolLib/src/com/comphenix/protocol/async/AsyncPacket.java new file mode 100644 index 00000000..e14d4073 --- /dev/null +++ b/ProtocolLib/src/com/comphenix/protocol/async/AsyncPacket.java @@ -0,0 +1,123 @@ +package com.comphenix.protocol.async; + +import java.io.Serializable; + +import com.comphenix.protocol.events.PacketEvent; +import com.google.common.primitives.Longs; + +/** + * Represents a packet that is being processed by asynchronous listeners. + * + * @author Kristian + */ +public class AsyncPacket implements Serializable, Comparable { + + /** + * Generated by Eclipse. + */ + private static final long serialVersionUID = -2621498096616187384L; + + /** + * Default number of milliseconds until a packet will rejected. + */ + public static final int DEFAULT_TIMEOUT_DETLA = 60000; + + /** + * The original synchronized packet. + */ + private PacketEvent packetEvent; + + // Timeout handling + private long initialTime; + private long timeout; + + // Packet order + private long originalSendingIndex; + private long newSendingIndex; + + + /** + * Create a container for asyncronous packets. + * @param packetEvent - the synchronous packet event. + * @param initialTime - the current time in milliseconds since 01.01.1970 00:00. + */ + public AsyncPacket(PacketEvent packetEvent, long sendingIndex, long initialTime) { + this.packetEvent = packetEvent; + + // Timeout + this.initialTime = initialTime; + this.timeout = initialTime + DEFAULT_TIMEOUT_DETLA; + + // Sending index + this.originalSendingIndex = sendingIndex; + this.newSendingIndex = sendingIndex; + } + + /** + * Retrieve the time the packet was initially queued for asynchronous processing. + * @return The initial time in number of milliseconds since 01.01.1970 00:00. + */ + public long getInitialTime() { + return initialTime; + } + + /** + * Retrieve the time the packet will be forcefully rejected. + * @return The time to reject the packet, in milliseconds since 01.01.1970 00:00. + */ + public long getTimeout() { + return timeout; + } + + /** + * Sets the time the packet will be forcefully rejected. + * @param timeout - the time to reject the packet, in milliseconds since 01.01.1970 00:00. + */ + public void setTimeout(long timeout) { + this.timeout = timeout; + } + + /** + * Retrieve the order the packet was originally transmitted. + * @return The original packet index. + */ + public long getOriginalSendingIndex() { + return originalSendingIndex; + } + + /** + * Retrieve the desired sending order after processing has completed. + *

+ * Higher sending order means lower priority. + * @return Desired sending order. + */ + public long getNewSendingIndex() { + return newSendingIndex; + } + + /** + * Sets the desired sending order after processing has completed. + *

+ * Higher sending order means lower priority. + * @param newSendingIndex - new packet send index. + */ + public void setNewSendingIndex(long newSendingIndex) { + this.newSendingIndex = newSendingIndex; + } + + /** + * Retrieve the original synchronous packet event. + * @return The original packet event. + */ + public PacketEvent getPacketEvent() { + return packetEvent; + } + + @Override + public int compareTo(AsyncPacket o) { + if (o == null) + return 1; + else + return Longs.compare(getNewSendingIndex(), o.getNewSendingIndex()); + } +} diff --git a/ProtocolLib/src/com/comphenix/protocol/async/ListenerToken.java b/ProtocolLib/src/com/comphenix/protocol/async/ListenerToken.java new file mode 100644 index 00000000..c3a59b3d --- /dev/null +++ b/ProtocolLib/src/com/comphenix/protocol/async/ListenerToken.java @@ -0,0 +1,38 @@ +package com.comphenix.protocol.async; + +import java.util.concurrent.ArrayBlockingQueue; + +public class ListenerToken { + + // Cancel the async handler + private volatile boolean cancelled; + + public boolean isCancelled() { + return cancelled; + } + + /** + * Cancel the handler. + */ + public void cancel() { + cancelled = true; + } + + + public void beginListener(AsyncListener asyncListener) { + + try { + AsyncPacket packet = processingQueue.take(); + + // Now, + asyncListener.onAsyncPacket(packet); + + + } catch (InterruptedException e) { + + } + + + + } +} diff --git a/ProtocolLib/src/com/comphenix/protocol/async/PacketProcessingQueue.java b/ProtocolLib/src/com/comphenix/protocol/async/PacketProcessingQueue.java new file mode 100644 index 00000000..620e9b3a --- /dev/null +++ b/ProtocolLib/src/com/comphenix/protocol/async/PacketProcessingQueue.java @@ -0,0 +1,74 @@ +package com.comphenix.protocol.async; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Semaphore; + +import com.comphenix.protocol.concurrency.SortedCopyOnWriteArray; + +/** + * Handles the processing of a certain packet type. + * + * @author Kristian + */ +class PacketProcessingQueue { + + + /** + * Default maximum number of packets to process concurrently. + */ + public static final int DEFAULT_MAXIMUM_CONCURRENCY = 5; + + /** + * Default maximum number of packets to queue for processing. + */ + public static final int DEFAULT_QUEUE_LIMIT = 1024 * 60; + + /** + * Number of packets we're processing concurrently. + */ + private final int maximumConcurrency; + private Semaphore concurrentProcessing; + + // Queued packets for being processed + private ArrayBlockingQueue processingQueue; + + // Packet listeners + private SortedCopyOnWriteArray<> + + public PacketProcessingQueue() { + this(DEFAULT_QUEUE_LIMIT, DEFAULT_MAXIMUM_CONCURRENCY); + } + + public PacketProcessingQueue(int queueLimit, int maximumConcurrency) { + this.processingQueue = new ArrayBlockingQueue(queueLimit); + this.maximumConcurrency = maximumConcurrency; + this.concurrentProcessing = new Semaphore(maximumConcurrency); + } + + public boolean queuePacket(AsyncPacket packet) { + try { + processingQueue.add(packet); + + // Begin processing packets + processPacket(); + return true; + } catch (IllegalStateException e) { + return false; + } + } + + public void processPacket() { + if (concurrentProcessing.tryAcquire()) { + AsyncPacket packet = processingQueue.poll(); + + // Any packet queued? + if (packet != null) { + + } + } + } + + public int getMaximumConcurrency() { + return maximumConcurrency; + } +} diff --git a/ProtocolLib/src/com/comphenix/protocol/concurrency/AbstractConcurrentListenerMultimap.java b/ProtocolLib/src/com/comphenix/protocol/concurrency/AbstractConcurrentListenerMultimap.java new file mode 100644 index 00000000..2fd5a1f3 --- /dev/null +++ b/ProtocolLib/src/com/comphenix/protocol/concurrency/AbstractConcurrentListenerMultimap.java @@ -0,0 +1,94 @@ +package com.comphenix.protocol.concurrency; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import com.comphenix.protocol.events.ListeningWhitelist; +import com.comphenix.protocol.injector.PrioritizedListener; + +/** + * A thread-safe implementation of a listener multimap. + * + * @author Kristian + */ +public abstract class AbstractConcurrentListenerMultimap { + + // The core of our map + protected ConcurrentMap>> listeners = + new ConcurrentHashMap>>(); + + /** + * Adds a listener to its requested list of packet recievers. + * @param listener - listener with a list of packets to recieve notifcations for. + * @param whitelist - the packet whitelist to use. + */ + public void addListener(TListener listener, ListeningWhitelist whitelist) { + + PrioritizedListener prioritized = new PrioritizedListener(listener, whitelist.getPriority()); + + for (Integer packetID : whitelist.getWhitelist()) { + addListener(packetID, prioritized); + } + } + + // Add the listener to a specific packet notifcation list + private void addListener(Integer packetID, PrioritizedListener listener) { + + SortedCopyOnWriteArray> list = listeners.get(packetID); + + // We don't want to create this for every lookup + if (list == null) { + // It would be nice if we could use a PriorityBlockingQueue, but it doesn't preseve iterator order, + // which is a essential feature for our purposes. + final SortedCopyOnWriteArray> value = new SortedCopyOnWriteArray>(); + + list = listeners.putIfAbsent(packetID, value); + + // We may end up creating multiple multisets, but we'll agree + // on the one to use. + if (list == null) { + list = value; + } + } + + // Thread safe + list.add(listener); + } + + /** + * Removes the given listener from the packet event list. + * @param listener - listener to remove. + * @param whitelist - the packet whitelist that was used. + * @return Every packet ID that was removed due to no listeners. + */ + public List removeListener(TListener listener, ListeningWhitelist whitelist) { + + List removedPackets = new ArrayList(); + + // Again, not terribly efficient. But adding or removing listeners should be a rare event. + for (Integer packetID : whitelist.getWhitelist()) { + + SortedCopyOnWriteArray> list = listeners.get(packetID); + + // Remove any listeners + if (list != null) { + // Don't remove from newly created lists + if (list.size() > 0) { + // Remove this listener. Note that priority is generally ignored. + list.remove(new PrioritizedListener(listener, whitelist.getPriority())); + + if (list.size() == 0) { + listeners.remove(packetID); + removedPackets.add(packetID); + } + } + } + + // Move on to the next + } + + return removedPackets; + } +} diff --git a/ProtocolLib/src/com/comphenix/protocol/injector/ConcurrentListenerMultimap.java b/ProtocolLib/src/com/comphenix/protocol/injector/ConcurrentListenerMultimap.java deleted file mode 100644 index 385e4d8c..00000000 --- a/ProtocolLib/src/com/comphenix/protocol/injector/ConcurrentListenerMultimap.java +++ /dev/null @@ -1,194 +0,0 @@ -package com.comphenix.protocol.injector; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.logging.Level; -import java.util.logging.Logger; - -import com.comphenix.protocol.concurrency.SortedCopyOnWriteArray; -import com.comphenix.protocol.events.ListenerPriority; -import com.comphenix.protocol.events.ListeningWhitelist; -import com.comphenix.protocol.events.PacketAdapter; -import com.comphenix.protocol.events.PacketEvent; -import com.comphenix.protocol.events.PacketListener; -import com.google.common.base.Objects; -import com.google.common.primitives.Ints; - -/** - * A thread-safe implementation of a listener multimap. - * - * @author Kristian - */ -public class ConcurrentListenerMultimap { - - // The core of our map - protected ConcurrentMap> listeners = - new ConcurrentHashMap>(); - - /** - * Adds a listener to its requested list of packet recievers. - * @param listener - listener with a list of packets to recieve notifcations for. - * @param whitelist - the packet whitelist to use. - */ - public void addListener(PacketListener listener, ListeningWhitelist whitelist) { - - PrioritizedListener prioritized = new PrioritizedListener(listener, whitelist.getPriority()); - - for (Integer packetID : whitelist.getWhitelist()) { - addListener(packetID, prioritized); - } - } - - // Add the listener to a specific packet notifcation list - private void addListener(Integer packetID, PrioritizedListener listener) { - - SortedCopyOnWriteArray list = listeners.get(packetID); - - // We don't want to create this for every lookup - if (list == null) { - // It would be nice if we could use a PriorityBlockingQueue, but it doesn't preseve iterator order, - // which is a essential feature for our purposes. - final SortedCopyOnWriteArray value = new SortedCopyOnWriteArray(); - - list = listeners.putIfAbsent(packetID, value); - - // We may end up creating multiple multisets, but we'll agree - // on the one to use. - if (list == null) { - list = value; - } - } - - // Thread safe - list.add(listener); - } - - /** - * Removes the given listener from the packet event list. - * @param listener - listener to remove. - * @param whitelist - the packet whitelist that was used. - * @return Every packet ID that was removed due to no listeners. - */ - public List removeListener(PacketListener listener, ListeningWhitelist whitelist) { - - List removedPackets = new ArrayList(); - - // Again, not terribly efficient. But adding or removing listeners should be a rare event. - for (Integer packetID : whitelist.getWhitelist()) { - - SortedCopyOnWriteArray list = listeners.get(packetID); - - // Remove any listeners - if (list != null) { - // Don't remove from newly created lists - if (list.size() > 0) { - // Remove this listener. Note that priority is generally ignored. - list.remove(new PrioritizedListener(listener, whitelist.getPriority())); - - if (list.size() == 0) { - listeners.remove(packetID); - removedPackets.add(packetID); - } - } - } - - // Move on to the next - } - - return removedPackets; - } - - /** - * Invokes the given packet event for every registered listener. - * @param logger - the logger that will be used to inform about listener exceptions. - * @param event - the packet event to invoke. - */ - public void invokePacketRecieving(Logger logger, PacketEvent event) { - SortedCopyOnWriteArray list = listeners.get(event.getPacketID()); - - if (list == null) - return; - - // We have to be careful. Cannot modify the underlying list when sending notifications. - synchronized (list) { - for (PrioritizedListener element : list) { - try { - element.getListener().onPacketReceiving(event); - } catch (Throwable e) { - // Minecraft doesn't want your Exception. - logger.log(Level.SEVERE, - "Exception occured in onPacketReceiving() for " + - PacketAdapter.getPluginName(element.getListener()), e); - } - } - } - } - - /** - * Invokes the given packet event for every registered listener. - * @param logger - the logger that will be used to inform about listener exceptions. - * @param event - the packet event to invoke. - */ - public void invokePacketSending(Logger logger, PacketEvent event) { - SortedCopyOnWriteArray list = listeners.get(event.getPacketID()); - - if (list == null) - return; - - synchronized (list) { - for (PrioritizedListener element : list) { - try { - element.getListener().onPacketSending(event); - } catch (Throwable e) { - // Minecraft doesn't want your Exception. - logger.log(Level.SEVERE, - "Exception occured in onPacketReceiving() for " + - PacketAdapter.getPluginName(element.getListener()), e); - } - } - } - } - - /** - * A listener with an associated priority. - */ - private class PrioritizedListener implements Comparable { - private PacketListener listener; - private ListenerPriority priority; - - public PrioritizedListener(PacketListener listener, ListenerPriority priority) { - this.listener = listener; - this.priority = priority; - } - - @Override - public int compareTo(PrioritizedListener other) { - // This ensures that lower priority listeners are executed first - return Ints.compare(this.getPriority().getSlot(), - other.getPriority().getSlot()); - } - - // Note that this equals() method is NOT consistent with compareTo(). - // But, it's a private class so who cares. - @Override - public boolean equals(Object obj) { - // We only care about the listener - priority itself should not make a difference - if(obj instanceof PrioritizedListener){ - final PrioritizedListener other = (PrioritizedListener) obj; - return Objects.equal(listener, other.listener); - } else { - return false; - } - } - - public PacketListener getListener() { - return listener; - } - - public ListenerPriority getPriority() { - return priority; - } - } -} diff --git a/ProtocolLib/src/com/comphenix/protocol/injector/PacketFilterManager.java b/ProtocolLib/src/com/comphenix/protocol/injector/PacketFilterManager.java index 05fcee54..f4084dbb 100644 --- a/ProtocolLib/src/com/comphenix/protocol/injector/PacketFilterManager.java +++ b/ProtocolLib/src/com/comphenix/protocol/injector/PacketFilterManager.java @@ -97,8 +97,8 @@ public final class PacketFilterManager implements ProtocolManager { private Set sendingFilters = Collections.newSetFromMap(new ConcurrentHashMap()); // The two listener containers - private ConcurrentListenerMultimap recievedListeners = new ConcurrentListenerMultimap(); - private ConcurrentListenerMultimap sendingListeners = new ConcurrentListenerMultimap(); + private SortedPacketListenerList recievedListeners = new SortedPacketListenerList(); + private SortedPacketListenerList sendingListeners = new SortedPacketListenerList(); // Whether or not this class has been closed private boolean hasClosed; diff --git a/ProtocolLib/src/com/comphenix/protocol/injector/PrioritizedListener.java b/ProtocolLib/src/com/comphenix/protocol/injector/PrioritizedListener.java new file mode 100644 index 00000000..017aadea --- /dev/null +++ b/ProtocolLib/src/com/comphenix/protocol/injector/PrioritizedListener.java @@ -0,0 +1,59 @@ +package com.comphenix.protocol.injector; + +import com.comphenix.protocol.events.ListenerPriority; +import com.google.common.base.Objects; +import com.google.common.primitives.Ints; + +/** + * Represents a listener with a priority. + * + * @author Kristian + */ +public class PrioritizedListener implements Comparable> { + + private TListener listener; + private ListenerPriority priority; + + public PrioritizedListener(TListener listener, ListenerPriority priority) { + this.listener = listener; + this.priority = priority; + } + + @Override + public int compareTo(PrioritizedListener other) { + // This ensures that lower priority listeners are executed first + return Ints.compare( + this.getPriority().getSlot(), + other.getPriority().getSlot()); + } + + // Note that this equals() method is NOT consistent with compareTo(). + // But, it's a private class so who cares. + @SuppressWarnings("unchecked") + @Override + public boolean equals(Object obj) { + // We only care about the listener - priority itself should not make a difference + if(obj instanceof PrioritizedListener){ + final PrioritizedListener other = (PrioritizedListener) obj; + return Objects.equal(listener, other.listener); + } else { + return false; + } + } + + /** + * Retrieve the underlying listener. + * @return Underlying listener. + */ + public TListener getListener() { + return listener; + } + + /** + * Retrieve the priority of this listener. + * @return Listener priority. + */ + public ListenerPriority getPriority() { + return priority; + } +} diff --git a/ProtocolLib/src/com/comphenix/protocol/injector/SortedPacketListenerList.java b/ProtocolLib/src/com/comphenix/protocol/injector/SortedPacketListenerList.java new file mode 100644 index 00000000..41c80e70 --- /dev/null +++ b/ProtocolLib/src/com/comphenix/protocol/injector/SortedPacketListenerList.java @@ -0,0 +1,70 @@ +package com.comphenix.protocol.injector; + +import java.util.logging.Level; +import java.util.logging.Logger; + +import com.comphenix.protocol.concurrency.AbstractConcurrentListenerMultimap; +import com.comphenix.protocol.concurrency.SortedCopyOnWriteArray; +import com.comphenix.protocol.events.PacketAdapter; +import com.comphenix.protocol.events.PacketEvent; +import com.comphenix.protocol.events.PacketListener; + +/** + * A thread-safe implementation of a listener multimap. + * + * @author Kristian + */ +class SortedPacketListenerList extends AbstractConcurrentListenerMultimap { + + /** + * Invokes the given packet event for every registered listener. + * @param logger - the logger that will be used to inform about listener exceptions. + * @param event - the packet event to invoke. + */ + public void invokePacketRecieving(Logger logger, PacketEvent event) { + SortedCopyOnWriteArray> list = listeners.get(event.getPacketID()); + + if (list == null) + return; + + // We have to be careful. Cannot modify the underlying list when sending notifications. + synchronized (list) { + for (PrioritizedListener element : list) { + try { + element.getListener().onPacketReceiving(event); + } catch (Throwable e) { + // Minecraft doesn't want your Exception. + logger.log(Level.SEVERE, + "Exception occured in onPacketReceiving() for " + + PacketAdapter.getPluginName(element.getListener()), e); + } + } + } + } + + /** + * Invokes the given packet event for every registered listener. + * @param logger - the logger that will be used to inform about listener exceptions. + * @param event - the packet event to invoke. + */ + public void invokePacketSending(Logger logger, PacketEvent event) { + SortedCopyOnWriteArray> list = listeners.get(event.getPacketID()); + + if (list == null) + return; + + synchronized (list) { + for (PrioritizedListener element : list) { + try { + element.getListener().onPacketSending(event); + } catch (Throwable e) { + // Minecraft doesn't want your Exception. + logger.log(Level.SEVERE, + "Exception occured in onPacketReceiving() for " + + PacketAdapter.getPluginName(element.getListener()), e); + } + } + } + } + +}