From 46a7b52a1371d7449db132b868cd443dd0802a69 Mon Sep 17 00:00:00 2001 From: kashike Date: Fri, 12 Oct 2018 16:21:43 -0700 Subject: [PATCH] Extract channel initialiser and transport type out --- .../proxy/network/ConnectionManager.java | 144 ++++-------------- .../network/ServerChannelInitializer.java | 57 +++++++ .../ServerChannelInitializerHolder.java | 28 ++++ .../proxy/network/TransportType.java | 86 +++++++++++ 4 files changed, 197 insertions(+), 118 deletions(-) create mode 100644 proxy/src/main/java/com/velocitypowered/proxy/network/ServerChannelInitializer.java create mode 100644 proxy/src/main/java/com/velocitypowered/proxy/network/ServerChannelInitializerHolder.java create mode 100644 proxy/src/main/java/com/velocitypowered/proxy/network/TransportType.java diff --git a/proxy/src/main/java/com/velocitypowered/proxy/network/ConnectionManager.java b/proxy/src/main/java/com/velocitypowered/proxy/network/ConnectionManager.java index 584843717..e7cf90874 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/network/ConnectionManager.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/network/ConnectionManager.java @@ -1,60 +1,43 @@ package com.velocitypowered.proxy.network; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.velocitypowered.natives.util.Natives; import com.velocitypowered.proxy.VelocityServer; -import com.velocitypowered.proxy.connection.MinecraftConnection; -import com.velocitypowered.proxy.connection.client.HandshakeSessionHandler; -import com.velocitypowered.proxy.protocol.ProtocolConstants; -import com.velocitypowered.proxy.protocol.StateRegistry; -import com.velocitypowered.proxy.protocol.netty.*; +import com.velocitypowered.proxy.protocol.netty.GS4QueryHandler; import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.*; -import io.netty.channel.epoll.*; -import io.netty.channel.kqueue.*; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.DatagramChannel; -import io.netty.channel.socket.ServerSocketChannel; -import io.netty.channel.socket.SocketChannel; -import io.netty.channel.socket.nio.NioDatagramChannel; -import io.netty.channel.socket.nio.NioServerSocketChannel; -import io.netty.channel.socket.nio.NioSocketChannel; -import io.netty.handler.codec.haproxy.HAProxyMessageDecoder; -import io.netty.handler.timeout.ReadTimeoutHandler; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.WriteBufferWaterMark; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.net.InetSocketAddress; import java.util.HashSet; -import java.util.Locale; import java.util.Set; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; - -import static com.velocitypowered.proxy.network.Connections.*; public final class ConnectionManager { private static final WriteBufferWaterMark SERVER_WRITE_MARK = new WriteBufferWaterMark(1 << 16, 1 << 18); - - private static final Logger logger = LogManager.getLogger(ConnectionManager.class); - + private static final Logger LOGGER = LogManager.getLogger(ConnectionManager.class); private final Set endpoints = new HashSet<>(); private final TransportType transportType; private final EventLoopGroup bossGroup; private final EventLoopGroup workerGroup; private final VelocityServer server; + public final ServerChannelInitializerHolder serverChannelInitializer; - public ConnectionManager(VelocityServer server) { + public ConnectionManager(final VelocityServer server) { this.server = server; this.transportType = TransportType.bestType(); - this.bossGroup = transportType.createEventLoopGroup(true); - this.workerGroup = transportType.createEventLoopGroup(false); + this.bossGroup = this.transportType.createEventLoopGroup(TransportType.Type.BOSS); + this.workerGroup = this.transportType.createEventLoopGroup(TransportType.Type.WORKER); + this.serverChannelInitializer = new ServerChannelInitializerHolder(new ServerChannelInitializer(this.server)); this.logChannelInformation(); } private void logChannelInformation() { - logger.info("Connections will use {} channels, {} compression, {} ciphers", transportType, Natives.compressor.getLoadedVariant(), Natives.cipher.getLoadedVariant()); + LOGGER.info("Connections will use {} channels, {} compression, {} ciphers", this.transportType, Natives.compressor.getLoadedVariant(), Natives.cipher.getLoadedVariant()); } public void bind(final InetSocketAddress address) { @@ -62,28 +45,7 @@ public final class ConnectionManager { .channel(this.transportType.serverSocketChannelClass) .group(this.bossGroup, this.workerGroup) .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, SERVER_WRITE_MARK) - .childHandler(new ChannelInitializer() { - @Override - protected void initChannel(final Channel ch) { - ch.pipeline() - .addLast(READ_TIMEOUT, new ReadTimeoutHandler(server.getConfiguration().getReadTimeout(), TimeUnit.SECONDS)) - .addLast(LEGACY_PING_DECODER, new LegacyPingDecoder()) - .addLast(FRAME_DECODER, new MinecraftVarintFrameDecoder()) - .addLast(LEGACY_PING_ENCODER, LegacyPingEncoder.INSTANCE) - .addLast(FRAME_ENCODER, MinecraftVarintLengthEncoder.INSTANCE) - .addLast(MINECRAFT_DECODER, new MinecraftDecoder(ProtocolConstants.Direction.SERVERBOUND)) - .addLast(MINECRAFT_ENCODER, new MinecraftEncoder(ProtocolConstants.Direction.CLIENTBOUND)); - - final MinecraftConnection connection = new MinecraftConnection(ch, server); - connection.setState(StateRegistry.HANDSHAKE); - connection.setSessionHandler(new HandshakeSessionHandler(connection, server)); - ch.pipeline().addLast(Connections.HANDLER, connection); - - if (server.getConfiguration().isProxyProtocol()) { - ch.pipeline().addFirst(new HAProxyMessageDecoder()); - } - } - }) + .childHandler(this.serverChannelInitializer.get()) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.IP_TOS, 0x18) .localAddress(address); @@ -92,27 +54,27 @@ public final class ConnectionManager { final Channel channel = future.channel(); if (future.isSuccess()) { this.endpoints.add(channel); - logger.info("Listening on {}", channel.localAddress()); + LOGGER.info("Listening on {}", channel.localAddress()); } else { - logger.error("Can't bind to {}", address, future.cause()); + LOGGER.error("Can't bind to {}", address, future.cause()); } }); } public void queryBind(final String hostname, final int port) { - Bootstrap bootstrap = new Bootstrap() - .channel(transportType.datagramChannelClass) + final Bootstrap bootstrap = new Bootstrap() + .channel(this.transportType.datagramChannelClass) .group(this.workerGroup) - .handler(new GS4QueryHandler(server)) + .handler(new GS4QueryHandler(this.server)) .localAddress(hostname, port); bootstrap.bind() .addListener((ChannelFutureListener) future -> { final Channel channel = future.channel(); if (future.isSuccess()) { this.endpoints.add(channel); - logger.info("Listening for GS4 query on {}", channel.localAddress()); + LOGGER.info("Listening for GS4 query on {}", channel.localAddress()); } else { - logger.error("Can't bind to {}", bootstrap.config().localAddress(), future.cause()); + LOGGER.error("Can't bind to {}", bootstrap.config().localAddress(), future.cause()); } }); } @@ -122,75 +84,21 @@ public final class ConnectionManager { .channel(this.transportType.socketChannelClass) .group(this.workerGroup) .option(ChannelOption.TCP_NODELAY, true) - .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, server.getConfiguration().getConnectTimeout()); + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, this.server.getConfiguration().getConnectTimeout()); } public void shutdown() { for (final Channel endpoint : this.endpoints) { try { - logger.info("Closing endpoint {}", endpoint.localAddress()); + LOGGER.info("Closing endpoint {}", endpoint.localAddress()); endpoint.close().sync(); } catch (final InterruptedException e) { - logger.info("Interrupted whilst closing endpoint", e); + LOGGER.info("Interrupted whilst closing endpoint", e); } } } - private static ThreadFactory createThreadFactory(final String nameFormat) { - return new ThreadFactoryBuilder() - .setNameFormat(nameFormat) - .setDaemon(true) - .build(); - } - - private enum TransportType { - NIO(NioServerSocketChannel.class, NioSocketChannel.class, NioDatagramChannel.class) { - @Override - public EventLoopGroup createEventLoopGroup(boolean boss) { - String name = "Netty NIO " + (boss ? "Boss" : "Worker") + " #%d"; - return new NioEventLoopGroup(0, createThreadFactory(name)); - } - }, - EPOLL(EpollServerSocketChannel.class, EpollSocketChannel.class, EpollDatagramChannel.class) { - @Override - public EventLoopGroup createEventLoopGroup(boolean boss) { - String name = "Netty Epoll " + (boss ? "Boss" : "Worker") + " #%d"; - return new EpollEventLoopGroup(0, createThreadFactory(name)); - } - }, - KQUEUE(KQueueServerSocketChannel.class, KQueueSocketChannel.class, KQueueDatagramChannel.class) { - @Override - public EventLoopGroup createEventLoopGroup(boolean boss) { - String name = "Netty KQueue " + (boss ? "Boss" : "Worker") + " #%d"; - return new KQueueEventLoopGroup(0, createThreadFactory(name)); - } - }; - - private final Class serverSocketChannelClass; - private final Class socketChannelClass; - private final Class datagramChannelClass; - - TransportType(Class serverSocketChannelClass, Class socketChannelClass, Class datagramChannelClass) { - this.serverSocketChannelClass = serverSocketChannelClass; - this.socketChannelClass = socketChannelClass; - this.datagramChannelClass = datagramChannelClass; - } - - @Override - public String toString() { - return name().toLowerCase(Locale.US); - } - - public abstract EventLoopGroup createEventLoopGroup(boolean boss); - - public static TransportType bestType() { - if (Epoll.isAvailable()) { - return EPOLL; - } else if (KQueue.isAvailable()) { - return KQUEUE; - } else { - return NIO; - } - } + public ServerChannelInitializerHolder getServerChannelInitializer() { + return this.serverChannelInitializer; } } diff --git a/proxy/src/main/java/com/velocitypowered/proxy/network/ServerChannelInitializer.java b/proxy/src/main/java/com/velocitypowered/proxy/network/ServerChannelInitializer.java new file mode 100644 index 000000000..46572fc5d --- /dev/null +++ b/proxy/src/main/java/com/velocitypowered/proxy/network/ServerChannelInitializer.java @@ -0,0 +1,57 @@ +package com.velocitypowered.proxy.network; + +import com.velocitypowered.proxy.VelocityServer; +import com.velocitypowered.proxy.connection.MinecraftConnection; +import com.velocitypowered.proxy.connection.client.HandshakeSessionHandler; +import com.velocitypowered.proxy.protocol.ProtocolConstants; +import com.velocitypowered.proxy.protocol.StateRegistry; +import com.velocitypowered.proxy.protocol.netty.LegacyPingDecoder; +import com.velocitypowered.proxy.protocol.netty.LegacyPingEncoder; +import com.velocitypowered.proxy.protocol.netty.MinecraftDecoder; +import com.velocitypowered.proxy.protocol.netty.MinecraftEncoder; +import com.velocitypowered.proxy.protocol.netty.MinecraftVarintFrameDecoder; +import com.velocitypowered.proxy.protocol.netty.MinecraftVarintLengthEncoder; +import io.netty.channel.Channel; +import io.netty.channel.ChannelInitializer; +import io.netty.handler.codec.haproxy.HAProxyMessageDecoder; +import io.netty.handler.timeout.ReadTimeoutHandler; + +import java.util.concurrent.TimeUnit; + +import static com.velocitypowered.proxy.network.Connections.FRAME_DECODER; +import static com.velocitypowered.proxy.network.Connections.FRAME_ENCODER; +import static com.velocitypowered.proxy.network.Connections.LEGACY_PING_DECODER; +import static com.velocitypowered.proxy.network.Connections.LEGACY_PING_ENCODER; +import static com.velocitypowered.proxy.network.Connections.MINECRAFT_DECODER; +import static com.velocitypowered.proxy.network.Connections.MINECRAFT_ENCODER; +import static com.velocitypowered.proxy.network.Connections.READ_TIMEOUT; + +@SuppressWarnings("WeakerAccess") +public class ServerChannelInitializer extends ChannelInitializer { + private final VelocityServer server; + + public ServerChannelInitializer(final VelocityServer server) { + this.server = server; + } + + @Override + protected void initChannel(final Channel ch) { + ch.pipeline() + .addLast(READ_TIMEOUT, new ReadTimeoutHandler(this.server.getConfiguration().getReadTimeout(), TimeUnit.SECONDS)) + .addLast(LEGACY_PING_DECODER, new LegacyPingDecoder()) + .addLast(FRAME_DECODER, new MinecraftVarintFrameDecoder()) + .addLast(LEGACY_PING_ENCODER, LegacyPingEncoder.INSTANCE) + .addLast(FRAME_ENCODER, MinecraftVarintLengthEncoder.INSTANCE) + .addLast(MINECRAFT_DECODER, new MinecraftDecoder(ProtocolConstants.Direction.SERVERBOUND)) + .addLast(MINECRAFT_ENCODER, new MinecraftEncoder(ProtocolConstants.Direction.CLIENTBOUND)); + + final MinecraftConnection connection = new MinecraftConnection(ch, this.server); + connection.setState(StateRegistry.HANDSHAKE); + connection.setSessionHandler(new HandshakeSessionHandler(connection, this.server)); + ch.pipeline().addLast(Connections.HANDLER, connection); + + if (ServerChannelInitializer.this.server.getConfiguration().isProxyProtocol()) { + ch.pipeline().addFirst(new HAProxyMessageDecoder()); + } + } +} diff --git a/proxy/src/main/java/com/velocitypowered/proxy/network/ServerChannelInitializerHolder.java b/proxy/src/main/java/com/velocitypowered/proxy/network/ServerChannelInitializerHolder.java new file mode 100644 index 000000000..2370397bd --- /dev/null +++ b/proxy/src/main/java/com/velocitypowered/proxy/network/ServerChannelInitializerHolder.java @@ -0,0 +1,28 @@ +package com.velocitypowered.proxy.network; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelInitializer; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.function.Supplier; + +public class ServerChannelInitializerHolder implements Supplier> { + private static final Logger LOGGER = LogManager.getLogger(ConnectionManager.class); + private ChannelInitializer initializer; + + public ServerChannelInitializerHolder(final ChannelInitializer initializer) { + this.initializer = initializer; + } + + @Override + public ChannelInitializer get() { + return this.initializer; + } + + @Deprecated + public void set(final ChannelInitializer initializer) { + LOGGER.warn("The server channel initializer has been replaced by {}", Thread.currentThread().getStackTrace()[2]); + this.initializer = initializer; + } +} diff --git a/proxy/src/main/java/com/velocitypowered/proxy/network/TransportType.java b/proxy/src/main/java/com/velocitypowered/proxy/network/TransportType.java new file mode 100644 index 000000000..5b300ffb8 --- /dev/null +++ b/proxy/src/main/java/com/velocitypowered/proxy/network/TransportType.java @@ -0,0 +1,86 @@ +package com.velocitypowered.proxy.network; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.epoll.Epoll; +import io.netty.channel.epoll.EpollDatagramChannel; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.epoll.EpollServerSocketChannel; +import io.netty.channel.epoll.EpollSocketChannel; +import io.netty.channel.kqueue.KQueue; +import io.netty.channel.kqueue.KQueueDatagramChannel; +import io.netty.channel.kqueue.KQueueEventLoopGroup; +import io.netty.channel.kqueue.KQueueServerSocketChannel; +import io.netty.channel.kqueue.KQueueSocketChannel; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.DatagramChannel; +import io.netty.channel.socket.ServerSocketChannel; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioDatagramChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; + +import java.util.concurrent.ThreadFactory; +import java.util.function.BiFunction; + +enum TransportType { + NIO("NIO", NioServerSocketChannel.class, NioSocketChannel.class, NioDatagramChannel.class, (name, type) -> new NioEventLoopGroup(0, createThreadFactory(name, type))), + EPOLL("epoll", EpollServerSocketChannel.class, EpollSocketChannel.class, EpollDatagramChannel.class, (name, type) -> new EpollEventLoopGroup(0, createThreadFactory(name, type))), + KQUEUE("Kqueue", KQueueServerSocketChannel.class, KQueueSocketChannel.class, KQueueDatagramChannel.class, (name, type) -> new KQueueEventLoopGroup(0, createThreadFactory(name, type))); + + final String name; + final Class serverSocketChannelClass; + final Class socketChannelClass; + final Class datagramChannelClass; + final BiFunction eventLoopGroupFactory; + + TransportType(final String name, final Class serverSocketChannelClass, final Class socketChannelClass, final Class datagramChannelClass, final BiFunction eventLoopGroupFactory) { + this.name = name; + this.serverSocketChannelClass = serverSocketChannelClass; + this.socketChannelClass = socketChannelClass; + this.datagramChannelClass = datagramChannelClass; + this.eventLoopGroupFactory = eventLoopGroupFactory; + } + + @Override + public String toString() { + return this.name; + } + + public EventLoopGroup createEventLoopGroup(final Type type) { + return this.eventLoopGroupFactory.apply(this.name, type); + } + + private static ThreadFactory createThreadFactory(final String name, final Type type) { + return new ThreadFactoryBuilder() + .setNameFormat("Netty " + name + ' ' + type.toString() + " #%d") + .setDaemon(true) + .build(); + } + + public static TransportType bestType() { + if (Epoll.isAvailable()) { + return EPOLL; + } else if (KQueue.isAvailable()) { + return KQUEUE; + } else { + return NIO; + } + } + + public enum Type { + BOSS("Boss"), + WORKER("Worker"); + + private final String name; + + Type(final String name) { + this.name = name; + } + + @Override + public String toString() { + return this.name; + } + } +}