2021-06-11 14:02:28 +02:00
From 0000000000000000000000000000000000000000 Mon Sep 17 00:00:00 2001
From: Aikar <aikar@aikar.co>
Date: Wed, 6 May 2020 04:53:35 -0400
Subject: [PATCH] Optimize Network Manager and add advanced packet support
Adds ability for 1 packet to bundle other packets to follow it
Adds ability for a packet to delay sending more packets until a state is ready.
Removes synchronization from sending packets
Removes processing packet queue off of main thread
- for the few cases where it is allowed, order is not necessary nor
should it even be happening concurrently in first place (handshaking/login/status)
Ensures packets sent asynchronously are dispatched on main thread
This helps ensure safety for ProtocolLib as packet listeners
are commonly accessing world state. This will allow you to schedule
a packet to be sent async, but itll be dispatched sync for packet
listeners to process.
This should solve some deadlock risks
Also adds Netty Channel Flush Consolidation to reduce the amount of flushing
Also avoids spamming closed channel exception by rechecking closed state in dispatch
and then catch exceptions and close if they fire.
Part of this commit was authored by: Spottedleaf
diff --git a/src/main/java/net/minecraft/network/Connection.java b/src/main/java/net/minecraft/network/Connection.java
2022-10-24 21:43:46 +02:00
index dd9c03611e410e601ba4a7769474fada8c28c104..be571be69b5c3df41531b6c8c7be467afaa4dc55 100644
2021-06-11 14:02:28 +02:00
--- a/src/main/java/net/minecraft/network/Connection.java
+++ b/src/main/java/net/minecraft/network/Connection.java
2022-09-26 10:02:51 +02:00
@@ -115,6 +115,10 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
2021-06-11 14:02:28 +02:00
public int protocolVersion;
public java.net.InetSocketAddress virtualHost;
private static boolean enableExplicitFlush = Boolean.getBoolean("paper.explicit-flush");
+ // Optimize network
+ public boolean isPending = true;
+ public boolean queueImmunity = false;
+ public ConnectionProtocol protocol;
// Paper end
public Connection(PacketFlow side) {
2022-09-26 10:02:51 +02:00
@@ -138,6 +142,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
2021-06-11 14:02:28 +02:00
}
public void setProtocol(ConnectionProtocol state) {
+ protocol = state; // Paper
this.channel.attr(Connection.ATTRIBUTE_PROTOCOL).set(state);
this.channel.config().setAutoRead(true);
Connection.LOGGER.debug("Enabled auto read");
2022-09-26 10:02:51 +02:00
@@ -216,19 +221,89 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
2021-06-11 14:02:28 +02:00
Validate.notNull(listener, "packetListener", new Object[0]);
this.packetListener = listener;
}
+ // Paper start
2022-07-28 00:00:14 +02:00
+ public @Nullable net.minecraft.server.level.ServerPlayer getPlayer() {
+ if (packetListener instanceof net.minecraft.server.network.ServerGamePacketListenerImpl serverGamePacketListener) {
+ return serverGamePacketListener.player;
2021-06-11 14:02:28 +02:00
+ } else {
+ return null;
+ }
+ }
+ private static class InnerUtil { // Attempt to hide these methods from ProtocolLib so it doesn't accidently pick them up.
+ private static java.util.List<Packet> buildExtraPackets(Packet packet) {
+ java.util.List<Packet> extra = packet.getExtraPackets();
+ if (extra == null || extra.isEmpty()) {
+ return null;
+ }
+ java.util.List<Packet> ret = new java.util.ArrayList<>(1 + extra.size());
+ buildExtraPackets0(extra, ret);
+ return ret;
+ }
+
+ private static void buildExtraPackets0(java.util.List<Packet> extraPackets, java.util.List<Packet> into) {
+ for (Packet extra : extraPackets) {
+ into.add(extra);
+ java.util.List<Packet> extraExtra = extra.getExtraPackets();
+ if (extraExtra != null && !extraExtra.isEmpty()) {
+ buildExtraPackets0(extraExtra, into);
+ }
+ }
+ }
+ // Paper start
+ private static boolean canSendImmediate(Connection networkManager, Packet<?> packet) {
+ return networkManager.isPending || networkManager.protocol != ConnectionProtocol.PLAY ||
2021-06-13 11:41:07 +02:00
+ packet instanceof net.minecraft.network.protocol.game.ClientboundKeepAlivePacket ||
2022-06-08 05:59:40 +02:00
+ packet instanceof net.minecraft.network.protocol.game.ClientboundPlayerChatPacket ||
+ packet instanceof net.minecraft.network.protocol.game.ClientboundSystemChatPacket ||
+ packet instanceof net.minecraft.network.protocol.game.ClientboundChatPreviewPacket ||
2021-06-13 11:41:07 +02:00
+ packet instanceof net.minecraft.network.protocol.game.ClientboundCommandSuggestionsPacket ||
+ packet instanceof net.minecraft.network.protocol.game.ClientboundSetTitleTextPacket ||
+ packet instanceof net.minecraft.network.protocol.game.ClientboundSetSubtitleTextPacket ||
+ packet instanceof net.minecraft.network.protocol.game.ClientboundSetActionBarTextPacket ||
+ packet instanceof net.minecraft.network.protocol.game.ClientboundSetTitlesAnimationPacket ||
+ packet instanceof net.minecraft.network.protocol.game.ClientboundClearTitlesPacket ||
+ packet instanceof net.minecraft.network.protocol.game.ClientboundBossEventPacket;
2021-06-11 14:02:28 +02:00
+ }
+ // Paper end
+ }
+ // Paper end
public void send(Packet<?> packet) {
2022-07-27 21:49:24 +02:00
this.send(packet, (PacketSendListener) null);
2021-06-11 14:02:28 +02:00
}
2022-08-06 00:58:34 +02:00
public void send(Packet<?> packet, @Nullable PacketSendListener callbacks) {
2021-06-11 14:02:28 +02:00
- if (this.isConnected()) {
- this.flushQueue();
+ // Paper start - handle oversized packets better
+ boolean connected = this.isConnected();
+ if (!connected && !preparing) {
+ return; // Do nothing
+ }
+ packet.onPacketDispatch(getPlayer());
+ if (connected && (InnerUtil.canSendImmediate(this, packet) || (
2022-10-24 21:43:46 +02:00
+ io.papermc.paper.util.MCUtil.isMainThread() && packet.isReady() && this.queue.isEmpty() &&
2021-06-11 14:02:28 +02:00
+ (packet.getExtraPackets() == null || packet.getExtraPackets().isEmpty())
+ ))) {
2022-08-06 00:58:34 +02:00
this.sendPacket(packet, callbacks);
2021-06-13 18:25:59 +02:00
- } else {
2022-08-06 00:58:34 +02:00
- this.queue.add(new Connection.PacketHolder(packet, callbacks));
2021-06-11 14:02:28 +02:00
+ return;
}
+ // write the packets to the queue, then flush - antixray hooks there already
+ java.util.List<Packet> extraPackets = InnerUtil.buildExtraPackets(packet);
+ boolean hasExtraPackets = extraPackets != null && !extraPackets.isEmpty();
+ if (!hasExtraPackets) {
2022-08-06 00:58:34 +02:00
+ this.queue.add(new Connection.PacketHolder(packet, callbacks));
2021-06-11 14:02:28 +02:00
+ } else {
+ java.util.List<Connection.PacketHolder> packets = new java.util.ArrayList<>(1 + extraPackets.size());
+ packets.add(new Connection.PacketHolder(packet, null)); // delay the future listener until the end of the extra packets
2021-06-13 11:41:07 +02:00
2021-06-11 14:02:28 +02:00
+ for (int i = 0, len = extraPackets.size(); i < len;) {
+ Packet extra = extraPackets.get(i);
+ boolean end = ++i == len;
2022-08-06 00:58:34 +02:00
+ packets.add(new Connection.PacketHolder(extra, end ? callbacks : null)); // append listener to the end
2021-06-11 14:02:28 +02:00
+ }
+ this.queue.addAll(packets); // atomic
+ }
2021-06-13 11:41:07 +02:00
+ this.flushQueue();
2021-06-11 14:02:28 +02:00
+ // Paper end
}
2022-08-06 00:58:34 +02:00
private void sendPacket(Packet<?> packet, @Nullable PacketSendListener callbacks) {
2022-09-26 10:02:51 +02:00
@@ -256,6 +331,15 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
2021-11-24 02:53:24 +01:00
this.setProtocol(packetState);
2021-06-11 14:02:28 +02:00
}
2021-06-13 11:41:07 +02:00
+ // Paper start
+ net.minecraft.server.level.ServerPlayer player = getPlayer();
+ if (!isConnected()) {
+ packet.onPacketDispatchFinish(player, null);
+ return;
+ }
2021-06-11 14:02:28 +02:00
+
2021-06-13 11:41:07 +02:00
+ try {
+ // Paper end
ChannelFuture channelfuture = this.channel.writeAndFlush(packet);
2021-06-11 14:02:28 +02:00
2022-08-06 00:58:34 +02:00
if (callbacks != null) {
2022-09-26 10:02:51 +02:00
@@ -274,28 +358,64 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
2022-07-27 21:49:24 +02:00
});
2021-06-11 14:02:28 +02:00
}
2021-06-13 11:41:07 +02:00
+ // Paper start
+ if (packet.hasFinishListener()) {
+ channelfuture.addListener((ChannelFutureListener) channelFuture -> packet.onPacketDispatchFinish(player, channelFuture));
+ }
+ // Paper end
channelfuture.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+ // Paper start
+ } catch (Exception e) {
+ LOGGER.error("NetworkException: " + player, e);
2022-06-08 05:59:40 +02:00
+ disconnect(Component.translatable("disconnect.genericReason", "Internal Exception: " + e.getMessage()));
2021-06-13 11:41:07 +02:00
+ packet.onPacketDispatchFinish(player, null);
+ }
+ // Paper end
}
2021-06-11 14:02:28 +02:00
2021-06-13 11:41:07 +02:00
private ConnectionProtocol getCurrentProtocol() {
return (ConnectionProtocol) this.channel.attr(Connection.ATTRIBUTE_PROTOCOL).get();
2021-06-11 14:02:28 +02:00
}
- private void flushQueue() {
2022-09-26 10:02:51 +02:00
- try { // Paper - add pending task queue
2021-06-11 14:02:28 +02:00
- if (this.channel != null && this.channel.isOpen()) {
- Queue queue = this.queue;
-
+ // Paper start - rewrite this to be safer if ran off main thread
2021-06-13 11:41:07 +02:00
+ private boolean flushQueue() { // void -> boolean
2021-06-11 14:02:28 +02:00
+ if (!isConnected()) {
+ return true;
+ }
2022-10-24 21:43:46 +02:00
+ if (io.papermc.paper.util.MCUtil.isMainThread()) {
2021-06-11 14:02:28 +02:00
+ return processQueue();
+ } else if (isPending) {
+ // Should only happen during login/status stages
synchronized (this.queue) {
- Connection.PacketHolder networkmanager_queuedpacket;
-
- while ((networkmanager_queuedpacket = (Connection.PacketHolder) this.queue.poll()) != null) {
- this.sendPacket(networkmanager_queuedpacket.packet, networkmanager_queuedpacket.listener);
- }
+ return this.processQueue();
+ }
+ }
+ return false;
+ }
+ private boolean processQueue() {
2022-09-26 10:02:51 +02:00
+ try { // Paper - add pending task queue
2021-06-11 14:02:28 +02:00
+ if (this.queue.isEmpty()) return true;
+ // If we are on main, we are safe here in that nothing else should be processing queue off main anymore
+ // But if we are not on main due to login/status, the parent is synchronized on packetQueue
+ java.util.Iterator<PacketHolder> iterator = this.queue.iterator();
+ while (iterator.hasNext()) {
2021-06-13 11:41:07 +02:00
+ PacketHolder queued = iterator.next(); // poll -> peek
2021-06-11 14:02:28 +02:00
+
+ // Fix NPE (Spigot bug caused by handleDisconnection())
+ if (queued == null) {
+ return true;
+ }
2021-06-13 11:41:07 +02:00
+ Packet<?> packet = queued.packet;
2021-06-11 14:02:28 +02:00
+ if (!packet.isReady()) {
+ return false;
+ } else {
+ iterator.remove();
2021-06-13 11:41:07 +02:00
+ this.sendPacket(packet, queued.listener);
2021-06-11 14:02:28 +02:00
}
}
+ return true;
2022-09-26 10:02:51 +02:00
} finally { // Paper start - add pending task queue
Runnable r;
while ((r = this.pendingTasks.poll()) != null) {
@@ -303,6 +423,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
}
} // Paper end - add pending task queue
2021-06-11 14:02:28 +02:00
}
+ // Paper end
public void tick() {
2021-06-13 11:41:07 +02:00
this.flushQueue();
2022-09-26 10:02:51 +02:00
@@ -339,9 +460,22 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
2021-06-11 14:02:28 +02:00
return this.address;
}
+ // Paper start
+ public void clearPacketQueue() {
2021-06-13 11:41:07 +02:00
+ net.minecraft.server.level.ServerPlayer player = getPlayer();
2021-06-11 14:02:28 +02:00
+ queue.forEach(queuedPacket -> {
2021-06-13 11:41:07 +02:00
+ Packet<?> packet = queuedPacket.packet;
2021-06-11 14:02:28 +02:00
+ if (packet.hasFinishListener()) {
+ packet.onPacketDispatchFinish(player, null);
+ }
+ });
+ queue.clear();
2021-06-13 11:41:07 +02:00
+ }
+ // Paper end
2021-06-11 14:02:28 +02:00
public void disconnect(Component disconnectReason) {
// Spigot Start
this.preparing = false;
+ clearPacketQueue(); // Paper
// Spigot End
if (this.channel.isOpen()) {
this.channel.close(); // We can't wait as this may be called from an event loop.
2022-09-26 10:02:51 +02:00
@@ -459,7 +593,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
2021-06-11 14:02:28 +02:00
public void handleDisconnection() {
if (this.channel != null && !this.channel.isOpen()) {
if (this.disconnectionHandled) {
- Connection.LOGGER.warn("handleDisconnection() called twice");
2021-06-13 11:41:07 +02:00
+ //Connection.LOGGER.warn("handleDisconnection() called twice"); // Paper - Do not log useless message
2021-06-11 14:02:28 +02:00
} else {
this.disconnectionHandled = true;
if (this.getDisconnectedReason() != null) {
2022-09-26 10:02:51 +02:00
@@ -467,7 +601,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
2021-06-11 14:02:28 +02:00
} else if (this.getPacketListener() != null) {
2022-06-08 04:25:49 +02:00
this.getPacketListener().onDisconnect(Component.translatable("multiplayer.disconnect.generic"));
2021-06-11 14:02:28 +02:00
}
- this.queue.clear(); // Free up packet queue.
+ clearPacketQueue(); // Paper
// Paper start - Add PlayerConnectionCloseEvent
final PacketListener packetListener = this.getPacketListener();
2022-07-28 00:04:27 +02:00
if (packetListener instanceof net.minecraft.server.network.ServerGamePacketListenerImpl) {
2021-06-11 14:02:28 +02:00
diff --git a/src/main/java/net/minecraft/network/protocol/Packet.java b/src/main/java/net/minecraft/network/protocol/Packet.java
2021-06-13 11:41:07 +02:00
index 74bfe0d3942259c45702b099efdc4e101a4e3022..e8fcd56906d26f6dc87959e32c4c7c78cfea9658 100644
2021-06-11 14:02:28 +02:00
--- a/src/main/java/net/minecraft/network/protocol/Packet.java
+++ b/src/main/java/net/minecraft/network/protocol/Packet.java
2021-06-13 11:41:07 +02:00
@@ -9,6 +9,19 @@ public interface Packet<T extends PacketListener> {
2021-06-11 14:02:28 +02:00
void handle(T listener);
// Paper start
+ /**
+ * @param player Null if not at PLAY stage yet
+ */
2021-06-13 11:41:07 +02:00
+ default void onPacketDispatch(@javax.annotation.Nullable net.minecraft.server.level.ServerPlayer player) {}
2021-06-11 14:02:28 +02:00
+
+ /**
+ * @param player Null if not at PLAY stage yet
+ * @param future Can be null if packet was cancelled
+ */
2021-06-13 11:41:07 +02:00
+ default void onPacketDispatchFinish(@javax.annotation.Nullable net.minecraft.server.level.ServerPlayer player, @javax.annotation.Nullable io.netty.channel.ChannelFuture future) {}
2021-06-11 14:02:28 +02:00
+ default boolean hasFinishListener() { return false; }
+ default boolean isReady() { return true; }
+ default java.util.List<Packet> getExtraPackets() { return null; }
2021-06-13 11:41:07 +02:00
default boolean packetTooLarge(net.minecraft.network.Connection manager) {
2021-06-11 14:02:28 +02:00
return false;
}
2021-06-16 13:07:43 +02:00
diff --git a/src/main/java/net/minecraft/server/network/ServerConnectionListener.java b/src/main/java/net/minecraft/server/network/ServerConnectionListener.java
2022-07-27 21:49:24 +02:00
index 0e04532b8f1d48116eb46dcef7952bfcc0d11394..a24ef433d0c9d06b86fd612978cfd6d877036791 100644
2021-06-16 13:07:43 +02:00
--- a/src/main/java/net/minecraft/server/network/ServerConnectionListener.java
+++ b/src/main/java/net/minecraft/server/network/ServerConnectionListener.java
2022-07-27 21:49:24 +02:00
@@ -65,10 +65,12 @@ public class ServerConnectionListener {
2021-06-16 13:07:43 +02:00
final List<Connection> connections = Collections.synchronizedList(Lists.newArrayList());
// Paper start - prevent blocking on adding a new network manager while the server is ticking
private final java.util.Queue<Connection> pending = new java.util.concurrent.ConcurrentLinkedQueue<>();
+ private static final boolean disableFlushConsolidation = Boolean.getBoolean("Paper.disableFlushConsolidate"); // Paper
private final void addPending() {
Connection manager = null;
while ((manager = pending.poll()) != null) {
connections.add(manager);
+ manager.isPending = false;
}
}
// Paper end
2022-07-27 21:49:24 +02:00
@@ -103,6 +105,7 @@ public class ServerConnectionListener {
2021-06-16 13:07:43 +02:00
;
}
+ if (!disableFlushConsolidation) channel.pipeline().addFirst(new io.netty.handler.flush.FlushConsolidationHandler()); // Paper
channel.pipeline().addLast("timeout", new ReadTimeoutHandler(30)).addLast("legacy_query", new LegacyQueryHandler(ServerConnectionListener.this)).addLast("splitter", new Varint21FrameDecoder()).addLast("decoder", new PacketDecoder(PacketFlow.SERVERBOUND)).addLast("prepender", new Varint21LengthFieldPrepender()).addLast("encoder", new PacketEncoder(PacketFlow.CLIENTBOUND));
int j = ServerConnectionListener.this.server.getRateLimitPacketsPerSecond();
Object object = j > 0 ? new RateKickingConnection(j) : new Connection(PacketFlow.SERVERBOUND);