diff --git a/proxy/build.gradle b/proxy/build.gradle index 675dd0363..9c3f151b1 100644 --- a/proxy/build.gradle +++ b/proxy/build.gradle @@ -48,7 +48,6 @@ dependencies { compile "io.netty:netty-handler:${nettyVersion}" compile "io.netty:netty-transport-native-epoll:${nettyVersion}" compile "io.netty:netty-transport-native-epoll:${nettyVersion}:linux-x86_64" - compile "io.netty:netty-transport-native-kqueue:${nettyVersion}:osx-x86_64" compile "io.netty:netty-resolver-dns:${nettyVersion}" compile "org.apache.logging.log4j:log4j-api:${log4jVersion}" @@ -65,6 +64,8 @@ dependencies { compile 'net.kyori:event-method-asm:3.0.0' compile 'com.mojang:brigadier:1.0.15' + + compile 'org.asynchttpclient:async-http-client:2.10.1' testCompile "org.junit.jupiter:junit-jupiter-api:${junitVersion}" testCompile "org.junit.jupiter:junit-jupiter-engine:${junitVersion}" diff --git a/proxy/src/main/java/com/velocitypowered/proxy/Metrics.java b/proxy/src/main/java/com/velocitypowered/proxy/Metrics.java index 6f2dfa535..402746171 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/Metrics.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/Metrics.java @@ -3,12 +3,9 @@ package com.velocitypowered.proxy; import com.google.gson.JsonArray; import com.google.gson.JsonObject; import com.velocitypowered.proxy.config.VelocityConfiguration; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufOutputStream; -import io.netty.buffer.ByteBufUtil; -import io.netty.buffer.Unpooled; import io.netty.handler.codec.http.HttpHeaderNames; import java.io.BufferedWriter; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStreamWriter; import java.io.Writer; @@ -21,11 +18,14 @@ import java.util.Map; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.zip.GZIPOutputStream; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.asynchttpclient.ListenableFuture; +import org.asynchttpclient.Response; /** * bStats collects some data for plugin authors. @@ -185,40 +185,44 @@ public class Metrics { } // Compress the data to save bandwidth - ByteBuf reqBody = createResponseBody(data); - - server.getHttpClient().post(new URL(URL), reqBody, request -> { - request.headers().add(HttpHeaderNames.CONTENT_ENCODING, "gzip"); - request.headers().add(HttpHeaderNames.ACCEPT, "application/json"); - request.headers().add(HttpHeaderNames.CONTENT_TYPE, "application/json"); - }) - .whenCompleteAsync((resp, exc) -> { - if (logFailedRequests) { - if (exc != null) { - logger.error("Unable to send metrics to bStats", exc); - } else if (resp.getCode() != 429) { - logger.error("Got HTTP status code {} when sending metrics to bStats", - resp.getCode()); - } + ListenableFuture future = server.getAsyncHttpClient() + .preparePost(URL) + .addHeader(HttpHeaderNames.CONTENT_ENCODING, "gzip") + .addHeader(HttpHeaderNames.ACCEPT, "application/json") + .addHeader(HttpHeaderNames.CONTENT_TYPE, "application/json") + .setBody(createResponseBody(data)) + .execute(); + future.addListener(() -> { + if (logFailedRequests) { + try { + Response r = future.get(); + if (r.getStatusCode() != 429) { + logger.error("Got HTTP status code {} when sending metrics to bStats", + r.getStatusCode()); } - }); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (ExecutionException e) { + logger.error("Unable to send metrics to bStats", e); + } + } + }, null); } - private static ByteBuf createResponseBody(JsonObject object) throws IOException { - ByteBuf buf = Unpooled.buffer(); + private static byte[] createResponseBody(JsonObject object) throws IOException { + ByteArrayOutputStream os = new ByteArrayOutputStream(); try (Writer writer = new BufferedWriter( new OutputStreamWriter( - new GZIPOutputStream(new ByteBufOutputStream(buf)), StandardCharsets.UTF_8 + new GZIPOutputStream(os), StandardCharsets.UTF_8 ) ) ) { VelocityServer.GSON.toJson(object, writer); } catch (IOException e) { - buf.release(); throw e; } - return buf; + return os.toByteArray(); } /** diff --git a/proxy/src/main/java/com/velocitypowered/proxy/VelocityServer.java b/proxy/src/main/java/com/velocitypowered/proxy/VelocityServer.java index 6e590b3c4..c078ecfb9 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/VelocityServer.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/VelocityServer.java @@ -30,7 +30,6 @@ import com.velocitypowered.proxy.config.VelocityConfiguration; import com.velocitypowered.proxy.connection.client.ConnectedPlayer; import com.velocitypowered.proxy.console.VelocityConsole; import com.velocitypowered.proxy.network.ConnectionManager; -import com.velocitypowered.proxy.network.http.NettyHttpClient; import com.velocitypowered.proxy.plugin.VelocityEventManager; import com.velocitypowered.proxy.plugin.VelocityPluginManager; import com.velocitypowered.proxy.protocol.packet.Chat; @@ -74,6 +73,7 @@ import net.kyori.text.TranslatableComponent; import net.kyori.text.serializer.gson.GsonComponentSerializer; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.asynchttpclient.AsyncHttpClient; import org.checkerframework.checker.nullness.qual.EnsuresNonNull; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.checkerframework.checker.nullness.qual.NonNull; @@ -90,7 +90,6 @@ public class VelocityServer implements ProxyServer { private final ConnectionManager cm; private final ProxyOptions options; private @MonotonicNonNull VelocityConfiguration configuration; - private @MonotonicNonNull NettyHttpClient httpClient; private @MonotonicNonNull KeyPair serverKeyPair; private final ServerMap servers; private final VelocityCommandManager commandManager = new VelocityCommandManager(); @@ -202,7 +201,6 @@ public class VelocityServer implements ProxyServer { } ipAttemptLimiter = Ratelimiters.createWithMilliseconds(configuration.getLoginRatelimit()); - httpClient = new NettyHttpClient(this); loadPlugins(); // Go ahead and fire the proxy initialization event. We block since plugins should have a chance @@ -430,8 +428,8 @@ public class VelocityServer implements ProxyServer { thread.start(); } - public NettyHttpClient getHttpClient() { - return ensureInitialized(httpClient); + public AsyncHttpClient getAsyncHttpClient() { + return ensureInitialized(cm).getHttpClient(); } public Ratelimiter getIpAttemptLimiter() { diff --git a/proxy/src/main/java/com/velocitypowered/proxy/connection/client/LoginSessionHandler.java b/proxy/src/main/java/com/velocitypowered/proxy/connection/client/LoginSessionHandler.java index d0707588c..f30b3d218 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/connection/client/LoginSessionHandler.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/connection/client/LoginSessionHandler.java @@ -7,6 +7,8 @@ import static com.velocitypowered.proxy.connection.VelocityConstants.EMPTY_BYTE_ import static com.velocitypowered.proxy.connection.VelocityConstants.VELOCITY_IP_FORWARDING_CHANNEL; import static com.velocitypowered.proxy.util.EncryptionUtils.decryptRsa; import static com.velocitypowered.proxy.util.EncryptionUtils.generateServerId; +import static org.asynchttpclient.Dsl.asyncHttpClient; +import static org.asynchttpclient.Dsl.config; import com.google.common.base.Preconditions; import com.google.common.net.UrlEscapers; @@ -42,14 +44,19 @@ import java.security.KeyPair; import java.util.Arrays; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadLocalRandom; import net.kyori.text.Component; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.asynchttpclient.Dsl; +import org.asynchttpclient.ListenableFuture; +import org.asynchttpclient.Response; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; public class LoginSessionHandler implements MinecraftSessionHandler { + private static final Logger logger = LogManager.getLogger(LoginSessionHandler.class); private static final String MOJANG_HASJOINED_URL = "https://sessionserver.mojang.com/session/minecraft/hasJoined?username=%s&serverId=%s&ip=%s"; @@ -61,6 +68,7 @@ public class LoginSessionHandler implements MinecraftSessionHandler { private byte[] verify = EMPTY_BYTE_ARRAY; private int playerInfoId; private @MonotonicNonNull ConnectedPlayer connectedPlayer; + private final long loginProcessBegan = System.currentTimeMillis(); LoginSessionHandler(VelocityServer server, MinecraftConnection mcConnection, InitialInboundConnection inbound) { @@ -123,46 +131,50 @@ public class LoginSessionHandler implements MinecraftSessionHandler { String url = String.format(MOJANG_HASJOINED_URL, urlFormParameterEscaper().escape(login.getUsername()), serverId, urlFormParameterEscaper().escape(playerIp)); - server.getHttpClient() - .get(new URL(url), mcConnection.eventLoop()) - .thenAcceptAsync(profileResponse -> { - if (mcConnection.isClosed()) { - // The player disconnected after we authenticated them. - return; - } - // Go ahead and enable encryption. Once the client sends EncryptionResponse, encryption - // is enabled. - try { - mcConnection.enableEncryption(decryptedSharedSecret); - } catch (GeneralSecurityException e) { - throw new RuntimeException(e); - } + ListenableFuture hasJoinedResponse = server.getAsyncHttpClient().prepareGet(url) + .execute(); + hasJoinedResponse.addListener(() -> { + if (mcConnection.isClosed()) { + // The player disconnected after we authenticated them. + return; + } - if (profileResponse.getCode() == 200) { - // All went well, initialize the session. - initializePlayer(GSON.fromJson(profileResponse.getBody(), GameProfile.class), true); - } else if (profileResponse.getCode() == 204) { - // Apparently an offline-mode user logged onto this online-mode proxy. - inbound.disconnect(VelocityMessages.ONLINE_MODE_ONLY); - } else { - // Something else went wrong - logger.error( - "Got an unexpected error code {} whilst contacting Mojang to log in {} ({})", - profileResponse.getCode(), login.getUsername(), playerIp); - mcConnection.close(); - } - }, mcConnection.eventLoop()) - .exceptionally(exception -> { - logger.error("Unable to enable encryption", exception); + // Go ahead and enable encryption. Once the client sends EncryptionResponse, encryption + // is enabled. + try { + mcConnection.enableEncryption(decryptedSharedSecret); + } catch (GeneralSecurityException e) { + throw new RuntimeException(e); + } + + try { + Response profileResponse = hasJoinedResponse.get(); + if (profileResponse.getStatusCode() == 200) { + // All went well, initialize the session. + initializePlayer(GSON.fromJson(profileResponse.getResponseBody(), GameProfile.class), + true); + } else if (profileResponse.getStatusCode() == 204) { + // Apparently an offline-mode user logged onto this online-mode proxy. + inbound.disconnect(VelocityMessages.ONLINE_MODE_ONLY); + } else { + // Something else went wrong + logger.error( + "Got an unexpected error code {} whilst contacting Mojang to log in {} ({})", + profileResponse.getStatusCode(), login.getUsername(), playerIp); mcConnection.close(); - return null; - }); + } + } catch (ExecutionException e) { + logger.error("Unable to authenticate with Mojang", e); + mcConnection.close(); + } catch (InterruptedException e) { + // not much we can do usefully + Thread.currentThread().interrupt(); + } + }, mcConnection.eventLoop()); } catch (GeneralSecurityException e) { logger.error("Unable to enable encryption", e); mcConnection.close(); - } catch (MalformedURLException e) { - throw new AssertionError(e); } return true; } @@ -179,6 +191,7 @@ public class LoginSessionHandler implements MinecraftSessionHandler { // The player was disconnected return; } + PreLoginComponentResult result = event.getResult(); Optional disconnectReason = result.getReason(); if (disconnectReason.isPresent()) { @@ -277,7 +290,7 @@ public class LoginSessionHandler implements MinecraftSessionHandler { player.disconnect(VelocityMessages.ALREADY_CONNECTED); return; } - + mcConnection.setSessionHandler(new InitialConnectSessionHandler(player)); server.getEventManager().fire(new PostLoginEvent(player)) .thenRun(() -> player.createConnectionRequest(toTry.get()).fireAndForget()); diff --git a/proxy/src/main/java/com/velocitypowered/proxy/network/ConnectionManager.java b/proxy/src/main/java/com/velocitypowered/proxy/network/ConnectionManager.java index c3b1d195f..9fa8b6b0c 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/network/ConnectionManager.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/network/ConnectionManager.java @@ -1,5 +1,8 @@ package com.velocitypowered.proxy.network; +import static org.asynchttpclient.Dsl.asyncHttpClient; +import static org.asynchttpclient.Dsl.config; + import com.google.common.base.Preconditions; import com.velocitypowered.natives.util.Natives; import com.velocitypowered.proxy.VelocityServer; @@ -18,6 +21,8 @@ import java.util.HashMap; import java.util.Map; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.asynchttpclient.AsyncHttpClient; +import org.asynchttpclient.Dsl; import org.checkerframework.checker.nullness.qual.Nullable; public final class ConnectionManager { @@ -36,6 +41,7 @@ public final class ConnectionManager { public final ServerChannelInitializerHolder serverChannelInitializer; private final DnsAddressResolverGroup resolverGroup; + private final AsyncHttpClient httpClient; /** * Initalizes the {@code ConnectionManager}. @@ -55,6 +61,10 @@ public final class ConnectionManager { .negativeTtl(15) .ndots(1) ); + this.httpClient = asyncHttpClient(config() + .setEventLoopGroup(this.workerGroup) + .setUserAgent(server.getVersion().getName() + "/" + server.getVersion().getVersion()) + .build()); } public void logChannelInformation() { @@ -168,4 +178,8 @@ public final class ConnectionManager { public ServerChannelInitializerHolder getServerChannelInitializer() { return this.serverChannelInitializer; } + + public AsyncHttpClient getHttpClient() { + return httpClient; + } } diff --git a/proxy/src/main/java/com/velocitypowered/proxy/network/TransportType.java b/proxy/src/main/java/com/velocitypowered/proxy/network/TransportType.java index e382a4856..a1b799494 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/network/TransportType.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/network/TransportType.java @@ -7,11 +7,6 @@ import io.netty.channel.epoll.EpollDatagramChannel; import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.epoll.EpollServerSocketChannel; import io.netty.channel.epoll.EpollSocketChannel; -import io.netty.channel.kqueue.KQueue; -import io.netty.channel.kqueue.KQueueDatagramChannel; -import io.netty.channel.kqueue.KQueueEventLoopGroup; -import io.netty.channel.kqueue.KQueueServerSocketChannel; -import io.netty.channel.kqueue.KQueueSocketChannel; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.DatagramChannel; import io.netty.channel.socket.ServerSocketChannel; @@ -27,10 +22,7 @@ enum TransportType { (name, type) -> new NioEventLoopGroup(0, createThreadFactory(name, type))), EPOLL("epoll", EpollServerSocketChannel.class, EpollSocketChannel.class, EpollDatagramChannel.class, - (name, type) -> new EpollEventLoopGroup(0, createThreadFactory(name, type))), - KQUEUE("Kqueue", KQueueServerSocketChannel.class, KQueueSocketChannel.class, - KQueueDatagramChannel.class, - (name, type) -> new KQueueEventLoopGroup(0, createThreadFactory(name, type))); + (name, type) -> new EpollEventLoopGroup(0, createThreadFactory(name, type))); final String name; final Class serverSocketChannelClass; @@ -66,8 +58,6 @@ enum TransportType { public static TransportType bestType() { if (Epoll.isAvailable()) { return EPOLL; - } else if (KQueue.isAvailable()) { - return KQUEUE; } else { return NIO; } diff --git a/proxy/src/main/java/com/velocitypowered/proxy/network/http/NettyHttpClient.java b/proxy/src/main/java/com/velocitypowered/proxy/network/http/NettyHttpClient.java deleted file mode 100644 index 5d0211a09..000000000 --- a/proxy/src/main/java/com/velocitypowered/proxy/network/http/NettyHttpClient.java +++ /dev/null @@ -1,153 +0,0 @@ -package com.velocitypowered.proxy.network.http; - -import com.velocitypowered.proxy.VelocityServer; -import io.netty.buffer.ByteBuf; -import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.EventLoop; -import io.netty.handler.codec.http.DefaultFullHttpRequest; -import io.netty.handler.codec.http.HttpClientCodec; -import io.netty.handler.codec.http.HttpHeaderNames; -import io.netty.handler.codec.http.HttpMethod; -import io.netty.handler.codec.http.HttpRequest; -import io.netty.handler.codec.http.HttpVersion; -import io.netty.handler.ssl.SslContext; -import io.netty.handler.ssl.SslContextBuilder; -import io.netty.handler.ssl.SslHandler; -import java.net.InetSocketAddress; -import java.net.URL; -import java.util.concurrent.CompletableFuture; -import java.util.function.Consumer; -import javax.net.ssl.SSLEngine; -import org.checkerframework.checker.nullness.qual.Nullable; - -public class NettyHttpClient { - - private final String userAgent; - private final VelocityServer server; - - /** - * Initializes the HTTP client. - * - * @param server the Velocity server - */ - public NettyHttpClient(VelocityServer server) { - this.userAgent = server.getVersion().getName() + "/" + server.getVersion().getVersion(); - this.server = server; - } - - private ChannelFuture establishConnection(URL url, @Nullable EventLoop loop) { - String host = url.getHost(); - int port = url.getPort(); - boolean ssl = url.getProtocol().equals("https"); - if (port == -1) { - port = ssl ? 443 : 80; - } - - InetSocketAddress address = InetSocketAddress.createUnresolved(host, port); - return server.createBootstrap(loop) - .handler(new ChannelInitializer() { - @Override - protected void initChannel(Channel ch) throws Exception { - if (ssl) { - SslContext context = SslContextBuilder.forClient().protocols("TLSv1.2").build(); - // Unbelievably, Java doesn't automatically check the CN to make sure we're talking - // to the right host! Therefore, we provide the intended host name and port, along - // with asking Java very nicely if it could check the hostname in the certificate - // for us. - SSLEngine engine = context.newEngine(ch.alloc(), address.getHostString(), - address.getPort()); - engine.getSSLParameters().setEndpointIdentificationAlgorithm("HTTPS"); - ch.pipeline().addLast("ssl", new SslHandler(engine)); - } - ch.pipeline().addLast("http", new HttpClientCodec()); - } - }) - .connect(address); - } - - /** - * Attempts an HTTP GET request to the specified URL. - * @param url the URL to fetch - * @param loop the event loop to use - * @return a future representing the response - */ - public CompletableFuture get(URL url, @Nullable EventLoop loop) { - CompletableFuture reply = new CompletableFuture<>(); - establishConnection(url, loop) - .addListener((ChannelFutureListener) future -> { - if (future.isSuccess()) { - Channel channel = future.channel(); - - channel.pipeline().addLast("collector", new SimpleHttpResponseCollector(reply)); - - String pathAndQuery = url.getPath(); - if (url.getQuery() != null && url.getQuery().length() > 0) { - pathAndQuery += "?" + url.getQuery(); - } - - DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, - HttpMethod.GET, pathAndQuery); - request.headers().add(HttpHeaderNames.HOST, url.getHost()); - request.headers().add(HttpHeaderNames.USER_AGENT, userAgent); - channel.writeAndFlush(request, channel.voidPromise()); - } else { - reply.completeExceptionally(future.cause()); - } - }); - return reply; - } - - /** - * Attempts an HTTP POST request to the specified URL. - * @param url the URL to fetch - * @param body the body to post - * @param decorator a consumer that can modify the request as required - * @return a future representing the response - */ - public CompletableFuture post(URL url, ByteBuf body, - Consumer decorator) { - return post(url, null, body, decorator); - } - - /** - * Attempts an HTTP POST request to the specified URL. - * @param url the URL to fetch - * @param loop the event loop to use - * @param body the body to post - the HTTP client takes ownership of the buffer - * @param decorator a consumer that can modify the request as required - * @return a future representing the response - */ - public CompletableFuture post(URL url, @Nullable EventLoop loop, ByteBuf body, - Consumer decorator) { - CompletableFuture reply = new CompletableFuture<>(); - establishConnection(url, loop) - .addListener((ChannelFutureListener) future -> { - if (future.isSuccess()) { - Channel channel = future.channel(); - - channel.pipeline().addLast("collector", new SimpleHttpResponseCollector(reply)); - - String pathAndQuery = url.getPath(); - if (url.getQuery() != null && url.getQuery().length() > 0) { - pathAndQuery += "?" + url.getQuery(); - } - - DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, - HttpMethod.POST, pathAndQuery, body); - request.headers().add(HttpHeaderNames.HOST, url.getHost()); - request.headers().add(HttpHeaderNames.USER_AGENT, userAgent); - request.headers().add(HttpHeaderNames.CONTENT_LENGTH, body.readableBytes()); - decorator.accept(request); - - channel.writeAndFlush(request, channel.voidPromise()); - } else { - body.release(); - reply.completeExceptionally(future.cause()); - } - }); - return reply; - } -} diff --git a/proxy/src/main/java/com/velocitypowered/proxy/network/http/SimpleHttpResponse.java b/proxy/src/main/java/com/velocitypowered/proxy/network/http/SimpleHttpResponse.java deleted file mode 100644 index 3bcfdc40a..000000000 --- a/proxy/src/main/java/com/velocitypowered/proxy/network/http/SimpleHttpResponse.java +++ /dev/null @@ -1,28 +0,0 @@ -package com.velocitypowered.proxy.network.http; - -public class SimpleHttpResponse { - - private final int code; - private final String body; - - SimpleHttpResponse(int code, String body) { - this.code = code; - this.body = body; - } - - public int getCode() { - return code; - } - - public String getBody() { - return body; - } - - @Override - public String toString() { - return "SimpleHttpResponse{" - + "code=" + code - + ", body='" + body + '\'' - + '}'; - } -} diff --git a/proxy/src/main/java/com/velocitypowered/proxy/network/http/SimpleHttpResponseCollector.java b/proxy/src/main/java/com/velocitypowered/proxy/network/http/SimpleHttpResponseCollector.java deleted file mode 100644 index 533e8170f..000000000 --- a/proxy/src/main/java/com/velocitypowered/proxy/network/http/SimpleHttpResponseCollector.java +++ /dev/null @@ -1,51 +0,0 @@ -package com.velocitypowered.proxy.network.http; - -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.handler.codec.http.HttpContent; -import io.netty.handler.codec.http.HttpResponse; -import io.netty.handler.codec.http.HttpResponseStatus; -import io.netty.handler.codec.http.HttpUtil; -import io.netty.handler.codec.http.LastHttpContent; -import io.netty.util.ReferenceCountUtil; -import java.nio.charset.StandardCharsets; -import java.util.concurrent.CompletableFuture; - -class SimpleHttpResponseCollector extends ChannelInboundHandlerAdapter { - - private final StringBuilder buffer = new StringBuilder(); - private final CompletableFuture reply; - private int httpCode; - - SimpleHttpResponseCollector(CompletableFuture reply) { - this.reply = reply; - } - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - try { - if (msg instanceof HttpResponse) { - HttpResponse response = (HttpResponse) msg; - HttpResponseStatus status = response.status(); - this.httpCode = status.code(); - } - - if (msg instanceof HttpContent) { - buffer.append(((HttpContent) msg).content().toString(StandardCharsets.UTF_8)); - - if (msg instanceof LastHttpContent) { - ctx.close(); - reply.complete(new SimpleHttpResponse(httpCode, buffer.toString())); - } - } - } finally { - ReferenceCountUtil.release(msg); - } - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - ctx.close(); - reply.completeExceptionally(cause); - } -}