From c6fb01e1e172b08b06adef60752fbf20ba05396c Mon Sep 17 00:00:00 2001 From: "Kristian S. Stangeland" Date: Wed, 3 Oct 2012 23:10:35 +0200 Subject: [PATCH] Adding the ability to use multiple worker threads. --- .../protocol/async/AsyncListenerHandler.java | 79 +++++++++++-------- 1 file changed, 48 insertions(+), 31 deletions(-) diff --git a/ProtocolLib/src/com/comphenix/protocol/async/AsyncListenerHandler.java b/ProtocolLib/src/com/comphenix/protocol/async/AsyncListenerHandler.java index e4cd15fc..047fb120 100644 --- a/ProtocolLib/src/com/comphenix/protocol/async/AsyncListenerHandler.java +++ b/ProtocolLib/src/com/comphenix/protocol/async/AsyncListenerHandler.java @@ -28,7 +28,7 @@ public class AsyncListenerHandler { private volatile boolean cancelled; // If we've started the listener loop before - private volatile boolean started; + private volatile int started; // The packet listener private PacketListener listener; @@ -78,18 +78,26 @@ public class AsyncListenerHandler { return nullPacketListener; } + private String getPluginName() { + return PacketAdapter.getPluginName(listener); + } + + /** + * Retrieve the plugin associated with this async listener. + * @return The plugin. + */ + public Plugin getPlugin() { + return listener != null ? listener.getPlugin() : null; + } + /** * Cancel the handler. */ public void cancel() { // Remove the listener as quickly as possible close(); - - // Poison Pill Shutdown - queuedPackets.clear(); - queuedPackets.add(INTERUPT_PACKET); } - + /** * Queue a packet for processing. * @param packet - a packet for processing. @@ -116,19 +124,36 @@ public class AsyncListenerHandler { }; } + /** + * Start a singler worker thread handling the asynchronous. + */ + public void start() { + if (listener.getPlugin() == null) + throw new IllegalArgumentException("Cannot start task without a valid plugin."); + + filterManager.scheduleAsyncTask(listener.getPlugin(), getListenerLoop()); + } + + /** + * Start multiple worker threads for this listener. + * @param count - number of worker threads to start. + */ + public void start(int count) { + for (int i = 0; i < count; i++) + start(); + } + // DO NOT call this method from the main thread private void listenerLoop() { // Danger, danger! if (Thread.currentThread().getId() == mainThread.getId()) throw new IllegalStateException("Do not call this method from the main thread."); - if (started) - throw new IllegalStateException("A listener cannot be run by multiple threads. Create a new listener instead."); if (cancelled) throw new IllegalStateException("Listener has been cancelled. Create a new listener instead."); // Proceed - started = true; + started++; try { mainLoop: @@ -171,40 +196,32 @@ public class AsyncListenerHandler { } catch (InterruptedException e) { // We're done + } finally { + // Clean up + started--; + close(); } - - // Clean up - close(); } - private void close() { + private synchronized void close() { // Remove the listener itself if (!cancelled) { filterManager.unregisterAsyncHandlerInternal(this); cancelled = true; - started = false; + + // Tell every uncancelled thread to end + stopThreads(); } } - private String getPluginName() { - return PacketAdapter.getPluginName(listener); - } - /** - * Retrieve the plugin associated with this async listener. - * @return The plugin. + * Use the poision pill method to stop every worker thread. */ - public Plugin getPlugin() { - return listener != null ? listener.getPlugin() : null; - } - - /** - * Start the asynchronous listener using the Bukkit scheduler. - */ - public void start() { - if (listener.getPlugin() == null) - throw new IllegalArgumentException("Cannot start task without a valid plugin."); + private void stopThreads() { + // Poison Pill Shutdown + queuedPackets.clear(); - filterManager.scheduleAsyncTask(listener.getPlugin(), getListenerLoop()); + for (int i = 0; i < started; i++) + queuedPackets.add(INTERUPT_PACKET); } }