Archiviert
13
0

Adding a blocking hash map to use while looking up data connections.

Dieser Commit ist enthalten in:
Kristian S. Stangeland 2012-11-23 08:48:08 +01:00
Ursprung a4f79ccb3f
Commit f5cb7ddc7b
5 geänderte Dateien mit 257 neuen und 32 gelöschten Zeilen

Datei anzeigen

@ -0,0 +1,165 @@
package com.comphenix.protocol.concurrency;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.MapMaker;
/**
* A map that supports blocking on read operations. Null keys are not supported.
* <p>
* Keys are stored as weak references, and will be automatically removed once they've all been dereferenced.
* <p>
* @author Kristian
*
* @param <TKey> - type of the key.
* @param <TValue> - type of the value.
*/
public class BlockingHashMap<TKey, TValue> {
// Map of values
private final ConcurrentMap<TKey, TValue> backingMap;
// Map of locked objects
private final ConcurrentMap<TKey, Object> locks;
/**
* Initialize a new map.
*/
public BlockingHashMap() {
backingMap = new MapMaker().weakKeys().makeMap();
locks = new MapMaker().weakKeys().makeMap();
}
/**
* Initialize a new map.
* @return The created map.
*/
public static <TKey, TValue> BlockingHashMap<TKey, TValue> create() {
return new BlockingHashMap<TKey, TValue>();
}
/**
* Waits until a value has been associated with the given key, and then retrieves that value.
* @param key - the key whose associated value is to be returned
* @return The value to which the specified key is mapped.
* @throws InterruptedException If the current thread got interrupted while waiting.
*/
public TValue get(TKey key) throws InterruptedException {
if (key == null)
throw new IllegalArgumentException("key cannot be NULL.");
TValue value = backingMap.get(key);
// Only lock if no value is available
if (value == null) {
final Object lock = getLock(key);
synchronized (lock) {
while (value == null) {
lock.wait();
value = backingMap.get(key);
}
}
}
return value;
}
/**
* Waits until a value has been associated with the given key, and then retrieves that value.
* @param key - the key whose associated value is to be returned
* @param timeout - the amount of time to wait until an association has been made.
* @param unit - unit of timeout.
* @return The value to which the specified key is mapped, or NULL if the timeout elapsed.
* @throws InterruptedException If the current thread got interrupted while waiting.
*/
public TValue get(TKey key, long timeout, TimeUnit unit) throws InterruptedException {
if (key == null)
throw new IllegalArgumentException("key cannot be NULL.");
if (unit == null)
throw new IllegalArgumentException("Unit cannot be NULL.");
TValue value = backingMap.get(key);
// Only lock if no value is available
if (value == null) {
final Object lock = getLock(key);
final long stopTimeNS = System.nanoTime() + unit.toNanos(timeout);
// Don't exceed the timeout
synchronized (lock) {
while (value == null) {
long remainingTime = stopTimeNS - System.nanoTime();
if (remainingTime > 0) {
TimeUnit.NANOSECONDS.timedWait(lock, remainingTime);
value = backingMap.get(key);
}
}
}
}
return value;
}
/**
* Associate a given key with the given value.
* <p>
* Wakes up any blocking getters on this specific key.
*
* @param key - the key to associate.
* @param value - the value.
* @return The previously associated value.
*/
public TValue put(TKey key, TValue value) {
if (value == null)
throw new IllegalArgumentException("This map doesn't support NULL values.");
final TValue previous = backingMap.put(key, value);
final Object lock = getLock(key);
// Inform our readers about this change
synchronized (lock) {
lock.notifyAll();
return previous;
}
}
public int size() {
return backingMap.size();
}
public Collection<TValue> values() {
return backingMap.values();
}
public Set<TKey> keys() {
return backingMap.keySet();
}
/**
* Atomically retrieve the lock associated with a given key.
* @param key - the current key.
* @return An asssociated lock.
*/
private Object getLock(TKey key) {
Object lock = locks.get(key);
if (lock == null) {
Object created = new Object();
// Do this atomically
lock = locks.putIfAbsent(key, created);
// If we succeeded, use the latch we created - otherwise, use the already inserted latch
if (lock == null) {
lock = created;
}
}
return lock;
}
}

Datei anzeigen

@ -202,13 +202,19 @@ class PacketInjector {
// Called from the ReadPacketModified monitor // Called from the ReadPacketModified monitor
PacketEvent packetRecieved(PacketContainer packet, DataInputStream input) { PacketEvent packetRecieved(PacketContainer packet, DataInputStream input) {
Player client = playerInjection.getPlayerByConnection(input); try {
Player client = playerInjection.getPlayerByConnection(input);
// Never invoke a event if we don't know where it's from // Never invoke a event if we don't know where it's from
if (client != null) if (client != null)
return packetRecieved(packet, client); return packetRecieved(packet, client);
else else
return null;
} catch (InterruptedException e) {
reporter.reportDetailed(this, "Thread was interrupted.", e, packet, input);
return null; return null;
}
} }
/** /**

Datei anzeigen

@ -24,12 +24,14 @@ import java.net.Socket;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit;
import net.minecraft.server.Packet; import net.minecraft.server.Packet;
import org.bukkit.Server; import org.bukkit.Server;
import org.bukkit.entity.Player; import org.bukkit.entity.Player;
import com.comphenix.protocol.concurrency.BlockingHashMap;
import com.comphenix.protocol.error.ErrorReporter; import com.comphenix.protocol.error.ErrorReporter;
import com.comphenix.protocol.events.PacketAdapter; import com.comphenix.protocol.events.PacketAdapter;
import com.comphenix.protocol.events.PacketContainer; import com.comphenix.protocol.events.PacketContainer;
@ -47,6 +49,10 @@ import com.google.common.collect.Maps;
* @author Kristian * @author Kristian
*/ */
public class PlayerInjectionHandler { public class PlayerInjectionHandler {
/**
* The maximum number of milliseconds to wait until a player can be looked up by connection.
*/
private static final long TIMEOUT_PLAYER_LOOKUP = 1000; // ms
/** /**
* The highest possible packet ID. It's unlikely that this value will ever change. * The highest possible packet ID. It's unlikely that this value will ever change.
@ -64,9 +70,11 @@ public class PlayerInjectionHandler {
// Player injection // Player injection
private Map<SocketAddress, PlayerInjector> addressLookup = Maps.newConcurrentMap(); private Map<SocketAddress, PlayerInjector> addressLookup = Maps.newConcurrentMap();
private Map<DataInputStream, PlayerInjector> dataInputLookup = Maps.newConcurrentMap();
private Map<Player, PlayerInjector> playerInjection = Maps.newConcurrentMap(); private Map<Player, PlayerInjector> playerInjection = Maps.newConcurrentMap();
// Lookup player by connection
private BlockingHashMap<DataInputStream, PlayerInjector> dataInputLookup = BlockingHashMap.create();
// Player injection types // Player injection types
private volatile PlayerInjectHooks loginPlayerHook = PlayerInjectHooks.NETWORK_SERVER_OBJECT; private volatile PlayerInjectHooks loginPlayerHook = PlayerInjectHooks.NETWORK_SERVER_OBJECT;
private volatile PlayerInjectHooks playingPlayerHook = PlayerInjectHooks.NETWORK_SERVER_OBJECT; private volatile PlayerInjectHooks playingPlayerHook = PlayerInjectHooks.NETWORK_SERVER_OBJECT;
@ -193,26 +201,37 @@ public class PlayerInjectionHandler {
* Retrieve a player by its DataInput connection. * Retrieve a player by its DataInput connection.
* @param inputStream - the associated DataInput connection. * @param inputStream - the associated DataInput connection.
* @return The player. * @return The player.
* @throws InterruptedException If the thread was interrupted during the wait.
*/ */
public Player getPlayerByConnection(DataInputStream inputStream) { public Player getPlayerByConnection(DataInputStream inputStream) throws InterruptedException {
try { return getPlayerByConnection(inputStream, TIMEOUT_PLAYER_LOOKUP, TimeUnit.MILLISECONDS);
// Concurrency issue! }
netLoginInjector.getReadLock().lock();
PlayerInjector injector = dataInputLookup.get(inputStream); /**
* Retrieve a player by its DataInput connection.
* @param inputStream - the associated DataInput connection.
* @param playerTimeout - the amount of time to wait for a result.
* @param unit - unit of playerTimeout.
* @return The player.
* @throws InterruptedException If the thread was interrupted during the wait.
*/
public Player getPlayerByConnection(DataInputStream inputStream, long playerTimeout, TimeUnit unit) throws InterruptedException {
if (injector != null) { PlayerInjector injector = dataInputLookup.get(inputStream, playerTimeout, unit);
return injector.getPlayer();
} else {
reporter.reportWarning(this, "Unable to find stream: " + inputStream);
return null;
}
} finally { if (injector != null) {
netLoginInjector.getReadLock().unlock(); return injector.getPlayer();
} else {
reporter.reportWarning(this, "Unable to find stream: " + inputStream);
return null;
} }
} }
/**
* Helper function that retrieves the injector type of a given player injector.
* @param injector - injector type.
* @return The injector type.
*/
private PlayerInjectHooks getInjectorType(PlayerInjector injector) { private PlayerInjectHooks getInjectorType(PlayerInjector injector) {
return injector != null ? injector.getHookType() : PlayerInjectHooks.NONE; return injector != null ? injector.getHookType() : PlayerInjectHooks.NONE;
} }
@ -404,7 +423,6 @@ public class PlayerInjectionHandler {
PlayerInjector injector = playerInjection.remove(player); PlayerInjector injector = playerInjection.remove(player);
if (injector != null) { if (injector != null) {
DataInputStream input = injector.getInputStream(true);
InetSocketAddress address = player.getAddress(); InetSocketAddress address = player.getAddress();
injector.cleanupAll(); injector.cleanupAll();
@ -423,8 +441,7 @@ public class PlayerInjectionHandler {
// Clean up // Clean up
if (removeAuxiliary) { if (removeAuxiliary) {
if (input != null) // Note that the dataInputLookup will clean itself
dataInputLookup.remove(input);
if (address != null) if (address != null)
addressLookup.remove(address); addressLookup.remove(address);
} }
@ -571,7 +588,6 @@ public class PlayerInjectionHandler {
} }
public void close() { public void close() {
// Guard // Guard
if (hasClosed || playerInjection == null) if (hasClosed || playerInjection == null)
return; return;
@ -593,7 +609,6 @@ public class PlayerInjectionHandler {
hasClosed = true; hasClosed = true;
playerInjection.clear(); playerInjection.clear();
dataInputLookup.clear();
addressLookup.clear(); addressLookup.clear();
invoker = null; invoker = null;
} }
@ -607,12 +622,9 @@ public class PlayerInjectionHandler {
// Update the DataInputStream // Update the DataInputStream
if (injector != null) { if (injector != null) {
final DataInputStream old = injector.getInputStream(true);
injector.scheduleAction(new Runnable() { injector.scheduleAction(new Runnable() {
@Override @Override
public void run() { public void run() {
dataInputLookup.remove(old);
dataInputLookup.put(injector.getInputStream(false), injector); dataInputLookup.put(injector.getInputStream(false), injector);
} }
}); });

Datei anzeigen

@ -1,5 +1,5 @@
name: ProtocolLib name: ProtocolLib
version: 1.7.0 version: 1.7.1-SNAPSHOT
description: Provides read/write access to the Minecraft protocol. description: Provides read/write access to the Minecraft protocol.
author: Comphenix author: Comphenix
website: http://www.comphenix.net/ProtocolLib website: http://www.comphenix.net/ProtocolLib

Datei anzeigen

@ -0,0 +1,42 @@
package com.comphenix.protocol.concurrency;
import static org.junit.Assert.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.junit.Test;
public class BlockingHashMapTest {
@Test
public void test() throws InterruptedException, ExecutionException {
final BlockingHashMap<Integer, String> map = BlockingHashMap.create();
ExecutorService service = Executors.newSingleThreadExecutor();
// Create a reader
Future<String> future = service.submit(new Callable<String>() {
@Override
public String call() throws Exception {
// Combine for easy reading
return map.get(0) + map.get(1);
}
});
// Wait a bit
Thread.sleep(50);
// Insert values
map.put(0, "hello ");
map.put(1, "world");
// Wait for the other thread to complete
assertEquals(future.get(), "hello world");
}
}