From 7eea1a3ac694aa9cf4597d2cd7383091d6b9eda7 Mon Sep 17 00:00:00 2001 From: Andrew Steinborn Date: Fri, 3 Aug 2018 02:25:57 -0400 Subject: [PATCH] Introduce a connection pool for Mojang's session servers. This has the potential to cut the time that players spend at the "logging in..." (or "encrypting..." for 1.13+) screen by a fair amount (gains of 200+ ms were noted for my own home connection). While this sounds minor, I really do like to aim for all the details and this is one of them. --- .../connection/http/NettyHttpClient.java | 79 ++++++++++++------- .../http/SimpleHttpResponseCollector.java | 23 ++++-- 2 files changed, 65 insertions(+), 37 deletions(-) diff --git a/proxy/src/main/java/com/velocitypowered/proxy/connection/http/NettyHttpClient.java b/proxy/src/main/java/com/velocitypowered/proxy/connection/http/NettyHttpClient.java index f13e1931b..7b78f36a8 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/connection/http/NettyHttpClient.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/connection/http/NettyHttpClient.java @@ -1,21 +1,50 @@ package com.velocitypowered.proxy.connection.http; import com.velocitypowered.proxy.VelocityServer; +import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; +import io.netty.channel.pool.*; import io.netty.handler.codec.http.*; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.SslHandler; import javax.net.ssl.SSLEngine; +import java.net.InetSocketAddress; import java.net.URL; import java.util.concurrent.CompletableFuture; public class NettyHttpClient { - private final VelocityServer server; + private final ChannelPoolMap poolMap; public NettyHttpClient(VelocityServer server) { - this.server = server; + Bootstrap bootstrap = server.initializeGenericBootstrap(); + this.poolMap = new AbstractChannelPoolMap() { + @Override + protected SimpleChannelPool newPool(InetSocketAddress key) { + return new FixedChannelPool(bootstrap.remoteAddress(key), new ChannelPoolHandler() { + @Override + public void channelReleased(Channel channel) throws Exception { + channel.pipeline().remove("collector"); + } + + @Override + public void channelAcquired(Channel channel) throws Exception { + System.out.println("ACQUIRED"); + } + + @Override + public void channelCreated(Channel channel) throws Exception { + if (key.getPort() == 443) { + SslContext context = SslContextBuilder.forClient().build(); + SSLEngine engine = context.newEngine(channel.alloc()); + channel.pipeline().addLast("ssl", new SslHandler(engine)); + } + channel.pipeline().addLast("http", new HttpClientCodec()); + } + }, 8); + } + }; } public CompletableFuture get(URL url) { @@ -27,35 +56,25 @@ public class NettyHttpClient { } CompletableFuture reply = new CompletableFuture<>(); - server.initializeGenericBootstrap() - .handler(new ChannelInitializer() { - @Override - protected void initChannel(Channel ch) throws Exception { - if (ssl) { - SslContext context = SslContextBuilder.forClient().build(); - SSLEngine engine = context.newEngine(ch.alloc()); - ch.pipeline().addLast(new SslHandler(engine)); - } - ch.pipeline().addLast(new HttpClientCodec()); - ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url.getPath() + "?" + url.getQuery()); - request.headers().add(HttpHeaderNames.HOST, url.getHost()); - request.headers().add(HttpHeaderNames.USER_AGENT, "Velocity"); - ctx.writeAndFlush(request); - } + InetSocketAddress address = new InetSocketAddress(host, port); + poolMap.get(address) + .acquire() + .addListener(future -> { + if (future.isSuccess()) { + Channel channel = (Channel) future.getNow(); + channel.pipeline().addLast("collector", new SimpleHttpResponseCollector(reply)); + + DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url.getPath() + "?" + url.getQuery()); + request.headers().add(HttpHeaderNames.HOST, url.getHost()); + request.headers().add(HttpHeaderNames.USER_AGENT, "Velocity"); + channel.writeAndFlush(request); + + reply.whenComplete((resp, err) -> { + // Make sure to release this connection + poolMap.get(address).release(channel, channel.voidPromise()); }); - ch.pipeline().addLast(new SimpleHttpResponseCollector(reply)); - } - }) - .connect(host, port) - .addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - reply.completeExceptionally(future.cause()); - } + } else { + reply.completeExceptionally(future.cause()); } }); return reply; diff --git a/proxy/src/main/java/com/velocitypowered/proxy/connection/http/SimpleHttpResponseCollector.java b/proxy/src/main/java/com/velocitypowered/proxy/connection/http/SimpleHttpResponseCollector.java index 236ccb9ef..a2285af6f 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/connection/http/SimpleHttpResponseCollector.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/connection/http/SimpleHttpResponseCollector.java @@ -2,10 +2,7 @@ package com.velocitypowered.proxy.connection.http; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.handler.codec.http.HttpContent; -import io.netty.handler.codec.http.HttpResponse; -import io.netty.handler.codec.http.HttpResponseStatus; -import io.netty.handler.codec.http.LastHttpContent; +import io.netty.handler.codec.http.*; import io.netty.util.ReferenceCountUtil; import java.nio.charset.StandardCharsets; @@ -14,6 +11,7 @@ import java.util.concurrent.CompletableFuture; class SimpleHttpResponseCollector extends ChannelInboundHandlerAdapter { private final StringBuilder buffer = new StringBuilder(1024); private final CompletableFuture reply; + private boolean canKeepAlive; SimpleHttpResponseCollector(CompletableFuture reply) { this.reply = reply; @@ -23,18 +21,23 @@ class SimpleHttpResponseCollector extends ChannelInboundHandlerAdapter { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { if (msg instanceof HttpResponse) { - HttpResponseStatus status = ((HttpResponse) msg).status(); + HttpResponse response = (HttpResponse) msg; + HttpResponseStatus status = response.status(); if (status != HttpResponseStatus.OK) { - ctx.close(); reply.completeExceptionally(new RuntimeException("Unexpected status code " + status.code())); + return; } + + this.canKeepAlive = HttpUtil.isKeepAlive(response); } if (msg instanceof HttpContent) { buffer.append(((HttpContent) msg).content().toString(StandardCharsets.UTF_8)); if (msg instanceof LastHttpContent) { - ctx.close(); + if (!canKeepAlive) { + ctx.close(); + } reply.complete(buffer.toString()); } } @@ -42,4 +45,10 @@ class SimpleHttpResponseCollector extends ChannelInboundHandlerAdapter { ReferenceCountUtil.release(msg); } } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + ctx.close(); + reply.completeExceptionally(cause); + } }