+ * Use {@link AsyncMarker#incrementProcessingDelay()} to delay a packet until its ready to be transmitted.
+ *
* To start listening asynchronously, pass the getListenerLoop() runnable to a different thread.
* @param listener - the packet listener that will recieve these asynchronous events.
* @return An asynchrouns handler.
diff --git a/ProtocolLib/src/main/java/com/comphenix/protocol/CommandBase.java b/ProtocolLib/src/main/java/com/comphenix/protocol/CommandBase.java
index 3128e6c5..7cda5346 100644
--- a/ProtocolLib/src/main/java/com/comphenix/protocol/CommandBase.java
+++ b/ProtocolLib/src/main/java/com/comphenix/protocol/CommandBase.java
@@ -19,7 +19,8 @@ abstract class CommandBase implements CommandExecutor {
private String permission;
private String name;
private int minimumArgumentCount;
- private ErrorReporter reporter;
+
+ protected ErrorReporter reporter;
public CommandBase(ErrorReporter reporter, String permission, String name) {
this(reporter, permission, name, 0);
diff --git a/ProtocolLib/src/main/java/com/comphenix/protocol/CommandPacket.java b/ProtocolLib/src/main/java/com/comphenix/protocol/CommandPacket.java
index dbe85114..3b8245c9 100644
--- a/ProtocolLib/src/main/java/com/comphenix/protocol/CommandPacket.java
+++ b/ProtocolLib/src/main/java/com/comphenix/protocol/CommandPacket.java
@@ -65,7 +65,6 @@ class CommandPacket extends CommandBase {
private Plugin plugin;
private Logger logger;
- private ErrorReporter reporter;
private ProtocolManager manager;
private ChatExtensions chatter;
@@ -460,9 +459,9 @@ class CommandPacket extends CommandBase {
// The interval tree will automatically remove the listeners for us
if (side.isForClient())
- result.addAll(clientListeners.remove(idStart, idStop));
+ result.addAll(clientListeners.remove(idStart, idStop, true));
if (side.isForServer())
- result.addAll(serverListeners.remove(idStart, idStop));
+ result.addAll(serverListeners.remove(idStart, idStop, true));
return result;
}
diff --git a/ProtocolLib/src/main/java/com/comphenix/protocol/ProtocolConfig.java b/ProtocolLib/src/main/java/com/comphenix/protocol/ProtocolConfig.java
index 078262c7..7e771e0b 100644
--- a/ProtocolLib/src/main/java/com/comphenix/protocol/ProtocolConfig.java
+++ b/ProtocolLib/src/main/java/com/comphenix/protocol/ProtocolConfig.java
@@ -18,6 +18,8 @@ class ProtocolConfig {
private static final String METRICS_ENABLED = "metrics";
+ private static final String BACKGROUND_COMPILER_ENABLED = "background compiler";
+
private static final String UPDATER_NOTIFY = "notify";
private static final String UPDATER_DOWNLAD = "download";
private static final String UPDATER_DELAY = "delay";
@@ -166,6 +168,25 @@ class ProtocolConfig {
global.set(METRICS_ENABLED, enabled);
}
+ /**
+ * Retrieve whether or not the background compiler for structure modifiers is enabled or not.
+ * @return TRUE if it is enabled, FALSE otherwise.
+ */
+ public boolean isBackgroundCompilerEnabled() {
+ return global.getBoolean(BACKGROUND_COMPILER_ENABLED, true);
+ }
+
+ /**
+ * Set whether or not the background compiler for structure modifiers is enabled or not.
+ *
+ * This setting will take effect next time ProtocolLib is started.
+ *
+ * @param enabled - TRUE if is enabled/running, FALSE otherwise.
+ */
+ public void setBackgroundCompilerEnabled(boolean enabled) {
+ global.set(BACKGROUND_COMPILER_ENABLED, enabled);
+ }
+
/**
* Set the last time we updated, in seconds since 1970.01.01 00:00.
* @param lastTimeSeconds - new last update time.
diff --git a/ProtocolLib/src/main/java/com/comphenix/protocol/ProtocolLibrary.java b/ProtocolLib/src/main/java/com/comphenix/protocol/ProtocolLibrary.java
index b97b35f9..ffe6b34d 100644
--- a/ProtocolLib/src/main/java/com/comphenix/protocol/ProtocolLibrary.java
+++ b/ProtocolLib/src/main/java/com/comphenix/protocol/ProtocolLibrary.java
@@ -79,6 +79,7 @@ public class ProtocolLibrary extends JavaPlugin {
// Logger
private Logger logger;
+ private Handler redirectHandler;
// Commands
private CommandProtocol commandProtocol;
@@ -90,39 +91,43 @@ public class ProtocolLibrary extends JavaPlugin {
logger = getLoggerSafely();
// Add global parameters
- DetailedErrorReporter reporter = new DetailedErrorReporter();
+ DetailedErrorReporter detailedReporter = new DetailedErrorReporter(this);
updater = new Updater(this, logger, "protocollib", getFile(), "protocol.info");
+ reporter = detailedReporter;
try {
config = new ProtocolConfig(this);
} catch (Exception e) {
- reporter.reportWarning(this, "Cannot load configuration", e);
+ detailedReporter.reportWarning(this, "Cannot load configuration", e);
// Load it again
- deleteConfig();
- config = new ProtocolConfig(this);
+ if (deleteConfig()) {
+ config = new ProtocolConfig(this);
+ } else {
+ reporter.reportWarning(this, "Cannot delete old ProtocolLib configuration.");
+ }
}
try {
unhookTask = new DelayedSingleTask(this);
- protocolManager = new PacketFilterManager(getClassLoader(), getServer(), unhookTask, reporter);
- reporter.addGlobalParameter("manager", protocolManager);
+ protocolManager = new PacketFilterManager(getClassLoader(), getServer(), unhookTask, detailedReporter);
+ detailedReporter.addGlobalParameter("manager", protocolManager);
// Initialize command handlers
- commandProtocol = new CommandProtocol(reporter, this, updater, config);
- commandPacket = new CommandPacket(reporter, this, logger, protocolManager);
+ commandProtocol = new CommandProtocol(detailedReporter, this, updater, config);
+ commandPacket = new CommandPacket(detailedReporter, this, logger, protocolManager);
// Send logging information to player listeners too
broadcastUsers(PERMISSION_INFO);
} catch (Throwable e) {
- reporter.reportDetailed(this, "Cannot load ProtocolLib.", e, protocolManager);
+ detailedReporter.reportDetailed(this, "Cannot load ProtocolLib.", e, protocolManager);
disablePlugin();
}
}
- private void deleteConfig() {
- config.getFile().delete();
+ private boolean deleteConfig() {
+ return config.getFile().delete();
}
@Override
@@ -136,8 +141,12 @@ public class ProtocolLibrary extends JavaPlugin {
}
private void broadcastUsers(final String permission) {
- // Broadcast information to every user too
- logger.addHandler(new Handler() {
+ // Guard against multiple calls
+ if (redirectHandler != null)
+ return;
+
+ // Broadcast information to every user too
+ redirectHandler = new Handler() {
@Override
public void publish(LogRecord record) {
commandPacket.broadcastMessageSilently(record.getMessage(), permission);
@@ -152,7 +161,9 @@ public class ProtocolLibrary extends JavaPlugin {
public void close() throws SecurityException {
// Do nothing.
}
- });
+ };
+
+ logger.addHandler(redirectHandler);
}
@Override
@@ -166,9 +177,13 @@ public class ProtocolLibrary extends JavaPlugin {
return;
// Initialize background compiler
- if (backgroundCompiler == null) {
- backgroundCompiler = new BackgroundCompiler(getClassLoader());
+ if (backgroundCompiler == null && config.isBackgroundCompilerEnabled()) {
+ backgroundCompiler = new BackgroundCompiler(getClassLoader(), reporter);
BackgroundCompiler.setInstance(backgroundCompiler);
+
+ logger.info("Started structure compiler thread.");
+ } else {
+ logger.info("Structure compiler thread has been disabled.");
}
// Set up command handlers
@@ -288,6 +303,11 @@ public class ProtocolLibrary extends JavaPlugin {
asyncPacketTask = -1;
}
+ // And redirect handler too
+ if (redirectHandler != null) {
+ logger.removeHandler(redirectHandler);
+ }
+
unhookTask.close();
protocolManager.close();
protocolManager = null;
diff --git a/ProtocolLib/src/main/java/com/comphenix/protocol/ProtocolManager.java b/ProtocolLib/src/main/java/com/comphenix/protocol/ProtocolManager.java
index cbdc93f1..3e319249 100644
--- a/ProtocolLib/src/main/java/com/comphenix/protocol/ProtocolManager.java
+++ b/ProtocolLib/src/main/java/com/comphenix/protocol/ProtocolManager.java
@@ -17,6 +17,7 @@
package com.comphenix.protocol;
+import java.lang.reflect.InvocationTargetException;
import java.util.List;
import java.util.Set;
@@ -25,6 +26,7 @@ import org.bukkit.entity.Entity;
import org.bukkit.entity.Player;
import org.bukkit.plugin.Plugin;
+import com.comphenix.protocol.async.AsyncMarker;
import com.comphenix.protocol.events.PacketContainer;
import com.comphenix.protocol.events.PacketListener;
import com.comphenix.protocol.injector.PacketConstructor;
@@ -37,6 +39,37 @@ import com.google.common.collect.ImmutableSet;
*/
public interface ProtocolManager extends PacketStream {
+ /**
+ * Send a packet to the given player.
+ *
+ * Re-sending a previously cancelled packet is discuraged. Use {@link AsyncMarker#incrementProcessingDelay()}
+ * to delay a packet until a certain condition has been met.
+ *
+ * @param reciever - the reciever.
+ * @param packet - packet to send.
+ * @param filters - whether or not to invoke any packet filters.
+ * @throws InvocationTargetException - if an error occured when sending the packet.
+ */
+ @Override
+ public void sendServerPacket(Player reciever, PacketContainer packet, boolean filters)
+ throws InvocationTargetException;
+
+ /**
+ * Simulate recieving a certain packet from a given player.
+ *
+ * Receiving a previously cancelled packet is discuraged. Use {@link AsyncMarker#incrementProcessingDelay()}
+ * to delay a packet until a certain condition has been met.
+ *
+ * @param sender - the sender.
+ * @param packet - the packet that was sent.
+ * @param filters - whether or not to invoke any packet filters.
+ * @throws InvocationTargetException If the reflection machinery failed.
+ * @throws IllegalAccessException If the underlying method caused an error.
+ */
+ @Override
+ public void recieveClientPacket(Player sender, PacketContainer packet, boolean filters)
+ throws IllegalAccessException, InvocationTargetException;
+
/**
* Retrieves a list of every registered packet listener.
* @return Every registered packet listener.
diff --git a/ProtocolLib/src/main/java/com/comphenix/protocol/async/AsyncFilterManager.java b/ProtocolLib/src/main/java/com/comphenix/protocol/async/AsyncFilterManager.java
index 7dd7b504..1b3bd8cc 100644
--- a/ProtocolLib/src/main/java/com/comphenix/protocol/async/AsyncFilterManager.java
+++ b/ProtocolLib/src/main/java/com/comphenix/protocol/async/AsyncFilterManager.java
@@ -23,6 +23,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
+import org.bukkit.entity.Player;
import org.bukkit.plugin.Plugin;
import org.bukkit.scheduler.BukkitScheduler;
@@ -42,7 +43,9 @@ import com.google.common.collect.Sets;
/**
* Represents a filter manager for asynchronous packets.
- *
+ *
+ * By using {@link AsyncMarker#incrementProcessingDelay()}, a packet can be delayed without having to block the
+ * processing thread.
* @author Kristian
*/
public class AsyncFilterManager implements AsynchronousManager {
@@ -52,58 +55,44 @@ public class AsyncFilterManager implements AsynchronousManager {
private Set timeoutListeners;
private PacketProcessingQueue serverProcessingQueue;
- private PacketSendingQueue serverQueue;
-
-
private PacketProcessingQueue clientProcessingQueue;
- private PacketSendingQueue clientQueue;
+
+ // Sending queues
+ private final PlayerSendingHandler playerSendingHandler;
- private ErrorReporter reporter;
+ // Report exceptions
+ private final ErrorReporter reporter;
// The likely main thread
- private Thread mainThread;
+ private final Thread mainThread;
// Default scheduler
- private BukkitScheduler scheduler;
+ private final BukkitScheduler scheduler;
// Our protocol manager
- private ProtocolManager manager;
+ private final ProtocolManager manager;
// Current packet index
- private AtomicInteger currentSendingIndex = new AtomicInteger();
-
- // Whether or not we're currently cleaning up
- private volatile boolean cleaningUp;
+ private final AtomicInteger currentSendingIndex = new AtomicInteger();
+ /**
+ * Initialize a asynchronous filter manager.
+ *
+ * Internal method. Retrieve the global asynchronous manager from the protocol manager instead.
+ * @param reporter - desired error reporter.
+ * @param scheduler - task scheduler.
+ * @param manager - protocol manager.
+ */
public AsyncFilterManager(ErrorReporter reporter, BukkitScheduler scheduler, ProtocolManager manager) {
-
// Initialize timeout listeners
- serverTimeoutListeners = new SortedPacketListenerList();
- clientTimeoutListeners = new SortedPacketListenerList();
- timeoutListeners = Sets.newSetFromMap(new ConcurrentHashMap());
-
- // Server packets are synchronized already
- this.serverQueue = new PacketSendingQueue(false) {
- @Override
- protected void onPacketTimeout(PacketEvent event) {
- if (!cleaningUp) {
- serverTimeoutListeners.invokePacketSending(AsyncFilterManager.this.reporter, event);
- }
- }
- };
-
- // Client packets must be synchronized
- this.clientQueue = new PacketSendingQueue(true) {
- @Override
- protected void onPacketTimeout(PacketEvent event) {
- if (!cleaningUp) {
- clientTimeoutListeners.invokePacketSending(AsyncFilterManager.this.reporter, event);
- }
- }
- };
-
- this.serverProcessingQueue = new PacketProcessingQueue(serverQueue);
- this.clientProcessingQueue = new PacketProcessingQueue(clientQueue);
+ this.serverTimeoutListeners = new SortedPacketListenerList();
+ this.clientTimeoutListeners = new SortedPacketListenerList();
+ this.timeoutListeners = Sets.newSetFromMap(new ConcurrentHashMap());
+
+ this.playerSendingHandler = new PlayerSendingHandler(reporter, serverTimeoutListeners, clientTimeoutListeners);
+ this.serverProcessingQueue = new PacketProcessingQueue(playerSendingHandler);
+ this.clientProcessingQueue = new PacketProcessingQueue(playerSendingHandler);
+ this.playerSendingHandler.initializeScheduler();
this.scheduler = scheduler;
this.manager = manager;
@@ -141,6 +130,8 @@ public class AsyncFilterManager implements AsynchronousManager {
/**
* Registers an asynchronous packet handler.
*
+ * Use {@link AsyncMarker#incrementProcessingDelay()} to delay a packet until its ready to be transmitted.
+ *
* To start listening asynchronously, pass the getListenerLoop() runnable to a different thread.
*
* Asynchronous events will only be executed if a synchronous listener with the same packets is registered.
@@ -219,15 +210,12 @@ public class AsyncFilterManager implements AsynchronousManager {
List removed = serverProcessingQueue.removeListener(handler, listener.getSendingWhitelist());
// We're already taking care of this, so don't do anything
- if (!cleaningUp)
- serverQueue.signalPacketUpdate(removed, synchronusOK);
+ playerSendingHandler.sendServerPackets(removed, synchronusOK);
}
if (hasValidWhitelist(listener.getReceivingWhitelist())) {
List removed = clientProcessingQueue.removeListener(handler, listener.getReceivingWhitelist());
-
- if (!cleaningUp)
- clientQueue.signalPacketUpdate(removed, synchronusOK);
+ playerSendingHandler.sendClientPackets(removed, synchronusOK);
}
}
@@ -287,12 +275,11 @@ public class AsyncFilterManager implements AsynchronousManager {
}
/**
- * Used to create a default asynchronous task.
- * @param plugin - the calling plugin.
- * @param runnable - the runnable.
+ * Retrieve the current task scheduler.
+ * @return Current task scheduler.
*/
- public void scheduleAsyncTask(Plugin plugin, Runnable runnable) {
- scheduler.scheduleAsyncDelayedTask(plugin, runnable);
+ public BukkitScheduler getScheduler() {
+ return scheduler;
}
@Override
@@ -337,15 +324,14 @@ public class AsyncFilterManager implements AsynchronousManager {
@Override
public void cleanupAll() {
- cleaningUp = true;
serverProcessingQueue.cleanupAll();
- serverQueue.cleanupAll();
-
+ playerSendingHandler.cleanupAll();
timeoutListeners.clear();
+
serverTimeoutListeners = null;
clientTimeoutListeners = null;
}
-
+
@Override
public void signalPacketTransmission(PacketEvent packet) {
signalPacketTransmission(packet, onMainThread());
@@ -366,8 +352,12 @@ public class AsyncFilterManager implements AsynchronousManager {
"A packet must have been queued before it can be transmitted.");
// Only send if the packet is ready
- if (marker.decrementProcessingDelay() == 0) {
- getSendingQueue(packet).signalPacketUpdate(packet, onMainThread);
+ if (marker.decrementProcessingDelay() == 0) {
+ PacketSendingQueue queue = getSendingQueue(packet, false);
+
+ // No need to create a new queue if the player has logged out
+ if (queue != null)
+ queue.signalPacketUpdate(packet, onMainThread);
}
}
@@ -376,8 +366,27 @@ public class AsyncFilterManager implements AsynchronousManager {
* @param packet - the packet.
* @return The server or client sending queue the packet belongs to.
*/
- private PacketSendingQueue getSendingQueue(PacketEvent packet) {
- return packet.isServerPacket() ? serverQueue : clientQueue;
+ public PacketSendingQueue getSendingQueue(PacketEvent packet) {
+ return playerSendingHandler.getSendingQueue(packet);
+ }
+
+ /**
+ * Retrieve the sending queue this packet belongs to.
+ * @param packet - the packet.
+ * @param createNew - if TRUE, create a new queue if it hasn't already been created.
+ * @return The server or client sending queue the packet belongs to.
+ */
+ public PacketSendingQueue getSendingQueue(PacketEvent packet, boolean createNew) {
+ return playerSendingHandler.getSendingQueue(packet, createNew);
+ }
+
+ /**
+ * Retrieve the processing queue this packet belongs to.
+ * @param packet - the packet.
+ * @return The server or client sending processing the packet belongs to.
+ */
+ public PacketProcessingQueue getProcessingQueue(PacketEvent packet) {
+ return packet.isServerPacket() ? serverProcessingQueue : clientProcessingQueue;
}
/**
@@ -388,24 +397,23 @@ public class AsyncFilterManager implements AsynchronousManager {
getProcessingQueue(packet).signalProcessingDone();
}
- /**
- * Retrieve the processing queue this packet belongs to.
- * @param packet - the packet.
- * @return The server or client sending processing the packet belongs to.
- */
- private PacketProcessingQueue getProcessingQueue(PacketEvent packet) {
- return packet.isServerPacket() ? serverProcessingQueue : clientProcessingQueue;
- }
-
/**
* Send any due packets, or clean up packets that have expired.
*/
public void sendProcessedPackets(int tickCounter, boolean onMainThread) {
// The server queue is unlikely to need checking that often
if (tickCounter % 10 == 0) {
- serverQueue.trySendPackets(onMainThread);
+ playerSendingHandler.trySendServerPackets(onMainThread);
}
+
+ playerSendingHandler.trySendClientPackets(onMainThread);
+ }
- clientQueue.trySendPackets(onMainThread);
+ /**
+ * Clean up after a given player has logged out.
+ * @param player - the player that has just logged out.
+ */
+ public void removePlayer(Player player) {
+ playerSendingHandler.removePlayer(player);
}
}
diff --git a/ProtocolLib/src/main/java/com/comphenix/protocol/async/AsyncListenerHandler.java b/ProtocolLib/src/main/java/com/comphenix/protocol/async/AsyncListenerHandler.java
index 0b1ef3ba..f37511f8 100644
--- a/ProtocolLib/src/main/java/com/comphenix/protocol/async/AsyncListenerHandler.java
+++ b/ProtocolLib/src/main/java/com/comphenix/protocol/async/AsyncListenerHandler.java
@@ -20,6 +20,7 @@ package com.comphenix.protocol.async;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -34,7 +35,8 @@ import com.google.common.base.Joiner;
/**
* Represents a handler for an asynchronous event.
- *
+ *
+ * Use {@link AsyncMarker#incrementProcessingDelay()} to delay a packet until a certain condition has been met.
* @author Kristian
*/
public class AsyncListenerHandler {
@@ -75,10 +77,19 @@ public class AsyncListenerHandler {
private final Set stoppedTasks = new HashSet();
private final Object stopLock = new Object();
+ // Processing task on the main thread
+ private int syncTask = -1;
+
// Minecraft main thread
private Thread mainThread;
- public AsyncListenerHandler(Thread mainThread, AsyncFilterManager filterManager, PacketListener listener) {
+ /**
+ * Construct a manager for an asynchronous packet handler.
+ * @param mainThread - the main game thread.
+ * @param filterManager - the parent filter manager.
+ * @param listener - the current packet listener.
+ */
+ AsyncListenerHandler(Thread mainThread, AsyncFilterManager filterManager, PacketListener listener) {
if (filterManager == null)
throw new IllegalArgumentException("filterManager cannot be NULL");
if (listener == null)
@@ -89,10 +100,18 @@ public class AsyncListenerHandler {
this.listener = listener;
}
+ /**
+ * Determine whether or not this asynchronous handler has been cancelled.
+ * @return TRUE if it has been cancelled/stopped, FALSE otherwise.
+ */
public boolean isCancelled() {
return cancelled;
}
+ /**
+ * Retrieve the current asynchronous packet listener.
+ * @return Current packet listener.
+ */
public PacketListener getAsyncListener() {
return listener;
}
@@ -223,7 +242,7 @@ public class AsyncListenerHandler {
final AsyncRunnable listenerLoop = getListenerLoop();
- filterManager.scheduleAsyncTask(listener.getPlugin(), new Runnable() {
+ filterManager.getScheduler().scheduleAsyncDelayedTask(listener.getPlugin(), new Runnable() {
@Override
public void run() {
Thread thread = Thread.currentThread();
@@ -271,7 +290,7 @@ public class AsyncListenerHandler {
final AsyncRunnable listenerLoop = getListenerLoop();
final Function delegateCopy = executor;
- filterManager.scheduleAsyncTask(listener.getPlugin(), new Runnable() {
+ filterManager.getScheduler().scheduleAsyncDelayedTask(listener.getPlugin(), new Runnable() {
@Override
public void run() {
delegateCopy.apply(listenerLoop);
@@ -308,6 +327,104 @@ public class AsyncListenerHandler {
return Joiner.on(", ").join(whitelist.getWhitelist());
}
+ /**
+ * Start processing packets on the main thread.
+ *
+ * This is useful if you need to synchronize with the main thread in your packet listener, but
+ * you're not performing any expensive processing.
+ *
+ * Note: Use a asynchronous worker if the packet listener may use more than 0.5 ms
+ * of processing time on a single packet. Do as much as possible on the worker thread, and schedule synchronous tasks
+ * to use the Bukkit API instead.
+ * @return TRUE if the synchronized processing was successfully started, FALSE if it's already running.
+ * @throws IllegalStateException If we couldn't start the underlying task.
+ */
+ public synchronized boolean syncStart() {
+ return syncStart(500, TimeUnit.MICROSECONDS);
+ }
+
+ /**
+ * Start processing packets on the main thread.
+ *
+ * This is useful if you need to synchronize with the main thread in your packet listener, but
+ * you're not performing any expensive processing.
+ *
+ * The processing time parameter gives the upper bound for the amount of time spent processing pending packets.
+ * It should be set to a fairly low number, such as 0.5 ms or 1% of a game tick - to reduce the impact
+ * on the main thread. Never go beyond 50 milliseconds.
+ *
+ * Note: Use a asynchronous worker if the packet listener may exceed the ideal processing time
+ * on a single packet. Do as much as possible on the worker thread, and schedule synchronous tasks
+ * to use the Bukkit API instead.
+ *
+ * @param time - the amount of processing time alloted per game tick (20 ticks per second).
+ * @param unit - the unit of the processingTime argument.
+ * @return TRUE if the synchronized processing was successfully started, FALSE if it's already running.
+ * @throws IllegalStateException If we couldn't start the underlying task.
+ */
+ public synchronized boolean syncStart(final long time, final TimeUnit unit) {
+ if (time <= 0)
+ throw new IllegalArgumentException("Time must be greater than zero.");
+ if (unit == null)
+ throw new IllegalArgumentException("TimeUnit cannot be NULL.");
+
+ final long tickDelay = 1;
+ final int workerID = nextID.incrementAndGet();
+
+ if (syncTask < 0) {
+ syncTask = filterManager.getScheduler().scheduleSyncRepeatingTask(getPlugin(), new Runnable() {
+ @Override
+ public void run() {
+ long stopTime = System.nanoTime() + unit.convert(time, TimeUnit.NANOSECONDS);
+
+ while (!cancelled) {
+ PacketEvent packet = queuedPackets.poll();
+
+ if (packet == INTERUPT_PACKET || packet == WAKEUP_PACKET) {
+ // Sorry, asynchronous threads!
+ queuedPackets.add(packet);
+
+ // Try again next tick
+ break;
+ } else if (packet != null && packet.getAsyncMarker() != null) {
+ processPacket(workerID, packet, "onSyncPacket()");
+ } else {
+ // No more packets left - wait a tick
+ break;
+ }
+
+ // Check time here, ensuring that we at least process one packet
+ if (System.nanoTime() < stopTime)
+ break;
+ }
+ }
+ }, tickDelay, tickDelay);
+
+ // This is very bad - force the caller to handle it
+ if (syncTask < 0)
+ throw new IllegalStateException("Cannot start synchronous task.");
+ else
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * Stop processing packets on the main thread.
+ * @return TRUE if we stopped any processing tasks, FALSE if it has already been stopped.
+ */
+ public synchronized boolean syncStop() {
+ if (syncTask > 0) {
+ filterManager.getScheduler().cancelTask(syncTask);
+
+ syncTask = -1;
+ return true;
+ } else {
+ return false;
+ }
+ }
+
/**
* Start multiple worker threads for this listener.
* @param count - number of worker threads to start.
@@ -386,9 +503,13 @@ public class AsyncListenerHandler {
}
}
- // DO NOT call this method from the main thread
+ /**
+ * The main processing loop of asynchronous threads.
+ *
+ * Note: DO NOT call this method from the main thread
+ * @param workerID - the current worker ID.
+ */
private void listenerLoop(int workerID) {
-
// Danger, danger!
if (Thread.currentThread().getId() == mainThread.getId())
throw new IllegalStateException("Do not call this method from the main thread.");
@@ -403,16 +524,11 @@ public class AsyncListenerHandler {
// Proceed
started.incrementAndGet();
- mainLoop:
while (!cancelled) {
PacketEvent packet = queuedPackets.take();
- AsyncMarker marker = packet.getAsyncMarker();
// Handle cancel requests
- if (packet == null || marker == null || packet == INTERUPT_PACKET) {
- return;
-
- } else if (packet == WAKEUP_PACKET) {
+ if (packet == WAKEUP_PACKET) {
// This is a bit slow, but it should be safe
synchronized (stopLock) {
// Are we the one who is supposed to stop?
@@ -421,42 +537,13 @@ public class AsyncListenerHandler {
if (waitForStops())
return;
}
+ } else if (packet == INTERUPT_PACKET) {
+ return;
}
- // Here's the core of the asynchronous processing
- try {
- marker.setListenerHandler(this);
- marker.setWorkerID(workerID);
-
- synchronized (marker.getProcessingLock()) {
- if (packet.isServerPacket())
- listener.onPacketSending(packet);
- else
- listener.onPacketReceiving(packet);
- }
-
- } catch (Throwable e) {
- // Minecraft doesn't want your Exception.
- filterManager.getErrorReporter().reportMinimal(listener.getPlugin(), "onAsyncPacket()", e);
+ if (packet != null && packet.getAsyncMarker() != null) {
+ processPacket(workerID, packet, "onAsyncPacket()");
}
-
- // Now, get the next non-cancelled listener
- if (!marker.hasExpired()) {
- for (; marker.getListenerTraversal().hasNext(); ) {
- AsyncListenerHandler handler = marker.getListenerTraversal().next().getListener();
-
- if (!handler.isCancelled()) {
- handler.enqueuePacket(packet);
- continue mainLoop;
- }
- }
- }
-
- // There are no more listeners - queue the packet for transmission
- filterManager.signalFreeProcessingSlot(packet);
-
- // Note that listeners can opt to delay the packet transmission
- filterManager.signalPacketTransmission(packet);
}
} catch (InterruptedException e) {
@@ -464,16 +551,66 @@ public class AsyncListenerHandler {
} finally {
// Clean up
started.decrementAndGet();
- close();
}
}
+ /**
+ * Called when a packet is scheduled for processing.
+ * @param workerID - the current worker ID.
+ * @param packet - the current packet.
+ * @param methodName - name of the method.
+ */
+ private void processPacket(int workerID, PacketEvent packet, String methodName) {
+ AsyncMarker marker = packet.getAsyncMarker();
+
+ // Here's the core of the asynchronous processing
+ try {
+ synchronized (marker.getProcessingLock()) {
+ marker.setListenerHandler(this);
+ marker.setWorkerID(workerID);
+
+ if (packet.isServerPacket())
+ listener.onPacketSending(packet);
+ else
+ listener.onPacketReceiving(packet);
+ }
+
+ } catch (Throwable e) {
+ // Minecraft doesn't want your Exception.
+ filterManager.getErrorReporter().reportMinimal(listener.getPlugin(), methodName, e);
+ }
+
+ // Now, get the next non-cancelled listener
+ if (!marker.hasExpired()) {
+ for (; marker.getListenerTraversal().hasNext(); ) {
+ AsyncListenerHandler handler = marker.getListenerTraversal().next().getListener();
+
+ if (!handler.isCancelled()) {
+ handler.enqueuePacket(packet);
+ return;
+ }
+ }
+ }
+
+ // There are no more listeners - queue the packet for transmission
+ filterManager.signalFreeProcessingSlot(packet);
+
+ // Note that listeners can opt to delay the packet transmission
+ filterManager.signalPacketTransmission(packet);
+ }
+
+ /**
+ * Close all worker threads and the handler itself.
+ */
private synchronized void close() {
// Remove the listener itself
if (!cancelled) {
filterManager.unregisterAsyncHandlerInternal(this);
cancelled = true;
+ // Close processing tasks
+ syncStop();
+
// Tell every uncancelled thread to end
stopThreads();
}
diff --git a/ProtocolLib/src/main/java/com/comphenix/protocol/async/AsyncMarker.java b/ProtocolLib/src/main/java/com/comphenix/protocol/async/AsyncMarker.java
index febbbbcf..7b6b038b 100644
--- a/ProtocolLib/src/main/java/com/comphenix/protocol/async/AsyncMarker.java
+++ b/ProtocolLib/src/main/java/com/comphenix/protocol/async/AsyncMarker.java
@@ -99,8 +99,8 @@ public class AsyncMarker implements Serializable, Comparable {
private transient int workerID;
// Determine if Minecraft processes this packet asynchronously
- private static Method isMinecraftAsync;
- private static boolean alwaysSync;
+ private volatile static Method isMinecraftAsync;
+ private volatile static boolean alwaysSync;
/**
* Create a container for asyncronous packets.
@@ -206,7 +206,7 @@ public class AsyncMarker implements Serializable, Comparable {
}
/**
- * Increment the number of times this packet must be signalled as done before its transmitted.
+ * Increment the number of times the current packet must be signalled as done before its transmitted.
*
* This is useful if an asynchronous listener is waiting for further information before the
* packet can be sent to the user. A packet listener MUST eventually call
@@ -215,9 +215,7 @@ public class AsyncMarker implements Serializable, Comparable {
*
* It is recommended that processing outside a packet listener is wrapped in a synchronized block
* using the {@link #getProcessingLock()} method.
- *
- * To decrement the processing delay, call signalPacketUpdate. A thread that calls this method
- * multiple times must call signalPacketUpdate at least that many times.
+ *
* @return The new processing delay.
*/
public int incrementProcessingDelay() {
@@ -447,4 +445,20 @@ public class AsyncMarker implements Serializable, Comparable {
else
return Longs.compare(getNewSendingIndex(), o.getNewSendingIndex());
}
+
+ @Override
+ public boolean equals(Object other) {
+ // Standard equals
+ if (other == this)
+ return true;
+ if (other instanceof AsyncMarker)
+ return getNewSendingIndex() == ((AsyncMarker) other).getNewSendingIndex();
+ else
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return Longs.hashCode(getNewSendingIndex());
+ }
}
diff --git a/ProtocolLib/src/main/java/com/comphenix/protocol/async/PacketEventHolder.java b/ProtocolLib/src/main/java/com/comphenix/protocol/async/PacketEventHolder.java
index da0ed7e8..5dce81a9 100644
--- a/ProtocolLib/src/main/java/com/comphenix/protocol/async/PacketEventHolder.java
+++ b/ProtocolLib/src/main/java/com/comphenix/protocol/async/PacketEventHolder.java
@@ -20,6 +20,7 @@ package com.comphenix.protocol.async;
import com.comphenix.protocol.events.PacketEvent;
import com.google.common.base.Preconditions;
import com.google.common.collect.ComparisonChain;
+import com.google.common.primitives.Longs;
/**
* Provides a comparable to a packet event.
@@ -56,4 +57,20 @@ class PacketEventHolder implements Comparable {
compare(sendingIndex, other.sendingIndex).
result();
}
+
+ @Override
+ public boolean equals(Object other) {
+ // Standard equals
+ if (other == this)
+ return true;
+ if (other instanceof PacketEventHolder)
+ return sendingIndex == ((PacketEventHolder) other).sendingIndex;
+ else
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return Longs.hashCode(sendingIndex);
+ }
}
diff --git a/ProtocolLib/src/main/java/com/comphenix/protocol/async/PacketProcessingQueue.java b/ProtocolLib/src/main/java/com/comphenix/protocol/async/PacketProcessingQueue.java
index 23defbf2..364b6c1f 100644
--- a/ProtocolLib/src/main/java/com/comphenix/protocol/async/PacketProcessingQueue.java
+++ b/ProtocolLib/src/main/java/com/comphenix/protocol/async/PacketProcessingQueue.java
@@ -58,13 +58,13 @@ class PacketProcessingQueue extends AbstractConcurrentListenerMultimap processingQueue;
// Packets for sending
- private PacketSendingQueue sendingQueue;
-
- public PacketProcessingQueue(PacketSendingQueue sendingQueue) {
- this(sendingQueue, INITIAL_CAPACITY, DEFAULT_QUEUE_LIMIT, DEFAULT_MAXIMUM_CONCURRENCY);
+ private PlayerSendingHandler sendingHandler;
+
+ public PacketProcessingQueue(PlayerSendingHandler sendingHandler) {
+ this(sendingHandler, INITIAL_CAPACITY, DEFAULT_QUEUE_LIMIT, DEFAULT_MAXIMUM_CONCURRENCY);
}
- public PacketProcessingQueue(PacketSendingQueue sendingQueue, int initialSize, int maximumSize, int maximumConcurrency) {
+ public PacketProcessingQueue(PlayerSendingHandler sendingHandler, int initialSize, int maximumSize, int maximumConcurrency) {
super();
this.processingQueue = Synchronization.queue(MinMaxPriorityQueue.
@@ -74,7 +74,7 @@ class PacketProcessingQueue extends AbstractConcurrentListenerMultimap sendingQueue;
- // Whether or not packet transmission can only occur on the main thread
- private final boolean synchronizeMain;
+ // Asynchronous packet sending
+ private Executor asynchronousSender;
+
+ // Whether or not packet transmission must occur on a specific thread
+ private final boolean notThreadSafe;
+
+ // Whether or not we've run the cleanup procedure
+ private boolean cleanedUp = false;
+
+ /**
+ * Create a packet sending queue.
+ * @param notThreadSafe - whether or not to synchronize with the main thread or a background thread.
+ */
+ public PacketSendingQueue(boolean notThreadSafe, Executor asynchronousSender) {
+ this.sendingQueue = new PriorityBlockingQueue(INITIAL_CAPACITY);
+ this.notThreadSafe = notThreadSafe;
+ this.asynchronousSender = asynchronousSender;
+ }
/**
* Number of packet events in the queue.
@@ -50,15 +67,6 @@ abstract class PacketSendingQueue {
return sendingQueue.size();
}
- /**
- * Create a packet sending queue.
- * @param synchronizeMain - whether or not to synchronize with the main thread.
- */
- public PacketSendingQueue(boolean synchronizeMain) {
- this.sendingQueue = new PriorityBlockingQueue(INITIAL_CAPACITY);
- this.synchronizeMain = synchronizeMain;
- }
-
/**
* Enqueue a packet for sending.
* @param packet - packet to queue.
@@ -119,56 +127,107 @@ abstract class PacketSendingQueue {
* @param onMainThread - whether or not this is occuring on the main thread.
*/
public void trySendPackets(boolean onMainThread) {
-
+ // Whether or not to continue sending packets
+ boolean sending = true;
+
// Transmit as many packets as we can
- while (true) {
- PacketEventHolder holder = sendingQueue.peek();
-
+ while (sending) {
+ PacketEventHolder holder = sendingQueue.poll();
+
if (holder != null) {
- PacketEvent current = holder.getEvent();
- AsyncMarker marker = current.getAsyncMarker();
- boolean hasExpired = marker.hasExpired();
+ sending = processPacketHolder(onMainThread, holder);
+
+ if (!sending) {
+ // Add it back again
+ sendingQueue.add(holder);
+ }
- // Abort if we're not on the main thread
- if (synchronizeMain) {
+ } else {
+ // No more packets to send
+ sending = false;
+ }
+ }
+ }
+
+ /**
+ * Invoked when a packet might be ready for transmission.
+ * @param onMainThread - TRUE if we're on the main thread, FALSE otherwise.
+ * @param holder - packet container.
+ * @return TRUE to continue sending packets, FALSE otherwise.
+ */
+ private boolean processPacketHolder(boolean onMainThread, final PacketEventHolder holder) {
+ PacketEvent current = holder.getEvent();
+ AsyncMarker marker = current.getAsyncMarker();
+ boolean hasExpired = marker.hasExpired();
+
+ // Guard in cause the queue is closed
+ if (cleanedUp) {
+ return true;
+ }
+
+ // End condition?
+ if (marker.isProcessed() || hasExpired) {
+ if (hasExpired) {
+ // Notify timeout listeners
+ onPacketTimeout(current);
+
+ // Recompute
+ marker = current.getAsyncMarker();
+ hasExpired = marker.hasExpired();
+
+ // Could happen due to the timeout listeners
+ if (!marker.isProcessed() && !hasExpired) {
+ return false;
+ }
+ }
+
+ // Is it okay to send the packet?
+ if (!current.isCancelled() && !hasExpired) {
+ // Make sure we're on the main thread
+ if (notThreadSafe) {
try {
boolean wantAsync = marker.isMinecraftAsync(current);
boolean wantSync = !wantAsync;
- // Quit if we haven't fulfilled our promise
- if ((onMainThread && wantAsync) || (!onMainThread && wantSync))
- return;
+ // Wait for the next main thread heartbeat if we haven't fulfilled our promise
+ if (!onMainThread && wantSync) {
+ return false;
+ }
+
+ // Let's give it what it wants
+ if (onMainThread && wantAsync) {
+ asynchronousSender.execute(new Runnable() {
+ @Override
+ public void run() {
+ // We know this isn't on the main thread
+ processPacketHolder(false, holder);
+ }
+ });
+
+ // Scheduler will do the rest
+ return true;
+ }
} catch (FieldAccessException e) {
e.printStackTrace();
- return;
- }
- }
-
- if (marker.isProcessed() || hasExpired) {
- if (hasExpired) {
- // Notify timeout listeners
- onPacketTimeout(current);
- // Recompute
- marker = current.getAsyncMarker();
- hasExpired = marker.hasExpired();
- }
- if (marker.isProcessed() && !current.isCancelled() && !hasExpired) {
- // Silently skip players that have logged out
- if (isOnline(current.getPlayer())) {
- sendPacket(current);
- }
+ // Just drop the packet
+ return true;
}
-
- sendingQueue.poll();
- continue;
+ }
+
+ // Silently skip players that have logged out
+ if (isOnline(current.getPlayer())) {
+ sendPacket(current);
}
- }
+ }
- // Only repeat when packets are removed
- break;
+ // Drop the packet
+ return true;
}
+
+ // Add it back and stop sending
+ return false;
}
/**
@@ -201,7 +260,7 @@ abstract class PacketSendingQueue {
* @return TRUE if it must, FALSE otherwise.
*/
public boolean isSynchronizeMain() {
- return synchronizeMain;
+ return notThreadSafe;
}
/**
@@ -234,7 +293,12 @@ abstract class PacketSendingQueue {
* Automatically transmits every delayed packet.
*/
public void cleanupAll() {
- // Note that the cleanup itself will always occur on the main thread
- forceSend();
+ if (!cleanedUp) {
+ // Note that the cleanup itself will always occur on the main thread
+ forceSend();
+
+ // And we're done
+ cleanedUp = true;
+ }
}
}
diff --git a/ProtocolLib/src/main/java/com/comphenix/protocol/async/PlayerSendingHandler.java b/ProtocolLib/src/main/java/com/comphenix/protocol/async/PlayerSendingHandler.java
new file mode 100644
index 00000000..8dc4d0be
--- /dev/null
+++ b/ProtocolLib/src/main/java/com/comphenix/protocol/async/PlayerSendingHandler.java
@@ -0,0 +1,249 @@
+package com.comphenix.protocol.async;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
+import org.bukkit.entity.Player;
+
+import com.comphenix.protocol.error.ErrorReporter;
+import com.comphenix.protocol.events.PacketEvent;
+import com.comphenix.protocol.injector.SortedPacketListenerList;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * Contains every sending queue for every player.
+ *
+ * @author Kristian
+ */
+class PlayerSendingHandler {
+
+ private ErrorReporter reporter;
+ private ConcurrentHashMap playerSendingQueues;
+
+ // Timeout listeners
+ private SortedPacketListenerList serverTimeoutListeners;
+ private SortedPacketListenerList clientTimeoutListeners;
+
+ // Asynchronous packet sending
+ private Executor asynchronousSender;
+
+ // Whether or not we're currently cleaning up
+ private volatile boolean cleaningUp;
+
+ /**
+ * Sending queues for a given player.
+ *
+ * @author Kristian
+ */
+ private class QueueContainer {
+ private PacketSendingQueue serverQueue;
+ private PacketSendingQueue clientQueue;
+
+ public QueueContainer() {
+ // Server packets can be sent concurrently
+ serverQueue = new PacketSendingQueue(false, asynchronousSender) {
+ @Override
+ protected void onPacketTimeout(PacketEvent event) {
+ if (!cleaningUp) {
+ serverTimeoutListeners.invokePacketSending(reporter, event);
+ }
+ }
+ };
+
+ // Client packets must be synchronized
+ clientQueue = new PacketSendingQueue(true, asynchronousSender) {
+ @Override
+ protected void onPacketTimeout(PacketEvent event) {
+ if (!cleaningUp) {
+ clientTimeoutListeners.invokePacketSending(reporter, event);
+ }
+ }
+ };
+ }
+
+ public PacketSendingQueue getServerQueue() {
+ return serverQueue;
+ }
+
+ public PacketSendingQueue getClientQueue() {
+ return clientQueue;
+ }
+ }
+
+ /**
+ * Initialize a packet sending handler.
+ * @param reporter - error reporter.
+ * @param serverTimeoutListeners - set of server timeout listeners.
+ * @param clientTimeoutListeners - set of client timeout listeners.
+ */
+ public PlayerSendingHandler(ErrorReporter reporter,
+ SortedPacketListenerList serverTimeoutListeners, SortedPacketListenerList clientTimeoutListeners) {
+
+ this.reporter = reporter;
+ this.serverTimeoutListeners = serverTimeoutListeners;
+ this.clientTimeoutListeners = clientTimeoutListeners;
+
+ // Initialize storage of queues
+ this.playerSendingQueues = new ConcurrentHashMap();
+ }
+
+ /**
+ * Start the asynchronous packet sender.
+ */
+ public synchronized void initializeScheduler() {
+ if (asynchronousSender == null) {
+ ThreadFactory factory = new ThreadFactoryBuilder().
+ setDaemon(true).
+ setNameFormat("ProtocolLib-AsyncSender %s").
+ build();
+ asynchronousSender = Executors.newSingleThreadExecutor(factory);
+ }
+ }
+
+ /**
+ * Retrieve the sending queue this packet belongs to.
+ * @param packet - the packet.
+ * @return The server or client sending queue the packet belongs to.
+ */
+ public PacketSendingQueue getSendingQueue(PacketEvent packet) {
+ return getSendingQueue(packet, true);
+ }
+
+ /**
+ * Retrieve the sending queue this packet belongs to.
+ * @param packet - the packet.
+ * @param createNew - if TRUE, create a new queue if it hasn't already been created.
+ * @return The server or client sending queue the packet belongs to.
+ */
+ public PacketSendingQueue getSendingQueue(PacketEvent packet, boolean createNew) {
+ String name = packet.getPlayer().getName();
+ QueueContainer queues = playerSendingQueues.get(name);
+
+ // Safe concurrent initialization
+ if (queues == null && createNew) {
+ final QueueContainer newContainer = new QueueContainer();
+
+ // Attempt to map the queue
+ queues = playerSendingQueues.putIfAbsent(name, newContainer);
+
+ if (queues == null) {
+ queues = newContainer;
+ }
+ }
+
+ // Check for NULL again
+ if (queues != null)
+ return packet.isServerPacket() ? queues.getServerQueue() : queues.getClientQueue();
+ else
+ return null;
+ }
+
+ /**
+ * Send all pending packets.
+ */
+ public void sendAllPackets() {
+ if (!cleaningUp) {
+ for (QueueContainer queues : playerSendingQueues.values()) {
+ queues.getClientQueue().cleanupAll();
+ queues.getServerQueue().cleanupAll();
+ }
+ }
+ }
+
+ /**
+ * Immediately send every server packet with the given list of IDs.
+ * @param ids - ID of every packet to send immediately.
+ * @param synchronusOK - whether or not we're running on the main thread.
+ */
+ public void sendServerPackets(List ids, boolean synchronusOK) {
+ if (!cleaningUp) {
+ for (QueueContainer queue : playerSendingQueues.values()) {
+ queue.getServerQueue().signalPacketUpdate(ids, synchronusOK);
+ }
+ }
+ }
+
+ /**
+ * Immediately send every client packet with the given list of IDs.
+ * @param ids - ID of every packet to send immediately.
+ * @param synchronusOK - whether or not we're running on the main thread.
+ */
+ public void sendClientPackets(List ids, boolean synchronusOK) {
+ if (!cleaningUp) {
+ for (QueueContainer queue : playerSendingQueues.values()) {
+ queue.getClientQueue().signalPacketUpdate(ids, synchronusOK);
+ }
+ }
+ }
+
+ /**
+ * Send any outstanding server packets.
+ * @param onMainThread - whether or not this is occuring on the main thread.
+ */
+ public void trySendServerPackets(boolean onMainThread) {
+ for (QueueContainer queue : playerSendingQueues.values()) {
+ queue.getServerQueue().trySendPackets(onMainThread);
+ }
+ }
+
+ /**
+ * Send any outstanding server packets.
+ * @param onMainThread - whether or not this is occuring on the main thread.
+ */
+ public void trySendClientPackets(boolean onMainThread) {
+ for (QueueContainer queue : playerSendingQueues.values()) {
+ queue.getClientQueue().trySendPackets(onMainThread);
+ }
+ }
+
+ /**
+ * Retrieve every server packet queue for every player.
+ * @return Every sever packet queue.
+ */
+ public List getServerQueues() {
+ List result = new ArrayList();
+
+ for (QueueContainer queue : playerSendingQueues.values())
+ result.add(queue.getServerQueue());
+ return result;
+ }
+
+ /**
+ * Retrieve every client packet queue for every player.
+ * @return Every client packet queue.
+ */
+ public List getClientQueues() {
+ List result = new ArrayList();
+
+ for (QueueContainer queue : playerSendingQueues.values())
+ result.add(queue.getClientQueue());
+ return result;
+ }
+
+ /**
+ * Send all pending packets and clean up queues.
+ */
+ public void cleanupAll() {
+ if (!cleaningUp) {
+ cleaningUp = true;
+
+ sendAllPackets();
+ playerSendingQueues.clear();
+ }
+ }
+
+ /**
+ * Invoked when a player has just logged out.
+ * @param player - the player that just logged out.
+ */
+ public void removePlayer(Player player) {
+ String name = player.getName();
+
+ // Every packet will be dropped - there's nothing we can do
+ playerSendingQueues.remove(name);
+ }
+}
diff --git a/ProtocolLib/src/main/java/com/comphenix/protocol/concurrency/AbstractIntervalTree.java b/ProtocolLib/src/main/java/com/comphenix/protocol/concurrency/AbstractIntervalTree.java
index e2748cbc..571083d5 100644
--- a/ProtocolLib/src/main/java/com/comphenix/protocol/concurrency/AbstractIntervalTree.java
+++ b/ProtocolLib/src/main/java/com/comphenix/protocol/concurrency/AbstractIntervalTree.java
@@ -6,6 +6,7 @@ import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
+import com.google.common.base.Objects;
import com.google.common.collect.Range;
import com.google.common.collect.Ranges;
@@ -32,24 +33,25 @@ public abstract class AbstractIntervalTree, TValue
* Represents a range and a value in this interval tree.
*/
public class Entry implements Map.Entry, TValue> {
- private final Range key;
private EndPoint left;
private EndPoint right;
- Entry(Range key, EndPoint left, EndPoint right) {
+ Entry(EndPoint left, EndPoint right) {
if (left == null)
throw new IllegalAccessError("left cannot be NUll");
if (right == null)
throw new IllegalAccessError("right cannot be NUll");
+ if (left.key.compareTo(right.key) > 0)
+ throw new IllegalArgumentException(
+ "Left key (" + left.key + ") cannot be greater than the right key (" + right.key + ")");
- this.key = key;
this.left = left;
this.right = right;
}
@Override
public Range getKey() {
- return key;
+ return Ranges.closed(left.key, right.key);
}
@Override
@@ -66,6 +68,31 @@ public abstract class AbstractIntervalTree, TValue
right.value = value;
return old;
}
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public boolean equals(Object obj) {
+ // Quick equality check
+ if (obj == this) {
+ return true;
+ } else if (obj instanceof AbstractIntervalTree.Entry) {
+ return Objects.equal(left.key, ((AbstractIntervalTree.Entry) obj).left.key) &&
+ Objects.equal(right.key, ((AbstractIntervalTree.Entry) obj).right.key) &&
+ Objects.equal(left.value, ((AbstractIntervalTree.Entry) obj).left.value);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(left.key, right.key, left.value);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Value %s at [%s, %s]", left.value, left.key, right.key);
+ }
}
/**
@@ -78,9 +105,13 @@ public abstract class AbstractIntervalTree, TValue
// The value this range contains
public TValue value;
+
+ // The key of this end point
+ public TKey key;
- public EndPoint(State state, TValue value) {
+ public EndPoint(State state, TKey key, TValue value) {
this.state = state;
+ this.key = key;
this.value = value;
}
}
@@ -96,7 +127,7 @@ public abstract class AbstractIntervalTree, TValue
public Set remove(TKey lowerBound, TKey upperBound) {
return remove(lowerBound, upperBound, false);
}
-
+
/**
* Removes every interval that intersects with the given range.
* @param lowerBound - lowest value to remove.
@@ -107,31 +138,46 @@ public abstract class AbstractIntervalTree, TValue
checkBounds(lowerBound, upperBound);
NavigableMap range = bounds.subMap(lowerBound, true, upperBound, true);
- boolean emptyRange = range.isEmpty();
- TKey first = !emptyRange ? range.firstKey() : null;
- TKey last = !emptyRange ? range.lastKey() : null;
-
+ EndPoint first = getNextEndPoint(lowerBound, true);
+ EndPoint last = getPreviousEndPoint(upperBound, true);
+
+ // Used while resizing intervals
+ EndPoint previous = null;
+ EndPoint next = null;
+
Set resized = new HashSet();
Set removed = new HashSet();
// Remove the previous element too. A close end-point must be preceded by an OPEN end-point.
- if (first != null && range.get(first).state == State.CLOSE) {
- TKey key = bounds.floorKey(first);
- EndPoint removedPoint = removeIfNonNull(key);
+ if (first != null && first.state == State.CLOSE) {
+ previous = getPreviousEndPoint(first.key, false);
// Add the interval back
- if (removedPoint != null && preserveDifference) {
- resized.add(putUnsafe(key, decrementKey(lowerBound), removedPoint.value));
+ if (previous != null) {
+ removed.add(getEntry(previous, first));
}
}
// Get the closing element too.
- if (last != null && range.get(last).state == State.OPEN) {
- TKey key = bounds.ceilingKey(last);
- EndPoint removedPoint = removeIfNonNull(key);
+ if (last != null && last.state == State.OPEN) {
+ next = getNextEndPoint(last.key, false);
- if (removedPoint != null && preserveDifference) {
- resized.add(putUnsafe(incrementKey(upperBound), key, removedPoint.value));
+ if (next != null) {
+ removed.add(getEntry(last, next));
+ }
+ }
+
+ // Now remove both ranges
+ removeEntrySafely(previous, first);
+ removeEntrySafely(last, next);
+
+ // Add new resized intervals
+ if (preserveDifference) {
+ if (previous != null) {
+ resized.add(putUnsafe(previous.key, decrementKey(lowerBound), previous.value));
+ }
+ if (next != null) {
+ resized.add(putUnsafe(incrementKey(upperBound), next.key, next.value));
}
}
@@ -140,7 +186,6 @@ public abstract class AbstractIntervalTree, TValue
invokeEntryRemoved(removed);
if (preserveDifference) {
- invokeEntryRemoved(resized);
invokeEntryAdded(resized);
}
@@ -149,12 +194,30 @@ public abstract class AbstractIntervalTree, TValue
return removed;
}
- // Helper
- private EndPoint removeIfNonNull(TKey key) {
- if (key != null) {
- return bounds.remove(key);
+ /**
+ * Retrieve the entry from a given set of end points.
+ * @param left - leftmost end point.
+ * @param right - rightmost end point.
+ * @return The associated entry.
+ */
+ protected Entry getEntry(EndPoint left, EndPoint right) {
+ if (left == null)
+ throw new IllegalArgumentException("left endpoint cannot be NULL.");
+ if (right == null)
+ throw new IllegalArgumentException("right endpoint cannot be NULL.");
+
+ // Make sure the order is correct
+ if (right.key.compareTo(left.key) < 0) {
+ return getEntry(right, left);
} else {
- return null;
+ return new Entry(left, right);
+ }
+ }
+
+ private void removeEntrySafely(EndPoint left, EndPoint right) {
+ if (left != null && right != null) {
+ bounds.remove(left.key);
+ bounds.remove(right.key);
}
}
@@ -165,7 +228,7 @@ public abstract class AbstractIntervalTree, TValue
if (endPoint != null) {
endPoint.state = State.BOTH;
} else {
- endPoint = new EndPoint(state, value);
+ endPoint = new EndPoint(state, key, value);
bounds.put(key, endPoint);
}
return endPoint;
@@ -198,9 +261,8 @@ public abstract class AbstractIntervalTree, TValue
if (value != null) {
EndPoint left = addEndPoint(lowerBound, value, State.OPEN);
EndPoint right = addEndPoint(upperBound, value, State.CLOSE);
-
- Range range = Ranges.closed(lowerBound, upperBound);
- return new Entry(range, left, right);
+
+ return new Entry(left, right);
} else {
return null;
}
@@ -257,15 +319,16 @@ public abstract class AbstractIntervalTree, TValue
private void getEntries(Set destination, NavigableMap map) {
Map.Entry last = null;
- for (Map.Entry entry : bounds.entrySet()) {
+ for (Map.Entry entry : map.entrySet()) {
switch (entry.getValue().state) {
case BOTH:
EndPoint point = entry.getValue();
- destination.add(new Entry(Ranges.singleton(entry.getKey()), point, point));
+ destination.add(new Entry(point, point));
break;
case CLOSE:
- Range range = Ranges.closed(last.getKey(), entry.getKey());
- destination.add(new Entry(range, last.getValue(), entry.getValue()));
+ if (last != null) {
+ destination.add(new Entry(last.getValue(), entry.getValue()));
+ }
break;
case OPEN:
// We don't know the full range yet
@@ -284,7 +347,7 @@ public abstract class AbstractIntervalTree, TValue
public void putAll(AbstractIntervalTree other) {
// Naively copy every range.
for (Entry entry : other.entrySet()) {
- put(entry.key.lowerEndpoint(), entry.key.upperEndpoint(), entry.getValue());
+ put(entry.left.key, entry.right.key, entry.getValue());
}
}
@@ -303,7 +366,7 @@ public abstract class AbstractIntervalTree, TValue
}
/**
- * Get the end-point composite associated with this key.
+ * Get the left-most end-point associated with this key.
* @param key - key to search for.
* @return The end point found, or NULL.
*/
@@ -311,22 +374,60 @@ public abstract class AbstractIntervalTree, TValue
EndPoint ends = bounds.get(key);
if (ends != null) {
- // This is a piece of cake
- return ends;
- } else {
+ // Always return the end point to the left
+ if (ends.state == State.CLOSE) {
+ Map.Entry left = bounds.floorEntry(decrementKey(key));
+ return left != null ? left.getValue() : null;
+
+ } else {
+ return ends;
+ }
+ } else {
// We need to determine if the point intersects with a range
- TKey left = bounds.floorKey(key);
+ Map.Entry left = bounds.floorEntry(key);
// We only need to check to the left
- if (left != null && bounds.get(left).state == State.OPEN) {
- return bounds.get(left);
+ if (left != null && left.getValue().state == State.OPEN) {
+ return left.getValue();
} else {
return null;
}
}
}
+ /**
+ * Get the previous end point of a given key.
+ * @param point - the point to search with.
+ * @param inclusive - whether or not to include the current point in the search.
+ * @return The previous end point of a given given key, or NULL if not found.
+ */
+ protected EndPoint getPreviousEndPoint(TKey point, boolean inclusive) {
+ if (point != null) {
+ Map.Entry previous = bounds.floorEntry(inclusive ? point : decrementKey(point));
+
+ if (previous != null)
+ return previous.getValue();
+ }
+ return null;
+ }
+
+ /**
+ * Get the next end point of a given key.
+ * @param point - the point to search with.
+ * @param inclusive - whether or not to include the current point in the search.
+ * @return The next end point of a given given key, or NULL if not found.
+ */
+ protected EndPoint getNextEndPoint(TKey point, boolean inclusive) {
+ if (point != null) {
+ Map.Entry next = bounds.ceilingEntry(inclusive ? point : incrementKey(point));
+
+ if (next != null)
+ return next.getValue();
+ }
+ return null;
+ }
+
private void invokeEntryAdded(Entry added) {
if (added != null) {
onEntryAdded(added);
diff --git a/ProtocolLib/src/main/java/com/comphenix/protocol/error/DetailedErrorReporter.java b/ProtocolLib/src/main/java/com/comphenix/protocol/error/DetailedErrorReporter.java
index bceb7dca..2257cbca 100644
--- a/ProtocolLib/src/main/java/com/comphenix/protocol/error/DetailedErrorReporter.java
+++ b/ProtocolLib/src/main/java/com/comphenix/protocol/error/DetailedErrorReporter.java
@@ -2,6 +2,7 @@ package com.comphenix.protocol.error;
import java.io.PrintWriter;
import java.io.StringWriter;
+import java.lang.ref.WeakReference;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@@ -40,6 +41,8 @@ public class DetailedErrorReporter implements ErrorReporter {
protected int errorCount;
protected int maxErrorCount;
protected Logger logger;
+
+ protected WeakReference pluginReference;
// Whether or not Apache Commons is not present
protected boolean apacheCommonsMissing;
@@ -50,17 +53,18 @@ public class DetailedErrorReporter implements ErrorReporter {
/**
* Create a default error reporting system.
*/
- public DetailedErrorReporter() {
- this(DEFAULT_PREFIX, DEFAULT_SUPPORT_URL);
+ public DetailedErrorReporter(Plugin plugin) {
+ this(plugin, DEFAULT_PREFIX, DEFAULT_SUPPORT_URL);
}
/**
* Create a central error reporting system.
+ * @param plugin - the plugin owner.
* @param prefix - default line prefix.
* @param supportURL - URL to report the error.
*/
- public DetailedErrorReporter(String prefix, String supportURL) {
- this(prefix, supportURL, DEFAULT_MAX_ERROR_COUNT, getBukkitLogger());
+ public DetailedErrorReporter(Plugin plugin, String prefix, String supportURL) {
+ this(plugin, prefix, supportURL, DEFAULT_MAX_ERROR_COUNT, getBukkitLogger());
}
// Attempt to get the logger.
@@ -74,12 +78,17 @@ public class DetailedErrorReporter implements ErrorReporter {
/**
* Create a central error reporting system.
+ * @param plugin - the plugin owner.
* @param prefix - default line prefix.
* @param supportURL - URL to report the error.
* @param maxErrorCount - number of errors to print before giving up.
* @param logger - current logger.
*/
- public DetailedErrorReporter(String prefix, String supportURL, int maxErrorCount, Logger logger) {
+ public DetailedErrorReporter(Plugin plugin, String prefix, String supportURL, int maxErrorCount, Logger logger) {
+ if (plugin == null)
+ throw new IllegalArgumentException("Plugin cannot be NULL.");
+
+ this.pluginReference = new WeakReference(plugin);
this.prefix = prefix;
this.supportURL = supportURL;
this.maxErrorCount = maxErrorCount;
@@ -112,6 +121,8 @@ public class DetailedErrorReporter implements ErrorReporter {
@Override
public void reportDetailed(Object sender, String message, Throwable error, Object... parameters) {
+ final Plugin plugin = pluginReference.get();
+
// Do not overtly spam the server!
if (++errorCount > maxErrorCount) {
String maxReached = String.format("Reached maxmimum error count. Cannot pass error %s from %s.", error, sender);
@@ -157,6 +168,12 @@ public class DetailedErrorReporter implements ErrorReporter {
writer.println("Sender:");
writer.println(addPrefix(getStringDescription(sender), SECOND_LEVEL_PREFIX));
+ // And plugin
+ if (plugin != null) {
+ writer.println("Version:");
+ writer.println(addPrefix(plugin.toString(), SECOND_LEVEL_PREFIX));
+ }
+
// Add the server version too
if (Bukkit.getServer() != null) {
writer.println("Server:");
diff --git a/ProtocolLib/src/main/java/com/comphenix/protocol/events/PacketEvent.java b/ProtocolLib/src/main/java/com/comphenix/protocol/events/PacketEvent.java
index c0580967..c35330cb 100644
--- a/ProtocolLib/src/main/java/com/comphenix/protocol/events/PacketEvent.java
+++ b/ProtocolLib/src/main/java/com/comphenix/protocol/events/PacketEvent.java
@@ -20,6 +20,7 @@ package com.comphenix.protocol.events;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
+import java.lang.ref.WeakReference;
import java.util.EventObject;
import org.bukkit.entity.Player;
@@ -32,8 +33,10 @@ public class PacketEvent extends EventObject implements Cancellable {
* Automatically generated by Eclipse.
*/
private static final long serialVersionUID = -5360289379097430620L;
-
- private transient Player player;
+
+ private transient WeakReference playerReference;
+ private transient Player offlinePlayer;
+
private PacketContainer packet;
private boolean serverPacket;
private boolean cancel;
@@ -52,14 +55,14 @@ public class PacketEvent extends EventObject implements Cancellable {
private PacketEvent(Object source, PacketContainer packet, Player player, boolean serverPacket) {
super(source);
this.packet = packet;
- this.player = player;
+ this.playerReference = new WeakReference(player);
this.serverPacket = serverPacket;
}
private PacketEvent(PacketEvent origial, AsyncMarker asyncMarker) {
super(origial.source);
this.packet = origial.packet;
- this.player = origial.player;
+ this.playerReference = origial.playerReference;
this.cancel = origial.cancel;
this.serverPacket = origial.serverPacket;
this.asyncMarker = asyncMarker;
@@ -131,7 +134,16 @@ public class PacketEvent extends EventObject implements Cancellable {
}
/**
- * Sets whether or not the packet should be cancelled.
+ * Sets whether or not the packet should be cancelled. Uncancelling is possible.
+ *
+ * Warning: A cancelled packet should never be re-transmitted. Use the asynchronous
+ * packet manager if you need to perform extensive processing. It should also be used
+ * if you need to synchronize with the main thread.
+ *
+ * This ensures that other plugins can work with the same packet.
+ *
+ * An asynchronous listener can also delay a packet indefinitely without having to block its thread.
+ *
* @param cancel - TRUE if it should be cancelled, FALSE otherwise.
*/
public void setCancelled(boolean cancel) {
@@ -143,7 +155,7 @@ public class PacketEvent extends EventObject implements Cancellable {
* @return The player associated with this event.
*/
public Player getPlayer() {
- return player;
+ return playerReference.get();
}
/**
@@ -197,18 +209,20 @@ public class PacketEvent extends EventObject implements Cancellable {
output.defaultWriteObject();
// Write the name of the player (or NULL if it's not set)
- output.writeObject(player != null ? new SerializedOfflinePlayer(player) : null);
+ output.writeObject(playerReference.get() != null ? new SerializedOfflinePlayer(playerReference.get()) : null);
}
private void readObject(ObjectInputStream input) throws ClassNotFoundException, IOException {
// Default deserialization
input.defaultReadObject();
- final SerializedOfflinePlayer offlinePlayer = (SerializedOfflinePlayer) input.readObject();
+ final SerializedOfflinePlayer serialized = (SerializedOfflinePlayer) input.readObject();
- if (offlinePlayer != null) {
- // Better than nothing
- player = offlinePlayer.getPlayer();
+ // Better than nothing
+ if (serialized != null) {
+ // Store it, to prevent weak reference from cleaning up the reference
+ offlinePlayer = serialized.getPlayer();
+ playerReference = new WeakReference(offlinePlayer);
}
}
}
diff --git a/ProtocolLib/src/main/java/com/comphenix/protocol/injector/PacketFilterManager.java b/ProtocolLib/src/main/java/com/comphenix/protocol/injector/PacketFilterManager.java
index 988e773b..a8706e90 100644
--- a/ProtocolLib/src/main/java/com/comphenix/protocol/injector/PacketFilterManager.java
+++ b/ProtocolLib/src/main/java/com/comphenix/protocol/injector/PacketFilterManager.java
@@ -598,46 +598,21 @@ public final class PacketFilterManager implements ProtocolManager, ListenerInvok
try {
manager.registerEvents(new Listener() {
- @EventHandler(priority = EventPriority.LOWEST, ignoreCancelled = true)
+ @EventHandler(priority = EventPriority.LOWEST)
public void onPrePlayerJoin(PlayerJoinEvent event) {
- try {
- // Let's clean up the other injection first.
- playerInjection.uninjectPlayer(event.getPlayer().getAddress());
- } catch (Exception e) {
- reporter.reportDetailed(PacketFilterManager.this, "Unable to uninject net handler for player.", e, event);
- }
+ PacketFilterManager.this.onPrePlayerJoin(event);
}
-
- @EventHandler(priority = EventPriority.MONITOR, ignoreCancelled = true)
+ @EventHandler(priority = EventPriority.MONITOR)
public void onPlayerJoin(PlayerJoinEvent event) {
- try {
- // This call will be ignored if no listeners are registered
- playerInjection.injectPlayer(event.getPlayer());
- } catch (Exception e) {
- reporter.reportDetailed(PacketFilterManager.this, "Unable to inject player.", e, event);
- }
+ PacketFilterManager.this.onPlayerJoin(event);
}
-
- @EventHandler(priority = EventPriority.MONITOR, ignoreCancelled = true)
+ @EventHandler(priority = EventPriority.MONITOR)
public void onPlayerQuit(PlayerQuitEvent event) {
- try {
- playerInjection.handleDisconnect(event.getPlayer());
- playerInjection.uninjectPlayer(event.getPlayer());
- } catch (Exception e) {
- reporter.reportDetailed(PacketFilterManager.this, "Unable to uninject logged off player.", e, event);
- }
+ PacketFilterManager.this.onPlayerQuit(event);
}
-
- @EventHandler(priority = EventPriority.MONITOR, ignoreCancelled = true)
+ @EventHandler(priority = EventPriority.MONITOR)
public void onPluginDisabled(PluginDisableEvent event) {
- try {
- // Clean up in case the plugin forgets
- if (event.getPlugin() != plugin) {
- removePacketListeners(event.getPlugin());
- }
- } catch (Exception e) {
- reporter.reportDetailed(PacketFilterManager.this, "Unable handle disabled plugin.", e, event);
- }
+ PacketFilterManager.this.onPluginDisabled(event, plugin);
}
}, plugin);
@@ -648,6 +623,47 @@ public final class PacketFilterManager implements ProtocolManager, ListenerInvok
}
}
+ private void onPrePlayerJoin(PlayerJoinEvent event) {
+ try {
+ // Let's clean up the other injection first.
+ playerInjection.uninjectPlayer(event.getPlayer().getAddress());
+ } catch (Exception e) {
+ reporter.reportDetailed(PacketFilterManager.this, "Unable to uninject net handler for player.", e, event);
+ }
+ }
+
+ private void onPlayerJoin(PlayerJoinEvent event) {
+ try {
+ // This call will be ignored if no listeners are registered
+ playerInjection.injectPlayer(event.getPlayer());
+ } catch (Exception e) {
+ reporter.reportDetailed(PacketFilterManager.this, "Unable to inject player.", e, event);
+ }
+ }
+
+ private void onPlayerQuit(PlayerQuitEvent event) {
+ try {
+ Player player = event.getPlayer();
+
+ asyncFilterManager.removePlayer(player);
+ playerInjection.handleDisconnect(player);
+ playerInjection.uninjectPlayer(player);
+ } catch (Exception e) {
+ reporter.reportDetailed(PacketFilterManager.this, "Unable to uninject logged off player.", e, event);
+ }
+ }
+
+ private void onPluginDisabled(PluginDisableEvent event, Plugin protocolLibrary) {
+ try {
+ // Clean up in case the plugin forgets
+ if (event.getPlugin() != protocolLibrary) {
+ removePacketListeners(event.getPlugin());
+ }
+ } catch (Exception e) {
+ reporter.reportDetailed(PacketFilterManager.this, "Unable handle disabled plugin.", e, event);
+ }
+ }
+
/**
* Retrieve the number of listeners that expect packets during playing.
* @return Number of listeners.
@@ -689,7 +705,7 @@ public final class PacketFilterManager implements ProtocolManager, ListenerInvok
// Yes, this is crazy.
@SuppressWarnings({ "unchecked", "rawtypes" })
- private void registerOld(PluginManager manager, Plugin plugin) {
+ private void registerOld(PluginManager manager, final Plugin plugin) {
try {
ClassLoader loader = manager.getClass().getClassLoader();
@@ -699,6 +715,7 @@ public final class PacketFilterManager implements ProtocolManager, ListenerInvok
Class eventPriority = loader.loadClass("org.bukkit.event.Event$Priority");
// Get the priority
+ Object priorityLowest = Enum.valueOf(eventPriority, "Lowest");
Object priorityMonitor = Enum.valueOf(eventPriority, "Monitor");
// Get event types
@@ -714,26 +731,40 @@ public final class PacketFilterManager implements ProtocolManager, ListenerInvok
Method registerEvent = FuzzyReflection.fromObject(manager).getMethodByParameters("registerEvent",
eventTypes, Listener.class, eventPriority, Plugin.class);
+ Enhancer playerLow = new Enhancer();
Enhancer playerEx = new Enhancer();
Enhancer serverEx = new Enhancer();
- playerEx.setSuperclass(playerListener);
- playerEx.setClassLoader(classLoader);
- playerEx.setCallback(new MethodInterceptor() {
+ playerLow.setSuperclass(playerListener);
+ playerLow.setClassLoader(classLoader);
+ playerLow.setCallback(new MethodInterceptor() {
@Override
public Object intercept(Object obj, Method method, Object[] args, MethodProxy proxy) throws Throwable {
// Must have a parameter
+ if (args.length == 1) {
+ Object event = args[0];
+
+ if (event instanceof PlayerJoinEvent) {
+ onPrePlayerJoin((PlayerJoinEvent) event);
+ }
+ }
+ return null;
+ }
+ });
+
+ playerEx.setSuperclass(playerListener);
+ playerEx.setClassLoader(classLoader);
+ playerEx.setCallback(new MethodInterceptor() {
+ @Override
+ public Object intercept(Object obj, Method method, Object[] args, MethodProxy proxy) throws Throwable {
if (args.length == 1) {
Object event = args[0];
// Check for the correct event
if (event instanceof PlayerJoinEvent) {
- Player player = ((PlayerJoinEvent) event).getPlayer();
- playerInjection.injectPlayer(player);
+ onPlayerJoin((PlayerJoinEvent) event);
} else if (event instanceof PlayerQuitEvent) {
- Player player = ((PlayerQuitEvent) event).getPlayer();
- playerInjection.handleDisconnect(player);
- playerInjection.uninjectPlayer(player);
+ onPlayerQuit((PlayerQuitEvent) event);
}
}
return null;
@@ -751,16 +782,18 @@ public final class PacketFilterManager implements ProtocolManager, ListenerInvok
Object event = args[0];
if (event instanceof PluginDisableEvent)
- removePacketListeners(((PluginDisableEvent) event).getPlugin());
+ onPluginDisabled((PluginDisableEvent) event, plugin);
}
return null;
}
});
// Create our listener
+ Object playerProxyLow = playerLow.create();
Object playerProxy = playerEx.create();
Object serverProxy = serverEx.create();
+ registerEvent.invoke(manager, playerJoinType, playerProxyLow, priorityLowest, plugin);
registerEvent.invoke(manager, playerJoinType, playerProxy, priorityMonitor, plugin);
registerEvent.invoke(manager, playerQuitType, playerProxy, priorityMonitor, plugin);
registerEvent.invoke(manager, pluginDisabledType, serverProxy, priorityMonitor, plugin);
diff --git a/ProtocolLib/src/main/java/com/comphenix/protocol/injector/PacketInjector.java b/ProtocolLib/src/main/java/com/comphenix/protocol/injector/PacketInjector.java
index 951e4d79..7630c360 100644
--- a/ProtocolLib/src/main/java/com/comphenix/protocol/injector/PacketInjector.java
+++ b/ProtocolLib/src/main/java/com/comphenix/protocol/injector/PacketInjector.java
@@ -83,8 +83,8 @@ class PacketInjector {
public void undoCancel(Integer id, Packet packet) {
ReadPacketModifier modifier = readModifier.get(id);
- // Cancelled packets are represented with NULL
- if (modifier != null && modifier.getOverride(packet) == null) {
+ // See if this packet has been cancelled before
+ if (modifier != null && modifier.hasCancelled(packet)) {
modifier.removeOverride(packet);
}
}
diff --git a/ProtocolLib/src/main/java/com/comphenix/protocol/injector/ReadPacketModifier.java b/ProtocolLib/src/main/java/com/comphenix/protocol/injector/ReadPacketModifier.java
index 6e1967e1..ee32ddc2 100644
--- a/ProtocolLib/src/main/java/com/comphenix/protocol/injector/ReadPacketModifier.java
+++ b/ProtocolLib/src/main/java/com/comphenix/protocol/injector/ReadPacketModifier.java
@@ -74,6 +74,15 @@ class ReadPacketModifier implements MethodInterceptor {
return override.get(packet);
}
+ /**
+ * Determine if the given packet has been cancelled before.
+ * @param packet - the packet to check.
+ * @return TRUE if it has been cancelled, FALSE otherwise.
+ */
+ public boolean hasCancelled(Packet packet) {
+ return getOverride(packet) == CANCEL_MARKER;
+ }
+
@Override
public Object intercept(Object thisObj, Method method, Object[] args, MethodProxy proxy) throws Throwable {
diff --git a/ProtocolLib/src/main/java/com/comphenix/protocol/injector/player/InjectedArrayList.java b/ProtocolLib/src/main/java/com/comphenix/protocol/injector/player/InjectedArrayList.java
index aa50212f..fd5a0b80 100644
--- a/ProtocolLib/src/main/java/com/comphenix/protocol/injector/player/InjectedArrayList.java
+++ b/ProtocolLib/src/main/java/com/comphenix/protocol/injector/player/InjectedArrayList.java
@@ -42,9 +42,9 @@ class InjectedArrayList extends ArrayList {
*/
private static final long serialVersionUID = -1173865905404280990L;
- private PlayerInjector injector;
- private Set ignoredPackets;
- private ClassLoader classLoader;
+ private transient PlayerInjector injector;
+ private transient Set ignoredPackets;
+ private transient ClassLoader classLoader;
public InjectedArrayList(ClassLoader classLoader, PlayerInjector injector, Set ignoredPackets) {
this.classLoader = classLoader;
diff --git a/ProtocolLib/src/main/java/com/comphenix/protocol/injector/player/NetworkServerInjector.java b/ProtocolLib/src/main/java/com/comphenix/protocol/injector/player/NetworkServerInjector.java
index c97d9f50..3a346dfd 100644
--- a/ProtocolLib/src/main/java/com/comphenix/protocol/injector/player/NetworkServerInjector.java
+++ b/ProtocolLib/src/main/java/com/comphenix/protocol/injector/player/NetworkServerInjector.java
@@ -53,8 +53,8 @@ public class NetworkServerInjector extends PlayerInjector {
private volatile static CallbackFilter callbackFilter;
- private static Field disconnectField;
- private static Method sendPacketMethod;
+ private volatile static Field disconnectField;
+ private volatile static Method sendPacketMethod;
private InjectedServerConnection serverInjection;
// Determine if we're listening
diff --git a/ProtocolLib/src/main/java/com/comphenix/protocol/injector/player/PlayerInjectionHandler.java b/ProtocolLib/src/main/java/com/comphenix/protocol/injector/player/PlayerInjectionHandler.java
index 373df511..fb2136b3 100644
--- a/ProtocolLib/src/main/java/com/comphenix/protocol/injector/player/PlayerInjectionHandler.java
+++ b/ProtocolLib/src/main/java/com/comphenix/protocol/injector/player/PlayerInjectionHandler.java
@@ -604,10 +604,11 @@ public class PlayerInjectionHandler {
*/
public void scheduleDataInputRefresh(Player player) {
final PlayerInjector injector = getInjector(player);
- final DataInputStream old = injector.getInputStream(true);
-
+
// Update the DataInputStream
if (injector != null) {
+ final DataInputStream old = injector.getInputStream(true);
+
injector.scheduleAction(new Runnable() {
@Override
public void run() {
diff --git a/ProtocolLib/src/main/java/com/comphenix/protocol/metrics/Updater.java b/ProtocolLib/src/main/java/com/comphenix/protocol/metrics/Updater.java
index eaea4a7b..50ea9583 100644
--- a/ProtocolLib/src/main/java/com/comphenix/protocol/metrics/Updater.java
+++ b/ProtocolLib/src/main/java/com/comphenix/protocol/metrics/Updater.java
@@ -1,5 +1,8 @@
package com.comphenix.protocol.metrics;
+// EXTENSIVELY MODIFIED BY AADNK/COMPHENIX
+// CHECK GIT FOR DETAILS
+
/*
* Updater for Bukkit.
*
@@ -9,12 +12,10 @@ import java.io.*;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;
-import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
+import java.util.logging.Level;
import java.util.logging.Logger;
-import java.util.zip.ZipEntry;
-import java.util.zip.ZipFile;
import javax.xml.stream.XMLEventReader;
import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLStreamException;
@@ -23,6 +24,8 @@ import javax.xml.stream.events.XMLEvent;
import org.bukkit.configuration.file.YamlConfiguration;
import org.bukkit.plugin.Plugin;
+import com.google.common.base.Preconditions;
+
/**
* Check dev.bukkit.org to find updates for a given plugin, and download the updates if needed.
*
@@ -211,6 +214,13 @@ public class Updater
*/
public Updater(Plugin plugin, Logger logger, String slug, File file, String permission)
{
+ // I hate NULL
+ Preconditions.checkNotNull(plugin, "plugin");
+ Preconditions.checkNotNull(logger, "logger");
+ Preconditions.checkNotNull(slug, "slug");
+ Preconditions.checkNotNull(file, "file");
+ Preconditions.checkNotNull(permission, "permission");
+
this.plugin = plugin;
this.file = file;
this.slug = slug;
@@ -253,20 +263,30 @@ public class Updater
String fileLink = getFile(versionLink);
if(fileLink != null && type != UpdateType.NO_DOWNLOAD)
{
- String name = file.getName();
- // If it's a zip file, it shouldn't be downloaded as the plugin's name
- if(fileLink.endsWith(".zip"))
- {
- String [] split = fileLink.split("/");
- name = split[split.length-1];
- }
+ String [] split = fileLink.split("/");
+ String name = split[split.length-1];
+ logger.info("Downloading " + fileLink);
+
// Never download the same file twice
- if (!downloadedVersion.equalsIgnoreCase(versionLink)) {
- saveFile(new File("plugins/" + updateFolder), name, fileLink);
+ if (downloadedVersion == null || !downloadedVersion.equalsIgnoreCase(versionLink)) {
+ File path = new File("plugins/");
+
+ // We can update the JAR in place as we're using different JAR file names
+ saveFile(path, name, fileLink);
downloadedVersion = versionLink;
result = UpdateResult.SUCCESS;
+ // ProtocolLib - try to remove the current version
+ try {
+ if (!file.delete()) {
+ File zeroCurrentJar = new File(path, updateFolder + "/" + file.getName());
+ zeroCurrentJar.createNewFile();
+ }
+ } catch (IOException e) {
+ logger.warning("Cannot delete old ProtocolLib version: " + file.getName());
+ }
+
} else {
result = UpdateResult.UPDATE_AVAILABLE;
}
@@ -338,26 +358,14 @@ public class Updater
logger.info("Downloading update: " + percent + "% of " + fileLength + " bytes.");
}
}
- //Just a quick check to make sure we didn't leave any files from last time...
- for(File xFile : new File("plugins/" + updateFolder).listFiles())
- {
- if(xFile.getName().endsWith(".zip"))
- {
- xFile.delete();
- }
- }
- // Check to see if it's a zip file, if it is, unzip it.
- File dFile = new File(folder.getAbsolutePath() + "/" + file);
- if(dFile.getName().endsWith(".zip"))
- {
- // Unzip
- unzip(dFile.getCanonicalPath());
- }
- if(announce) logger.info("Finished updating.");
+
+ if(announce)
+ logger.info("Finished updating.");
}
catch (Exception ex)
{
- logger.warning("The auto-updater tried to download a new update, but was unsuccessful.");
+ logger.warning("The auto-updater tried to download a new update, but was unsuccessful.");
+ logger.log(Level.INFO, "Error message to submit as a ticket.", ex);
result = Updater.UpdateResult.FAIL_DOWNLOAD;
}
finally
@@ -379,99 +387,6 @@ public class Updater
}
}
- /**
- * Part of Zip-File-Extractor, modified by H31IX for use with Bukkit
- */
- private void unzip(String file)
- {
- try
- {
- File fSourceZip = new File(file);
- String zipPath = file.substring(0, file.length()-4);
- ZipFile zipFile = new ZipFile(fSourceZip);
- Enumeration extends ZipEntry> e = zipFile.entries();
- while(e.hasMoreElements())
- {
- ZipEntry entry = (ZipEntry)e.nextElement();
- File destinationFilePath = new File(zipPath,entry.getName());
- destinationFilePath.getParentFile().mkdirs();
- if(entry.isDirectory())
- {
- continue;
- }
- else
- {
- BufferedInputStream bis = new BufferedInputStream(zipFile.getInputStream(entry));
- int b;
- byte buffer[] = new byte[BYTE_SIZE];
- FileOutputStream fos = new FileOutputStream(destinationFilePath);
- BufferedOutputStream bos = new BufferedOutputStream(fos, BYTE_SIZE);
- while((b = bis.read(buffer, 0, BYTE_SIZE)) != -1)
- {
- bos.write(buffer, 0, b);
- }
- bos.flush();
- bos.close();
- bis.close();
- String name = destinationFilePath.getName();
- if(name.endsWith(".jar") && pluginFile(name))
- {
- destinationFilePath.renameTo(new File("plugins/" + updateFolder + "/" + name));
- }
- }
- entry = null;
- destinationFilePath = null;
- }
- e = null;
- zipFile.close();
- zipFile = null;
- // Move any plugin data folders that were included to the right place, Bukkit won't do this for us.
- for(File dFile : new File(zipPath).listFiles())
- {
- if(dFile.isDirectory())
- {
- if(pluginFile(dFile.getName()))
- {
- File oFile = new File("plugins/" + dFile.getName()); // Get current dir
- File [] contents = oFile.listFiles(); // List of existing files in the current dir
- for(File cFile : dFile.listFiles()) // Loop through all the files in the new dir
- {
- boolean found = false;
- for(File xFile : contents) // Loop through contents to see if it exists
- {
- if(xFile.getName().equals(cFile.getName()))
- {
- found = true;
- break;
- }
- }
- if(!found)
- {
- // Move the new file into the current dir
- cFile.renameTo(new File(oFile.getCanonicalFile() + "/" + cFile.getName()));
- }
- else
- {
- // This file already exists, so we don't need it anymore.
- cFile.delete();
- }
- }
- }
- }
- dFile.delete();
- }
- new File(zipPath).delete();
- fSourceZip.delete();
- }
- catch(IOException ex)
- {
- ex.printStackTrace();
- logger.warning("The auto-updater tried to unzip a new update file, but was unsuccessful.");
- result = Updater.UpdateResult.FAIL_DOWNLOAD;
- }
- new File(file).delete();
- }
-
/**
* Check if the name of a jar is one of the plugins currently installed, used for extracting the correct files out of a zip.
*/
diff --git a/ProtocolLib/src/main/java/com/comphenix/protocol/reflect/StructureModifier.java b/ProtocolLib/src/main/java/com/comphenix/protocol/reflect/StructureModifier.java
index 9323065d..87a309fa 100644
--- a/ProtocolLib/src/main/java/com/comphenix/protocol/reflect/StructureModifier.java
+++ b/ProtocolLib/src/main/java/com/comphenix/protocol/reflect/StructureModifier.java
@@ -202,7 +202,7 @@ public class StructureModifier {
*/
public boolean isReadOnly(int fieldIndex) {
if (fieldIndex < 0 || fieldIndex >= data.size())
- new IllegalArgumentException("Index parameter is not within [0 - " + data.size() + ")");
+ throw new IllegalArgumentException("Index parameter is not within [0 - " + data.size() + ")");
return Modifier.isFinal(data.get(fieldIndex).getModifiers());
}
@@ -219,7 +219,7 @@ public class StructureModifier {
*/
public void setReadOnly(int fieldIndex, boolean value) throws FieldAccessException {
if (fieldIndex < 0 || fieldIndex >= data.size())
- new IllegalArgumentException("Index parameter is not within [0 - " + data.size() + ")");
+ throw new IllegalArgumentException("Index parameter is not within [0 - " + data.size() + ")");
try {
StructureModifier.setFinalState(data.get(fieldIndex), value);
@@ -400,7 +400,7 @@ public class StructureModifier {
if (a == null)
return b == null;
else if (b == null)
- return a == null;
+ return false;
else
return a.getSpecificType().equals(b.getSpecificType());
}
diff --git a/ProtocolLib/src/main/java/com/comphenix/protocol/reflect/compiler/BackgroundCompiler.java b/ProtocolLib/src/main/java/com/comphenix/protocol/reflect/compiler/BackgroundCompiler.java
index 647206a9..6f636c49 100644
--- a/ProtocolLib/src/main/java/com/comphenix/protocol/reflect/compiler/BackgroundCompiler.java
+++ b/ProtocolLib/src/main/java/com/comphenix/protocol/reflect/compiler/BackgroundCompiler.java
@@ -22,11 +22,16 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
+import javax.annotation.Nullable;
+
+import com.comphenix.protocol.error.ErrorReporter;
import com.comphenix.protocol.reflect.StructureModifier;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* Compiles structure modifiers on a background thread.
@@ -37,6 +42,11 @@ import com.comphenix.protocol.reflect.StructureModifier;
*/
public class BackgroundCompiler {
+ /**
+ * The default format for the name of new worker threads.
+ */
+ public static final String THREAD_FORMAT = "ProtocolLib-StructureCompiler %s";
+
// How long to wait for a shutdown
public static final int SHUTDOWN_DELAY_MS = 2000;
@@ -48,6 +58,7 @@ public class BackgroundCompiler {
private boolean shuttingDown;
private ExecutorService executor;
+ private ErrorReporter reporter;
/**
* Retrieves the current background compiler.
@@ -64,27 +75,41 @@ public class BackgroundCompiler {
public static void setInstance(BackgroundCompiler backgroundCompiler) {
BackgroundCompiler.backgroundCompiler = backgroundCompiler;
}
-
+
/**
* Initialize a background compiler.
+ *
+ * Uses the default {@link #THREAD_FORMAT} to name worker threads.
* @param loader - class loader from Bukkit.
+ * @param reporter - current error reporter.
*/
- public BackgroundCompiler(ClassLoader loader) {
- this(loader, Executors.newSingleThreadExecutor());
+ public BackgroundCompiler(ClassLoader loader, ErrorReporter reporter) {
+ ThreadFactory factory = new ThreadFactoryBuilder().
+ setDaemon(true).
+ setNameFormat(THREAD_FORMAT).
+ build();
+ initializeCompiler(loader, reporter, Executors.newSingleThreadExecutor(factory));
}
-
+
/**
* Initialize a background compiler utilizing the given thread pool.
* @param loader - class loader from Bukkit.
+ * @param reporter - current error reporter.
* @param executor - thread pool we'll use.
*/
- public BackgroundCompiler(ClassLoader loader, ExecutorService executor) {
+ public BackgroundCompiler(ClassLoader loader, ErrorReporter reporter, ExecutorService executor) {
+ initializeCompiler(loader, reporter, executor);
+ }
+
+ // Avoid "Constructor call must be the first statement".
+ private void initializeCompiler(ClassLoader loader, @Nullable ErrorReporter reporter, ExecutorService executor) {
if (loader == null)
throw new IllegalArgumentException("loader cannot be NULL");
if (executor == null)
throw new IllegalArgumentException("executor cannot be NULL");
this.compiler = new StructureCompiler(loader);
+ this.reporter = reporter;
this.executor = executor;
this.enabled = true;
}
@@ -129,15 +154,30 @@ public class BackgroundCompiler {
executor.submit(new Callable