From d38c7467d9e7f38701306fb6387125719960172c Mon Sep 17 00:00:00 2001 From: Andrew Steinborn Date: Fri, 3 Aug 2018 00:48:19 -0400 Subject: [PATCH] Various Netty changes. - Potentially fixed a reference count leak with plugin messages. - Cleaned up plugin message handling. - Optimized the pipeline for better throughput by eliminating copying in the varint encoder and reduced object churn elsewhere. --- .../backend/BackendPlaySessionHandler.java | 21 +++++------ .../client/ClientPlaySessionHandler.java | 37 ++++--------------- .../connection/client/ConnectedPlayer.java | 2 - .../netty/MinecraftCompressDecoder.java | 2 +- .../protocol/netty/MinecraftDecoder.java | 2 +- .../netty/MinecraftVarintFrameDecoder.java | 3 +- .../netty/MinecraftVarintLengthEncoder.java | 14 ++++--- .../protocol/util/PluginMessageUtil.java | 12 ++++++ 8 files changed, 42 insertions(+), 51 deletions(-) diff --git a/proxy/src/main/java/com/velocitypowered/proxy/connection/backend/BackendPlaySessionHandler.java b/proxy/src/main/java/com/velocitypowered/proxy/connection/backend/BackendPlaySessionHandler.java index 8348fa5b7..a887da94d 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/connection/backend/BackendPlaySessionHandler.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/connection/backend/BackendPlaySessionHandler.java @@ -46,23 +46,22 @@ public class BackendPlaySessionHandler implements MinecraftSessionHandler { } else if (packet instanceof PluginMessage) { PluginMessage pm = (PluginMessage) packet; try { - PluginMessage newPacket = pm; - if (!canForwardPluginMessage(newPacket)) { + if (!canForwardPluginMessage(pm)) { return; } - if (newPacket.getChannel().equals("MC|Brand")) { - newPacket = PluginMessageUtil.rewriteMCBrand(pm); + if (PluginMessageUtil.isMCBrand(pm)) { + connection.getProxyPlayer().getConnection().write(PluginMessageUtil.rewriteMCBrand(pm)); + return; } - if (newPacket == pm) { - // we'll decrement this thrice: once when writing to the server, once just below this block, - // and once in the MinecraftConnection (since this is a slice) - pm.getData().retain(); - } - connection.getProxyPlayer().getConnection().write(newPacket); + // we'll decrement this twice: once when writing to the server, once just below this block, + // and once in the MinecraftConnection (since this is a slice) + pm.getData().retain(); + + connection.getProxyPlayer().getConnection().write(pm); } finally { - ReferenceCountUtil.release(pm.getData()); + pm.getData().release(); } } else { // Just forward the packet on. We don't have anything to handle at this time. diff --git a/proxy/src/main/java/com/velocitypowered/proxy/connection/client/ClientPlaySessionHandler.java b/proxy/src/main/java/com/velocitypowered/proxy/connection/client/ClientPlaySessionHandler.java index 662ab4403..a3f5904ae 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/connection/client/ClientPlaySessionHandler.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/connection/client/ClientPlaySessionHandler.java @@ -9,12 +9,14 @@ import com.velocitypowered.proxy.data.scoreboard.Scoreboard; import com.velocitypowered.proxy.data.scoreboard.Team; import com.velocitypowered.proxy.protocol.MinecraftPacket; import com.velocitypowered.proxy.protocol.ProtocolConstants; +import com.velocitypowered.proxy.protocol.ProtocolUtils; import com.velocitypowered.proxy.protocol.packet.*; import com.velocitypowered.proxy.connection.MinecraftSessionHandler; import com.velocitypowered.proxy.protocol.remap.EntityIdRemapper; import com.velocitypowered.proxy.protocol.util.PluginMessageUtil; import com.velocitypowered.proxy.util.ThrowableUtils; import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.netty.channel.EventLoop; import io.netty.util.ReferenceCountUtil; import net.kyori.text.TextComponent; @@ -38,7 +40,6 @@ public class ClientPlaySessionHandler implements MinecraftSessionHandler { private boolean spawned = false; private final List serverBossBars = new ArrayList<>(); private final Set clientPluginMsgChannels = new HashSet<>(); - private PluginMessage brandMessage; private int currentDimension; private Scoreboard serverScoreboard = new Scoreboard(); private EntityIdRemapper idRemapper; @@ -110,10 +111,6 @@ public class ClientPlaySessionHandler implements MinecraftSessionHandler { pingTask.cancel(false); pingTask = null; } - - if (brandMessage != null) { - brandMessage.getData().release(); - } } @Override @@ -179,12 +176,6 @@ public class ClientPlaySessionHandler implements MinecraftSessionHandler { PluginMessageUtil.constructChannelsPacket(channel, clientPluginMsgChannels)); } - // Tell the server the client's brand - if (brandMessage != null) { - brandMessage.getData().retain(); - player.getConnectedServer().getMinecraftConnection().delayedWrite(brandMessage); - } - // Flush everything player.getConnection().flush(); player.getConnectedServer().getMinecraftConnection().flush(); @@ -201,7 +192,6 @@ public class ClientPlaySessionHandler implements MinecraftSessionHandler { public void handleClientPluginMessage(PluginMessage packet) { logger.info("Got client plugin message packet {}", packet); - PluginMessage original = packet; try { if (packet.getChannel().equals("REGISTER") || packet.getChannel().equals("minecraft:register")) { List actuallyRegistered = new ArrayList<>(); @@ -230,27 +220,16 @@ public class ClientPlaySessionHandler implements MinecraftSessionHandler { clientPluginMsgChannels.removeAll(channels); } - if (packet.getChannel().equals("MC|Brand") || packet.getChannel().equals("minecraft:brand")) { - if (this.brandMessage != null) { - // Rewrite this packet to indicate that Velocity is running. Hurrah! - packet = PluginMessageUtil.rewriteMCBrand(packet); - this.brandMessage = packet; - } else { - // Already have the brand packet and don't need this one. - return; - } - } - - // No other special handling? - if (packet == original) { - // we'll decrement this thrice: once when writing to the server, once just below this block, - // and once in the MinecraftConnection (since this is a slice) - packet.getData().retain(); + if (PluginMessageUtil.isMCBrand(packet)) { + player.getConnectedServer().getMinecraftConnection().write(PluginMessageUtil.rewriteMCBrand(packet)); + return; } + // We're going to forward on the original packet. + packet.getData().retain(); player.getConnectedServer().getMinecraftConnection().write(packet); } finally { - ReferenceCountUtil.release(original.getData()); + ReferenceCountUtil.release(packet.getData()); } } diff --git a/proxy/src/main/java/com/velocitypowered/proxy/connection/client/ConnectedPlayer.java b/proxy/src/main/java/com/velocitypowered/proxy/connection/client/ConnectedPlayer.java index 4824860bf..658b0a21f 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/connection/client/ConnectedPlayer.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/connection/client/ConnectedPlayer.java @@ -78,10 +78,8 @@ public class ConnectedPlayer implements MinecraftConnectionAssociation { String error = ThrowableUtils.briefDescription(throwable); String userMessage; if (connectedServer != null && connectedServer.getServerInfo().equals(info)) { - logger.error("{}: exception occurred in connection to {}", this, info.getName(), throwable); userMessage = "Exception in server " + info.getName(); } else { - logger.error("{}: unable to connect to server {}", this, info.getName(), throwable); userMessage = "Exception connecting to server " + info.getName(); } handleConnectionException(info, TextComponent.builder() diff --git a/proxy/src/main/java/com/velocitypowered/proxy/protocol/netty/MinecraftCompressDecoder.java b/proxy/src/main/java/com/velocitypowered/proxy/protocol/netty/MinecraftCompressDecoder.java index 2f6ace563..61893ff44 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/protocol/netty/MinecraftCompressDecoder.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/protocol/netty/MinecraftCompressDecoder.java @@ -25,7 +25,7 @@ public class MinecraftCompressDecoder extends MessageToMessageDecoder { int uncompressedSize = ProtocolUtils.readVarInt(msg); if (uncompressedSize == 0) { // Strip the now-useless uncompressed size, this message is already uncompressed. - out.add(msg.slice().retain()); + out.add(msg.retainedSlice()); msg.skipBytes(msg.readableBytes()); return; } diff --git a/proxy/src/main/java/com/velocitypowered/proxy/protocol/netty/MinecraftDecoder.java b/proxy/src/main/java/com/velocitypowered/proxy/protocol/netty/MinecraftDecoder.java index 74cf1faba..d8768987a 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/protocol/netty/MinecraftDecoder.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/protocol/netty/MinecraftDecoder.java @@ -26,7 +26,7 @@ public class MinecraftDecoder extends MessageToMessageDecoder { return; } - ByteBuf slice = msg.slice().retain(); + ByteBuf slice = msg.retainedSlice(); int packetId = ProtocolUtils.readVarInt(msg); MinecraftPacket packet = this.protocolVersion.createPacket(packetId); diff --git a/proxy/src/main/java/com/velocitypowered/proxy/protocol/netty/MinecraftVarintFrameDecoder.java b/proxy/src/main/java/com/velocitypowered/proxy/protocol/netty/MinecraftVarintFrameDecoder.java index f3c00cde8..0afa0637d 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/protocol/netty/MinecraftVarintFrameDecoder.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/protocol/netty/MinecraftVarintFrameDecoder.java @@ -21,7 +21,6 @@ public class MinecraftVarintFrameDecoder extends ByteToMessageDecoder { return; } - out.add(in.slice(in.readerIndex(), packetLength).retain()); - in.skipBytes(packetLength); + out.add(in.readRetainedSlice(packetLength)); } } diff --git a/proxy/src/main/java/com/velocitypowered/proxy/protocol/netty/MinecraftVarintLengthEncoder.java b/proxy/src/main/java/com/velocitypowered/proxy/protocol/netty/MinecraftVarintLengthEncoder.java index 5686b13f8..7a0d61c92 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/protocol/netty/MinecraftVarintLengthEncoder.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/protocol/netty/MinecraftVarintLengthEncoder.java @@ -5,17 +5,21 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; +import io.netty.handler.codec.MessageToMessageEncoder; + +import java.util.List; @ChannelHandler.Sharable -public class MinecraftVarintLengthEncoder extends MessageToByteEncoder { +public class MinecraftVarintLengthEncoder extends MessageToMessageEncoder { public static final MinecraftVarintLengthEncoder INSTANCE = new MinecraftVarintLengthEncoder(); private MinecraftVarintLengthEncoder() { } @Override - protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception { - out.ensureWritable(msg.readableBytes() + 5); - ProtocolUtils.writeVarInt(out, msg.readableBytes()); - out.writeBytes(msg); + protected void encode(ChannelHandlerContext ctx, ByteBuf buf, List list) throws Exception { + ByteBuf lengthBuf = ctx.alloc().buffer(5); // the maximum size of a varint + ProtocolUtils.writeVarInt(lengthBuf, buf.readableBytes()); + list.add(lengthBuf); + list.add(buf.retain()); } } diff --git a/proxy/src/main/java/com/velocitypowered/proxy/protocol/util/PluginMessageUtil.java b/proxy/src/main/java/com/velocitypowered/proxy/protocol/util/PluginMessageUtil.java index 92eb04e87..a71172654 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/protocol/util/PluginMessageUtil.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/protocol/util/PluginMessageUtil.java @@ -15,7 +15,13 @@ import java.util.List; public enum PluginMessageUtil { ; + public static boolean isMCBrand(PluginMessage message) { + Preconditions.checkNotNull(message, "message"); + return message.getChannel().equals("MC|Brand") || message.getChannel().equals("minecraft:brand"); + } + public static List getChannels(PluginMessage message) { + Preconditions.checkNotNull(message, "message"); Preconditions.checkArgument(message.getChannel().equals("REGISTER") || message.getChannel().equals("UNREGISTER") || message.getChannel().equals("minecraft:register") || @@ -26,6 +32,9 @@ public enum PluginMessageUtil { } public static PluginMessage constructChannelsPacket(String channel, Collection channels) { + Preconditions.checkNotNull(channel, "channel"); + Preconditions.checkNotNull(channel, "channels"); + PluginMessage message = new PluginMessage(); message.setChannel(channel); @@ -41,6 +50,9 @@ public enum PluginMessageUtil { } public static PluginMessage rewriteMCBrand(PluginMessage message) { + Preconditions.checkNotNull(message, "message"); + Preconditions.checkArgument(isMCBrand(message), "message is not a MC Brand plugin message"); + ByteBuf rewrittenBuf = Unpooled.buffer(); String currentBrand = ProtocolUtils.readString(message.getData()); ProtocolUtils.writeString(rewrittenBuf, currentBrand + " (Velocity)");