diff --git a/ProtocolLib/src/main/java/com/comphenix/protocol/concurrency/BlockingHashMap.java b/ProtocolLib/src/main/java/com/comphenix/protocol/concurrency/BlockingHashMap.java
new file mode 100644
index 00000000..793f3c11
--- /dev/null
+++ b/ProtocolLib/src/main/java/com/comphenix/protocol/concurrency/BlockingHashMap.java
@@ -0,0 +1,165 @@
+package com.comphenix.protocol.concurrency;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.MapMaker;
+
+/**
+ * A map that supports blocking on read operations. Null keys are not supported.
+ *
+ * Keys are stored as weak references, and will be automatically removed once they've all been dereferenced.
+ *
+ * @author Kristian
+ *
+ * @param - type of the key.
+ * @param - type of the value.
+ */
+public class BlockingHashMap {
+
+ // Map of values
+ private final ConcurrentMap backingMap;
+
+ // Map of locked objects
+ private final ConcurrentMap locks;
+
+ /**
+ * Initialize a new map.
+ */
+ public BlockingHashMap() {
+ backingMap = new MapMaker().weakKeys().makeMap();
+ locks = new MapMaker().weakKeys().makeMap();
+ }
+
+ /**
+ * Initialize a new map.
+ * @return The created map.
+ */
+ public static BlockingHashMap create() {
+ return new BlockingHashMap();
+ }
+
+ /**
+ * Waits until a value has been associated with the given key, and then retrieves that value.
+ * @param key - the key whose associated value is to be returned
+ * @return The value to which the specified key is mapped.
+ * @throws InterruptedException If the current thread got interrupted while waiting.
+ */
+ public TValue get(TKey key) throws InterruptedException {
+ if (key == null)
+ throw new IllegalArgumentException("key cannot be NULL.");
+
+ TValue value = backingMap.get(key);
+
+ // Only lock if no value is available
+ if (value == null) {
+ final Object lock = getLock(key);
+
+ synchronized (lock) {
+ while (value == null) {
+ lock.wait();
+ value = backingMap.get(key);
+ }
+ }
+ }
+
+ return value;
+ }
+
+ /**
+ * Waits until a value has been associated with the given key, and then retrieves that value.
+ * @param key - the key whose associated value is to be returned
+ * @param timeout - the amount of time to wait until an association has been made.
+ * @param unit - unit of timeout.
+ * @return The value to which the specified key is mapped, or NULL if the timeout elapsed.
+ * @throws InterruptedException If the current thread got interrupted while waiting.
+ */
+ public TValue get(TKey key, long timeout, TimeUnit unit) throws InterruptedException {
+ if (key == null)
+ throw new IllegalArgumentException("key cannot be NULL.");
+ if (unit == null)
+ throw new IllegalArgumentException("Unit cannot be NULL.");
+
+ TValue value = backingMap.get(key);
+
+ // Only lock if no value is available
+ if (value == null) {
+ final Object lock = getLock(key);
+ final long stopTimeNS = System.nanoTime() + unit.toNanos(timeout);
+
+ // Don't exceed the timeout
+ synchronized (lock) {
+ while (value == null) {
+ long remainingTime = stopTimeNS - System.nanoTime();
+
+ if (remainingTime > 0) {
+ TimeUnit.NANOSECONDS.timedWait(lock, remainingTime);
+ value = backingMap.get(key);
+ }
+ }
+ }
+ }
+
+ return value;
+ }
+
+ /**
+ * Associate a given key with the given value.
+ *
+ * Wakes up any blocking getters on this specific key.
+ *
+ * @param key - the key to associate.
+ * @param value - the value.
+ * @return The previously associated value.
+ */
+ public TValue put(TKey key, TValue value) {
+ if (value == null)
+ throw new IllegalArgumentException("This map doesn't support NULL values.");
+
+ final TValue previous = backingMap.put(key, value);
+ final Object lock = getLock(key);
+
+ // Inform our readers about this change
+ synchronized (lock) {
+ lock.notifyAll();
+ return previous;
+ }
+ }
+
+ public int size() {
+ return backingMap.size();
+ }
+
+ public Collection values() {
+ return backingMap.values();
+ }
+
+ public Set keys() {
+ return backingMap.keySet();
+ }
+
+ /**
+ * Atomically retrieve the lock associated with a given key.
+ * @param key - the current key.
+ * @return An asssociated lock.
+ */
+ private Object getLock(TKey key) {
+ Object lock = locks.get(key);
+
+ if (lock == null) {
+ Object created = new Object();
+
+ // Do this atomically
+ lock = locks.putIfAbsent(key, created);
+
+ // If we succeeded, use the latch we created - otherwise, use the already inserted latch
+ if (lock == null) {
+ lock = created;
+ }
+ }
+
+ return lock;
+ }
+}
diff --git a/ProtocolLib/src/main/java/com/comphenix/protocol/injector/PacketInjector.java b/ProtocolLib/src/main/java/com/comphenix/protocol/injector/PacketInjector.java
index 7630c360..637ec025 100644
--- a/ProtocolLib/src/main/java/com/comphenix/protocol/injector/PacketInjector.java
+++ b/ProtocolLib/src/main/java/com/comphenix/protocol/injector/PacketInjector.java
@@ -202,13 +202,19 @@ class PacketInjector {
// Called from the ReadPacketModified monitor
PacketEvent packetRecieved(PacketContainer packet, DataInputStream input) {
- Player client = playerInjection.getPlayerByConnection(input);
-
- // Never invoke a event if we don't know where it's from
- if (client != null)
- return packetRecieved(packet, client);
- else
+ try {
+ Player client = playerInjection.getPlayerByConnection(input);
+
+ // Never invoke a event if we don't know where it's from
+ if (client != null)
+ return packetRecieved(packet, client);
+ else
+ return null;
+
+ } catch (InterruptedException e) {
+ reporter.reportDetailed(this, "Thread was interrupted.", e, packet, input);
return null;
+ }
}
/**
diff --git a/ProtocolLib/src/main/java/com/comphenix/protocol/injector/player/PlayerInjectionHandler.java b/ProtocolLib/src/main/java/com/comphenix/protocol/injector/player/PlayerInjectionHandler.java
index fb2136b3..7d4a6a4a 100644
--- a/ProtocolLib/src/main/java/com/comphenix/protocol/injector/player/PlayerInjectionHandler.java
+++ b/ProtocolLib/src/main/java/com/comphenix/protocol/injector/player/PlayerInjectionHandler.java
@@ -24,12 +24,14 @@ import java.net.Socket;
import java.net.SocketAddress;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import net.minecraft.server.Packet;
import org.bukkit.Server;
import org.bukkit.entity.Player;
+import com.comphenix.protocol.concurrency.BlockingHashMap;
import com.comphenix.protocol.error.ErrorReporter;
import com.comphenix.protocol.events.PacketAdapter;
import com.comphenix.protocol.events.PacketContainer;
@@ -47,6 +49,10 @@ import com.google.common.collect.Maps;
* @author Kristian
*/
public class PlayerInjectionHandler {
+ /**
+ * The maximum number of milliseconds to wait until a player can be looked up by connection.
+ */
+ private static final long TIMEOUT_PLAYER_LOOKUP = 1000; // ms
/**
* The highest possible packet ID. It's unlikely that this value will ever change.
@@ -64,9 +70,11 @@ public class PlayerInjectionHandler {
// Player injection
private Map addressLookup = Maps.newConcurrentMap();
- private Map dataInputLookup = Maps.newConcurrentMap();
private Map playerInjection = Maps.newConcurrentMap();
+ // Lookup player by connection
+ private BlockingHashMap dataInputLookup = BlockingHashMap.create();
+
// Player injection types
private volatile PlayerInjectHooks loginPlayerHook = PlayerInjectHooks.NETWORK_SERVER_OBJECT;
private volatile PlayerInjectHooks playingPlayerHook = PlayerInjectHooks.NETWORK_SERVER_OBJECT;
@@ -193,26 +201,37 @@ public class PlayerInjectionHandler {
* Retrieve a player by its DataInput connection.
* @param inputStream - the associated DataInput connection.
* @return The player.
+ * @throws InterruptedException If the thread was interrupted during the wait.
*/
- public Player getPlayerByConnection(DataInputStream inputStream) {
- try {
- // Concurrency issue!
- netLoginInjector.getReadLock().lock();
-
- PlayerInjector injector = dataInputLookup.get(inputStream);
-
- if (injector != null) {
- return injector.getPlayer();
- } else {
- reporter.reportWarning(this, "Unable to find stream: " + inputStream);
- return null;
- }
-
- } finally {
- netLoginInjector.getReadLock().unlock();
+ public Player getPlayerByConnection(DataInputStream inputStream) throws InterruptedException {
+ return getPlayerByConnection(inputStream, TIMEOUT_PLAYER_LOOKUP, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Retrieve a player by its DataInput connection.
+ * @param inputStream - the associated DataInput connection.
+ * @param playerTimeout - the amount of time to wait for a result.
+ * @param unit - unit of playerTimeout.
+ * @return The player.
+ * @throws InterruptedException If the thread was interrupted during the wait.
+ */
+ public Player getPlayerByConnection(DataInputStream inputStream, long playerTimeout, TimeUnit unit) throws InterruptedException {
+
+ PlayerInjector injector = dataInputLookup.get(inputStream, playerTimeout, unit);
+
+ if (injector != null) {
+ return injector.getPlayer();
+ } else {
+ reporter.reportWarning(this, "Unable to find stream: " + inputStream);
+ return null;
}
}
+ /**
+ * Helper function that retrieves the injector type of a given player injector.
+ * @param injector - injector type.
+ * @return The injector type.
+ */
private PlayerInjectHooks getInjectorType(PlayerInjector injector) {
return injector != null ? injector.getHookType() : PlayerInjectHooks.NONE;
}
@@ -404,7 +423,6 @@ public class PlayerInjectionHandler {
PlayerInjector injector = playerInjection.remove(player);
if (injector != null) {
- DataInputStream input = injector.getInputStream(true);
InetSocketAddress address = player.getAddress();
injector.cleanupAll();
@@ -423,8 +441,7 @@ public class PlayerInjectionHandler {
// Clean up
if (removeAuxiliary) {
- if (input != null)
- dataInputLookup.remove(input);
+ // Note that the dataInputLookup will clean itself
if (address != null)
addressLookup.remove(address);
}
@@ -571,7 +588,6 @@ public class PlayerInjectionHandler {
}
public void close() {
-
// Guard
if (hasClosed || playerInjection == null)
return;
@@ -593,7 +609,6 @@ public class PlayerInjectionHandler {
hasClosed = true;
playerInjection.clear();
- dataInputLookup.clear();
addressLookup.clear();
invoker = null;
}
@@ -607,12 +622,9 @@ public class PlayerInjectionHandler {
// Update the DataInputStream
if (injector != null) {
- final DataInputStream old = injector.getInputStream(true);
-
injector.scheduleAction(new Runnable() {
@Override
public void run() {
- dataInputLookup.remove(old);
dataInputLookup.put(injector.getInputStream(false), injector);
}
});
diff --git a/ProtocolLib/src/main/resources/plugin.yml b/ProtocolLib/src/main/resources/plugin.yml
index d632e814..d96bf284 100644
--- a/ProtocolLib/src/main/resources/plugin.yml
+++ b/ProtocolLib/src/main/resources/plugin.yml
@@ -1,5 +1,5 @@
name: ProtocolLib
-version: 1.7.0
+version: 1.7.1-SNAPSHOT
description: Provides read/write access to the Minecraft protocol.
author: Comphenix
website: http://www.comphenix.net/ProtocolLib
diff --git a/ProtocolLib/src/test/java/com/comphenix/protocol/concurrency/BlockingHashMapTest.java b/ProtocolLib/src/test/java/com/comphenix/protocol/concurrency/BlockingHashMapTest.java
new file mode 100644
index 00000000..7be7e705
--- /dev/null
+++ b/ProtocolLib/src/test/java/com/comphenix/protocol/concurrency/BlockingHashMapTest.java
@@ -0,0 +1,42 @@
+package com.comphenix.protocol.concurrency;
+
+import static org.junit.Assert.*;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.junit.Test;
+
+public class BlockingHashMapTest {
+
+ @Test
+ public void test() throws InterruptedException, ExecutionException {
+
+ final BlockingHashMap map = BlockingHashMap.create();
+
+ ExecutorService service = Executors.newSingleThreadExecutor();
+
+ // Create a reader
+ Future future = service.submit(new Callable() {
+ @Override
+ public String call() throws Exception {
+ // Combine for easy reading
+ return map.get(0) + map.get(1);
+ }
+ });
+
+ // Wait a bit
+ Thread.sleep(50);
+
+ // Insert values
+ map.put(0, "hello ");
+ map.put(1, "world");
+
+ // Wait for the other thread to complete
+ assertEquals(future.get(), "hello world");
+ }
+
+}