From 0000000000000000000000000000000000000000 Mon Sep 17 00:00:00 2001 From: Aikar Date: Wed, 6 May 2020 04:53:35 -0400 Subject: [PATCH] Optimize Network Manager and add advanced packet support Adds ability for 1 packet to bundle other packets to follow it Adds ability for a packet to delay sending more packets until a state is ready. Removes synchronization from sending packets Removes processing packet queue off of main thread - for the few cases where it is allowed, order is not necessary nor should it even be happening concurrently in first place (handshaking/login/status) Ensures packets sent asynchronously are dispatched on main thread This helps ensure safety for ProtocolLib as packet listeners are commonly accessing world state. This will allow you to schedule a packet to be sent async, but itll be dispatched sync for packet listeners to process. This should solve some deadlock risks Part of this commit was authored by: Spottedleaf diff --git a/src/main/java/net/minecraft/server/NetworkManager.java b/src/main/java/net/minecraft/server/NetworkManager.java index b1dededc15cce686ead74a99bee64c89ac1de22c..94c25c542dd18396fa792af944794bdb2436d2fd 100644 --- a/src/main/java/net/minecraft/server/NetworkManager.java +++ b/src/main/java/net/minecraft/server/NetworkManager.java @@ -64,6 +64,10 @@ public class NetworkManager extends SimpleChannelInboundHandler> { public int protocolVersion; public java.net.InetSocketAddress virtualHost; private static boolean enableExplicitFlush = Boolean.getBoolean("paper.explicit-flush"); + // Optimize network + boolean isPending = true; + boolean queueImmunity = false; + EnumProtocol protocol; // Paper end public NetworkManager(EnumProtocolDirection enumprotocoldirection) { @@ -87,6 +91,7 @@ public class NetworkManager extends SimpleChannelInboundHandler> { } public void setProtocol(EnumProtocol enumprotocol) { + protocol = enumprotocol; // Paper this.channel.attr(NetworkManager.c).set(enumprotocol); this.channel.config().setAutoRead(true); NetworkManager.LOGGER.debug("Enabled auto read"); @@ -158,19 +163,82 @@ public class NetworkManager extends SimpleChannelInboundHandler> { NetworkManager.LOGGER.debug("Set listener of {} to {}", this, packetlistener); this.packetListener = packetlistener; } + // Paper start + private EntityPlayer getPlayer() { + if (packetListener instanceof PlayerConnection) { + return ((PlayerConnection) packetListener).player; + } else { + return null; + } + } + private static class InnerUtil { // Attempt to hide these methods from ProtocolLib so it doesn't accidently pick them up. + private static java.util.List buildExtraPackets(Packet packet) { + java.util.List extra = packet.getExtraPackets(); + if (extra == null || extra.isEmpty()) { + return null; + } + java.util.List ret = new java.util.ArrayList<>(1 + extra.size()); + buildExtraPackets0(extra, ret); + return ret; + } + + private static void buildExtraPackets0(java.util.List extraPackets, java.util.List into) { + for (Packet extra : extraPackets) { + into.add(extra); + java.util.List extraExtra = extra.getExtraPackets(); + if (extraExtra != null && !extraExtra.isEmpty()) { + buildExtraPackets0(extraExtra, into); + } + } + } + // Paper start + private static boolean canSendImmediate(NetworkManager networkManager, Packet packet) { + return networkManager.isPending || networkManager.protocol != EnumProtocol.PLAY || networkManager.queueImmunity || + packet instanceof PacketPlayOutKeepAlive || + packet instanceof PacketPlayOutChat || + packet instanceof PacketPlayOutTabComplete; + } + // Paper end + } + // Paper end public void sendPacket(Packet packet) { this.sendPacket(packet, (GenericFutureListener) null); } public void sendPacket(Packet packet, @Nullable GenericFutureListener> genericfuturelistener) { - if (this.isConnected()) { - this.o(); - this.b(packet, genericfuturelistener); - } else { - this.packetQueue.add(new NetworkManager.QueuedPacket(packet, genericfuturelistener)); + // Paper start - handle oversized packets better + boolean connected = this.isConnected(); + if (!connected && !preparing) { + return; // Do nothing + } + packet.onPacketDispatch(getPlayer()); + if (connected && (InnerUtil.canSendImmediate(this, packet) || ( + MCUtil.isMainThread() && packet.isReady() && this.packetQueue.isEmpty() && + (packet.getExtraPackets() == null || packet.getExtraPackets().isEmpty()) + ))) { + this.dispatchPacket(packet, genericfuturelistener); + return; } + // write the packets to the queue, then flush - antixray hooks there already + java.util.List extraPackets = InnerUtil.buildExtraPackets(packet); + boolean hasExtraPackets = extraPackets != null && !extraPackets.isEmpty(); + if (!hasExtraPackets) { + this.packetQueue.add(new NetworkManager.QueuedPacket(packet, genericfuturelistener)); + } else { + java.util.List packets = new java.util.ArrayList<>(1 + extraPackets.size()); + packets.add(new NetworkManager.QueuedPacket(packet, null)); // delay the future listener until the end of the extra packets + for (int i = 0, len = extraPackets.size(); i < len;) { + Packet extra = extraPackets.get(i); + boolean end = ++i == len; + packets.add(new NetworkManager.QueuedPacket(extra, end ? genericfuturelistener : null)); // append listener to the end + } + + this.packetQueue.addAll(packets); // atomic + } + this.sendPacketQueue(); + // Paper end } private void dispatchPacket(Packet packet, @Nullable GenericFutureListener> genericFutureListener) { this.b(packet, genericFutureListener); } // Paper - OBFHELPER @@ -194,6 +262,11 @@ public class NetworkManager extends SimpleChannelInboundHandler> { if (genericfuturelistener != null) { channelfuture.addListener(genericfuturelistener); } + // Paper start + if (packet.hasFinishListener()) { + channelfuture.addListener((ChannelFutureListener) channelFuture -> packet.onPacketDispatchFinish(getPlayer(), channelFuture)); + } + // Paper end channelfuture.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE); } else { @@ -207,6 +280,11 @@ public class NetworkManager extends SimpleChannelInboundHandler> { if (genericfuturelistener != null) { channelfuture1.addListener(genericfuturelistener); } + // Paper start + if (packet.hasFinishListener()) { + channelfuture1.addListener((ChannelFutureListener) channelFuture -> packet.onPacketDispatchFinish(getPlayer(), channelFuture)); + } + // Paper end channelfuture1.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE); }); @@ -214,21 +292,46 @@ public class NetworkManager extends SimpleChannelInboundHandler> { } - private void sendPacketQueue() { this.o(); } // Paper - OBFHELPER - private void o() { - if (this.channel != null && this.channel.isOpen()) { - Queue queue = this.packetQueue; - + // Paper start - rewrite this to be safer on + private boolean sendPacketQueue() { return this.o(); } // OBFHELPER // void -> boolean + private boolean o() { // void -> boolean + if (!isConnected()) { + return true; + } + if (MCUtil.isMainThread()) { + return processQueue(); + } else if (isPending) { + // Should only happen during login/status stages synchronized (this.packetQueue) { - NetworkManager.QueuedPacket networkmanager_queuedpacket; - - while ((networkmanager_queuedpacket = (NetworkManager.QueuedPacket) this.packetQueue.poll()) != null) { - this.b(networkmanager_queuedpacket.a, networkmanager_queuedpacket.b); - } + return this.processQueue(); + } + } + return false; + } + private boolean processQueue() { + if (this.packetQueue.isEmpty()) return true; + // If we are on main, we are safe here in that nothing else should be processing queue off main anymore + // But if we are not on main due to login/status, the parent is synchronized on packetQueue + java.util.Iterator iterator = this.packetQueue.iterator(); + while (iterator.hasNext()) { + NetworkManager.QueuedPacket queued = iterator.next(); // poll -> peek + + // Fix NPE (Spigot bug caused by handleDisconnection()) + if (queued == null) { + return true; + } + Packet packet = queued.getPacket(); + if (!packet.isReady()) { + return false; + } else { + iterator.remove(); + this.dispatchPacket(packet, queued.getGenericFutureListener()); } } + return true; } + // Paper end public void a() { this.o(); @@ -257,9 +360,21 @@ public class NetworkManager extends SimpleChannelInboundHandler> { return this.socketAddress; } + // Paper start + public void clearPacketQueue() { + EntityPlayer player = getPlayer(); + packetQueue.forEach(queuedPacket -> { + Packet packet = queuedPacket.getPacket(); + if (packet.hasFinishListener()) { + packet.onPacketDispatchFinish(player, null); + } + }); + packetQueue.clear(); + } // Paper end public void close(IChatBaseComponent ichatbasecomponent) { // Spigot Start this.preparing = false; + clearPacketQueue(); // Paper // Spigot End if (this.channel.isOpen()) { this.channel.close(); // We can't wait as this may be called from an event loop. @@ -335,7 +450,7 @@ public class NetworkManager extends SimpleChannelInboundHandler> { } else if (this.i() != null) { this.i().a(new ChatMessage("multiplayer.disconnect.generic", new Object[0])); } - this.packetQueue.clear(); // Free up packet queue. + clearPacketQueue(); // Paper // Paper start - Add PlayerConnectionCloseEvent final PacketListener packetListener = this.i(); if (packetListener instanceof PlayerConnection) { diff --git a/src/main/java/net/minecraft/server/Packet.java b/src/main/java/net/minecraft/server/Packet.java index 2d8e6a2f4a0c3c5d74a647d7164b0028781d3bf5..ffc9a1f7d58d67611c4ab46462ac13a921042313 100644 --- a/src/main/java/net/minecraft/server/Packet.java +++ b/src/main/java/net/minecraft/server/Packet.java @@ -11,6 +11,20 @@ public interface Packet { void a(T t0); // Paper start + + /** + * @param player Null if not at PLAY stage yet + */ + default void onPacketDispatch(@javax.annotation.Nullable EntityPlayer player) {} + + /** + * @param player Null if not at PLAY stage yet + * @param future Can be null if packet was cancelled + */ + default void onPacketDispatchFinish(@javax.annotation.Nullable EntityPlayer player, @javax.annotation.Nullable io.netty.channel.ChannelFuture future) {} + default boolean hasFinishListener() { return false; } + default boolean isReady() { return true; } + default java.util.List getExtraPackets() { return null; } default boolean packetTooLarge(NetworkManager manager) { return false; } diff --git a/src/main/java/net/minecraft/server/PlayerList.java b/src/main/java/net/minecraft/server/PlayerList.java index 5136905b71085445eb6bac00e9200af8cc7fbe27..14c82861158eed8c91336590fb71c69d185d22f8 100644 --- a/src/main/java/net/minecraft/server/PlayerList.java +++ b/src/main/java/net/minecraft/server/PlayerList.java @@ -143,6 +143,7 @@ public abstract class PlayerList { // CraftBukkit - getType() // Spigot - view distance + networkmanager.queueImmunity = true; // Paper playerconnection.sendPacket(new PacketPlayOutLogin(entityplayer.getId(), entityplayer.playerInteractManager.getGameMode(), WorldData.c(worlddata.getSeed()), worlddata.isHardcore(), worldserver.worldProvider.getDimensionManager().getType(), this.getMaxPlayers(), worlddata.getType(), worldserver.spigotConfig.viewDistance, flag1, !flag)); entityplayer.getBukkitEntity().sendSupportedChannels(); // CraftBukkit playerconnection.sendPacket(new PacketPlayOutCustomPayload(PacketPlayOutCustomPayload.a, (new PacketDataSerializer(Unpooled.buffer())).a(this.getServer().getServerModName()))); @@ -152,6 +153,7 @@ public abstract class PlayerList { playerconnection.sendPacket(new PacketPlayOutRecipeUpdate(this.server.getCraftingManager().b())); playerconnection.sendPacket(new PacketPlayOutTags(this.server.getTagRegistry())); playerconnection.sendPacket(new PacketPlayOutEntityStatus(entityplayer, (byte) (worldserver.getGameRules().getBoolean(GameRules.REDUCED_DEBUG_INFO) ? 22 : 23))); // Paper - fix this rule not being initialized on the client + networkmanager.queueImmunity = false; // Paper this.d(entityplayer); entityplayer.getStatisticManager().c(); entityplayer.B().a(entityplayer); diff --git a/src/main/java/net/minecraft/server/ServerConnection.java b/src/main/java/net/minecraft/server/ServerConnection.java index 37a22ba6f7a2ac54759428d23d5ea9787bb557f7..06cd29bb9a5d6b67f896c129662c3f493238c758 100644 --- a/src/main/java/net/minecraft/server/ServerConnection.java +++ b/src/main/java/net/minecraft/server/ServerConnection.java @@ -45,6 +45,7 @@ public class ServerConnection { NetworkManager manager = null; while ((manager = pending.poll()) != null) { connectedChannels.add(manager); + manager.isPending = false; } } // Paper end