3
0
Mirror von https://github.com/PaperMC/Velocity.git synchronisiert 2024-11-16 21:10:30 +01:00

Move from custom HTTP client to Async HTTP Client

The most significant advantage is that Velocity gets a well-tested
HTTP client implementation which also includes a connection pool,
allowing us to avoid the overhead of TCP and TLS handshakes upon each
login.

Unfortunately, Async HTTP Client does not work with the kqueue transport.
Since almost nobody runs a production Velocity server on macOS, we have
decided to remove kqueue support. The benefits that Async HTTP Client
provides outweigh the disadvantages of not having a macOS native transport.
macOS is adequately supported by the normal NIO transport.
Dieser Commit ist enthalten in:
Andrew Steinborn 2019-07-26 01:29:43 -04:00
Ursprung 7fde18c9f8
Commit d1736bf94c
9 geänderte Dateien mit 97 neuen und 309 gelöschten Zeilen

Datei anzeigen

@ -48,7 +48,6 @@ dependencies {
compile "io.netty:netty-handler:${nettyVersion}"
compile "io.netty:netty-transport-native-epoll:${nettyVersion}"
compile "io.netty:netty-transport-native-epoll:${nettyVersion}:linux-x86_64"
compile "io.netty:netty-transport-native-kqueue:${nettyVersion}:osx-x86_64"
compile "io.netty:netty-resolver-dns:${nettyVersion}"
compile "org.apache.logging.log4j:log4j-api:${log4jVersion}"
@ -65,6 +64,8 @@ dependencies {
compile 'net.kyori:event-method-asm:3.0.0'
compile 'com.mojang:brigadier:1.0.15'
compile 'org.asynchttpclient:async-http-client:2.10.1'
testCompile "org.junit.jupiter:junit-jupiter-api:${junitVersion}"
testCompile "org.junit.jupiter:junit-jupiter-engine:${junitVersion}"

Datei anzeigen

@ -3,12 +3,9 @@ package com.velocitypowered.proxy;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import com.velocitypowered.proxy.config.VelocityConfiguration;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.HttpHeaderNames;
import java.io.BufferedWriter;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Writer;
@ -21,11 +18,14 @@ import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.zip.GZIPOutputStream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.asynchttpclient.ListenableFuture;
import org.asynchttpclient.Response;
/**
* bStats collects some data for plugin authors.
@ -185,40 +185,44 @@ public class Metrics {
}
// Compress the data to save bandwidth
ByteBuf reqBody = createResponseBody(data);
server.getHttpClient().post(new URL(URL), reqBody, request -> {
request.headers().add(HttpHeaderNames.CONTENT_ENCODING, "gzip");
request.headers().add(HttpHeaderNames.ACCEPT, "application/json");
request.headers().add(HttpHeaderNames.CONTENT_TYPE, "application/json");
})
.whenCompleteAsync((resp, exc) -> {
if (logFailedRequests) {
if (exc != null) {
logger.error("Unable to send metrics to bStats", exc);
} else if (resp.getCode() != 429) {
logger.error("Got HTTP status code {} when sending metrics to bStats",
resp.getCode());
}
ListenableFuture<Response> future = server.getAsyncHttpClient()
.preparePost(URL)
.addHeader(HttpHeaderNames.CONTENT_ENCODING, "gzip")
.addHeader(HttpHeaderNames.ACCEPT, "application/json")
.addHeader(HttpHeaderNames.CONTENT_TYPE, "application/json")
.setBody(createResponseBody(data))
.execute();
future.addListener(() -> {
if (logFailedRequests) {
try {
Response r = future.get();
if (r.getStatusCode() != 429) {
logger.error("Got HTTP status code {} when sending metrics to bStats",
r.getStatusCode());
}
});
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
logger.error("Unable to send metrics to bStats", e);
}
}
}, null);
}
private static ByteBuf createResponseBody(JsonObject object) throws IOException {
ByteBuf buf = Unpooled.buffer();
private static byte[] createResponseBody(JsonObject object) throws IOException {
ByteArrayOutputStream os = new ByteArrayOutputStream();
try (Writer writer =
new BufferedWriter(
new OutputStreamWriter(
new GZIPOutputStream(new ByteBufOutputStream(buf)), StandardCharsets.UTF_8
new GZIPOutputStream(os), StandardCharsets.UTF_8
)
)
) {
VelocityServer.GSON.toJson(object, writer);
} catch (IOException e) {
buf.release();
throw e;
}
return buf;
return os.toByteArray();
}
/**

Datei anzeigen

@ -30,7 +30,6 @@ import com.velocitypowered.proxy.config.VelocityConfiguration;
import com.velocitypowered.proxy.connection.client.ConnectedPlayer;
import com.velocitypowered.proxy.console.VelocityConsole;
import com.velocitypowered.proxy.network.ConnectionManager;
import com.velocitypowered.proxy.network.http.NettyHttpClient;
import com.velocitypowered.proxy.plugin.VelocityEventManager;
import com.velocitypowered.proxy.plugin.VelocityPluginManager;
import com.velocitypowered.proxy.protocol.packet.Chat;
@ -74,6 +73,7 @@ import net.kyori.text.TranslatableComponent;
import net.kyori.text.serializer.gson.GsonComponentSerializer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.asynchttpclient.AsyncHttpClient;
import org.checkerframework.checker.nullness.qual.EnsuresNonNull;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.checkerframework.checker.nullness.qual.NonNull;
@ -90,7 +90,6 @@ public class VelocityServer implements ProxyServer {
private final ConnectionManager cm;
private final ProxyOptions options;
private @MonotonicNonNull VelocityConfiguration configuration;
private @MonotonicNonNull NettyHttpClient httpClient;
private @MonotonicNonNull KeyPair serverKeyPair;
private final ServerMap servers;
private final VelocityCommandManager commandManager = new VelocityCommandManager();
@ -202,7 +201,6 @@ public class VelocityServer implements ProxyServer {
}
ipAttemptLimiter = Ratelimiters.createWithMilliseconds(configuration.getLoginRatelimit());
httpClient = new NettyHttpClient(this);
loadPlugins();
// Go ahead and fire the proxy initialization event. We block since plugins should have a chance
@ -430,8 +428,8 @@ public class VelocityServer implements ProxyServer {
thread.start();
}
public NettyHttpClient getHttpClient() {
return ensureInitialized(httpClient);
public AsyncHttpClient getAsyncHttpClient() {
return ensureInitialized(cm).getHttpClient();
}
public Ratelimiter getIpAttemptLimiter() {

Datei anzeigen

@ -7,6 +7,8 @@ import static com.velocitypowered.proxy.connection.VelocityConstants.EMPTY_BYTE_
import static com.velocitypowered.proxy.connection.VelocityConstants.VELOCITY_IP_FORWARDING_CHANNEL;
import static com.velocitypowered.proxy.util.EncryptionUtils.decryptRsa;
import static com.velocitypowered.proxy.util.EncryptionUtils.generateServerId;
import static org.asynchttpclient.Dsl.asyncHttpClient;
import static org.asynchttpclient.Dsl.config;
import com.google.common.base.Preconditions;
import com.google.common.net.UrlEscapers;
@ -42,14 +44,19 @@ import java.security.KeyPair;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import net.kyori.text.Component;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.asynchttpclient.Dsl;
import org.asynchttpclient.ListenableFuture;
import org.asynchttpclient.Response;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
public class LoginSessionHandler implements MinecraftSessionHandler {
private static final Logger logger = LogManager.getLogger(LoginSessionHandler.class);
private static final String MOJANG_HASJOINED_URL =
"https://sessionserver.mojang.com/session/minecraft/hasJoined?username=%s&serverId=%s&ip=%s";
@ -61,6 +68,7 @@ public class LoginSessionHandler implements MinecraftSessionHandler {
private byte[] verify = EMPTY_BYTE_ARRAY;
private int playerInfoId;
private @MonotonicNonNull ConnectedPlayer connectedPlayer;
private final long loginProcessBegan = System.currentTimeMillis();
LoginSessionHandler(VelocityServer server, MinecraftConnection mcConnection,
InitialInboundConnection inbound) {
@ -123,46 +131,50 @@ public class LoginSessionHandler implements MinecraftSessionHandler {
String url = String.format(MOJANG_HASJOINED_URL,
urlFormParameterEscaper().escape(login.getUsername()), serverId,
urlFormParameterEscaper().escape(playerIp));
server.getHttpClient()
.get(new URL(url), mcConnection.eventLoop())
.thenAcceptAsync(profileResponse -> {
if (mcConnection.isClosed()) {
// The player disconnected after we authenticated them.
return;
}
// Go ahead and enable encryption. Once the client sends EncryptionResponse, encryption
// is enabled.
try {
mcConnection.enableEncryption(decryptedSharedSecret);
} catch (GeneralSecurityException e) {
throw new RuntimeException(e);
}
ListenableFuture<Response> hasJoinedResponse = server.getAsyncHttpClient().prepareGet(url)
.execute();
hasJoinedResponse.addListener(() -> {
if (mcConnection.isClosed()) {
// The player disconnected after we authenticated them.
return;
}
if (profileResponse.getCode() == 200) {
// All went well, initialize the session.
initializePlayer(GSON.fromJson(profileResponse.getBody(), GameProfile.class), true);
} else if (profileResponse.getCode() == 204) {
// Apparently an offline-mode user logged onto this online-mode proxy.
inbound.disconnect(VelocityMessages.ONLINE_MODE_ONLY);
} else {
// Something else went wrong
logger.error(
"Got an unexpected error code {} whilst contacting Mojang to log in {} ({})",
profileResponse.getCode(), login.getUsername(), playerIp);
mcConnection.close();
}
}, mcConnection.eventLoop())
.exceptionally(exception -> {
logger.error("Unable to enable encryption", exception);
// Go ahead and enable encryption. Once the client sends EncryptionResponse, encryption
// is enabled.
try {
mcConnection.enableEncryption(decryptedSharedSecret);
} catch (GeneralSecurityException e) {
throw new RuntimeException(e);
}
try {
Response profileResponse = hasJoinedResponse.get();
if (profileResponse.getStatusCode() == 200) {
// All went well, initialize the session.
initializePlayer(GSON.fromJson(profileResponse.getResponseBody(), GameProfile.class),
true);
} else if (profileResponse.getStatusCode() == 204) {
// Apparently an offline-mode user logged onto this online-mode proxy.
inbound.disconnect(VelocityMessages.ONLINE_MODE_ONLY);
} else {
// Something else went wrong
logger.error(
"Got an unexpected error code {} whilst contacting Mojang to log in {} ({})",
profileResponse.getStatusCode(), login.getUsername(), playerIp);
mcConnection.close();
return null;
});
}
} catch (ExecutionException e) {
logger.error("Unable to authenticate with Mojang", e);
mcConnection.close();
} catch (InterruptedException e) {
// not much we can do usefully
Thread.currentThread().interrupt();
}
}, mcConnection.eventLoop());
} catch (GeneralSecurityException e) {
logger.error("Unable to enable encryption", e);
mcConnection.close();
} catch (MalformedURLException e) {
throw new AssertionError(e);
}
return true;
}
@ -179,6 +191,7 @@ public class LoginSessionHandler implements MinecraftSessionHandler {
// The player was disconnected
return;
}
PreLoginComponentResult result = event.getResult();
Optional<Component> disconnectReason = result.getReason();
if (disconnectReason.isPresent()) {
@ -277,7 +290,7 @@ public class LoginSessionHandler implements MinecraftSessionHandler {
player.disconnect(VelocityMessages.ALREADY_CONNECTED);
return;
}
mcConnection.setSessionHandler(new InitialConnectSessionHandler(player));
server.getEventManager().fire(new PostLoginEvent(player))
.thenRun(() -> player.createConnectionRequest(toTry.get()).fireAndForget());

Datei anzeigen

@ -1,5 +1,8 @@
package com.velocitypowered.proxy.network;
import static org.asynchttpclient.Dsl.asyncHttpClient;
import static org.asynchttpclient.Dsl.config;
import com.google.common.base.Preconditions;
import com.velocitypowered.natives.util.Natives;
import com.velocitypowered.proxy.VelocityServer;
@ -18,6 +21,8 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.Dsl;
import org.checkerframework.checker.nullness.qual.Nullable;
public final class ConnectionManager {
@ -36,6 +41,7 @@ public final class ConnectionManager {
public final ServerChannelInitializerHolder serverChannelInitializer;
private final DnsAddressResolverGroup resolverGroup;
private final AsyncHttpClient httpClient;
/**
* Initalizes the {@code ConnectionManager}.
@ -55,6 +61,10 @@ public final class ConnectionManager {
.negativeTtl(15)
.ndots(1)
);
this.httpClient = asyncHttpClient(config()
.setEventLoopGroup(this.workerGroup)
.setUserAgent(server.getVersion().getName() + "/" + server.getVersion().getVersion())
.build());
}
public void logChannelInformation() {
@ -168,4 +178,8 @@ public final class ConnectionManager {
public ServerChannelInitializerHolder getServerChannelInitializer() {
return this.serverChannelInitializer;
}
public AsyncHttpClient getHttpClient() {
return httpClient;
}
}

Datei anzeigen

@ -7,11 +7,6 @@ import io.netty.channel.epoll.EpollDatagramChannel;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.kqueue.KQueue;
import io.netty.channel.kqueue.KQueueDatagramChannel;
import io.netty.channel.kqueue.KQueueEventLoopGroup;
import io.netty.channel.kqueue.KQueueServerSocketChannel;
import io.netty.channel.kqueue.KQueueSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.ServerSocketChannel;
@ -27,10 +22,7 @@ enum TransportType {
(name, type) -> new NioEventLoopGroup(0, createThreadFactory(name, type))),
EPOLL("epoll", EpollServerSocketChannel.class, EpollSocketChannel.class,
EpollDatagramChannel.class,
(name, type) -> new EpollEventLoopGroup(0, createThreadFactory(name, type))),
KQUEUE("Kqueue", KQueueServerSocketChannel.class, KQueueSocketChannel.class,
KQueueDatagramChannel.class,
(name, type) -> new KQueueEventLoopGroup(0, createThreadFactory(name, type)));
(name, type) -> new EpollEventLoopGroup(0, createThreadFactory(name, type)));
final String name;
final Class<? extends ServerSocketChannel> serverSocketChannelClass;
@ -66,8 +58,6 @@ enum TransportType {
public static TransportType bestType() {
if (Epoll.isAvailable()) {
return EPOLL;
} else if (KQueue.isAvailable()) {
return KQUEUE;
} else {
return NIO;
}

Datei anzeigen

@ -1,153 +0,0 @@
package com.velocitypowered.proxy.network.http;
import com.velocitypowered.proxy.VelocityServer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoop;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslHandler;
import java.net.InetSocketAddress;
import java.net.URL;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import javax.net.ssl.SSLEngine;
import org.checkerframework.checker.nullness.qual.Nullable;
public class NettyHttpClient {
private final String userAgent;
private final VelocityServer server;
/**
* Initializes the HTTP client.
*
* @param server the Velocity server
*/
public NettyHttpClient(VelocityServer server) {
this.userAgent = server.getVersion().getName() + "/" + server.getVersion().getVersion();
this.server = server;
}
private ChannelFuture establishConnection(URL url, @Nullable EventLoop loop) {
String host = url.getHost();
int port = url.getPort();
boolean ssl = url.getProtocol().equals("https");
if (port == -1) {
port = ssl ? 443 : 80;
}
InetSocketAddress address = InetSocketAddress.createUnresolved(host, port);
return server.createBootstrap(loop)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
if (ssl) {
SslContext context = SslContextBuilder.forClient().protocols("TLSv1.2").build();
// Unbelievably, Java doesn't automatically check the CN to make sure we're talking
// to the right host! Therefore, we provide the intended host name and port, along
// with asking Java very nicely if it could check the hostname in the certificate
// for us.
SSLEngine engine = context.newEngine(ch.alloc(), address.getHostString(),
address.getPort());
engine.getSSLParameters().setEndpointIdentificationAlgorithm("HTTPS");
ch.pipeline().addLast("ssl", new SslHandler(engine));
}
ch.pipeline().addLast("http", new HttpClientCodec());
}
})
.connect(address);
}
/**
* Attempts an HTTP GET request to the specified URL.
* @param url the URL to fetch
* @param loop the event loop to use
* @return a future representing the response
*/
public CompletableFuture<SimpleHttpResponse> get(URL url, @Nullable EventLoop loop) {
CompletableFuture<SimpleHttpResponse> reply = new CompletableFuture<>();
establishConnection(url, loop)
.addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
Channel channel = future.channel();
channel.pipeline().addLast("collector", new SimpleHttpResponseCollector(reply));
String pathAndQuery = url.getPath();
if (url.getQuery() != null && url.getQuery().length() > 0) {
pathAndQuery += "?" + url.getQuery();
}
DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1,
HttpMethod.GET, pathAndQuery);
request.headers().add(HttpHeaderNames.HOST, url.getHost());
request.headers().add(HttpHeaderNames.USER_AGENT, userAgent);
channel.writeAndFlush(request, channel.voidPromise());
} else {
reply.completeExceptionally(future.cause());
}
});
return reply;
}
/**
* Attempts an HTTP POST request to the specified URL.
* @param url the URL to fetch
* @param body the body to post
* @param decorator a consumer that can modify the request as required
* @return a future representing the response
*/
public CompletableFuture<SimpleHttpResponse> post(URL url, ByteBuf body,
Consumer<HttpRequest> decorator) {
return post(url, null, body, decorator);
}
/**
* Attempts an HTTP POST request to the specified URL.
* @param url the URL to fetch
* @param loop the event loop to use
* @param body the body to post - the HTTP client takes ownership of the buffer
* @param decorator a consumer that can modify the request as required
* @return a future representing the response
*/
public CompletableFuture<SimpleHttpResponse> post(URL url, @Nullable EventLoop loop, ByteBuf body,
Consumer<HttpRequest> decorator) {
CompletableFuture<SimpleHttpResponse> reply = new CompletableFuture<>();
establishConnection(url, loop)
.addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
Channel channel = future.channel();
channel.pipeline().addLast("collector", new SimpleHttpResponseCollector(reply));
String pathAndQuery = url.getPath();
if (url.getQuery() != null && url.getQuery().length() > 0) {
pathAndQuery += "?" + url.getQuery();
}
DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1,
HttpMethod.POST, pathAndQuery, body);
request.headers().add(HttpHeaderNames.HOST, url.getHost());
request.headers().add(HttpHeaderNames.USER_AGENT, userAgent);
request.headers().add(HttpHeaderNames.CONTENT_LENGTH, body.readableBytes());
decorator.accept(request);
channel.writeAndFlush(request, channel.voidPromise());
} else {
body.release();
reply.completeExceptionally(future.cause());
}
});
return reply;
}
}

Datei anzeigen

@ -1,28 +0,0 @@
package com.velocitypowered.proxy.network.http;
public class SimpleHttpResponse {
private final int code;
private final String body;
SimpleHttpResponse(int code, String body) {
this.code = code;
this.body = body;
}
public int getCode() {
return code;
}
public String getBody() {
return body;
}
@Override
public String toString() {
return "SimpleHttpResponse{"
+ "code=" + code
+ ", body='" + body + '\''
+ '}';
}
}

Datei anzeigen

@ -1,51 +0,0 @@
package com.velocitypowered.proxy.network.http;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.ReferenceCountUtil;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
class SimpleHttpResponseCollector extends ChannelInboundHandlerAdapter {
private final StringBuilder buffer = new StringBuilder();
private final CompletableFuture<SimpleHttpResponse> reply;
private int httpCode;
SimpleHttpResponseCollector(CompletableFuture<SimpleHttpResponse> reply) {
this.reply = reply;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
if (msg instanceof HttpResponse) {
HttpResponse response = (HttpResponse) msg;
HttpResponseStatus status = response.status();
this.httpCode = status.code();
}
if (msg instanceof HttpContent) {
buffer.append(((HttpContent) msg).content().toString(StandardCharsets.UTF_8));
if (msg instanceof LastHttpContent) {
ctx.close();
reply.complete(new SimpleHttpResponse(httpCode, buffer.toString()));
}
}
} finally {
ReferenceCountUtil.release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
reply.completeExceptionally(cause);
}
}