Archiviert
13
0

Client packets are processed on the server, so they must be

synchronized with the main thread.
Dieser Commit ist enthalten in:
Kristian S. Stangeland 2012-09-29 23:48:09 +02:00
Ursprung d58ff1c4c1
Commit 8a26d047b2
3 geänderte Dateien mit 58 neuen und 23 gelöschten Zeilen

Datei anzeigen

@ -44,12 +44,11 @@ public class ProtocolLibrary extends JavaPlugin {
// Structure compiler // Structure compiler
private BackgroundCompiler backgroundCompiler; private BackgroundCompiler backgroundCompiler;
// Used to (mostly) clean up packets that have expired // Used to clean up server packets that have expired.
// But mostly required to simulate recieving client packets.
private int asyncPacketTask = -1; private int asyncPacketTask = -1;
private int tickCounter = 0;
// Number of ticks between each cleanup. We don't need to do this often, private static final int ASYNC_PACKET_DELAY = 1;
// as it's only indeeded to detected timeouts.
private static final int ASYNC_PACKET_DELAY = 10;
@Override @Override
public void onLoad() { public void onLoad() {
@ -100,7 +99,9 @@ public class ProtocolLibrary extends JavaPlugin {
@Override @Override
public void run() { public void run() {
AsyncFilterManager manager = (AsyncFilterManager) protocolManager.getAsynchronousManager(); AsyncFilterManager manager = (AsyncFilterManager) protocolManager.getAsynchronousManager();
manager.sendProcessedPackets();
// We KNOW we're on the main thread at the moment
manager.sendProcessedPackets(tickCounter++, true);
} }
}, ASYNC_PACKET_DELAY, ASYNC_PACKET_DELAY); }, ASYNC_PACKET_DELAY, ASYNC_PACKET_DELAY);

Datei anzeigen

@ -42,8 +42,10 @@ public class AsyncFilterManager implements AsynchronousManager {
private volatile boolean cleaningUp; private volatile boolean cleaningUp;
public AsyncFilterManager(Logger logger, PacketStream packetStream) { public AsyncFilterManager(Logger logger, PacketStream packetStream) {
this.serverQueue = new PacketSendingQueue(); this.serverQueue = new PacketSendingQueue(false);
this.clientQueue = new PacketSendingQueue(); // Client packets must be synchronized
this.clientQueue = new PacketSendingQueue(true);
this.serverProcessingQueue = new PacketProcessingQueue(serverQueue); this.serverProcessingQueue = new PacketProcessingQueue(serverQueue);
this.clientProcessingQueue = new PacketProcessingQueue(clientQueue); this.clientProcessingQueue = new PacketProcessingQueue(clientQueue);
this.packetStream = packetStream; this.packetStream = packetStream;
@ -88,6 +90,7 @@ public class AsyncFilterManager implements AsynchronousManager {
void unregisterAsyncHandlerInternal(AsyncListenerHandler handler) { void unregisterAsyncHandlerInternal(AsyncListenerHandler handler) {
PacketListener listener = handler.getAsyncListener(); PacketListener listener = handler.getAsyncListener();
boolean synchronusOK = onMainThread();
// Just remove it from the queue(s) // Just remove it from the queue(s)
if (hasValidWhitelist(listener.getSendingWhitelist())) { if (hasValidWhitelist(listener.getSendingWhitelist())) {
@ -95,16 +98,24 @@ public class AsyncFilterManager implements AsynchronousManager {
// We're already taking care of this, so don't do anything // We're already taking care of this, so don't do anything
if (!cleaningUp) if (!cleaningUp)
serverQueue.signalPacketUpdate(removed); serverQueue.signalPacketUpdate(removed, synchronusOK);
} }
if (hasValidWhitelist(listener.getReceivingWhitelist())) { if (hasValidWhitelist(listener.getReceivingWhitelist())) {
List<Integer> removed = clientProcessingQueue.removeListener(handler, listener.getReceivingWhitelist()); List<Integer> removed = clientProcessingQueue.removeListener(handler, listener.getReceivingWhitelist());
if (!cleaningUp) if (!cleaningUp)
clientQueue.signalPacketUpdate(removed); clientQueue.signalPacketUpdate(removed, synchronusOK);
} }
} }
/**
* Determine if we're running on the main thread.
* @return TRUE if we are, FALSE otherwise.
*/
private boolean onMainThread() {
return Thread.currentThread().getId() == mainThread.getId();
}
@Override @Override
public void unregisterAsyncHandlers(Plugin plugin) { public void unregisterAsyncHandlers(Plugin plugin) {
unregisterAsyncHandlers(serverProcessingQueue, plugin); unregisterAsyncHandlers(serverProcessingQueue, plugin);
@ -196,7 +207,7 @@ public class AsyncFilterManager implements AsynchronousManager {
* @param packet - packet to signal. * @param packet - packet to signal.
*/ */
public void signalPacketUpdate(PacketEvent packet) { public void signalPacketUpdate(PacketEvent packet) {
getSendingQueue(packet).signalPacketUpdate(packet); getSendingQueue(packet).signalPacketUpdate(packet, onMainThread());
} }
/** /**
@ -228,8 +239,13 @@ public class AsyncFilterManager implements AsynchronousManager {
/** /**
* Send any due packets, or clean up packets that have expired. * Send any due packets, or clean up packets that have expired.
*/ */
public void sendProcessedPackets() { public void sendProcessedPackets(int tickCounter, boolean onMainThread) {
clientQueue.trySendPackets();
serverQueue.trySendPackets(); // The server queue is unlikely to need checking that often
if (tickCounter % 10 == 0) {
serverQueue.trySendPackets(onMainThread);
}
clientQueue.trySendPackets(onMainThread);
} }
} }

Datei anzeigen

@ -20,8 +20,12 @@ class PacketSendingQueue {
private PriorityBlockingQueue<PacketEvent> sendingQueue; private PriorityBlockingQueue<PacketEvent> sendingQueue;
public PacketSendingQueue() { // Whether or not packet transmission can only occur on the main thread
sendingQueue = new PriorityBlockingQueue<PacketEvent>(INITIAL_CAPACITY, new Comparator<PacketEvent>() { private final boolean synchronizeMain;
public PacketSendingQueue(boolean synchronizeMain) {
this.synchronizeMain = synchronizeMain;
this.sendingQueue = new PriorityBlockingQueue<PacketEvent>(INITIAL_CAPACITY, new Comparator<PacketEvent>() {
// Compare using the async marker // Compare using the async marker
@Override @Override
public int compare(PacketEvent o1, PacketEvent o2) { public int compare(PacketEvent o1, PacketEvent o2) {
@ -43,13 +47,13 @@ class PacketSendingQueue {
/** /**
* Invoked when one of the packets have finished processing. * Invoked when one of the packets have finished processing.
*/ */
public synchronized void signalPacketUpdate(PacketEvent packetUpdated) { public synchronized void signalPacketUpdate(PacketEvent packetUpdated, boolean onMainThread) {
// Mark this packet as finished // Mark this packet as finished
packetUpdated.getAsyncMarker().setProcessed(true); packetUpdated.getAsyncMarker().setProcessed(true);
trySendPackets(); trySendPackets(onMainThread);
} }
public synchronized void signalPacketUpdate(List<Integer> packetsRemoved) { public synchronized void signalPacketUpdate(List<Integer> packetsRemoved, boolean onMainThread) {
Set<Integer> lookup = new HashSet<Integer>(packetsRemoved); Set<Integer> lookup = new HashSet<Integer>(packetsRemoved);
@ -61,13 +65,18 @@ class PacketSendingQueue {
} }
// This is likely to have changed the situation a bit // This is likely to have changed the situation a bit
trySendPackets(); trySendPackets(onMainThread);
} }
/** /**
* Attempt to send any remaining packets. * Attempt to send any remaining packets.
*/ */
public synchronized void trySendPackets() { public void trySendPackets(boolean onMainThread) {
// Abort if we're not on the main thread
if (synchronizeMain && !onMainThread)
return;
// Transmit as many packets as we can // Transmit as many packets as we can
while (true) { while (true) {
PacketEvent current = sendingQueue.peek(); PacketEvent current = sendingQueue.peek();
@ -92,7 +101,7 @@ class PacketSendingQueue {
/** /**
* Send every packet, regardless of the processing state. * Send every packet, regardless of the processing state.
*/ */
public synchronized void forceSend() { private void forceSend() {
while (true) { while (true) {
PacketEvent current = sendingQueue.poll(); PacketEvent current = sendingQueue.poll();
@ -104,6 +113,14 @@ class PacketSendingQueue {
} }
} }
/**
* Whether or not the packet transmission must synchronize with the main thread.
* @return TRUE if it must, FALSE otherwise.
*/
public boolean isSynchronizeMain() {
return synchronizeMain;
}
/** /**
* Transmit a packet, if it hasn't already. * Transmit a packet, if it hasn't already.
* @param event - the packet to transmit. * @param event - the packet to transmit.
@ -128,6 +145,7 @@ class PacketSendingQueue {
* Automatically transmits every delayed packet. * Automatically transmits every delayed packet.
*/ */
public void cleanupAll() { public void cleanupAll() {
// Note that the cleanup itself will always occur on the main thread
forceSend(); forceSend();
} }
} }