Archiviert
13
0

Process asynchronous packets on an async thread.

Bukkit complains if we try to send an async packet on the main thread,
so we will have to add a new background thread that can transmit
packets processed by light-weight packet listeners.

In addition, fixed a bug causing the "uncancel" method in 
PacketInjector from not working properly. That bug is as persistent
as a zombie.
Dieser Commit ist enthalten in:
Kristian S. Stangeland 2012-11-21 01:39:43 +01:00
Ursprung 9482818751
Commit 0b200472f1
5 geänderte Dateien mit 155 neuen und 60 gelöschten Zeilen

Datei anzeigen

@ -90,6 +90,7 @@ public class AsyncFilterManager implements AsynchronousManager {
this.playerSendingHandler = new PlayerSendingHandler(reporter, serverTimeoutListeners, clientTimeoutListeners); this.playerSendingHandler = new PlayerSendingHandler(reporter, serverTimeoutListeners, clientTimeoutListeners);
this.serverProcessingQueue = new PacketProcessingQueue(playerSendingHandler); this.serverProcessingQueue = new PacketProcessingQueue(playerSendingHandler);
this.clientProcessingQueue = new PacketProcessingQueue(playerSendingHandler); this.clientProcessingQueue = new PacketProcessingQueue(playerSendingHandler);
this.playerSendingHandler.initializeScheduler();
this.scheduler = scheduler; this.scheduler = scheduler;
this.manager = manager; this.manager = manager;

Datei anzeigen

@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.PriorityBlockingQueue;
import org.bukkit.entity.Player; import org.bukkit.entity.Player;
@ -39,9 +40,25 @@ abstract class PacketSendingQueue {
private PriorityBlockingQueue<PacketEventHolder> sendingQueue; private PriorityBlockingQueue<PacketEventHolder> sendingQueue;
// Asynchronous packet sending
private Executor asynchronousSender;
// Whether or not packet transmission can only occur on the main thread // Whether or not packet transmission can only occur on the main thread
private final boolean synchronizeMain; private final boolean synchronizeMain;
// Whether or not we've run the cleanup procedure
private boolean cleanedUp = false;
/**
* Create a packet sending queue.
* @param synchronizeMain - whether or not to synchronize with the main thread.
*/
public PacketSendingQueue(boolean synchronizeMain, Executor asynchronousSender) {
this.sendingQueue = new PriorityBlockingQueue<PacketEventHolder>(INITIAL_CAPACITY);
this.synchronizeMain = synchronizeMain;
this.asynchronousSender = asynchronousSender;
}
/** /**
* Number of packet events in the queue. * Number of packet events in the queue.
* @return The number of packet events in the queue. * @return The number of packet events in the queue.
@ -50,15 +67,6 @@ abstract class PacketSendingQueue {
return sendingQueue.size(); return sendingQueue.size();
} }
/**
* Create a packet sending queue.
* @param synchronizeMain - whether or not to synchronize with the main thread.
*/
public PacketSendingQueue(boolean synchronizeMain) {
this.sendingQueue = new PriorityBlockingQueue<PacketEventHolder>(INITIAL_CAPACITY);
this.synchronizeMain = synchronizeMain;
}
/** /**
* Enqueue a packet for sending. * Enqueue a packet for sending.
* @param packet - packet to queue. * @param packet - packet to queue.
@ -119,32 +127,45 @@ abstract class PacketSendingQueue {
* @param onMainThread - whether or not this is occuring on the main thread. * @param onMainThread - whether or not this is occuring on the main thread.
*/ */
public void trySendPackets(boolean onMainThread) { public void trySendPackets(boolean onMainThread) {
// Whether or not to continue sending packets
boolean sending = true;
// Transmit as many packets as we can // Transmit as many packets as we can
while (true) { while (sending) {
PacketEventHolder holder = sendingQueue.peek(); PacketEventHolder holder = sendingQueue.poll();
if (holder != null) { if (holder != null) {
sending = processPacketHolder(onMainThread, holder);
if (!sending) {
// Add it back again
sendingQueue.add(holder);
}
} else {
// No more packets to send
sending = false;
}
}
}
/**
* Invoked when a packet might be ready for transmission.
* @param onMainThread - TRUE if we're on the main thread, FALSE otherwise.
* @param holder - packet container.
* @return TRUE to continue sending packets, FALSE otherwise.
*/
private boolean processPacketHolder(boolean onMainThread, final PacketEventHolder holder) {
PacketEvent current = holder.getEvent(); PacketEvent current = holder.getEvent();
AsyncMarker marker = current.getAsyncMarker(); AsyncMarker marker = current.getAsyncMarker();
boolean hasExpired = marker.hasExpired(); boolean hasExpired = marker.hasExpired();
// Abort if we're not on the main thread // Guard in cause the queue is closed
if (synchronizeMain) { if (cleanedUp) {
try { return true;
boolean wantAsync = marker.isMinecraftAsync(current);
boolean wantSync = !wantAsync;
// Quit if we haven't fulfilled our promise
if ((onMainThread && wantAsync) || (!onMainThread && wantSync))
return;
} catch (FieldAccessException e) {
e.printStackTrace();
return;
}
} }
// End condition?
if (marker.isProcessed() || hasExpired) { if (marker.isProcessed() || hasExpired) {
if (hasExpired) { if (hasExpired) {
// Notify timeout listeners // Notify timeout listeners
@ -154,6 +175,39 @@ abstract class PacketSendingQueue {
marker = current.getAsyncMarker(); marker = current.getAsyncMarker();
hasExpired = marker.hasExpired(); hasExpired = marker.hasExpired();
} }
// Abort if we're not on the main thread
if (synchronizeMain && !hasExpired) {
try {
boolean wantAsync = marker.isMinecraftAsync(current);
boolean wantSync = !wantAsync;
// Wait for the next main thread heartbeat if we haven't fulfilled our promise
if (!onMainThread && wantSync) {
return false;
}
// Let's give it what it wants, then
if (onMainThread && wantAsync) {
asynchronousSender.execute(new Runnable() {
@Override
public void run() {
// We know this isn't on the main thread
processPacketHolder(false, holder);
}
});
// The executor will take it from here
return true;
}
} catch (FieldAccessException e) {
e.printStackTrace();
// Skip this packet
return true;
}
}
if (marker.isProcessed() && !current.isCancelled() && !hasExpired) { if (marker.isProcessed() && !current.isCancelled() && !hasExpired) {
// Silently skip players that have logged out // Silently skip players that have logged out
if (isOnline(current.getPlayer())) { if (isOnline(current.getPlayer())) {
@ -161,13 +215,11 @@ abstract class PacketSendingQueue {
} }
} }
sendingQueue.poll(); return true;
continue;
}
}
// Only repeat when packets are removed } else {
break; // Add it back and stop sending
return false;
} }
} }
@ -234,7 +286,12 @@ abstract class PacketSendingQueue {
* Automatically transmits every delayed packet. * Automatically transmits every delayed packet.
*/ */
public void cleanupAll() { public void cleanupAll() {
if (!cleanedUp) {
// Note that the cleanup itself will always occur on the main thread // Note that the cleanup itself will always occur on the main thread
forceSend(); forceSend();
// And we're done
cleanedUp = true;
}
} }
} }

Datei anzeigen

@ -3,12 +3,16 @@ package com.comphenix.protocol.async;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import org.bukkit.entity.Player; import org.bukkit.entity.Player;
import com.comphenix.protocol.error.ErrorReporter; import com.comphenix.protocol.error.ErrorReporter;
import com.comphenix.protocol.events.PacketEvent; import com.comphenix.protocol.events.PacketEvent;
import com.comphenix.protocol.injector.SortedPacketListenerList; import com.comphenix.protocol.injector.SortedPacketListenerList;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/** /**
* Contains every sending queue for every player. * Contains every sending queue for every player.
@ -24,6 +28,9 @@ class PlayerSendingHandler {
private SortedPacketListenerList serverTimeoutListeners; private SortedPacketListenerList serverTimeoutListeners;
private SortedPacketListenerList clientTimeoutListeners; private SortedPacketListenerList clientTimeoutListeners;
// Asynchronous packet sending
private Executor asynchronousSender;
// Whether or not we're currently cleaning up // Whether or not we're currently cleaning up
private volatile boolean cleaningUp; private volatile boolean cleaningUp;
@ -38,7 +45,7 @@ class PlayerSendingHandler {
public QueueContainer() { public QueueContainer() {
// Server packets are synchronized already // Server packets are synchronized already
serverQueue = new PacketSendingQueue(false) { serverQueue = new PacketSendingQueue(false, asynchronousSender) {
@Override @Override
protected void onPacketTimeout(PacketEvent event) { protected void onPacketTimeout(PacketEvent event) {
if (!cleaningUp) { if (!cleaningUp) {
@ -48,7 +55,7 @@ class PlayerSendingHandler {
}; };
// Client packets must be synchronized // Client packets must be synchronized
clientQueue = new PacketSendingQueue(true) { clientQueue = new PacketSendingQueue(true, asynchronousSender) {
@Override @Override
protected void onPacketTimeout(PacketEvent event) { protected void onPacketTimeout(PacketEvent event) {
if (!cleaningUp) { if (!cleaningUp) {
@ -67,6 +74,12 @@ class PlayerSendingHandler {
} }
} }
/**
* Initialize a packet sending handler.
* @param reporter - error reporter.
* @param serverTimeoutListeners - set of server timeout listeners.
* @param clientTimeoutListeners - set of client timeout listeners.
*/
public PlayerSendingHandler(ErrorReporter reporter, public PlayerSendingHandler(ErrorReporter reporter,
SortedPacketListenerList serverTimeoutListeners, SortedPacketListenerList clientTimeoutListeners) { SortedPacketListenerList serverTimeoutListeners, SortedPacketListenerList clientTimeoutListeners) {
@ -75,7 +88,20 @@ class PlayerSendingHandler {
this.clientTimeoutListeners = clientTimeoutListeners; this.clientTimeoutListeners = clientTimeoutListeners;
// Initialize storage of queues // Initialize storage of queues
playerSendingQueues = new ConcurrentHashMap<String, QueueContainer>(); this.playerSendingQueues = new ConcurrentHashMap<String, QueueContainer>();
}
/**
* Start the asynchronous packet sender.
*/
public synchronized void initializeScheduler() {
if (asynchronousSender == null) {
ThreadFactory factory = new ThreadFactoryBuilder().
setDaemon(true).
setNameFormat("ProtocolLib-AsyncSender %s").
build();
asynchronousSender = Executors.newSingleThreadExecutor(factory);
}
} }
/** /**
@ -202,11 +228,13 @@ class PlayerSendingHandler {
* Send all pending packets and clean up queues. * Send all pending packets and clean up queues.
*/ */
public void cleanupAll() { public void cleanupAll() {
if (!cleaningUp) {
cleaningUp = true; cleaningUp = true;
sendAllPackets(); sendAllPackets();
playerSendingQueues.clear(); playerSendingQueues.clear();
} }
}
/** /**
* Invoked when a player has just logged out. * Invoked when a player has just logged out.

Datei anzeigen

@ -83,8 +83,8 @@ class PacketInjector {
public void undoCancel(Integer id, Packet packet) { public void undoCancel(Integer id, Packet packet) {
ReadPacketModifier modifier = readModifier.get(id); ReadPacketModifier modifier = readModifier.get(id);
// Cancelled packets are represented with NULL // See if this packet has been cancelled before
if (modifier != null && modifier.getOverride(packet) == null) { if (modifier != null && modifier.hasCancelled(packet)) {
modifier.removeOverride(packet); modifier.removeOverride(packet);
} }
} }

Datei anzeigen

@ -74,6 +74,15 @@ class ReadPacketModifier implements MethodInterceptor {
return override.get(packet); return override.get(packet);
} }
/**
* Determine if the given packet has been cancelled before.
* @param packet - the packet to check.
* @return TRUE if it has been cancelled, FALSE otherwise.
*/
public boolean hasCancelled(Packet packet) {
return getOverride(packet) == CANCEL_MARKER;
}
@Override @Override
public Object intercept(Object thisObj, Method method, Object[] args, MethodProxy proxy) throws Throwable { public Object intercept(Object thisObj, Method method, Object[] args, MethodProxy proxy) throws Throwable {