diff --git a/ProtocolLib/src/main/java/com/comphenix/protocol/async/AsyncFilterManager.java b/ProtocolLib/src/main/java/com/comphenix/protocol/async/AsyncFilterManager.java index 44f49bba..5fe61857 100644 --- a/ProtocolLib/src/main/java/com/comphenix/protocol/async/AsyncFilterManager.java +++ b/ProtocolLib/src/main/java/com/comphenix/protocol/async/AsyncFilterManager.java @@ -56,29 +56,36 @@ public class AsyncFilterManager implements AsynchronousManager { private PacketProcessingQueue clientProcessingQueue; // Sending queues - private PlayerSendingHandler playerSendingHandler; + private final PlayerSendingHandler playerSendingHandler; // Report exceptions - private ErrorReporter reporter; + private final ErrorReporter reporter; // The likely main thread - private Thread mainThread; + private final Thread mainThread; // Default scheduler - private BukkitScheduler scheduler; + private final BukkitScheduler scheduler; // Our protocol manager - private ProtocolManager manager; + private final ProtocolManager manager; // Current packet index - private AtomicInteger currentSendingIndex = new AtomicInteger(); + private final AtomicInteger currentSendingIndex = new AtomicInteger(); + /** + * Initialize a asynchronous filter manager. + *

+ * Internal method. Retrieve the global asynchronous manager from the protocol manager instead. + * @param reporter - desired error reporter. + * @param scheduler - task scheduler. + * @param manager - protocol manager. + */ public AsyncFilterManager(ErrorReporter reporter, BukkitScheduler scheduler, ProtocolManager manager) { - // Initialize timeout listeners - serverTimeoutListeners = new SortedPacketListenerList(); - clientTimeoutListeners = new SortedPacketListenerList(); - timeoutListeners = Sets.newSetFromMap(new ConcurrentHashMap()); + this.serverTimeoutListeners = new SortedPacketListenerList(); + this.clientTimeoutListeners = new SortedPacketListenerList(); + this.timeoutListeners = Sets.newSetFromMap(new ConcurrentHashMap()); this.playerSendingHandler = new PlayerSendingHandler(reporter, serverTimeoutListeners, clientTimeoutListeners); this.serverProcessingQueue = new PacketProcessingQueue(playerSendingHandler); @@ -263,12 +270,11 @@ public class AsyncFilterManager implements AsynchronousManager { } /** - * Used to create a default asynchronous task. - * @param plugin - the calling plugin. - * @param runnable - the runnable. + * Retrieve the current task scheduler. + * @return Current task scheduler. */ - public void scheduleAsyncTask(Plugin plugin, Runnable runnable) { - scheduler.scheduleAsyncDelayedTask(plugin, runnable); + public BukkitScheduler getScheduler() { + return scheduler; } @Override diff --git a/ProtocolLib/src/main/java/com/comphenix/protocol/async/AsyncListenerHandler.java b/ProtocolLib/src/main/java/com/comphenix/protocol/async/AsyncListenerHandler.java index 0b1ef3ba..0ccb719c 100644 --- a/ProtocolLib/src/main/java/com/comphenix/protocol/async/AsyncListenerHandler.java +++ b/ProtocolLib/src/main/java/com/comphenix/protocol/async/AsyncListenerHandler.java @@ -20,6 +20,7 @@ package com.comphenix.protocol.async; import java.util.HashSet; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -75,10 +76,19 @@ public class AsyncListenerHandler { private final Set stoppedTasks = new HashSet(); private final Object stopLock = new Object(); + // Processing task on the main thread + private int syncTask = -1; + // Minecraft main thread private Thread mainThread; - public AsyncListenerHandler(Thread mainThread, AsyncFilterManager filterManager, PacketListener listener) { + /** + * Construct a manager for an asynchronous packet handler. + * @param mainThread - the main game thread. + * @param filterManager - the parent filter manager. + * @param listener - the current packet listener. + */ + AsyncListenerHandler(Thread mainThread, AsyncFilterManager filterManager, PacketListener listener) { if (filterManager == null) throw new IllegalArgumentException("filterManager cannot be NULL"); if (listener == null) @@ -89,10 +99,18 @@ public class AsyncListenerHandler { this.listener = listener; } + /** + * Determine whether or not this asynchronous handler has been cancelled. + * @return TRUE if it has been cancelled/stopped, FALSE otherwise. + */ public boolean isCancelled() { return cancelled; } + /** + * Retrieve the current asynchronous packet listener. + * @return Current packet listener. + */ public PacketListener getAsyncListener() { return listener; } @@ -223,7 +241,7 @@ public class AsyncListenerHandler { final AsyncRunnable listenerLoop = getListenerLoop(); - filterManager.scheduleAsyncTask(listener.getPlugin(), new Runnable() { + filterManager.getScheduler().scheduleAsyncDelayedTask(listener.getPlugin(), new Runnable() { @Override public void run() { Thread thread = Thread.currentThread(); @@ -271,7 +289,7 @@ public class AsyncListenerHandler { final AsyncRunnable listenerLoop = getListenerLoop(); final Function delegateCopy = executor; - filterManager.scheduleAsyncTask(listener.getPlugin(), new Runnable() { + filterManager.getScheduler().scheduleAsyncDelayedTask(listener.getPlugin(), new Runnable() { @Override public void run() { delegateCopy.apply(listenerLoop); @@ -308,6 +326,104 @@ public class AsyncListenerHandler { return Joiner.on(", ").join(whitelist.getWhitelist()); } + /** + * Start processing packets on the main thread. + *

+ * This is useful if you need to synchronize with the main thread in your packet listener, but + * you're not performing any expensive processing. + *

+ * Note: Use a asynchronous worker if the packet listener may use more than 0.5 ms + * of processing time on a single packet. Do as much as possible on the worker thread, and schedule synchronous tasks + * to use the Bukkit API instead. + * @return TRUE if the synchronized processing was successfully started, FALSE if it's already running. + * @throws IllegalStateException If we couldn't start the underlying task. + */ + public synchronized boolean syncStart() { + return syncStart(500, TimeUnit.MICROSECONDS); + } + + /** + * Start processing packets on the main thread. + *

+ * This is useful if you need to synchronize with the main thread in your packet listener, but + * you're not performing any expensive processing. + *

+ * The processing time parameter gives the upper bound for the amount of time spent processing pending packets. + * It should be set to a fairly low number, such as 0.5 ms or 1% of a game tick - to reduce the impact + * on the main thread. Never go beyond 50 milliseconds. + *

+ * Note: Use a asynchronous worker if the packet listener may exceed the ideal processing time + * on a single packet. Do as much as possible on the worker thread, and schedule synchronous tasks + * to use the Bukkit API instead. + * + * @param time - the amount of processing time alloted per game tick (20 ticks per second). + * @param unit - the unit of the processingTime argument. + * @return TRUE if the synchronized processing was successfully started, FALSE if it's already running. + * @throws IllegalStateException If we couldn't start the underlying task. + */ + public synchronized boolean syncStart(final long time, final TimeUnit unit) { + if (time <= 0) + throw new IllegalArgumentException("Time must be greater than zero."); + if (unit == null) + throw new IllegalArgumentException("TimeUnit cannot be NULL."); + + final long tickDelay = 1; + final int workerID = nextID.incrementAndGet(); + + if (syncTask < 0) { + syncTask = filterManager.getScheduler().scheduleSyncRepeatingTask(getPlugin(), new Runnable() { + @Override + public void run() { + long stopTime = System.nanoTime() + unit.convert(time, TimeUnit.NANOSECONDS); + + while (!cancelled) { + PacketEvent packet = queuedPackets.poll(); + + if (packet == INTERUPT_PACKET || packet == WAKEUP_PACKET) { + // Sorry, asynchronous threads! + queuedPackets.add(packet); + + // Try again next tick + break; + } else if (packet != null && packet.getAsyncMarker() != null) { + processPacket(workerID, packet, "onSyncPacket()"); + } else { + // No more packets left - wait a tick + break; + } + + // Check time here, ensuring that we at least process one packet + if (System.nanoTime() < stopTime) + break; + } + } + }, tickDelay, tickDelay); + + // This is very bad - force the caller to handle it + if (syncTask < 0) + throw new IllegalStateException("Cannot start synchronous task."); + else + return true; + } else { + return false; + } + } + + /** + * Stop processing packets on the main thread. + * @return TRUE if we stopped any processing tasks, FALSE if it has already been stopped. + */ + public synchronized boolean syncStop() { + if (syncTask > 0) { + filterManager.getScheduler().cancelTask(syncTask); + + syncTask = -1; + return true; + } else { + return false; + } + } + /** * Start multiple worker threads for this listener. * @param count - number of worker threads to start. @@ -386,9 +502,13 @@ public class AsyncListenerHandler { } } - // DO NOT call this method from the main thread + /** + * The main processing loop of asynchronous threads. + *

+ * Note: DO NOT call this method from the main thread + * @param workerID - the current worker ID. + */ private void listenerLoop(int workerID) { - // Danger, danger! if (Thread.currentThread().getId() == mainThread.getId()) throw new IllegalStateException("Do not call this method from the main thread."); @@ -403,16 +523,11 @@ public class AsyncListenerHandler { // Proceed started.incrementAndGet(); - mainLoop: while (!cancelled) { PacketEvent packet = queuedPackets.take(); - AsyncMarker marker = packet.getAsyncMarker(); // Handle cancel requests - if (packet == null || marker == null || packet == INTERUPT_PACKET) { - return; - - } else if (packet == WAKEUP_PACKET) { + if (packet == WAKEUP_PACKET) { // This is a bit slow, but it should be safe synchronized (stopLock) { // Are we the one who is supposed to stop? @@ -421,42 +536,13 @@ public class AsyncListenerHandler { if (waitForStops()) return; } + } else if (packet == INTERUPT_PACKET) { + return; } - // Here's the core of the asynchronous processing - try { - marker.setListenerHandler(this); - marker.setWorkerID(workerID); - - synchronized (marker.getProcessingLock()) { - if (packet.isServerPacket()) - listener.onPacketSending(packet); - else - listener.onPacketReceiving(packet); - } - - } catch (Throwable e) { - // Minecraft doesn't want your Exception. - filterManager.getErrorReporter().reportMinimal(listener.getPlugin(), "onAsyncPacket()", e); + if (packet != null && packet.getAsyncMarker() == null) { + processPacket(workerID, packet, "onAsyncPacket()"); } - - // Now, get the next non-cancelled listener - if (!marker.hasExpired()) { - for (; marker.getListenerTraversal().hasNext(); ) { - AsyncListenerHandler handler = marker.getListenerTraversal().next().getListener(); - - if (!handler.isCancelled()) { - handler.enqueuePacket(packet); - continue mainLoop; - } - } - } - - // There are no more listeners - queue the packet for transmission - filterManager.signalFreeProcessingSlot(packet); - - // Note that listeners can opt to delay the packet transmission - filterManager.signalPacketTransmission(packet); } } catch (InterruptedException e) { @@ -464,16 +550,66 @@ public class AsyncListenerHandler { } finally { // Clean up started.decrementAndGet(); - close(); } } + /** + * Called when a packet is scheduled for processing. + * @param workerID - the current worker ID. + * @param packet - the current packet. + * @param methodName - name of the method. + */ + private void processPacket(int workerID, PacketEvent packet, String methodName) { + AsyncMarker marker = packet.getAsyncMarker(); + + // Here's the core of the asynchronous processing + try { + synchronized (marker.getProcessingLock()) { + marker.setListenerHandler(this); + marker.setWorkerID(workerID); + + if (packet.isServerPacket()) + listener.onPacketSending(packet); + else + listener.onPacketReceiving(packet); + } + + } catch (Throwable e) { + // Minecraft doesn't want your Exception. + filterManager.getErrorReporter().reportMinimal(listener.getPlugin(), methodName, e); + } + + // Now, get the next non-cancelled listener + if (!marker.hasExpired()) { + for (; marker.getListenerTraversal().hasNext(); ) { + AsyncListenerHandler handler = marker.getListenerTraversal().next().getListener(); + + if (!handler.isCancelled()) { + handler.enqueuePacket(packet); + return; + } + } + } + + // There are no more listeners - queue the packet for transmission + filterManager.signalFreeProcessingSlot(packet); + + // Note that listeners can opt to delay the packet transmission + filterManager.signalPacketTransmission(packet); + } + + /** + * Close all worker threads and the handler itself. + */ private synchronized void close() { // Remove the listener itself if (!cancelled) { filterManager.unregisterAsyncHandlerInternal(this); cancelled = true; + // Close processing tasks + syncStop(); + // Tell every uncancelled thread to end stopThreads(); }