diff --git a/ProtocolLib/src/com/comphenix/protocol/async/AsyncListenerHandler.java b/ProtocolLib/src/com/comphenix/protocol/async/AsyncListenerHandler.java index 5668647f..f5385fad 100644 --- a/ProtocolLib/src/com/comphenix/protocol/async/AsyncListenerHandler.java +++ b/ProtocolLib/src/com/comphenix/protocol/async/AsyncListenerHandler.java @@ -1,6 +1,9 @@ package com.comphenix.protocol.async; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; @@ -18,18 +21,26 @@ import com.comphenix.protocol.events.PacketListener; public class AsyncListenerHandler { /** - * Signal an end to the packet processing. + * Signal an end to packet processing. */ private static final PacketEvent INTERUPT_PACKET = new PacketEvent(new Object()); + /** + * Called when the threads have to wake up for something important. + */ + private static final PacketEvent WAKEUP_PACKET = new PacketEvent(new Object()); + // Default queue capacity private static int DEFAULT_CAPACITY = 1024; // Cancel the async handler private volatile boolean cancelled; - // If we've started the listener loop before - private AtomicInteger started = new AtomicInteger(); + // Number of worker threads + private final AtomicInteger started = new AtomicInteger(); + + // Unique local worker ID + private final AtomicInteger nextID = new AtomicInteger(); // The packet listener private PacketListener listener; @@ -41,6 +52,10 @@ public class AsyncListenerHandler { // List of queued packets private ArrayBlockingQueue queuedPackets = new ArrayBlockingQueue(DEFAULT_CAPACITY); + // List of cancelled tasks + private final Set stoppedTasks = new HashSet(); + private final Object stopLock = new Object(); + // Minecraft main thread private Thread mainThread; @@ -112,15 +127,57 @@ public class AsyncListenerHandler { } /** - * Create a runnable that will initiate the listener loop. + * Create a worker that will initiate the listener loop. Note that using stop() to + * close a specific worker is less efficient than stopping an arbitrary worker. *

* Warning: Never call the run() method in the main thread. */ public Runnable getListenerLoop() { - return new Runnable() { + return new AsyncRunnable() { + + private final AtomicBoolean running = new AtomicBoolean(); + private volatile int id; + @Override public void run() { - listenerLoop(); + // Careful now + if (running.compareAndSet(false, true)) { + id = nextID.incrementAndGet(); + listenerLoop(id); + + synchronized (stopLock) { + stoppedTasks.remove(id); + notifyAll(); + running.set(false); + } + + } else { + throw new IllegalStateException( + "This listener loop has already been started. Create a new instead."); + } + } + + @Override + public boolean stop() throws InterruptedException { + synchronized (stopLock) { + if (!running.get()) + return false; + + stoppedTasks.add(id); + + // Wake up threads - we have a listener to stop + for (int i = 0; i < getWorkers(); i++) { + queuedPackets.offer(WAKEUP_PACKET); + } + + waitForStops(); + return true; + } + } + + @Override + public boolean isRunning() { + return running.get(); } }; } @@ -128,9 +185,11 @@ public class AsyncListenerHandler { /** * Start a singler worker thread handling the asynchronous. */ - public void start() { + public synchronized void start() { if (listener.getPlugin() == null) throw new IllegalArgumentException("Cannot start task without a valid plugin."); + if (cancelled) + throw new IllegalStateException("Cannot start a worker when the listener is closing."); filterManager.scheduleAsyncTask(listener.getPlugin(), getListenerLoop()); } @@ -139,32 +198,115 @@ public class AsyncListenerHandler { * Start multiple worker threads for this listener. * @param count - number of worker threads to start. */ - public void start(int count) { + public synchronized void start(int count) { for (int i = 0; i < count; i++) start(); } + /** + * Stop a worker thread. + */ + public synchronized void stop() { + queuedPackets.add(INTERUPT_PACKET); + } + + /** + * Stop the given amount of worker threads. + * @param count - number of threads to stop. + */ + public synchronized void stop(int count) { + for (int i = 0; i < count; i++) + stop(); + } + + /** + * Set the current number of workers. + *

+ * This method can only be called with a count of zero when the listener is closing. + * @param count - new number of workers. + */ + public synchronized void setWorkers(int count) { + if (count < 0) + throw new IllegalArgumentException("Number of workers cannot be less than zero."); + if (count > DEFAULT_CAPACITY) + throw new IllegalArgumentException("Cannot initiate more than " + DEFAULT_CAPACITY + " workers"); + if (cancelled && count > 0) + throw new IllegalArgumentException("Cannot add workers when the listener is closing."); + + long time = System.currentTimeMillis(); + + // Try to get to the correct count + while (started.get() != count) { + if (started.get() < count) + start(); + else + stop(); + + // May happen if another thread is doing something similar to "setWorkers" + if ((System.currentTimeMillis() - time) > 1000) + throw new RuntimeException("Failed to set worker count."); + } + } + + /** + * Retrieve the current number of registered workers. + *

+ * Note that the returned value may be out of data. + * @return Number of registered workers. + */ + public synchronized int getWorkers() { + return started.get(); + } + + /** + * Wait until every tasks scheduled to stop has actually stopped. + * @return TRUE if the current listener should stop, FALSE otherwise. + * @throws InterruptedException - If the current thread was interrupted. + */ + private boolean waitForStops() throws InterruptedException { + while (stoppedTasks.size() > 0 && !cancelled) { + wait(); + } + return cancelled; + } + // DO NOT call this method from the main thread - private void listenerLoop() { + private void listenerLoop(int taskID) { // Danger, danger! if (Thread.currentThread().getId() == mainThread.getId()) throw new IllegalStateException("Do not call this method from the main thread."); if (cancelled) throw new IllegalStateException("Listener has been cancelled. Create a new listener instead."); - - // Proceed - started.incrementAndGet(); - + try { + // Wait if certain threads are stopping + synchronized (stopLock) { + if (waitForStops()) + return; + } + + // Proceed + started.incrementAndGet(); + mainLoop: while (!cancelled) { PacketEvent packet = queuedPackets.take(); AsyncMarker marker = packet.getAsyncMarker(); // Handle cancel requests - if (packet == null || marker == null || !packet.isAsynchronous()) { - break; + if (packet == null || marker == null || packet == INTERUPT_PACKET) { + return; + + } else if (packet == WAKEUP_PACKET) { + // This is a bit slow, but it should be safe + synchronized (stopLock) { + // Are we the one who is supposed to stop? + if (stoppedTasks.contains(taskID)) + return; + if (waitForStops()) + return; + } } // Here's the core of the asynchronous processing @@ -221,8 +363,11 @@ public class AsyncListenerHandler { private void stopThreads() { // Poison Pill Shutdown queuedPackets.clear(); + stop(started.get()); - for (int i = 0; i < started.get(); i++) - queuedPackets.add(INTERUPT_PACKET); + // Individual shut down is irrelevant now + synchronized (stopLock) { + notifyAll(); + } } } diff --git a/ProtocolLib/src/com/comphenix/protocol/async/AsyncRunnable.java b/ProtocolLib/src/com/comphenix/protocol/async/AsyncRunnable.java new file mode 100644 index 00000000..281b71be --- /dev/null +++ b/ProtocolLib/src/com/comphenix/protocol/async/AsyncRunnable.java @@ -0,0 +1,23 @@ +package com.comphenix.protocol.async; + +/** + * A runnable representing a asynchronous event listener. + * + * @author Kristian + */ +public interface AsyncRunnable extends Runnable { + + /** + * Stop the given runnable. + *

+ * This may not occur right away. + * @return TRUE if the thread was stopped, FALSE if it was already stopped. + */ + public boolean stop() throws InterruptedException; + + /** + * Determine if we're running or not. + * @return TRUE if we're running, FALSE otherwise. + */ + public boolean isRunning(); +}