From 0000000000000000000000000000000000000000 Mon Sep 17 00:00:00 2001 From: Aikar Date: Wed, 6 May 2020 04:53:35 -0400 Subject: [PATCH] Optimize Network Manager and add advanced packet support Adds ability for 1 packet to bundle other packets to follow it Adds ability for a packet to delay sending more packets until a state is ready. Removes synchronization from sending packets Removes processing packet queue off of main thread - for the few cases where it is allowed, order is not necessary nor should it even be happening concurrently in first place (handshaking/login/status) Ensures packets sent asynchronously are dispatched on main thread This helps ensure safety for ProtocolLib as packet listeners are commonly accessing world state. This will allow you to schedule a packet to be sent async, but itll be dispatched sync for packet listeners to process. This should solve some deadlock risks Also adds Netty Channel Flush Consolidation to reduce the amount of flushing Also avoids spamming closed channel exception by rechecking closed state in dispatch and then catch exceptions and close if they fire. Part of this commit was authored by: Spottedleaf diff --git a/src/main/java/net/minecraft/server/NetworkManager.java b/src/main/java/net/minecraft/server/NetworkManager.java index b1dededc15cce686ead74a99bee64c89ac1de22c..289b17addd71aeab8a392e8f6a53d4c6329cf562 100644 --- a/src/main/java/net/minecraft/server/NetworkManager.java +++ b/src/main/java/net/minecraft/server/NetworkManager.java @@ -64,6 +64,10 @@ public class NetworkManager extends SimpleChannelInboundHandler> { public int protocolVersion; public java.net.InetSocketAddress virtualHost; private static boolean enableExplicitFlush = Boolean.getBoolean("paper.explicit-flush"); + // Optimize network + boolean isPending = true; + boolean queueImmunity = false; + EnumProtocol protocol; // Paper end public NetworkManager(EnumProtocolDirection enumprotocoldirection) { @@ -87,6 +91,7 @@ public class NetworkManager extends SimpleChannelInboundHandler> { } public void setProtocol(EnumProtocol enumprotocol) { + protocol = enumprotocol; // Paper this.channel.attr(NetworkManager.c).set(enumprotocol); this.channel.config().setAutoRead(true); NetworkManager.LOGGER.debug("Enabled auto read"); @@ -158,19 +163,82 @@ public class NetworkManager extends SimpleChannelInboundHandler> { NetworkManager.LOGGER.debug("Set listener of {} to {}", this, packetlistener); this.packetListener = packetlistener; } + // Paper start + private EntityPlayer getPlayer() { + if (packetListener instanceof PlayerConnection) { + return ((PlayerConnection) packetListener).player; + } else { + return null; + } + } + private static class InnerUtil { // Attempt to hide these methods from ProtocolLib so it doesn't accidently pick them up. + private static java.util.List buildExtraPackets(Packet packet) { + java.util.List extra = packet.getExtraPackets(); + if (extra == null || extra.isEmpty()) { + return null; + } + java.util.List ret = new java.util.ArrayList<>(1 + extra.size()); + buildExtraPackets0(extra, ret); + return ret; + } + + private static void buildExtraPackets0(java.util.List extraPackets, java.util.List into) { + for (Packet extra : extraPackets) { + into.add(extra); + java.util.List extraExtra = extra.getExtraPackets(); + if (extraExtra != null && !extraExtra.isEmpty()) { + buildExtraPackets0(extraExtra, into); + } + } + } + // Paper start + private static boolean canSendImmediate(NetworkManager networkManager, Packet packet) { + return networkManager.isPending || networkManager.protocol != EnumProtocol.PLAY || networkManager.queueImmunity || + packet instanceof PacketPlayOutKeepAlive || + packet instanceof PacketPlayOutChat || + packet instanceof PacketPlayOutTabComplete; + } + // Paper end + } + // Paper end public void sendPacket(Packet packet) { this.sendPacket(packet, (GenericFutureListener) null); } public void sendPacket(Packet packet, @Nullable GenericFutureListener> genericfuturelistener) { - if (this.isConnected()) { - this.o(); - this.b(packet, genericfuturelistener); - } else { - this.packetQueue.add(new NetworkManager.QueuedPacket(packet, genericfuturelistener)); + // Paper start - handle oversized packets better + boolean connected = this.isConnected(); + if (!connected && !preparing) { + return; // Do nothing + } + packet.onPacketDispatch(getPlayer()); + if (connected && (InnerUtil.canSendImmediate(this, packet) || ( + MCUtil.isMainThread() && packet.isReady() && this.packetQueue.isEmpty() && + (packet.getExtraPackets() == null || packet.getExtraPackets().isEmpty()) + ))) { + this.dispatchPacket(packet, genericfuturelistener); + return; } + // write the packets to the queue, then flush - antixray hooks there already + java.util.List extraPackets = InnerUtil.buildExtraPackets(packet); + boolean hasExtraPackets = extraPackets != null && !extraPackets.isEmpty(); + if (!hasExtraPackets) { + this.packetQueue.add(new NetworkManager.QueuedPacket(packet, genericfuturelistener)); + } else { + java.util.List packets = new java.util.ArrayList<>(1 + extraPackets.size()); + packets.add(new NetworkManager.QueuedPacket(packet, null)); // delay the future listener until the end of the extra packets + + for (int i = 0, len = extraPackets.size(); i < len;) { + Packet extra = extraPackets.get(i); + boolean end = ++i == len; + packets.add(new NetworkManager.QueuedPacket(extra, end ? genericfuturelistener : null)); // append listener to the end + } + this.packetQueue.addAll(packets); // atomic + } + this.sendPacketQueue(); + // Paper end } private void dispatchPacket(Packet packet, @Nullable GenericFutureListener> genericFutureListener) { this.b(packet, genericFutureListener); } // Paper - OBFHELPER @@ -184,51 +252,116 @@ public class NetworkManager extends SimpleChannelInboundHandler> { this.channel.config().setAutoRead(false); } + EntityPlayer player = getPlayer(); // Paper if (this.channel.eventLoop().inEventLoop()) { if (enumprotocol != enumprotocol1) { this.setProtocol(enumprotocol); } + // Paper start + if (!isConnected()) { + packet.onPacketDispatchFinish(player, null); + return; + } + try { + // Paper end ChannelFuture channelfuture = this.channel.writeAndFlush(packet); if (genericfuturelistener != null) { channelfuture.addListener(genericfuturelistener); } + // Paper start + if (packet.hasFinishListener()) { + channelfuture.addListener((ChannelFutureListener) channelFuture -> packet.onPacketDispatchFinish(player, channelFuture)); + } + // Paper end channelfuture.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE); + // Paper start + } catch (Exception e) { + LOGGER.error("NetworkException: " + player, e); + close(new ChatMessage("disconnect.genericReason", "Internal Exception: " + e.getMessage()));; + packet.onPacketDispatchFinish(player, null); + } + // Paper end } else { this.channel.eventLoop().execute(() -> { if (enumprotocol != enumprotocol1) { this.setProtocol(enumprotocol); } + // Paper start + if (!isConnected()) { + packet.onPacketDispatchFinish(player, null); + return; + } + try { + // Paper end ChannelFuture channelfuture1 = this.channel.writeAndFlush(packet); + if (genericfuturelistener != null) { channelfuture1.addListener(genericfuturelistener); } + // Paper start + if (packet.hasFinishListener()) { + channelfuture1.addListener((ChannelFutureListener) channelFuture -> packet.onPacketDispatchFinish(player, channelFuture)); + } + // Paper end channelfuture1.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE); + // Paper start + } catch (Exception e) { + LOGGER.error("NetworkException: " + player, e); + close(new ChatMessage("disconnect.genericReason", "Internal Exception: " + e.getMessage()));; + packet.onPacketDispatchFinish(player, null); + } + // Paper end }); } } - private void sendPacketQueue() { this.o(); } // Paper - OBFHELPER - private void o() { - if (this.channel != null && this.channel.isOpen()) { - Queue queue = this.packetQueue; - + // Paper start - rewrite this to be safer on + private boolean sendPacketQueue() { return this.o(); } // OBFHELPER // void -> boolean + private boolean o() { // void -> boolean + if (!isConnected()) { + return true; + } + if (MCUtil.isMainThread()) { + return processQueue(); + } else if (isPending) { + // Should only happen during login/status stages synchronized (this.packetQueue) { - NetworkManager.QueuedPacket networkmanager_queuedpacket; - - while ((networkmanager_queuedpacket = (NetworkManager.QueuedPacket) this.packetQueue.poll()) != null) { - this.b(networkmanager_queuedpacket.a, networkmanager_queuedpacket.b); - } + return this.processQueue(); + } + } + return false; + } + private boolean processQueue() { + if (this.packetQueue.isEmpty()) return true; + // If we are on main, we are safe here in that nothing else should be processing queue off main anymore + // But if we are not on main due to login/status, the parent is synchronized on packetQueue + java.util.Iterator iterator = this.packetQueue.iterator(); + while (iterator.hasNext()) { + NetworkManager.QueuedPacket queued = iterator.next(); // poll -> peek + + // Fix NPE (Spigot bug caused by handleDisconnection()) + if (queued == null) { + return true; + } + Packet packet = queued.getPacket(); + if (!packet.isReady()) { + return false; + } else { + iterator.remove(); + this.dispatchPacket(packet, queued.getGenericFutureListener()); } } + return true; } + // Paper end public void a() { this.o(); @@ -257,9 +390,21 @@ public class NetworkManager extends SimpleChannelInboundHandler> { return this.socketAddress; } + // Paper start + public void clearPacketQueue() { + EntityPlayer player = getPlayer(); + packetQueue.forEach(queuedPacket -> { + Packet packet = queuedPacket.getPacket(); + if (packet.hasFinishListener()) { + packet.onPacketDispatchFinish(player, null); + } + }); + packetQueue.clear(); + } // Paper end public void close(IChatBaseComponent ichatbasecomponent) { // Spigot Start this.preparing = false; + clearPacketQueue(); // Paper // Spigot End if (this.channel.isOpen()) { this.channel.close(); // We can't wait as this may be called from an event loop. @@ -335,7 +480,7 @@ public class NetworkManager extends SimpleChannelInboundHandler> { } else if (this.i() != null) { this.i().a(new ChatMessage("multiplayer.disconnect.generic", new Object[0])); } - this.packetQueue.clear(); // Free up packet queue. + clearPacketQueue(); // Paper // Paper start - Add PlayerConnectionCloseEvent final PacketListener packetListener = this.i(); if (packetListener instanceof PlayerConnection) { diff --git a/src/main/java/net/minecraft/server/Packet.java b/src/main/java/net/minecraft/server/Packet.java index 2d8e6a2f4a0c3c5d74a647d7164b0028781d3bf5..c5edf8c4b01cc7ddac06797133e6fd13ec5b6592 100644 --- a/src/main/java/net/minecraft/server/Packet.java +++ b/src/main/java/net/minecraft/server/Packet.java @@ -11,6 +11,20 @@ public interface Packet { void a(T t0); // Paper start + + /** + * @param player Null if not at PLAY stage yet + */ + default void onPacketDispatch(@javax.annotation.Nullable EntityPlayer player) {} + + /** + * @param player Null if not at PLAY stage yet + * @param future Can be null if packet was cancelled + */ + default void onPacketDispatchFinish(@javax.annotation.Nullable EntityPlayer player, @javax.annotation.Nullable io.netty.channel.ChannelFuture future) {} + default boolean hasFinishListener() { return false; } + default boolean isReady() { return true; } + default java.util.List getExtraPackets() { return null; } default boolean packetTooLarge(NetworkManager manager) { return false; } diff --git a/src/main/java/net/minecraft/server/PlayerList.java b/src/main/java/net/minecraft/server/PlayerList.java index 707b4523536dd6c3192451982e6e33ea4e263053..dc86dae6ff15d38fcc3af2a5f65119aa2daf08f0 100644 --- a/src/main/java/net/minecraft/server/PlayerList.java +++ b/src/main/java/net/minecraft/server/PlayerList.java @@ -143,6 +143,7 @@ public abstract class PlayerList { // CraftBukkit - getType() // Spigot - view distance + networkmanager.queueImmunity = true; // Paper playerconnection.sendPacket(new PacketPlayOutLogin(entityplayer.getId(), entityplayer.playerInteractManager.getGameMode(), WorldData.c(worlddata.getSeed()), worlddata.isHardcore(), worldserver.worldProvider.getDimensionManager().getType(), this.getMaxPlayers(), worlddata.getType(), worldserver.spigotConfig.viewDistance, flag1, !flag)); entityplayer.getBukkitEntity().sendSupportedChannels(); // CraftBukkit playerconnection.sendPacket(new PacketPlayOutCustomPayload(PacketPlayOutCustomPayload.a, (new PacketDataSerializer(Unpooled.buffer())).a(this.getServer().getServerModName()))); @@ -152,6 +153,7 @@ public abstract class PlayerList { playerconnection.sendPacket(new PacketPlayOutRecipeUpdate(this.server.getCraftingManager().b())); playerconnection.sendPacket(new PacketPlayOutTags(this.server.getTagRegistry())); playerconnection.sendPacket(new PacketPlayOutEntityStatus(entityplayer, (byte) (worldserver.getGameRules().getBoolean(GameRules.REDUCED_DEBUG_INFO) ? 22 : 23))); // Paper - fix this rule not being initialized on the client + networkmanager.queueImmunity = false; // Paper this.d(entityplayer); entityplayer.getStatisticManager().c(); entityplayer.B().a(entityplayer); diff --git a/src/main/java/net/minecraft/server/ServerConnection.java b/src/main/java/net/minecraft/server/ServerConnection.java index 37a22ba6f7a2ac54759428d23d5ea9787bb557f7..ceecb78ff879e4d6a3295b05d04deb01b7fb1da6 100644 --- a/src/main/java/net/minecraft/server/ServerConnection.java +++ b/src/main/java/net/minecraft/server/ServerConnection.java @@ -41,10 +41,12 @@ public class ServerConnection { private final List connectedChannels = Collections.synchronizedList(Lists.newArrayList()); // Paper start - prevent blocking on adding a new network manager while the server is ticking private final java.util.Queue pending = new java.util.concurrent.ConcurrentLinkedQueue<>(); + private static final boolean disableFlushConsolidation = Boolean.getBoolean("Paper.disableFlushConsolidate"); // Paper private void addPending() { NetworkManager manager = null; while ((manager = pending.poll()) != null) { connectedChannels.add(manager); + manager.isPending = false; } } // Paper end @@ -79,6 +81,7 @@ public class ServerConnection { ; } + if (!disableFlushConsolidation) channel.pipeline().addFirst(new io.netty.handler.flush.FlushConsolidationHandler()); // Paper channel.pipeline().addLast("timeout", new ReadTimeoutHandler(30)).addLast("legacy_query", new LegacyPingHandler(ServerConnection.this)).addLast("splitter", new PacketSplitter()).addLast("decoder", new PacketDecoder(EnumProtocolDirection.SERVERBOUND)).addLast("prepender", new PacketPrepender()).addLast("encoder", new PacketEncoder(EnumProtocolDirection.CLIENTBOUND)); NetworkManager networkmanager = new NetworkManager(EnumProtocolDirection.SERVERBOUND);