13
0
geforkt von Mirrors/Velocity

Improve Velocity networking pipeline when under stress

Dieser Commit ist enthalten in:
Andrew Steinborn 2020-07-16 11:44:55 -04:00
Ursprung f93e227491
Commit 72ce5c86ba
13 geänderte Dateien mit 168 neuen und 92 gelöschten Zeilen

Datei anzeigen

@ -16,6 +16,8 @@ import com.velocitypowered.natives.encryption.VelocityCipher;
import com.velocitypowered.natives.encryption.VelocityCipherFactory;
import com.velocitypowered.natives.util.Natives;
import com.velocitypowered.proxy.VelocityServer;
import com.velocitypowered.proxy.connection.client.HandshakeSessionHandler;
import com.velocitypowered.proxy.connection.client.LoginSessionHandler;
import com.velocitypowered.proxy.connection.client.StatusSessionHandler;
import com.velocitypowered.proxy.protocol.MinecraftPacket;
import com.velocitypowered.proxy.protocol.StateRegistry;
@ -25,7 +27,7 @@ import com.velocitypowered.proxy.protocol.netty.MinecraftCompressDecoder;
import com.velocitypowered.proxy.protocol.netty.MinecraftCompressEncoder;
import com.velocitypowered.proxy.protocol.netty.MinecraftDecoder;
import com.velocitypowered.proxy.protocol.netty.MinecraftEncoder;
import com.velocitypowered.proxy.util.except.QuietException;
import com.velocitypowered.proxy.util.except.QuietDecoderException;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
@ -39,7 +41,6 @@ import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.security.GeneralSecurityException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.crypto.SecretKey;
import javax.crypto.spec.SecretKeySpec;
import org.apache.logging.log4j.LogManager;
@ -53,8 +54,6 @@ import org.checkerframework.checker.nullness.qual.Nullable;
public class MinecraftConnection extends ChannelInboundHandlerAdapter {
private static final Logger logger = LogManager.getLogger(MinecraftConnection.class);
private static final AtomicLong lastQuietError = new AtomicLong();
private static final AtomicLong quietErrorsSent = new AtomicLong();
private final Channel channel;
private SocketAddress remoteAddress;
@ -158,12 +157,15 @@ public class MinecraftConnection extends ChannelInboundHandlerAdapter {
if (cause instanceof ReadTimeoutException) {
logger.error("{}: read timed out", association);
} else {
if (cause instanceof QuietException && willThrottleQuietErrorLogging()) {
// Silence the disconnect
this.knownDisconnect = true;
return;
boolean isQuietDecoderException = cause instanceof QuietDecoderException;
boolean willLogQuietDecoderException = isQuietDecoderException
&& !(sessionHandler instanceof LoginSessionHandler)
&& !(sessionHandler instanceof HandshakeSessionHandler);
if (willLogQuietDecoderException) {
logger.error("{}: exception encountered in {}", association, sessionHandler, cause);
} else if (isQuietDecoderException) {
knownDisconnect = true;
}
logger.error("{}: exception encountered in {}", association, sessionHandler, cause);
}
}
@ -441,16 +443,4 @@ public class MinecraftConnection extends ChannelInboundHandlerAdapter {
this.connectionType = connectionType;
}
private static boolean willThrottleQuietErrorLogging() {
long lastErrorAt = lastQuietError.get();
long now = System.currentTimeMillis();
if (lastErrorAt + 2000 >= now) {
return quietErrorsSent.incrementAndGet() >= 5;
} else {
lastQuietError.set(now);
quietErrorsSent.set(0);
return false;
}
}
}

Datei anzeigen

@ -17,7 +17,7 @@ import com.velocitypowered.proxy.protocol.packet.LoginPluginMessage;
import com.velocitypowered.proxy.protocol.packet.LoginPluginResponse;
import com.velocitypowered.proxy.protocol.packet.ServerLoginSuccess;
import com.velocitypowered.proxy.protocol.packet.SetCompression;
import com.velocitypowered.proxy.util.except.QuietException;
import com.velocitypowered.proxy.util.except.QuietRuntimeException;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.security.InvalidKeyException;
@ -113,7 +113,7 @@ public class LoginSessionHandler implements MinecraftSessionHandler {
public void disconnected() {
if (server.getConfiguration().getPlayerInfoForwardingMode() == PlayerInfoForwarding.LEGACY) {
resultFuture.completeExceptionally(
new QuietException("The connection to the remote server was unexpectedly closed.\n"
new QuietRuntimeException("The connection to the remote server was unexpectedly closed.\n"
+ "This is usually because the remote server does not have BungeeCord IP forwarding "
+ "correctly enabled.\nSee "
+ "https://docs.velocitypowered.com/en/latest/users/player-info-forwarding.html "
@ -121,7 +121,7 @@ public class LoginSessionHandler implements MinecraftSessionHandler {
);
} else {
resultFuture.completeExceptionally(
new QuietException("The connection to the remote server was unexpectedly closed.")
new QuietRuntimeException("The connection to the remote server was unexpectedly closed.")
);
}
}

Datei anzeigen

@ -9,13 +9,13 @@ import static com.velocitypowered.proxy.network.Connections.READ_TIMEOUT;
import com.velocitypowered.proxy.VelocityServer;
import com.velocitypowered.proxy.protocol.ProtocolUtils;
import com.velocitypowered.proxy.protocol.netty.AutoReadHolderHandler;
import com.velocitypowered.proxy.protocol.netty.MinecraftDecoder;
import com.velocitypowered.proxy.protocol.netty.MinecraftEncoder;
import com.velocitypowered.proxy.protocol.netty.MinecraftVarintFrameDecoder;
import com.velocitypowered.proxy.protocol.netty.MinecraftVarintLengthEncoder;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.handler.flow.FlowControlHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import java.util.concurrent.TimeUnit;
@ -36,7 +36,7 @@ public class BackendChannelInitializer extends ChannelInitializer<Channel> {
TimeUnit.MILLISECONDS))
.addLast(FRAME_DECODER, new MinecraftVarintFrameDecoder())
.addLast(FRAME_ENCODER, MinecraftVarintLengthEncoder.INSTANCE)
.addLast(FLOW_HANDLER, new FlowControlHandler())
.addLast(FLOW_HANDLER, new AutoReadHolderHandler())
.addLast(MINECRAFT_DECODER,
new MinecraftDecoder(ProtocolUtils.Direction.CLIENTBOUND))
.addLast(MINECRAFT_ENCODER,

Datei anzeigen

@ -7,7 +7,7 @@ import com.google.common.base.Preconditions;
import com.velocitypowered.api.network.ProtocolVersion;
import com.velocitypowered.api.util.GameProfile;
import com.velocitypowered.proxy.protocol.netty.MinecraftDecoder;
import com.velocitypowered.proxy.util.except.QuietException;
import com.velocitypowered.proxy.util.except.QuietDecoderException;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream;
@ -16,8 +16,6 @@ import io.netty.handler.codec.CorruptedFrameException;
import io.netty.handler.codec.DecoderException;
import io.netty.handler.codec.EncoderException;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@ -27,12 +25,12 @@ import java.util.UUID;
import net.kyori.adventure.text.serializer.gson.GsonComponentSerializer;
import net.kyori.nbt.CompoundTag;
import net.kyori.nbt.TagIO;
import net.kyori.nbt.TagType;
public enum ProtocolUtils {
;
private static final int DEFAULT_MAX_STRING_SIZE = 65536; // 64KiB
private static final QuietException BAD_VARINT_CACHED = new QuietException("Bad varint decoded");
private static final QuietDecoderException BAD_VARINT_CACHED =
new QuietDecoderException("Bad varint decoded");
/**
* Reads a Minecraft-style VarInt from the specified {@code buf}.

Datei anzeigen

@ -0,0 +1,61 @@
package com.velocitypowered.proxy.protocol.netty;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.util.ReferenceCountUtil;
import java.util.ArrayDeque;
import java.util.Queue;
/**
* A variation on {@link io.netty.handler.flow.FlowControlHandler} that explicitly holds messages
* on {@code channelRead} and only releases them on an explicit read operation.
*/
public class AutoReadHolderHandler extends ChannelDuplexHandler implements ChannelInboundHandler {
private final Queue<Object> queuedMessages;
public AutoReadHolderHandler() {
this.queuedMessages = new ArrayDeque<>();
}
@Override
public void read(ChannelHandlerContext ctx) throws Exception {
drainQueuedMessages(ctx);
ctx.read();
}
private void drainQueuedMessages(ChannelHandlerContext ctx) {
if (!this.queuedMessages.isEmpty()) {
Object queued;
while ((queued = this.queuedMessages.poll()) != null) {
ctx.fireChannelRead(queued);
}
ctx.fireChannelReadComplete();
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (ctx.channel().config().isAutoRead()) {
ctx.fireChannelRead(msg);
} else {
this.queuedMessages.add(msg);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
if (this.queuedMessages.isEmpty()) {
ctx.fireChannelReadComplete();
}
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
for (Object message : this.queuedMessages) {
ReferenceCountUtil.release(message);
}
this.queuedMessages.clear();
}
}

Datei anzeigen

@ -22,6 +22,11 @@ public class LegacyPingDecoder extends ByteToMessageDecoder {
return;
}
if (!ctx.channel().isActive()) {
in.skipBytes(in.readableBytes());
return;
}
int originalReaderIndex = in.readerIndex();
short first = in.readUnsignedByte();
if (first == 0xfe) {
@ -39,7 +44,7 @@ public class LegacyPingDecoder extends ByteToMessageDecoder {
// We got a 1.6.x ping. Let's chomp off the stuff we don't need.
out.add(readExtended16Data(in));
} else if (first == 0x02) {
} else if (first == 0x02 && in.isReadable()) {
in.skipBytes(in.readableBytes());
out.add(new LegacyHandshake());
} else {

Datei anzeigen

@ -5,7 +5,7 @@ import com.velocitypowered.api.network.ProtocolVersion;
import com.velocitypowered.proxy.protocol.MinecraftPacket;
import com.velocitypowered.proxy.protocol.ProtocolUtils;
import com.velocitypowered.proxy.protocol.StateRegistry;
import com.velocitypowered.proxy.util.except.QuietException;
import com.velocitypowered.proxy.util.except.QuietDecoderException;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
@ -14,8 +14,8 @@ import io.netty.handler.codec.CorruptedFrameException;
public class MinecraftDecoder extends ChannelInboundHandlerAdapter {
public static final boolean DEBUG = Boolean.getBoolean("velocity.packet-decode-logging");
private static final QuietException DECODE_FAILED =
new QuietException("A packet did not decode successfully (invalid data). If you are a "
private static final QuietDecoderException DECODE_FAILED =
new QuietDecoderException("A packet did not decode successfully (invalid data). If you are a "
+ "developer, launch Velocity with -Dvelocity.packet-decode-logging=true to see more.");
private final ProtocolUtils.Direction direction;
@ -38,11 +38,7 @@ public class MinecraftDecoder extends ChannelInboundHandlerAdapter {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
try {
tryDecode(ctx, buf);
} finally {
buf.release();
}
tryDecode(ctx, buf);
} else {
ctx.fireChannelRead(msg);
}
@ -50,6 +46,7 @@ public class MinecraftDecoder extends ChannelInboundHandlerAdapter {
private void tryDecode(ChannelHandlerContext ctx, ByteBuf buf) throws Exception {
if (!ctx.channel().isActive()) {
buf.release();
return;
}
@ -58,18 +55,22 @@ public class MinecraftDecoder extends ChannelInboundHandlerAdapter {
MinecraftPacket packet = this.registry.createPacket(packetId);
if (packet == null) {
buf.readerIndex(originalReaderIndex);
ctx.fireChannelRead(buf.retain());
ctx.fireChannelRead(buf);
} else {
try {
packet.decode(buf, direction, registry.version);
} catch (Exception e) {
throw handleDecodeFailure(e, packet, packetId);
}
try {
packet.decode(buf, direction, registry.version);
} catch (Exception e) {
throw handleDecodeFailure(e, packet, packetId);
}
if (buf.isReadable()) {
throw handleNotReadEnough(packet, packetId);
if (buf.isReadable()) {
throw handleNotReadEnough(packet, packetId);
}
ctx.fireChannelRead(packet);
} finally {
buf.release();
}
ctx.fireChannelRead(packet);
}
}

Datei anzeigen

@ -1,6 +1,6 @@
package com.velocitypowered.proxy.protocol.netty;
import com.velocitypowered.proxy.util.except.QuietException;
import com.velocitypowered.proxy.util.except.QuietDecoderException;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
@ -9,8 +9,10 @@ import java.util.List;
public class MinecraftVarintFrameDecoder extends ByteToMessageDecoder {
private static final QuietException BAD_LENGTH_CACHED = new QuietException("Bad packet length");
private static final QuietException VARINT_BIG_CACHED = new QuietException("VarInt too big");
private static final QuietDecoderException BAD_LENGTH_CACHED =
new QuietDecoderException("Bad packet length");
private static final QuietDecoderException VARINT_BIG_CACHED =
new QuietDecoderException("VarInt too big");
private final VarintByteDecoder reader = new VarintByteDecoder();
@Override
@ -20,37 +22,32 @@ public class MinecraftVarintFrameDecoder extends ByteToMessageDecoder {
return;
}
while (in.isReadable()) {
reader.reset();
reader.reset();
int varintEnd = in.forEachByte(reader);
if (varintEnd == -1) {
// We tried to go beyond the end of the buffer. This is probably a good sign that the
// buffer was too short to hold a proper varint.
return;
}
int varintEnd = in.forEachByte(reader);
if (varintEnd == -1) {
// We tried to go beyond the end of the buffer. This is probably a good sign that the
// buffer was too short to hold a proper varint.
return;
}
if (reader.result == DecodeResult.SUCCESS) {
if (reader.readVarint < 0) {
throw BAD_LENGTH_CACHED;
} else if (reader.readVarint == 0) {
// skip over the empty packet and ignore it
in.readerIndex(varintEnd + 1);
if (reader.result == DecodeResult.SUCCESS) {
if (reader.readVarint < 0) {
throw BAD_LENGTH_CACHED;
} else if (reader.readVarint == 0) {
// skip over the empty packet and ignore it
in.readerIndex(varintEnd + 1);
} else {
int minimumRead = reader.bytesRead + reader.readVarint;
if (in.isReadable(minimumRead)) {
out.add(in.retainedSlice(varintEnd + 1, reader.readVarint));
in.skipBytes(minimumRead);
} else {
int minimumRead = reader.bytesRead + reader.readVarint;
if (in.isReadable(minimumRead)) {
out.add(in.retainedSlice(varintEnd + 1, reader.readVarint));
in.skipBytes(minimumRead);
} else {
return;
}
return;
}
} else if (reader.result == DecodeResult.TOO_BIG) {
throw VARINT_BIG_CACHED;
} else if (reader.result == DecodeResult.TOO_SHORT) {
// No-op: we couldn't get a useful result.
break;
}
} else if (reader.result == DecodeResult.TOO_BIG) {
throw VARINT_BIG_CACHED;
}
}

Datei anzeigen

@ -6,11 +6,10 @@ import com.velocitypowered.proxy.protocol.ProtocolUtils;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import java.util.List;
import io.netty.handler.codec.MessageToByteEncoder;
@ChannelHandler.Sharable
public class MinecraftVarintLengthEncoder extends MessageToMessageEncoder<ByteBuf> {
public class MinecraftVarintLengthEncoder extends MessageToByteEncoder<ByteBuf> {
public static final MinecraftVarintLengthEncoder INSTANCE = new MinecraftVarintLengthEncoder();
private static final boolean IS_JAVA_CIPHER = Natives.cipher.get() == JavaVelocityCipher.FACTORY;
@ -19,11 +18,17 @@ public class MinecraftVarintLengthEncoder extends MessageToMessageEncoder<ByteBu
}
@Override
protected void encode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> list)
protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception {
ProtocolUtils.writeVarInt(out, msg.readableBytes());
out.writeBytes(msg);
}
@Override
protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, ByteBuf msg, boolean preferDirect)
throws Exception {
ByteBuf lengthBuf = IS_JAVA_CIPHER ? ctx.alloc().heapBuffer(5) : ctx.alloc().directBuffer(5);
ProtocolUtils.writeVarInt(lengthBuf, buf.readableBytes());
list.add(lengthBuf);
list.add(buf.retain());
int anticipatedRequiredCapacity = 5 + msg.readableBytes();
return IS_JAVA_CIPHER
? ctx.alloc().heapBuffer(anticipatedRequiredCapacity)
: ctx.alloc().directBuffer(anticipatedRequiredCapacity);
}
}

Datei anzeigen

@ -5,13 +5,13 @@ import com.velocitypowered.api.network.ProtocolVersion;
import com.velocitypowered.proxy.connection.MinecraftSessionHandler;
import com.velocitypowered.proxy.protocol.MinecraftPacket;
import com.velocitypowered.proxy.protocol.ProtocolUtils;
import com.velocitypowered.proxy.util.except.QuietException;
import com.velocitypowered.proxy.util.except.QuietDecoderException;
import io.netty.buffer.ByteBuf;
import org.checkerframework.checker.nullness.qual.Nullable;
public class ServerLogin implements MinecraftPacket {
private static final QuietException EMPTY_USERNAME = new QuietException("Empty username!");
private static final QuietDecoderException EMPTY_USERNAME = new QuietDecoderException("Empty username!");
private @Nullable String username;

Datei anzeigen

@ -2,14 +2,14 @@ package com.velocitypowered.proxy.protocol.util;
import com.google.common.base.Strings;
import com.velocitypowered.proxy.protocol.netty.MinecraftDecoder;
import com.velocitypowered.proxy.util.except.QuietException;
import com.velocitypowered.proxy.util.except.QuietDecoderException;
import io.netty.handler.codec.CorruptedFrameException;
/**
* Extends {@link com.google.common.base.Preconditions} for Netty's {@link CorruptedFrameException}.
*/
public class NettyPreconditions {
private static final QuietException BAD = new QuietException(
private static final QuietDecoderException BAD = new QuietDecoderException(
"Invalid packet received. Launch Velocity with -Dvelocity.packet-decode-logging=true "
+ "to see more.");

Datei anzeigen

@ -0,0 +1,19 @@
package com.velocitypowered.proxy.util.except;
import io.netty.handler.codec.DecoderException;
/**
* A special-purpose exception thrown when we want to indicate an error decoding but do not want
* to see a large stack trace in logs.
*/
public class QuietDecoderException extends DecoderException {
public QuietDecoderException(String message) {
super(message);
}
@Override
public synchronized Throwable fillInStackTrace() {
return this;
}
}

Datei anzeigen

@ -1,12 +1,12 @@
package com.velocitypowered.proxy.util.except;
/**
* A special-purpose exception thrown when we want to indicate an error condition but do not want
* A special-purpose exception thrown when we want to indicate an error but do not want
* to see a large stack trace in logs.
*/
public class QuietException extends RuntimeException {
public class QuietRuntimeException extends RuntimeException {
public QuietException(String message) {
public QuietRuntimeException(String message) {
super(message);
}