Archiviert
13
0

Make it possible to cancel packets for asynchronous processing.

Dieser Commit ist enthalten in:
Kristian S. Stangeland 2013-05-06 18:56:19 +02:00
Ursprung 8d814d2d9c
Commit a2b04e055a
2 geänderte Dateien mit 306 neuen und 304 gelöschten Zeilen

Datei anzeigen

@ -1,304 +1,305 @@
/* /*
* ProtocolLib - Bukkit server library that allows access to the Minecraft protocol. * ProtocolLib - Bukkit server library that allows access to the Minecraft protocol.
* Copyright (C) 2012 Kristian S. Stangeland * Copyright (C) 2012 Kristian S. Stangeland
* *
* This program is free software; you can redistribute it and/or modify it under the terms of the * This program is free software; you can redistribute it and/or modify it under the terms of the
* GNU General Public License as published by the Free Software Foundation; either version 2 of * GNU General Public License as published by the Free Software Foundation; either version 2 of
* the License, or (at your option) any later version. * the License, or (at your option) any later version.
* *
* This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
* without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU General Public License for more details. * See the GNU General Public License for more details.
* *
* You should have received a copy of the GNU General Public License along with this program; * You should have received a copy of the GNU General Public License along with this program;
* if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA * if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
* 02111-1307 USA * 02111-1307 USA
*/ */
package com.comphenix.protocol.async; package com.comphenix.protocol.async;
import java.io.IOException; import java.io.IOException;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.PriorityBlockingQueue;
import org.bukkit.entity.Player; import org.bukkit.entity.Player;
import com.comphenix.protocol.events.PacketEvent; import com.comphenix.protocol.events.PacketEvent;
import com.comphenix.protocol.injector.PlayerLoggedOutException; import com.comphenix.protocol.injector.PlayerLoggedOutException;
import com.comphenix.protocol.reflect.FieldAccessException; import com.comphenix.protocol.reflect.FieldAccessException;
/** /**
* Represents packets ready to be transmitted to a client. * Represents packets ready to be transmitted to a client.
* @author Kristian * @author Kristian
*/ */
abstract class PacketSendingQueue { abstract class PacketSendingQueue {
public static final int INITIAL_CAPACITY = 10; public static final int INITIAL_CAPACITY = 10;
private PriorityBlockingQueue<PacketEventHolder> sendingQueue; private PriorityBlockingQueue<PacketEventHolder> sendingQueue;
// Asynchronous packet sending // Asynchronous packet sending
private Executor asynchronousSender; private Executor asynchronousSender;
// Whether or not packet transmission must occur on a specific thread // Whether or not packet transmission must occur on a specific thread
private final boolean notThreadSafe; private final boolean notThreadSafe;
// Whether or not we've run the cleanup procedure // Whether or not we've run the cleanup procedure
private boolean cleanedUp = false; private boolean cleanedUp = false;
/** /**
* Create a packet sending queue. * Create a packet sending queue.
* @param notThreadSafe - whether or not to synchronize with the main thread or a background thread. * @param notThreadSafe - whether or not to synchronize with the main thread or a background thread.
*/ */
public PacketSendingQueue(boolean notThreadSafe, Executor asynchronousSender) { public PacketSendingQueue(boolean notThreadSafe, Executor asynchronousSender) {
this.sendingQueue = new PriorityBlockingQueue<PacketEventHolder>(INITIAL_CAPACITY); this.sendingQueue = new PriorityBlockingQueue<PacketEventHolder>(INITIAL_CAPACITY);
this.notThreadSafe = notThreadSafe; this.notThreadSafe = notThreadSafe;
this.asynchronousSender = asynchronousSender; this.asynchronousSender = asynchronousSender;
} }
/** /**
* Number of packet events in the queue. * Number of packet events in the queue.
* @return The number of packet events in the queue. * @return The number of packet events in the queue.
*/ */
public int size() { public int size() {
return sendingQueue.size(); return sendingQueue.size();
} }
/** /**
* Enqueue a packet for sending. * Enqueue a packet for sending.
* @param packet - packet to queue. * @param packet - packet to queue.
*/ */
public void enqueue(PacketEvent packet) { public void enqueue(PacketEvent packet) {
sendingQueue.add(new PacketEventHolder(packet)); sendingQueue.add(new PacketEventHolder(packet));
} }
/** /**
* Invoked when one of the packets have finished processing. * Invoked when one of the packets have finished processing.
* @param packetUpdated - the packet that has now been updated. * @param packetUpdated - the packet that has now been updated.
* @param onMainThread - whether or not this is occuring on the main thread. * @param onMainThread - whether or not this is occuring on the main thread.
*/ */
public synchronized void signalPacketUpdate(PacketEvent packetUpdated, boolean onMainThread) { public synchronized void signalPacketUpdate(PacketEvent packetUpdated, boolean onMainThread) {
AsyncMarker marker = packetUpdated.getAsyncMarker(); AsyncMarker marker = packetUpdated.getAsyncMarker();
// Should we reorder the event? // Should we reorder the event?
if (marker.getQueuedSendingIndex() != marker.getNewSendingIndex() && !marker.hasExpired()) { if (marker.getQueuedSendingIndex() != marker.getNewSendingIndex() && !marker.hasExpired()) {
PacketEvent copy = PacketEvent.fromSynchronous(packetUpdated, marker); PacketEvent copy = PacketEvent.fromSynchronous(packetUpdated, marker);
// "Cancel" the original event // "Cancel" the original event
packetUpdated.setCancelled(true); packetUpdated.setReadOnly(false);
packetUpdated.setCancelled(true);
// Enqueue the copy with the new sending index
enqueue(copy); // Enqueue the copy with the new sending index
} enqueue(copy);
}
// Mark this packet as finished
marker.setProcessed(true); // Mark this packet as finished
trySendPackets(onMainThread); marker.setProcessed(true);
} trySendPackets(onMainThread);
}
/***
* Invoked when a list of packet IDs are no longer associated with any listeners. /***
* @param packetsRemoved - packets that no longer have any listeners. * Invoked when a list of packet IDs are no longer associated with any listeners.
* @param onMainThread - whether or not this is occuring on the main thread. * @param packetsRemoved - packets that no longer have any listeners.
*/ * @param onMainThread - whether or not this is occuring on the main thread.
public synchronized void signalPacketUpdate(List<Integer> packetsRemoved, boolean onMainThread) { */
public synchronized void signalPacketUpdate(List<Integer> packetsRemoved, boolean onMainThread) {
Set<Integer> lookup = new HashSet<Integer>(packetsRemoved);
Set<Integer> lookup = new HashSet<Integer>(packetsRemoved);
// Note that this is O(n), so it might be expensive
for (PacketEventHolder holder : sendingQueue) { // Note that this is O(n), so it might be expensive
PacketEvent event = holder.getEvent(); for (PacketEventHolder holder : sendingQueue) {
PacketEvent event = holder.getEvent();
if (lookup.contains(event.getPacketID())) {
event.getAsyncMarker().setProcessed(true); if (lookup.contains(event.getPacketID())) {
} event.getAsyncMarker().setProcessed(true);
} }
}
// This is likely to have changed the situation a bit
trySendPackets(onMainThread); // This is likely to have changed the situation a bit
} trySendPackets(onMainThread);
}
/**
* Attempt to send any remaining packets. /**
* @param onMainThread - whether or not this is occuring on the main thread. * Attempt to send any remaining packets.
*/ * @param onMainThread - whether or not this is occuring on the main thread.
public void trySendPackets(boolean onMainThread) { */
// Whether or not to continue sending packets public void trySendPackets(boolean onMainThread) {
boolean sending = true; // Whether or not to continue sending packets
boolean sending = true;
// Transmit as many packets as we can
while (sending) { // Transmit as many packets as we can
PacketEventHolder holder = sendingQueue.poll(); while (sending) {
PacketEventHolder holder = sendingQueue.poll();
if (holder != null) {
sending = processPacketHolder(onMainThread, holder); if (holder != null) {
sending = processPacketHolder(onMainThread, holder);
if (!sending) {
// Add it back again if (!sending) {
sendingQueue.add(holder); // Add it back again
} sendingQueue.add(holder);
}
} else {
// No more packets to send } else {
sending = false; // No more packets to send
} sending = false;
} }
} }
}
/**
* Invoked when a packet might be ready for transmission. /**
* @param onMainThread - TRUE if we're on the main thread, FALSE otherwise. * Invoked when a packet might be ready for transmission.
* @param holder - packet container. * @param onMainThread - TRUE if we're on the main thread, FALSE otherwise.
* @return TRUE to continue sending packets, FALSE otherwise. * @param holder - packet container.
*/ * @return TRUE to continue sending packets, FALSE otherwise.
private boolean processPacketHolder(boolean onMainThread, final PacketEventHolder holder) { */
PacketEvent current = holder.getEvent(); private boolean processPacketHolder(boolean onMainThread, final PacketEventHolder holder) {
AsyncMarker marker = current.getAsyncMarker(); PacketEvent current = holder.getEvent();
boolean hasExpired = marker.hasExpired(); AsyncMarker marker = current.getAsyncMarker();
boolean hasExpired = marker.hasExpired();
// Guard in cause the queue is closed
if (cleanedUp) { // Guard in cause the queue is closed
return true; if (cleanedUp) {
} return true;
}
// End condition?
if (marker.isProcessed() || hasExpired) { // End condition?
if (hasExpired) { if (marker.isProcessed() || hasExpired) {
// Notify timeout listeners if (hasExpired) {
onPacketTimeout(current); // Notify timeout listeners
onPacketTimeout(current);
// Recompute
marker = current.getAsyncMarker(); // Recompute
hasExpired = marker.hasExpired(); marker = current.getAsyncMarker();
hasExpired = marker.hasExpired();
// Could happen due to the timeout listeners
if (!marker.isProcessed() && !hasExpired) { // Could happen due to the timeout listeners
return false; if (!marker.isProcessed() && !hasExpired) {
} return false;
} }
}
// Is it okay to send the packet?
if (!current.isCancelled() && !hasExpired) { // Is it okay to send the packet?
// Make sure we're on the main thread if (!current.isCancelled() && !hasExpired) {
if (notThreadSafe) { // Make sure we're on the main thread
try { if (notThreadSafe) {
boolean wantAsync = marker.isMinecraftAsync(current); try {
boolean wantSync = !wantAsync; boolean wantAsync = marker.isMinecraftAsync(current);
boolean wantSync = !wantAsync;
// Wait for the next main thread heartbeat if we haven't fulfilled our promise
if (!onMainThread && wantSync) { // Wait for the next main thread heartbeat if we haven't fulfilled our promise
return false; if (!onMainThread && wantSync) {
} return false;
}
// Let's give it what it wants
if (onMainThread && wantAsync) { // Let's give it what it wants
asynchronousSender.execute(new Runnable() { if (onMainThread && wantAsync) {
@Override asynchronousSender.execute(new Runnable() {
public void run() { @Override
// We know this isn't on the main thread public void run() {
processPacketHolder(false, holder); // We know this isn't on the main thread
} processPacketHolder(false, holder);
}); }
});
// Scheduler will do the rest
return true; // Scheduler will do the rest
} return true;
}
} catch (FieldAccessException e) {
e.printStackTrace(); } catch (FieldAccessException e) {
e.printStackTrace();
// Just drop the packet
return true; // Just drop the packet
} return true;
} }
}
// Silently skip players that have logged out
if (isOnline(current.getPlayer())) { // Silently skip players that have logged out
sendPacket(current); if (isOnline(current.getPlayer())) {
} sendPacket(current);
} }
}
// Drop the packet
return true; // Drop the packet
} return true;
}
// Add it back and stop sending
return false; // Add it back and stop sending
} return false;
}
/**
* Invoked when a packet has timed out. /**
* @param event - the timed out packet. * Invoked when a packet has timed out.
*/ * @param event - the timed out packet.
protected abstract void onPacketTimeout(PacketEvent event); */
protected abstract void onPacketTimeout(PacketEvent event);
private boolean isOnline(Player player) {
return player != null && player.isOnline(); private boolean isOnline(Player player) {
} return player != null && player.isOnline();
}
/**
* Send every packet, regardless of the processing state. /**
*/ * Send every packet, regardless of the processing state.
private void forceSend() { */
while (true) { private void forceSend() {
PacketEventHolder holder = sendingQueue.poll(); while (true) {
PacketEventHolder holder = sendingQueue.poll();
if (holder != null) {
sendPacket(holder.getEvent()); if (holder != null) {
} else { sendPacket(holder.getEvent());
break; } else {
} break;
} }
} }
}
/**
* Whether or not the packet transmission must synchronize with the main thread. /**
* @return TRUE if it must, FALSE otherwise. * Whether or not the packet transmission must synchronize with the main thread.
*/ * @return TRUE if it must, FALSE otherwise.
public boolean isSynchronizeMain() { */
return notThreadSafe; public boolean isSynchronizeMain() {
} return notThreadSafe;
}
/**
* Transmit a packet, if it hasn't already. /**
* @param event - the packet to transmit. * Transmit a packet, if it hasn't already.
*/ * @param event - the packet to transmit.
private void sendPacket(PacketEvent event) { */
private void sendPacket(PacketEvent event) {
AsyncMarker marker = event.getAsyncMarker();
AsyncMarker marker = event.getAsyncMarker();
try {
// Don't send a packet twice try {
if (marker != null && !marker.isTransmitted()) { // Don't send a packet twice
marker.sendPacket(event); if (marker != null && !marker.isTransmitted()) {
} marker.sendPacket(event);
}
} catch (PlayerLoggedOutException e) {
System.out.println(String.format( } catch (PlayerLoggedOutException e) {
"[ProtocolLib] Warning: Dropped packet index %s of ID %s", System.out.println(String.format(
marker.getOriginalSendingIndex(), event.getPacketID() "[ProtocolLib] Warning: Dropped packet index %s of ID %s",
)); marker.getOriginalSendingIndex(), event.getPacketID()
));
} catch (IOException e) {
// Just print the error } catch (IOException e) {
e.printStackTrace(); // Just print the error
} e.printStackTrace();
} }
}
/**
* Automatically transmits every delayed packet. /**
*/ * Automatically transmits every delayed packet.
public void cleanupAll() { */
if (!cleanedUp) { public void cleanupAll() {
// Note that the cleanup itself will always occur on the main thread if (!cleanedUp) {
forceSend(); // Note that the cleanup itself will always occur on the main thread
forceSend();
// And we're done
cleanedUp = true; // And we're done
} cleanedUp = true;
} }
} }
}

Datei anzeigen

@ -496,6 +496,7 @@ public final class PacketFilterManager implements ProtocolManager, ListenerInvok
asyncFilterManager.enqueueSyncPacket(event, event.getAsyncMarker()); asyncFilterManager.enqueueSyncPacket(event, event.getAsyncMarker());
// The above makes a copy of the event, so it's safe to cancel it // The above makes a copy of the event, so it's safe to cancel it
event.setReadOnly(false);
event.setCancelled(true); event.setCancelled(true);
} }
} }