From f5cb7ddc7b2384ab7efa2a844c18dc809db234fb Mon Sep 17 00:00:00 2001 From: "Kristian S. Stangeland" Date: Fri, 23 Nov 2012 08:48:08 +0100 Subject: [PATCH] Adding a blocking hash map to use while looking up data connections. --- .../protocol/concurrency/BlockingHashMap.java | 165 ++++++++++++++++++ .../protocol/injector/PacketInjector.java | 18 +- .../player/PlayerInjectionHandler.java | 62 ++++--- ProtocolLib/src/main/resources/plugin.yml | 2 +- .../concurrency/BlockingHashMapTest.java | 42 +++++ 5 files changed, 257 insertions(+), 32 deletions(-) create mode 100644 ProtocolLib/src/main/java/com/comphenix/protocol/concurrency/BlockingHashMap.java create mode 100644 ProtocolLib/src/test/java/com/comphenix/protocol/concurrency/BlockingHashMapTest.java diff --git a/ProtocolLib/src/main/java/com/comphenix/protocol/concurrency/BlockingHashMap.java b/ProtocolLib/src/main/java/com/comphenix/protocol/concurrency/BlockingHashMap.java new file mode 100644 index 00000000..793f3c11 --- /dev/null +++ b/ProtocolLib/src/main/java/com/comphenix/protocol/concurrency/BlockingHashMap.java @@ -0,0 +1,165 @@ +package com.comphenix.protocol.concurrency; + +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; + +import com.google.common.collect.MapMaker; + +/** + * A map that supports blocking on read operations. Null keys are not supported. + *

+ * Keys are stored as weak references, and will be automatically removed once they've all been dereferenced. + *

+ * @author Kristian + * + * @param - type of the key. + * @param - type of the value. + */ +public class BlockingHashMap { + + // Map of values + private final ConcurrentMap backingMap; + + // Map of locked objects + private final ConcurrentMap locks; + + /** + * Initialize a new map. + */ + public BlockingHashMap() { + backingMap = new MapMaker().weakKeys().makeMap(); + locks = new MapMaker().weakKeys().makeMap(); + } + + /** + * Initialize a new map. + * @return The created map. + */ + public static BlockingHashMap create() { + return new BlockingHashMap(); + } + + /** + * Waits until a value has been associated with the given key, and then retrieves that value. + * @param key - the key whose associated value is to be returned + * @return The value to which the specified key is mapped. + * @throws InterruptedException If the current thread got interrupted while waiting. + */ + public TValue get(TKey key) throws InterruptedException { + if (key == null) + throw new IllegalArgumentException("key cannot be NULL."); + + TValue value = backingMap.get(key); + + // Only lock if no value is available + if (value == null) { + final Object lock = getLock(key); + + synchronized (lock) { + while (value == null) { + lock.wait(); + value = backingMap.get(key); + } + } + } + + return value; + } + + /** + * Waits until a value has been associated with the given key, and then retrieves that value. + * @param key - the key whose associated value is to be returned + * @param timeout - the amount of time to wait until an association has been made. + * @param unit - unit of timeout. + * @return The value to which the specified key is mapped, or NULL if the timeout elapsed. + * @throws InterruptedException If the current thread got interrupted while waiting. + */ + public TValue get(TKey key, long timeout, TimeUnit unit) throws InterruptedException { + if (key == null) + throw new IllegalArgumentException("key cannot be NULL."); + if (unit == null) + throw new IllegalArgumentException("Unit cannot be NULL."); + + TValue value = backingMap.get(key); + + // Only lock if no value is available + if (value == null) { + final Object lock = getLock(key); + final long stopTimeNS = System.nanoTime() + unit.toNanos(timeout); + + // Don't exceed the timeout + synchronized (lock) { + while (value == null) { + long remainingTime = stopTimeNS - System.nanoTime(); + + if (remainingTime > 0) { + TimeUnit.NANOSECONDS.timedWait(lock, remainingTime); + value = backingMap.get(key); + } + } + } + } + + return value; + } + + /** + * Associate a given key with the given value. + *

+ * Wakes up any blocking getters on this specific key. + * + * @param key - the key to associate. + * @param value - the value. + * @return The previously associated value. + */ + public TValue put(TKey key, TValue value) { + if (value == null) + throw new IllegalArgumentException("This map doesn't support NULL values."); + + final TValue previous = backingMap.put(key, value); + final Object lock = getLock(key); + + // Inform our readers about this change + synchronized (lock) { + lock.notifyAll(); + return previous; + } + } + + public int size() { + return backingMap.size(); + } + + public Collection values() { + return backingMap.values(); + } + + public Set keys() { + return backingMap.keySet(); + } + + /** + * Atomically retrieve the lock associated with a given key. + * @param key - the current key. + * @return An asssociated lock. + */ + private Object getLock(TKey key) { + Object lock = locks.get(key); + + if (lock == null) { + Object created = new Object(); + + // Do this atomically + lock = locks.putIfAbsent(key, created); + + // If we succeeded, use the latch we created - otherwise, use the already inserted latch + if (lock == null) { + lock = created; + } + } + + return lock; + } +} diff --git a/ProtocolLib/src/main/java/com/comphenix/protocol/injector/PacketInjector.java b/ProtocolLib/src/main/java/com/comphenix/protocol/injector/PacketInjector.java index 7630c360..637ec025 100644 --- a/ProtocolLib/src/main/java/com/comphenix/protocol/injector/PacketInjector.java +++ b/ProtocolLib/src/main/java/com/comphenix/protocol/injector/PacketInjector.java @@ -202,13 +202,19 @@ class PacketInjector { // Called from the ReadPacketModified monitor PacketEvent packetRecieved(PacketContainer packet, DataInputStream input) { - Player client = playerInjection.getPlayerByConnection(input); - - // Never invoke a event if we don't know where it's from - if (client != null) - return packetRecieved(packet, client); - else + try { + Player client = playerInjection.getPlayerByConnection(input); + + // Never invoke a event if we don't know where it's from + if (client != null) + return packetRecieved(packet, client); + else + return null; + + } catch (InterruptedException e) { + reporter.reportDetailed(this, "Thread was interrupted.", e, packet, input); return null; + } } /** diff --git a/ProtocolLib/src/main/java/com/comphenix/protocol/injector/player/PlayerInjectionHandler.java b/ProtocolLib/src/main/java/com/comphenix/protocol/injector/player/PlayerInjectionHandler.java index fb2136b3..7d4a6a4a 100644 --- a/ProtocolLib/src/main/java/com/comphenix/protocol/injector/player/PlayerInjectionHandler.java +++ b/ProtocolLib/src/main/java/com/comphenix/protocol/injector/player/PlayerInjectionHandler.java @@ -24,12 +24,14 @@ import java.net.Socket; import java.net.SocketAddress; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import net.minecraft.server.Packet; import org.bukkit.Server; import org.bukkit.entity.Player; +import com.comphenix.protocol.concurrency.BlockingHashMap; import com.comphenix.protocol.error.ErrorReporter; import com.comphenix.protocol.events.PacketAdapter; import com.comphenix.protocol.events.PacketContainer; @@ -47,6 +49,10 @@ import com.google.common.collect.Maps; * @author Kristian */ public class PlayerInjectionHandler { + /** + * The maximum number of milliseconds to wait until a player can be looked up by connection. + */ + private static final long TIMEOUT_PLAYER_LOOKUP = 1000; // ms /** * The highest possible packet ID. It's unlikely that this value will ever change. @@ -64,9 +70,11 @@ public class PlayerInjectionHandler { // Player injection private Map addressLookup = Maps.newConcurrentMap(); - private Map dataInputLookup = Maps.newConcurrentMap(); private Map playerInjection = Maps.newConcurrentMap(); + // Lookup player by connection + private BlockingHashMap dataInputLookup = BlockingHashMap.create(); + // Player injection types private volatile PlayerInjectHooks loginPlayerHook = PlayerInjectHooks.NETWORK_SERVER_OBJECT; private volatile PlayerInjectHooks playingPlayerHook = PlayerInjectHooks.NETWORK_SERVER_OBJECT; @@ -193,26 +201,37 @@ public class PlayerInjectionHandler { * Retrieve a player by its DataInput connection. * @param inputStream - the associated DataInput connection. * @return The player. + * @throws InterruptedException If the thread was interrupted during the wait. */ - public Player getPlayerByConnection(DataInputStream inputStream) { - try { - // Concurrency issue! - netLoginInjector.getReadLock().lock(); - - PlayerInjector injector = dataInputLookup.get(inputStream); - - if (injector != null) { - return injector.getPlayer(); - } else { - reporter.reportWarning(this, "Unable to find stream: " + inputStream); - return null; - } - - } finally { - netLoginInjector.getReadLock().unlock(); + public Player getPlayerByConnection(DataInputStream inputStream) throws InterruptedException { + return getPlayerByConnection(inputStream, TIMEOUT_PLAYER_LOOKUP, TimeUnit.MILLISECONDS); + } + + /** + * Retrieve a player by its DataInput connection. + * @param inputStream - the associated DataInput connection. + * @param playerTimeout - the amount of time to wait for a result. + * @param unit - unit of playerTimeout. + * @return The player. + * @throws InterruptedException If the thread was interrupted during the wait. + */ + public Player getPlayerByConnection(DataInputStream inputStream, long playerTimeout, TimeUnit unit) throws InterruptedException { + + PlayerInjector injector = dataInputLookup.get(inputStream, playerTimeout, unit); + + if (injector != null) { + return injector.getPlayer(); + } else { + reporter.reportWarning(this, "Unable to find stream: " + inputStream); + return null; } } + /** + * Helper function that retrieves the injector type of a given player injector. + * @param injector - injector type. + * @return The injector type. + */ private PlayerInjectHooks getInjectorType(PlayerInjector injector) { return injector != null ? injector.getHookType() : PlayerInjectHooks.NONE; } @@ -404,7 +423,6 @@ public class PlayerInjectionHandler { PlayerInjector injector = playerInjection.remove(player); if (injector != null) { - DataInputStream input = injector.getInputStream(true); InetSocketAddress address = player.getAddress(); injector.cleanupAll(); @@ -423,8 +441,7 @@ public class PlayerInjectionHandler { // Clean up if (removeAuxiliary) { - if (input != null) - dataInputLookup.remove(input); + // Note that the dataInputLookup will clean itself if (address != null) addressLookup.remove(address); } @@ -571,7 +588,6 @@ public class PlayerInjectionHandler { } public void close() { - // Guard if (hasClosed || playerInjection == null) return; @@ -593,7 +609,6 @@ public class PlayerInjectionHandler { hasClosed = true; playerInjection.clear(); - dataInputLookup.clear(); addressLookup.clear(); invoker = null; } @@ -607,12 +622,9 @@ public class PlayerInjectionHandler { // Update the DataInputStream if (injector != null) { - final DataInputStream old = injector.getInputStream(true); - injector.scheduleAction(new Runnable() { @Override public void run() { - dataInputLookup.remove(old); dataInputLookup.put(injector.getInputStream(false), injector); } }); diff --git a/ProtocolLib/src/main/resources/plugin.yml b/ProtocolLib/src/main/resources/plugin.yml index d632e814..d96bf284 100644 --- a/ProtocolLib/src/main/resources/plugin.yml +++ b/ProtocolLib/src/main/resources/plugin.yml @@ -1,5 +1,5 @@ name: ProtocolLib -version: 1.7.0 +version: 1.7.1-SNAPSHOT description: Provides read/write access to the Minecraft protocol. author: Comphenix website: http://www.comphenix.net/ProtocolLib diff --git a/ProtocolLib/src/test/java/com/comphenix/protocol/concurrency/BlockingHashMapTest.java b/ProtocolLib/src/test/java/com/comphenix/protocol/concurrency/BlockingHashMapTest.java new file mode 100644 index 00000000..7be7e705 --- /dev/null +++ b/ProtocolLib/src/test/java/com/comphenix/protocol/concurrency/BlockingHashMapTest.java @@ -0,0 +1,42 @@ +package com.comphenix.protocol.concurrency; + +import static org.junit.Assert.*; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.junit.Test; + +public class BlockingHashMapTest { + + @Test + public void test() throws InterruptedException, ExecutionException { + + final BlockingHashMap map = BlockingHashMap.create(); + + ExecutorService service = Executors.newSingleThreadExecutor(); + + // Create a reader + Future future = service.submit(new Callable() { + @Override + public String call() throws Exception { + // Combine for easy reading + return map.get(0) + map.get(1); + } + }); + + // Wait a bit + Thread.sleep(50); + + // Insert values + map.put(0, "hello "); + map.put(1, "world"); + + // Wait for the other thread to complete + assertEquals(future.get(), "hello world"); + } + +}