13
0
geforkt von Mirrors/Velocity

Various Netty changes.

- Potentially fixed a reference count leak with plugin messages.
- Cleaned up plugin message handling.
- Optimized the pipeline for better throughput by eliminating copying
  in the varint encoder and reduced object churn elsewhere.
Dieser Commit ist enthalten in:
Andrew Steinborn 2018-08-03 00:48:19 -04:00
Ursprung 68d315d1d3
Commit d38c7467d9
8 geänderte Dateien mit 42 neuen und 51 gelöschten Zeilen

Datei anzeigen

@ -46,23 +46,22 @@ public class BackendPlaySessionHandler implements MinecraftSessionHandler {
} else if (packet instanceof PluginMessage) { } else if (packet instanceof PluginMessage) {
PluginMessage pm = (PluginMessage) packet; PluginMessage pm = (PluginMessage) packet;
try { try {
PluginMessage newPacket = pm; if (!canForwardPluginMessage(pm)) {
if (!canForwardPluginMessage(newPacket)) {
return; return;
} }
if (newPacket.getChannel().equals("MC|Brand")) { if (PluginMessageUtil.isMCBrand(pm)) {
newPacket = PluginMessageUtil.rewriteMCBrand(pm); connection.getProxyPlayer().getConnection().write(PluginMessageUtil.rewriteMCBrand(pm));
return;
} }
if (newPacket == pm) { // we'll decrement this twice: once when writing to the server, once just below this block,
// we'll decrement this thrice: once when writing to the server, once just below this block,
// and once in the MinecraftConnection (since this is a slice) // and once in the MinecraftConnection (since this is a slice)
pm.getData().retain(); pm.getData().retain();
}
connection.getProxyPlayer().getConnection().write(newPacket); connection.getProxyPlayer().getConnection().write(pm);
} finally { } finally {
ReferenceCountUtil.release(pm.getData()); pm.getData().release();
} }
} else { } else {
// Just forward the packet on. We don't have anything to handle at this time. // Just forward the packet on. We don't have anything to handle at this time.

Datei anzeigen

@ -9,12 +9,14 @@ import com.velocitypowered.proxy.data.scoreboard.Scoreboard;
import com.velocitypowered.proxy.data.scoreboard.Team; import com.velocitypowered.proxy.data.scoreboard.Team;
import com.velocitypowered.proxy.protocol.MinecraftPacket; import com.velocitypowered.proxy.protocol.MinecraftPacket;
import com.velocitypowered.proxy.protocol.ProtocolConstants; import com.velocitypowered.proxy.protocol.ProtocolConstants;
import com.velocitypowered.proxy.protocol.ProtocolUtils;
import com.velocitypowered.proxy.protocol.packet.*; import com.velocitypowered.proxy.protocol.packet.*;
import com.velocitypowered.proxy.connection.MinecraftSessionHandler; import com.velocitypowered.proxy.connection.MinecraftSessionHandler;
import com.velocitypowered.proxy.protocol.remap.EntityIdRemapper; import com.velocitypowered.proxy.protocol.remap.EntityIdRemapper;
import com.velocitypowered.proxy.protocol.util.PluginMessageUtil; import com.velocitypowered.proxy.protocol.util.PluginMessageUtil;
import com.velocitypowered.proxy.util.ThrowableUtils; import com.velocitypowered.proxy.util.ThrowableUtils;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCountUtil;
import net.kyori.text.TextComponent; import net.kyori.text.TextComponent;
@ -38,7 +40,6 @@ public class ClientPlaySessionHandler implements MinecraftSessionHandler {
private boolean spawned = false; private boolean spawned = false;
private final List<UUID> serverBossBars = new ArrayList<>(); private final List<UUID> serverBossBars = new ArrayList<>();
private final Set<String> clientPluginMsgChannels = new HashSet<>(); private final Set<String> clientPluginMsgChannels = new HashSet<>();
private PluginMessage brandMessage;
private int currentDimension; private int currentDimension;
private Scoreboard serverScoreboard = new Scoreboard(); private Scoreboard serverScoreboard = new Scoreboard();
private EntityIdRemapper idRemapper; private EntityIdRemapper idRemapper;
@ -110,10 +111,6 @@ public class ClientPlaySessionHandler implements MinecraftSessionHandler {
pingTask.cancel(false); pingTask.cancel(false);
pingTask = null; pingTask = null;
} }
if (brandMessage != null) {
brandMessage.getData().release();
}
} }
@Override @Override
@ -179,12 +176,6 @@ public class ClientPlaySessionHandler implements MinecraftSessionHandler {
PluginMessageUtil.constructChannelsPacket(channel, clientPluginMsgChannels)); PluginMessageUtil.constructChannelsPacket(channel, clientPluginMsgChannels));
} }
// Tell the server the client's brand
if (brandMessage != null) {
brandMessage.getData().retain();
player.getConnectedServer().getMinecraftConnection().delayedWrite(brandMessage);
}
// Flush everything // Flush everything
player.getConnection().flush(); player.getConnection().flush();
player.getConnectedServer().getMinecraftConnection().flush(); player.getConnectedServer().getMinecraftConnection().flush();
@ -201,7 +192,6 @@ public class ClientPlaySessionHandler implements MinecraftSessionHandler {
public void handleClientPluginMessage(PluginMessage packet) { public void handleClientPluginMessage(PluginMessage packet) {
logger.info("Got client plugin message packet {}", packet); logger.info("Got client plugin message packet {}", packet);
PluginMessage original = packet;
try { try {
if (packet.getChannel().equals("REGISTER") || packet.getChannel().equals("minecraft:register")) { if (packet.getChannel().equals("REGISTER") || packet.getChannel().equals("minecraft:register")) {
List<String> actuallyRegistered = new ArrayList<>(); List<String> actuallyRegistered = new ArrayList<>();
@ -230,27 +220,16 @@ public class ClientPlaySessionHandler implements MinecraftSessionHandler {
clientPluginMsgChannels.removeAll(channels); clientPluginMsgChannels.removeAll(channels);
} }
if (packet.getChannel().equals("MC|Brand") || packet.getChannel().equals("minecraft:brand")) { if (PluginMessageUtil.isMCBrand(packet)) {
if (this.brandMessage != null) { player.getConnectedServer().getMinecraftConnection().write(PluginMessageUtil.rewriteMCBrand(packet));
// Rewrite this packet to indicate that Velocity is running. Hurrah!
packet = PluginMessageUtil.rewriteMCBrand(packet);
this.brandMessage = packet;
} else {
// Already have the brand packet and don't need this one.
return; return;
} }
}
// No other special handling? // We're going to forward on the original packet.
if (packet == original) {
// we'll decrement this thrice: once when writing to the server, once just below this block,
// and once in the MinecraftConnection (since this is a slice)
packet.getData().retain(); packet.getData().retain();
}
player.getConnectedServer().getMinecraftConnection().write(packet); player.getConnectedServer().getMinecraftConnection().write(packet);
} finally { } finally {
ReferenceCountUtil.release(original.getData()); ReferenceCountUtil.release(packet.getData());
} }
} }

Datei anzeigen

@ -78,10 +78,8 @@ public class ConnectedPlayer implements MinecraftConnectionAssociation {
String error = ThrowableUtils.briefDescription(throwable); String error = ThrowableUtils.briefDescription(throwable);
String userMessage; String userMessage;
if (connectedServer != null && connectedServer.getServerInfo().equals(info)) { if (connectedServer != null && connectedServer.getServerInfo().equals(info)) {
logger.error("{}: exception occurred in connection to {}", this, info.getName(), throwable);
userMessage = "Exception in server " + info.getName(); userMessage = "Exception in server " + info.getName();
} else { } else {
logger.error("{}: unable to connect to server {}", this, info.getName(), throwable);
userMessage = "Exception connecting to server " + info.getName(); userMessage = "Exception connecting to server " + info.getName();
} }
handleConnectionException(info, TextComponent.builder() handleConnectionException(info, TextComponent.builder()

Datei anzeigen

@ -25,7 +25,7 @@ public class MinecraftCompressDecoder extends MessageToMessageDecoder<ByteBuf> {
int uncompressedSize = ProtocolUtils.readVarInt(msg); int uncompressedSize = ProtocolUtils.readVarInt(msg);
if (uncompressedSize == 0) { if (uncompressedSize == 0) {
// Strip the now-useless uncompressed size, this message is already uncompressed. // Strip the now-useless uncompressed size, this message is already uncompressed.
out.add(msg.slice().retain()); out.add(msg.retainedSlice());
msg.skipBytes(msg.readableBytes()); msg.skipBytes(msg.readableBytes());
return; return;
} }

Datei anzeigen

@ -26,7 +26,7 @@ public class MinecraftDecoder extends MessageToMessageDecoder<ByteBuf> {
return; return;
} }
ByteBuf slice = msg.slice().retain(); ByteBuf slice = msg.retainedSlice();
int packetId = ProtocolUtils.readVarInt(msg); int packetId = ProtocolUtils.readVarInt(msg);
MinecraftPacket packet = this.protocolVersion.createPacket(packetId); MinecraftPacket packet = this.protocolVersion.createPacket(packetId);

Datei anzeigen

@ -21,7 +21,6 @@ public class MinecraftVarintFrameDecoder extends ByteToMessageDecoder {
return; return;
} }
out.add(in.slice(in.readerIndex(), packetLength).retain()); out.add(in.readRetainedSlice(packetLength));
in.skipBytes(packetLength);
} }
} }

Datei anzeigen

@ -5,17 +5,21 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder; import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.handler.codec.MessageToMessageEncoder;
import java.util.List;
@ChannelHandler.Sharable @ChannelHandler.Sharable
public class MinecraftVarintLengthEncoder extends MessageToByteEncoder<ByteBuf> { public class MinecraftVarintLengthEncoder extends MessageToMessageEncoder<ByteBuf> {
public static final MinecraftVarintLengthEncoder INSTANCE = new MinecraftVarintLengthEncoder(); public static final MinecraftVarintLengthEncoder INSTANCE = new MinecraftVarintLengthEncoder();
private MinecraftVarintLengthEncoder() { } private MinecraftVarintLengthEncoder() { }
@Override @Override
protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception { protected void encode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> list) throws Exception {
out.ensureWritable(msg.readableBytes() + 5); ByteBuf lengthBuf = ctx.alloc().buffer(5); // the maximum size of a varint
ProtocolUtils.writeVarInt(out, msg.readableBytes()); ProtocolUtils.writeVarInt(lengthBuf, buf.readableBytes());
out.writeBytes(msg); list.add(lengthBuf);
list.add(buf.retain());
} }
} }

Datei anzeigen

@ -15,7 +15,13 @@ import java.util.List;
public enum PluginMessageUtil { public enum PluginMessageUtil {
; ;
public static boolean isMCBrand(PluginMessage message) {
Preconditions.checkNotNull(message, "message");
return message.getChannel().equals("MC|Brand") || message.getChannel().equals("minecraft:brand");
}
public static List<String> getChannels(PluginMessage message) { public static List<String> getChannels(PluginMessage message) {
Preconditions.checkNotNull(message, "message");
Preconditions.checkArgument(message.getChannel().equals("REGISTER") || Preconditions.checkArgument(message.getChannel().equals("REGISTER") ||
message.getChannel().equals("UNREGISTER") || message.getChannel().equals("UNREGISTER") ||
message.getChannel().equals("minecraft:register") || message.getChannel().equals("minecraft:register") ||
@ -26,6 +32,9 @@ public enum PluginMessageUtil {
} }
public static PluginMessage constructChannelsPacket(String channel, Collection<String> channels) { public static PluginMessage constructChannelsPacket(String channel, Collection<String> channels) {
Preconditions.checkNotNull(channel, "channel");
Preconditions.checkNotNull(channel, "channels");
PluginMessage message = new PluginMessage(); PluginMessage message = new PluginMessage();
message.setChannel(channel); message.setChannel(channel);
@ -41,6 +50,9 @@ public enum PluginMessageUtil {
} }
public static PluginMessage rewriteMCBrand(PluginMessage message) { public static PluginMessage rewriteMCBrand(PluginMessage message) {
Preconditions.checkNotNull(message, "message");
Preconditions.checkArgument(isMCBrand(message), "message is not a MC Brand plugin message");
ByteBuf rewrittenBuf = Unpooled.buffer(); ByteBuf rewrittenBuf = Unpooled.buffer();
String currentBrand = ProtocolUtils.readString(message.getData()); String currentBrand = ProtocolUtils.readString(message.getData());
ProtocolUtils.writeString(rewrittenBuf, currentBrand + " (Velocity)"); ProtocolUtils.writeString(rewrittenBuf, currentBrand + " (Velocity)");