diff --git a/ProtocolLib/src/com/comphenix/protocol/async/AsyncFilterManager.java b/ProtocolLib/src/com/comphenix/protocol/async/AsyncFilterManager.java index 9e6e6886..ecc841d0 100644 --- a/ProtocolLib/src/com/comphenix/protocol/async/AsyncFilterManager.java +++ b/ProtocolLib/src/com/comphenix/protocol/async/AsyncFilterManager.java @@ -53,6 +53,7 @@ public class AsyncFilterManager implements AsynchronousManager { // Server packets are synchronized already this.serverQueue = new PacketSendingQueue(false); + // Client packets must be synchronized this.clientQueue = new PacketSendingQueue(true); diff --git a/ProtocolLib/src/com/comphenix/protocol/async/PacketEventHolder.java b/ProtocolLib/src/com/comphenix/protocol/async/PacketEventHolder.java new file mode 100644 index 00000000..429d5caf --- /dev/null +++ b/ProtocolLib/src/com/comphenix/protocol/async/PacketEventHolder.java @@ -0,0 +1,40 @@ +package com.comphenix.protocol.async; + +import com.comphenix.protocol.events.PacketEvent; +import com.google.common.base.Preconditions; +import com.google.common.collect.ComparisonChain; + +/** + * Provides a comparable to a packet event. + * + * @author Kristian + */ +class PacketEventHolder implements Comparable { + + private PacketEvent event; + + /** + * A wrapper that ensures the packet event is ordered by sending index. + * @param event - packet event to wrap. + */ + public PacketEventHolder(PacketEvent event) { + this.event = Preconditions.checkNotNull(event, "Event must be non-null"); + } + + /** + * Retrieve the stored event. + * @return The stored event. + */ + public PacketEvent getEvent() { + return event; + } + + @Override + public int compareTo(PacketEventHolder other) { + AsyncMarker marker = other != null ? other.getEvent().getAsyncMarker() : null; + + return ComparisonChain.start(). + compare(event.getAsyncMarker(), marker). + result(); + } +} diff --git a/ProtocolLib/src/com/comphenix/protocol/async/PacketProcessingQueue.java b/ProtocolLib/src/com/comphenix/protocol/async/PacketProcessingQueue.java index 37349e83..02a3f93f 100644 --- a/ProtocolLib/src/com/comphenix/protocol/async/PacketProcessingQueue.java +++ b/ProtocolLib/src/com/comphenix/protocol/async/PacketProcessingQueue.java @@ -2,12 +2,14 @@ package com.comphenix.protocol.async; import java.util.Collection; import java.util.Iterator; -import java.util.concurrent.ArrayBlockingQueue; +import java.util.Queue; import java.util.concurrent.Semaphore; import com.comphenix.protocol.concurrency.AbstractConcurrentListenerMultimap; import com.comphenix.protocol.events.PacketEvent; import com.comphenix.protocol.injector.PrioritizedListener; +import com.google.common.collect.MinMaxPriorityQueue; + /** * Handles the processing of every packet type. @@ -16,6 +18,9 @@ import com.comphenix.protocol.injector.PrioritizedListener; */ class PacketProcessingQueue extends AbstractConcurrentListenerMultimap { + // Initial number of elements + public static final int INITIAL_CAPACITY = 64; + /** * Default maximum number of packets to process concurrently. */ @@ -33,18 +38,23 @@ class PacketProcessingQueue extends AbstractConcurrentListenerMultimap processingQueue; + private Queue processingQueue; // Packets for sending private PacketSendingQueue sendingQueue; public PacketProcessingQueue(PacketSendingQueue sendingQueue) { - this(sendingQueue, DEFAULT_QUEUE_LIMIT, DEFAULT_MAXIMUM_CONCURRENCY); + this(sendingQueue, INITIAL_CAPACITY, DEFAULT_QUEUE_LIMIT, DEFAULT_MAXIMUM_CONCURRENCY); } - public PacketProcessingQueue(PacketSendingQueue sendingQueue, int queueLimit, int maximumConcurrency) { + public PacketProcessingQueue(PacketSendingQueue sendingQueue, int initialSize, int maximumSize, int maximumConcurrency) { super(); - this.processingQueue = new ArrayBlockingQueue(queueLimit); + + this.processingQueue = Synchronization.queue(MinMaxPriorityQueue. + expectedSize(initialSize). + maximumSize(maximumSize). + create(), null); + this.maximumConcurrency = maximumConcurrency; this.concurrentProcessing = new Semaphore(maximumConcurrency); this.sendingQueue = sendingQueue; @@ -58,7 +68,7 @@ class PacketProcessingQueue extends AbstractConcurrentListenerMultimap> list = getListener(packet.getPacketID()); + if (holder != null) { + PacketEvent packet = holder.getEvent(); AsyncMarker marker = packet.getAsyncMarker(); + Collection> list = getListener(packet.getPacketID()); // Yes, removing the marker will cause the chain to stop if (list != null) { diff --git a/ProtocolLib/src/com/comphenix/protocol/async/PacketSendingQueue.java b/ProtocolLib/src/com/comphenix/protocol/async/PacketSendingQueue.java index 500cf2c3..d2e9861e 100644 --- a/ProtocolLib/src/com/comphenix/protocol/async/PacketSendingQueue.java +++ b/ProtocolLib/src/com/comphenix/protocol/async/PacketSendingQueue.java @@ -1,7 +1,6 @@ package com.comphenix.protocol.async; import java.io.IOException; -import java.util.Comparator; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -9,7 +8,6 @@ import java.util.concurrent.PriorityBlockingQueue; import com.comphenix.protocol.events.PacketEvent; import com.comphenix.protocol.reflect.FieldAccessException; -import com.google.common.collect.ComparisonChain; /** * Represents packets ready to be transmitted to a client. @@ -17,9 +15,9 @@ import com.google.common.collect.ComparisonChain; */ class PacketSendingQueue { - private static final int INITIAL_CAPACITY = 64; + public static final int INITIAL_CAPACITY = 64; - private PriorityBlockingQueue sendingQueue; + private PriorityBlockingQueue sendingQueue; // Whether or not packet transmission can only occur on the main thread private final boolean synchronizeMain; @@ -29,16 +27,8 @@ class PacketSendingQueue { * @param synchronizeMain - whether or not to synchronize with the main thread. */ public PacketSendingQueue(boolean synchronizeMain) { + this.sendingQueue = new PriorityBlockingQueue(INITIAL_CAPACITY); this.synchronizeMain = synchronizeMain; - this.sendingQueue = new PriorityBlockingQueue(INITIAL_CAPACITY, new Comparator() { - // Compare using the async marker - @Override - public int compare(PacketEvent o1, PacketEvent o2) { - return ComparisonChain.start(). - compare(o1.getAsyncMarker(), o2.getAsyncMarker()). - result(); - } - }); } /** @@ -46,7 +36,7 @@ class PacketSendingQueue { * @param packet */ public void enqueue(PacketEvent packet) { - sendingQueue.add(packet); + sendingQueue.add(new PacketEventHolder(packet)); } /** @@ -70,7 +60,9 @@ class PacketSendingQueue { Set lookup = new HashSet(packetsRemoved); // Note that this is O(n), so it might be expensive - for (PacketEvent event : sendingQueue) { + for (PacketEventHolder holder : sendingQueue) { + PacketEvent event = holder.getEvent(); + if (lookup.contains(event.getPacketID())) { event.getAsyncMarker().setProcessed(true); } @@ -88,9 +80,10 @@ class PacketSendingQueue { // Transmit as many packets as we can while (true) { - PacketEvent current = sendingQueue.peek(); + PacketEventHolder holder = sendingQueue.peek(); - if (current != null) { + if (holder != null) { + PacketEvent current = holder.getEvent(); AsyncMarker marker = current.getAsyncMarker(); // Abort if we're not on the main thread @@ -129,10 +122,10 @@ class PacketSendingQueue { */ private void forceSend() { while (true) { - PacketEvent current = sendingQueue.poll(); + PacketEventHolder holder = sendingQueue.poll(); - if (current != null) { - sendPacket(current); + if (holder != null) { + sendPacket(holder.getEvent()); } else { break; } diff --git a/ProtocolLib/src/com/comphenix/protocol/async/Synchronization.java b/ProtocolLib/src/com/comphenix/protocol/async/Synchronization.java new file mode 100644 index 00000000..71e949e7 --- /dev/null +++ b/ProtocolLib/src/com/comphenix/protocol/async/Synchronization.java @@ -0,0 +1,211 @@ +package com.comphenix.protocol.async; + +import java.io.Serializable; +import java.util.Collection; +import java.util.Iterator; +import java.util.Queue; + +import javax.annotation.Nullable; + +import com.google.common.base.Preconditions; + +/** + * Synchronization views copied from Google Guava. + * + * @author Kristian + */ +class Synchronization { + + /** + * Create a synchronized wrapper for the given queue. + *

+ * This wrapper cannot synchronize the iterator(). Callers are expected + * to synchronize iterators manually. + * @param queue - the queue to synchronize. + * @param mutex - synchronization mutex, or NULL to use the queue. + * @return A synchronization wrapper. + */ + public static Queue queue(Queue queue, @Nullable Object mutex) { + return (queue instanceof SynchronizedQueue) ? + queue : + new SynchronizedQueue(queue, mutex); + } + + private static class SynchronizedObject implements Serializable { + private static final long serialVersionUID = -4408866092364554628L; + + final Object delegate; + final Object mutex; + + SynchronizedObject(Object delegate, @Nullable Object mutex) { + this.delegate = Preconditions.checkNotNull(delegate); + this.mutex = (mutex == null) ? this : mutex; + } + + Object delegate() { + return delegate; + } + + // No equals and hashCode; see ForwardingObject for details. + + @Override + public String toString() { + synchronized (mutex) { + return delegate.toString(); + } + } + } + + private static class SynchronizedCollection extends SynchronizedObject implements Collection { + private static final long serialVersionUID = 5440572373531285692L; + + private SynchronizedCollection(Collection delegate, + @Nullable Object mutex) { + super(delegate, mutex); + } + + @SuppressWarnings("unchecked") + @Override + Collection delegate() { + return (Collection) super.delegate(); + } + + @Override + public boolean add(E e) { + synchronized (mutex) { + return delegate().add(e); + } + } + + @Override + public boolean addAll(Collection c) { + synchronized (mutex) { + return delegate().addAll(c); + } + } + + @Override + public void clear() { + synchronized (mutex) { + delegate().clear(); + } + } + + @Override + public boolean contains(Object o) { + synchronized (mutex) { + return delegate().contains(o); + } + } + + @Override + public boolean containsAll(Collection c) { + synchronized (mutex) { + return delegate().containsAll(c); + } + } + + @Override + public boolean isEmpty() { + synchronized (mutex) { + return delegate().isEmpty(); + } + } + + @Override + public Iterator iterator() { + return delegate().iterator(); // manually synchronized + } + + @Override + public boolean remove(Object o) { + synchronized (mutex) { + return delegate().remove(o); + } + } + + @Override + public boolean removeAll(Collection c) { + synchronized (mutex) { + return delegate().removeAll(c); + } + } + + @Override + public boolean retainAll(Collection c) { + synchronized (mutex) { + return delegate().retainAll(c); + } + } + + @Override + public int size() { + synchronized (mutex) { + return delegate().size(); + } + } + + @Override + public Object[] toArray() { + synchronized (mutex) { + return delegate().toArray(); + } + } + + @Override + public T[] toArray(T[] a) { + synchronized (mutex) { + return delegate().toArray(a); + } + } + + } + + private static class SynchronizedQueue extends SynchronizedCollection implements Queue { + private static final long serialVersionUID = 1961791630386791902L; + + SynchronizedQueue(Queue delegate, @Nullable Object mutex) { + super(delegate, mutex); + } + + @Override + Queue delegate() { + return (Queue) super.delegate(); + } + + @Override + public E element() { + synchronized (mutex) { + return delegate().element(); + } + } + + @Override + public boolean offer(E e) { + synchronized (mutex) { + return delegate().offer(e); + } + } + + @Override + public E peek() { + synchronized (mutex) { + return delegate().peek(); + } + } + + @Override + public E poll() { + synchronized (mutex) { + return delegate().poll(); + } + } + + @Override + public E remove() { + synchronized (mutex) { + return delegate().remove(); + } + } + } +}