From dde6560a5a177d45a4b4c034c3698176bbf3f0d7 Mon Sep 17 00:00:00 2001 From: Andrew Steinborn Date: Mon, 30 Jul 2018 15:05:24 -0400 Subject: [PATCH] More efficient message handling --- .../backend/BackendPlaySessionHandler.java | 20 ++++--- .../client/ClientPlaySessionHandler.java | 60 ++++++++++--------- .../proxy/protocol/ProtocolUtils.java | 13 ++-- .../proxy/protocol/packets/PluginMessage.java | 12 ++-- .../protocol/util/PluginMessageUtil.java | 28 +++++---- 5 files changed, 74 insertions(+), 59 deletions(-) diff --git a/src/main/java/com/velocitypowered/proxy/connection/backend/BackendPlaySessionHandler.java b/src/main/java/com/velocitypowered/proxy/connection/backend/BackendPlaySessionHandler.java index 81817fce3..271f6fa12 100644 --- a/src/main/java/com/velocitypowered/proxy/connection/backend/BackendPlaySessionHandler.java +++ b/src/main/java/com/velocitypowered/proxy/connection/backend/BackendPlaySessionHandler.java @@ -6,6 +6,7 @@ import com.velocitypowered.proxy.protocol.packets.*; import com.velocitypowered.proxy.connection.MinecraftSessionHandler; import com.velocitypowered.proxy.protocol.util.PluginMessageUtil; import io.netty.buffer.ByteBuf; +import io.netty.util.ReferenceCountUtil; public class BackendPlaySessionHandler implements MinecraftSessionHandler { private final ServerConnection connection; @@ -43,15 +44,20 @@ public class BackendPlaySessionHandler implements MinecraftSessionHandler { connection.getProxyPlayer().getConnection().write(packet); } else if (packet instanceof PluginMessage) { PluginMessage pm = (PluginMessage) packet; - if (!canForwardMessage(pm)) { - return; - } + try { + PluginMessage newPacket = pm; + if (!canForwardMessage(newPacket)) { + return; + } - if (pm.getChannel().equals("MC|Brand")) { - pm = PluginMessageUtil.rewriteMCBrand(pm); - } + if (newPacket.getChannel().equals("MC|Brand")) { + newPacket = PluginMessageUtil.rewriteMCBrand(pm); + } - connection.getProxyPlayer().getConnection().write(pm); + connection.getProxyPlayer().getConnection().write(newPacket); + } finally { + ReferenceCountUtil.release(pm.getData()); + } } else { // Just forward the packet on. We don't have anything to handle at this time. connection.getProxyPlayer().getConnection().write(packet); diff --git a/src/main/java/com/velocitypowered/proxy/connection/client/ClientPlaySessionHandler.java b/src/main/java/com/velocitypowered/proxy/connection/client/ClientPlaySessionHandler.java index ee925bcb4..4ce8953a0 100644 --- a/src/main/java/com/velocitypowered/proxy/connection/client/ClientPlaySessionHandler.java +++ b/src/main/java/com/velocitypowered/proxy/connection/client/ClientPlaySessionHandler.java @@ -10,6 +10,7 @@ import com.velocitypowered.proxy.protocol.util.PluginMessageUtil; import com.velocitypowered.proxy.util.ThrowableUtils; import io.netty.buffer.ByteBuf; import io.netty.channel.EventLoop; +import io.netty.util.ReferenceCountUtil; import net.kyori.text.TextComponent; import net.kyori.text.format.TextColor; import org.apache.logging.log4j.LogManager; @@ -173,41 +174,46 @@ public class ClientPlaySessionHandler implements MinecraftSessionHandler { public void handleClientPluginMessage(PluginMessage packet) { logger.info("Got client plugin message packet {}", packet); - if (packet.getChannel().equals("REGISTER")) { - List actuallyRegistered = new ArrayList<>(); - List channels = PluginMessageUtil.getChannels(packet); - for (String channel : channels) { - if (clientPluginMsgChannels.size() >= MAX_PLUGIN_CHANNELS && - !clientPluginMsgChannels.contains(channel)) { - throw new IllegalStateException("Too many plugin message channels registered"); + PluginMessage original = packet; + try { + if (packet.getChannel().equals("REGISTER")) { + List actuallyRegistered = new ArrayList<>(); + List channels = PluginMessageUtil.getChannels(packet); + for (String channel : channels) { + if (clientPluginMsgChannels.size() >= MAX_PLUGIN_CHANNELS && + !clientPluginMsgChannels.contains(channel)) { + throw new IllegalStateException("Too many plugin message channels registered"); + } + if (clientPluginMsgChannels.add(channel)) { + actuallyRegistered.add(channel); + } } - if (clientPluginMsgChannels.add(channel)) { - actuallyRegistered.add(channel); + + if (actuallyRegistered.size() > 0) { + logger.info("Rewritten register packet: {}", actuallyRegistered); + PluginMessage newRegisterPacket = PluginMessageUtil.constructChannelsPacket("REGISTER", actuallyRegistered); + player.getConnectedServer().getChannel().write(newRegisterPacket); } + + return; } - if (actuallyRegistered.size() > 0) { - logger.info("Rewritten register packet: {}", actuallyRegistered); - PluginMessage newRegisterPacket = PluginMessageUtil.constructChannelsPacket("REGISTER", actuallyRegistered); - player.getConnectedServer().getChannel().write(newRegisterPacket); + if (packet.getChannel().equals("UNREGISTER")) { + List channels = PluginMessageUtil.getChannels(packet); + clientPluginMsgChannels.removeAll(channels); } - return; - } + if (packet.getChannel().equals("MC|Brand")) { + // Rewrite this packet to indicate that Velocity is running. Hurrah! + packet = PluginMessageUtil.rewriteMCBrand(packet); + this.brandMessage = packet; + } - if (packet.getChannel().equals("UNREGISTER")) { - List channels = PluginMessageUtil.getChannels(packet); - clientPluginMsgChannels.removeAll(channels); + // No other special handling? + player.getConnectedServer().getChannel().write(packet); + } finally { + ReferenceCountUtil.release(original.getData()); } - - if (packet.getChannel().equals("MC|Brand")) { - // Rewrite this packet to indicate that Velocity is running. Hurrah! - packet = PluginMessageUtil.rewriteMCBrand(packet); - this.brandMessage = packet; - } - - // No other special handling? - player.getConnectedServer().getChannel().write(packet); } public Set getClientPluginMsgChannels() { diff --git a/src/main/java/com/velocitypowered/proxy/protocol/ProtocolUtils.java b/src/main/java/com/velocitypowered/proxy/protocol/ProtocolUtils.java index 7d21cd649..33a3a0ac5 100644 --- a/src/main/java/com/velocitypowered/proxy/protocol/ProtocolUtils.java +++ b/src/main/java/com/velocitypowered/proxy/protocol/ProtocolUtils.java @@ -2,6 +2,7 @@ package com.velocitypowered.proxy.protocol; import com.google.common.base.Preconditions; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; import java.nio.charset.StandardCharsets; import java.util.UUID; @@ -40,15 +41,15 @@ public enum ProtocolUtils { ; public static String readString(ByteBuf buf, int cap) { int length = readVarInt(buf); Preconditions.checkArgument(length <= cap, "Bad string size (got %s, maximum is %s)", length, cap); - byte[] str = new byte[length]; - buf.readBytes(str); - return new String(str, StandardCharsets.UTF_8); + String str = buf.toString(buf.readerIndex(), length, StandardCharsets.UTF_8); + buf.skipBytes(length); + return str; } public static void writeString(ByteBuf buf, String str) { - byte[] asUtf8 = str.getBytes(StandardCharsets.UTF_8); - writeVarInt(buf, asUtf8.length); - buf.writeBytes(asUtf8); + int size = ByteBufUtil.utf8Bytes(str); + writeVarInt(buf, size); + ByteBufUtil.writeUtf8(buf, str); } public static byte[] readByteArray(ByteBuf buf) { diff --git a/src/main/java/com/velocitypowered/proxy/protocol/packets/PluginMessage.java b/src/main/java/com/velocitypowered/proxy/protocol/packets/PluginMessage.java index b00ea6afd..6c62a1a2e 100644 --- a/src/main/java/com/velocitypowered/proxy/protocol/packets/PluginMessage.java +++ b/src/main/java/com/velocitypowered/proxy/protocol/packets/PluginMessage.java @@ -4,12 +4,13 @@ import com.velocitypowered.proxy.protocol.MinecraftPacket; import com.velocitypowered.proxy.protocol.ProtocolConstants; import com.velocitypowered.proxy.protocol.ProtocolUtils; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; import java.util.Arrays; public class PluginMessage implements MinecraftPacket { private String channel; - private byte[] data; + private ByteBuf data; public String getChannel() { return channel; @@ -19,11 +20,11 @@ public class PluginMessage implements MinecraftPacket { this.channel = channel; } - public byte[] getData() { + public ByteBuf getData() { return data; } - public void setData(byte[] data) { + public void setData(ByteBuf data) { this.data = data; } @@ -31,15 +32,14 @@ public class PluginMessage implements MinecraftPacket { public String toString() { return "PluginMessage{" + "channel='" + channel + '\'' + - ", data=" + Arrays.toString(data) + + ", data=" + ByteBufUtil.hexDump(data) + '}'; } @Override public void decode(ByteBuf buf, ProtocolConstants.Direction direction, int protocolVersion) { this.channel = ProtocolUtils.readString(buf, 20); - this.data = new byte[buf.readableBytes()]; - buf.readBytes(this.data); + this.data = buf.readRetainedSlice(buf.readableBytes()); } @Override diff --git a/src/main/java/com/velocitypowered/proxy/protocol/util/PluginMessageUtil.java b/src/main/java/com/velocitypowered/proxy/protocol/util/PluginMessageUtil.java index ea9b6f3b7..e1305631c 100644 --- a/src/main/java/com/velocitypowered/proxy/protocol/util/PluginMessageUtil.java +++ b/src/main/java/com/velocitypowered/proxy/protocol/util/PluginMessageUtil.java @@ -6,6 +6,7 @@ import com.google.common.collect.ImmutableList; import com.velocitypowered.proxy.protocol.ProtocolUtils; import com.velocitypowered.proxy.protocol.packets.PluginMessage; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; import java.nio.charset.StandardCharsets; @@ -18,32 +19,33 @@ public enum PluginMessageUtil { public static List getChannels(PluginMessage message) { Preconditions.checkArgument(message.getChannel().equals("REGISTER") || message.getChannel().equals("UNREGISTER"), "Unknown channel type " + message.getChannel()); - return ImmutableList.copyOf(new String(message.getData()).split("\0")); + String channels = message.getData().toString(StandardCharsets.UTF_8); + return ImmutableList.copyOf(channels.split("\0")); } public static PluginMessage constructChannelsPacket(String channel, Collection channels) { PluginMessage message = new PluginMessage(); message.setChannel(channel); - message.setData(Joiner.on("\0").join(channels).getBytes(StandardCharsets.UTF_8)); + + ByteBuf data = Unpooled.buffer(); + for (String s : channels) { + ByteBufUtil.writeUtf8(data, s); + data.writeByte(0); + } + data.writerIndex(data.writerIndex() - 1); + + message.setData(data); return message; } public static PluginMessage rewriteMCBrand(PluginMessage message) { - ByteBuf currentBrandBuf = Unpooled.wrappedBuffer(message.getData()); ByteBuf rewrittenBuf = Unpooled.buffer(); - byte[] rewrittenBrand; - try { - String currentBrand = ProtocolUtils.readString(currentBrandBuf); - ProtocolUtils.writeString(rewrittenBuf, currentBrand + " (Velocity)"); - rewrittenBrand = new byte[rewrittenBuf.readableBytes()]; - rewrittenBuf.readBytes(rewrittenBrand); - } finally { - rewrittenBuf.release(); - } + String currentBrand = ProtocolUtils.readString(message.getData()); + ProtocolUtils.writeString(rewrittenBuf, currentBrand + " (Velocity)"); PluginMessage newMsg = new PluginMessage(); newMsg.setChannel("MC|Brand"); - newMsg.setData(rewrittenBrand); + newMsg.setData(rewrittenBuf); return newMsg; } }