13
0
geforkt von Mirrors/Velocity

Merge pull request #114 from kashike/nwc

Extract channel initialiser and transport type out
Dieser Commit ist enthalten in:
Andrew Steinborn 2018-10-12 19:44:01 -04:00 committet von GitHub
Commit 668725759a
Es konnte kein GPG-Schlüssel zu dieser Signatur gefunden werden
GPG-Schlüssel-ID: 4AEE18F83AFDEB23
4 geänderte Dateien mit 197 neuen und 118 gelöschten Zeilen

Datei anzeigen

@ -1,60 +1,43 @@
package com.velocitypowered.proxy.network;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.velocitypowered.natives.util.Natives;
import com.velocitypowered.proxy.VelocityServer;
import com.velocitypowered.proxy.connection.MinecraftConnection;
import com.velocitypowered.proxy.connection.client.HandshakeSessionHandler;
import com.velocitypowered.proxy.protocol.ProtocolConstants;
import com.velocitypowered.proxy.protocol.StateRegistry;
import com.velocitypowered.proxy.protocol.netty.*;
import com.velocitypowered.proxy.protocol.netty.GS4QueryHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.epoll.*;
import io.netty.channel.kqueue.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.haproxy.HAProxyMessageDecoder;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.WriteBufferWaterMark;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.net.InetSocketAddress;
import java.util.HashSet;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import static com.velocitypowered.proxy.network.Connections.*;
public final class ConnectionManager {
private static final WriteBufferWaterMark SERVER_WRITE_MARK = new WriteBufferWaterMark(1 << 16, 1 << 18);
private static final Logger logger = LogManager.getLogger(ConnectionManager.class);
private static final Logger LOGGER = LogManager.getLogger(ConnectionManager.class);
private final Set<Channel> endpoints = new HashSet<>();
private final TransportType transportType;
private final EventLoopGroup bossGroup;
private final EventLoopGroup workerGroup;
private final VelocityServer server;
public final ServerChannelInitializerHolder serverChannelInitializer;
public ConnectionManager(VelocityServer server) {
public ConnectionManager(final VelocityServer server) {
this.server = server;
this.transportType = TransportType.bestType();
this.bossGroup = transportType.createEventLoopGroup(true);
this.workerGroup = transportType.createEventLoopGroup(false);
this.bossGroup = this.transportType.createEventLoopGroup(TransportType.Type.BOSS);
this.workerGroup = this.transportType.createEventLoopGroup(TransportType.Type.WORKER);
this.serverChannelInitializer = new ServerChannelInitializerHolder(new ServerChannelInitializer(this.server));
this.logChannelInformation();
}
private void logChannelInformation() {
logger.info("Connections will use {} channels, {} compression, {} ciphers", transportType, Natives.compressor.getLoadedVariant(), Natives.cipher.getLoadedVariant());
LOGGER.info("Connections will use {} channels, {} compression, {} ciphers", this.transportType, Natives.compressor.getLoadedVariant(), Natives.cipher.getLoadedVariant());
}
public void bind(final InetSocketAddress address) {
@ -62,28 +45,7 @@ public final class ConnectionManager {
.channel(this.transportType.serverSocketChannelClass)
.group(this.bossGroup, this.workerGroup)
.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, SERVER_WRITE_MARK)
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(final Channel ch) {
ch.pipeline()
.addLast(READ_TIMEOUT, new ReadTimeoutHandler(server.getConfiguration().getReadTimeout(), TimeUnit.SECONDS))
.addLast(LEGACY_PING_DECODER, new LegacyPingDecoder())
.addLast(FRAME_DECODER, new MinecraftVarintFrameDecoder())
.addLast(LEGACY_PING_ENCODER, LegacyPingEncoder.INSTANCE)
.addLast(FRAME_ENCODER, MinecraftVarintLengthEncoder.INSTANCE)
.addLast(MINECRAFT_DECODER, new MinecraftDecoder(ProtocolConstants.Direction.SERVERBOUND))
.addLast(MINECRAFT_ENCODER, new MinecraftEncoder(ProtocolConstants.Direction.CLIENTBOUND));
final MinecraftConnection connection = new MinecraftConnection(ch, server);
connection.setState(StateRegistry.HANDSHAKE);
connection.setSessionHandler(new HandshakeSessionHandler(connection, server));
ch.pipeline().addLast(Connections.HANDLER, connection);
if (server.getConfiguration().isProxyProtocol()) {
ch.pipeline().addFirst(new HAProxyMessageDecoder());
}
}
})
.childHandler(this.serverChannelInitializer.get())
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.IP_TOS, 0x18)
.localAddress(address);
@ -92,27 +54,27 @@ public final class ConnectionManager {
final Channel channel = future.channel();
if (future.isSuccess()) {
this.endpoints.add(channel);
logger.info("Listening on {}", channel.localAddress());
LOGGER.info("Listening on {}", channel.localAddress());
} else {
logger.error("Can't bind to {}", address, future.cause());
LOGGER.error("Can't bind to {}", address, future.cause());
}
});
}
public void queryBind(final String hostname, final int port) {
Bootstrap bootstrap = new Bootstrap()
.channel(transportType.datagramChannelClass)
final Bootstrap bootstrap = new Bootstrap()
.channel(this.transportType.datagramChannelClass)
.group(this.workerGroup)
.handler(new GS4QueryHandler(server))
.handler(new GS4QueryHandler(this.server))
.localAddress(hostname, port);
bootstrap.bind()
.addListener((ChannelFutureListener) future -> {
final Channel channel = future.channel();
if (future.isSuccess()) {
this.endpoints.add(channel);
logger.info("Listening for GS4 query on {}", channel.localAddress());
LOGGER.info("Listening for GS4 query on {}", channel.localAddress());
} else {
logger.error("Can't bind to {}", bootstrap.config().localAddress(), future.cause());
LOGGER.error("Can't bind to {}", bootstrap.config().localAddress(), future.cause());
}
});
}
@ -122,75 +84,21 @@ public final class ConnectionManager {
.channel(this.transportType.socketChannelClass)
.group(this.workerGroup)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, server.getConfiguration().getConnectTimeout());
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, this.server.getConfiguration().getConnectTimeout());
}
public void shutdown() {
for (final Channel endpoint : this.endpoints) {
try {
logger.info("Closing endpoint {}", endpoint.localAddress());
LOGGER.info("Closing endpoint {}", endpoint.localAddress());
endpoint.close().sync();
} catch (final InterruptedException e) {
logger.info("Interrupted whilst closing endpoint", e);
LOGGER.info("Interrupted whilst closing endpoint", e);
}
}
}
private static ThreadFactory createThreadFactory(final String nameFormat) {
return new ThreadFactoryBuilder()
.setNameFormat(nameFormat)
.setDaemon(true)
.build();
}
private enum TransportType {
NIO(NioServerSocketChannel.class, NioSocketChannel.class, NioDatagramChannel.class) {
@Override
public EventLoopGroup createEventLoopGroup(boolean boss) {
String name = "Netty NIO " + (boss ? "Boss" : "Worker") + " #%d";
return new NioEventLoopGroup(0, createThreadFactory(name));
}
},
EPOLL(EpollServerSocketChannel.class, EpollSocketChannel.class, EpollDatagramChannel.class) {
@Override
public EventLoopGroup createEventLoopGroup(boolean boss) {
String name = "Netty Epoll " + (boss ? "Boss" : "Worker") + " #%d";
return new EpollEventLoopGroup(0, createThreadFactory(name));
}
},
KQUEUE(KQueueServerSocketChannel.class, KQueueSocketChannel.class, KQueueDatagramChannel.class) {
@Override
public EventLoopGroup createEventLoopGroup(boolean boss) {
String name = "Netty KQueue " + (boss ? "Boss" : "Worker") + " #%d";
return new KQueueEventLoopGroup(0, createThreadFactory(name));
}
};
private final Class<? extends ServerSocketChannel> serverSocketChannelClass;
private final Class<? extends SocketChannel> socketChannelClass;
private final Class<? extends DatagramChannel> datagramChannelClass;
TransportType(Class<? extends ServerSocketChannel> serverSocketChannelClass, Class<? extends SocketChannel> socketChannelClass, Class<? extends DatagramChannel> datagramChannelClass) {
this.serverSocketChannelClass = serverSocketChannelClass;
this.socketChannelClass = socketChannelClass;
this.datagramChannelClass = datagramChannelClass;
}
@Override
public String toString() {
return name().toLowerCase(Locale.US);
}
public abstract EventLoopGroup createEventLoopGroup(boolean boss);
public static TransportType bestType() {
if (Epoll.isAvailable()) {
return EPOLL;
} else if (KQueue.isAvailable()) {
return KQUEUE;
} else {
return NIO;
}
}
public ServerChannelInitializerHolder getServerChannelInitializer() {
return this.serverChannelInitializer;
}
}

Datei anzeigen

@ -0,0 +1,57 @@
package com.velocitypowered.proxy.network;
import com.velocitypowered.proxy.VelocityServer;
import com.velocitypowered.proxy.connection.MinecraftConnection;
import com.velocitypowered.proxy.connection.client.HandshakeSessionHandler;
import com.velocitypowered.proxy.protocol.ProtocolConstants;
import com.velocitypowered.proxy.protocol.StateRegistry;
import com.velocitypowered.proxy.protocol.netty.LegacyPingDecoder;
import com.velocitypowered.proxy.protocol.netty.LegacyPingEncoder;
import com.velocitypowered.proxy.protocol.netty.MinecraftDecoder;
import com.velocitypowered.proxy.protocol.netty.MinecraftEncoder;
import com.velocitypowered.proxy.protocol.netty.MinecraftVarintFrameDecoder;
import com.velocitypowered.proxy.protocol.netty.MinecraftVarintLengthEncoder;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.handler.codec.haproxy.HAProxyMessageDecoder;
import io.netty.handler.timeout.ReadTimeoutHandler;
import java.util.concurrent.TimeUnit;
import static com.velocitypowered.proxy.network.Connections.FRAME_DECODER;
import static com.velocitypowered.proxy.network.Connections.FRAME_ENCODER;
import static com.velocitypowered.proxy.network.Connections.LEGACY_PING_DECODER;
import static com.velocitypowered.proxy.network.Connections.LEGACY_PING_ENCODER;
import static com.velocitypowered.proxy.network.Connections.MINECRAFT_DECODER;
import static com.velocitypowered.proxy.network.Connections.MINECRAFT_ENCODER;
import static com.velocitypowered.proxy.network.Connections.READ_TIMEOUT;
@SuppressWarnings("WeakerAccess")
public class ServerChannelInitializer extends ChannelInitializer<Channel> {
private final VelocityServer server;
public ServerChannelInitializer(final VelocityServer server) {
this.server = server;
}
@Override
protected void initChannel(final Channel ch) {
ch.pipeline()
.addLast(READ_TIMEOUT, new ReadTimeoutHandler(this.server.getConfiguration().getReadTimeout(), TimeUnit.SECONDS))
.addLast(LEGACY_PING_DECODER, new LegacyPingDecoder())
.addLast(FRAME_DECODER, new MinecraftVarintFrameDecoder())
.addLast(LEGACY_PING_ENCODER, LegacyPingEncoder.INSTANCE)
.addLast(FRAME_ENCODER, MinecraftVarintLengthEncoder.INSTANCE)
.addLast(MINECRAFT_DECODER, new MinecraftDecoder(ProtocolConstants.Direction.SERVERBOUND))
.addLast(MINECRAFT_ENCODER, new MinecraftEncoder(ProtocolConstants.Direction.CLIENTBOUND));
final MinecraftConnection connection = new MinecraftConnection(ch, this.server);
connection.setState(StateRegistry.HANDSHAKE);
connection.setSessionHandler(new HandshakeSessionHandler(connection, this.server));
ch.pipeline().addLast(Connections.HANDLER, connection);
if (ServerChannelInitializer.this.server.getConfiguration().isProxyProtocol()) {
ch.pipeline().addFirst(new HAProxyMessageDecoder());
}
}
}

Datei anzeigen

@ -0,0 +1,28 @@
package com.velocitypowered.proxy.network;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.function.Supplier;
public class ServerChannelInitializerHolder implements Supplier<ChannelInitializer<Channel>> {
private static final Logger LOGGER = LogManager.getLogger(ConnectionManager.class);
private ChannelInitializer<Channel> initializer;
public ServerChannelInitializerHolder(final ChannelInitializer<Channel> initializer) {
this.initializer = initializer;
}
@Override
public ChannelInitializer<Channel> get() {
return this.initializer;
}
@Deprecated
public void set(final ChannelInitializer<Channel> initializer) {
LOGGER.warn("The server channel initializer has been replaced by {}", Thread.currentThread().getStackTrace()[2]);
this.initializer = initializer;
}
}

Datei anzeigen

@ -0,0 +1,86 @@
package com.velocitypowered.proxy.network;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollDatagramChannel;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.kqueue.KQueue;
import io.netty.channel.kqueue.KQueueDatagramChannel;
import io.netty.channel.kqueue.KQueueEventLoopGroup;
import io.netty.channel.kqueue.KQueueServerSocketChannel;
import io.netty.channel.kqueue.KQueueSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.util.concurrent.ThreadFactory;
import java.util.function.BiFunction;
enum TransportType {
NIO("NIO", NioServerSocketChannel.class, NioSocketChannel.class, NioDatagramChannel.class, (name, type) -> new NioEventLoopGroup(0, createThreadFactory(name, type))),
EPOLL("epoll", EpollServerSocketChannel.class, EpollSocketChannel.class, EpollDatagramChannel.class, (name, type) -> new EpollEventLoopGroup(0, createThreadFactory(name, type))),
KQUEUE("Kqueue", KQueueServerSocketChannel.class, KQueueSocketChannel.class, KQueueDatagramChannel.class, (name, type) -> new KQueueEventLoopGroup(0, createThreadFactory(name, type)));
final String name;
final Class<? extends ServerSocketChannel> serverSocketChannelClass;
final Class<? extends SocketChannel> socketChannelClass;
final Class<? extends DatagramChannel> datagramChannelClass;
final BiFunction<String, Type, EventLoopGroup> eventLoopGroupFactory;
TransportType(final String name, final Class<? extends ServerSocketChannel> serverSocketChannelClass, final Class<? extends SocketChannel> socketChannelClass, final Class<? extends DatagramChannel> datagramChannelClass, final BiFunction<String, Type, EventLoopGroup> eventLoopGroupFactory) {
this.name = name;
this.serverSocketChannelClass = serverSocketChannelClass;
this.socketChannelClass = socketChannelClass;
this.datagramChannelClass = datagramChannelClass;
this.eventLoopGroupFactory = eventLoopGroupFactory;
}
@Override
public String toString() {
return this.name;
}
public EventLoopGroup createEventLoopGroup(final Type type) {
return this.eventLoopGroupFactory.apply(this.name, type);
}
private static ThreadFactory createThreadFactory(final String name, final Type type) {
return new ThreadFactoryBuilder()
.setNameFormat("Netty " + name + ' ' + type.toString() + " #%d")
.setDaemon(true)
.build();
}
public static TransportType bestType() {
if (Epoll.isAvailable()) {
return EPOLL;
} else if (KQueue.isAvailable()) {
return KQUEUE;
} else {
return NIO;
}
}
public enum Type {
BOSS("Boss"),
WORKER("Worker");
private final String name;
Type(final String name) {
this.name = name;
}
@Override
public String toString() {
return this.name;
}
}
}