3
0
Mirror von https://github.com/PaperMC/Velocity.git synchronisiert 2024-11-17 05:20:14 +01:00

Extract channel initialiser and transport type out

Dieser Commit ist enthalten in:
kashike 2018-10-12 16:21:43 -07:00
Ursprung f10a3fa1ac
Commit 46a7b52a13
4 geänderte Dateien mit 197 neuen und 118 gelöschten Zeilen

Datei anzeigen

@ -1,60 +1,43 @@
package com.velocitypowered.proxy.network; package com.velocitypowered.proxy.network;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.velocitypowered.natives.util.Natives; import com.velocitypowered.natives.util.Natives;
import com.velocitypowered.proxy.VelocityServer; import com.velocitypowered.proxy.VelocityServer;
import com.velocitypowered.proxy.connection.MinecraftConnection; import com.velocitypowered.proxy.protocol.netty.GS4QueryHandler;
import com.velocitypowered.proxy.connection.client.HandshakeSessionHandler;
import com.velocitypowered.proxy.protocol.ProtocolConstants;
import com.velocitypowered.proxy.protocol.StateRegistry;
import com.velocitypowered.proxy.protocol.netty.*;
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*; import io.netty.channel.Channel;
import io.netty.channel.epoll.*; import io.netty.channel.ChannelFutureListener;
import io.netty.channel.kqueue.*; import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.DatagramChannel; import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.haproxy.HAProxyMessageDecoder;
import io.netty.handler.timeout.ReadTimeoutHandler;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.HashSet; import java.util.HashSet;
import java.util.Locale;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import static com.velocitypowered.proxy.network.Connections.*;
public final class ConnectionManager { public final class ConnectionManager {
private static final WriteBufferWaterMark SERVER_WRITE_MARK = new WriteBufferWaterMark(1 << 16, 1 << 18); private static final WriteBufferWaterMark SERVER_WRITE_MARK = new WriteBufferWaterMark(1 << 16, 1 << 18);
private static final Logger LOGGER = LogManager.getLogger(ConnectionManager.class);
private static final Logger logger = LogManager.getLogger(ConnectionManager.class);
private final Set<Channel> endpoints = new HashSet<>(); private final Set<Channel> endpoints = new HashSet<>();
private final TransportType transportType; private final TransportType transportType;
private final EventLoopGroup bossGroup; private final EventLoopGroup bossGroup;
private final EventLoopGroup workerGroup; private final EventLoopGroup workerGroup;
private final VelocityServer server; private final VelocityServer server;
public final ServerChannelInitializerHolder serverChannelInitializer;
public ConnectionManager(VelocityServer server) { public ConnectionManager(final VelocityServer server) {
this.server = server; this.server = server;
this.transportType = TransportType.bestType(); this.transportType = TransportType.bestType();
this.bossGroup = transportType.createEventLoopGroup(true); this.bossGroup = this.transportType.createEventLoopGroup(TransportType.Type.BOSS);
this.workerGroup = transportType.createEventLoopGroup(false); this.workerGroup = this.transportType.createEventLoopGroup(TransportType.Type.WORKER);
this.serverChannelInitializer = new ServerChannelInitializerHolder(new ServerChannelInitializer(this.server));
this.logChannelInformation(); this.logChannelInformation();
} }
private void logChannelInformation() { private void logChannelInformation() {
logger.info("Connections will use {} channels, {} compression, {} ciphers", transportType, Natives.compressor.getLoadedVariant(), Natives.cipher.getLoadedVariant()); LOGGER.info("Connections will use {} channels, {} compression, {} ciphers", this.transportType, Natives.compressor.getLoadedVariant(), Natives.cipher.getLoadedVariant());
} }
public void bind(final InetSocketAddress address) { public void bind(final InetSocketAddress address) {
@ -62,28 +45,7 @@ public final class ConnectionManager {
.channel(this.transportType.serverSocketChannelClass) .channel(this.transportType.serverSocketChannelClass)
.group(this.bossGroup, this.workerGroup) .group(this.bossGroup, this.workerGroup)
.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, SERVER_WRITE_MARK) .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, SERVER_WRITE_MARK)
.childHandler(new ChannelInitializer<Channel>() { .childHandler(this.serverChannelInitializer.get())
@Override
protected void initChannel(final Channel ch) {
ch.pipeline()
.addLast(READ_TIMEOUT, new ReadTimeoutHandler(server.getConfiguration().getReadTimeout(), TimeUnit.SECONDS))
.addLast(LEGACY_PING_DECODER, new LegacyPingDecoder())
.addLast(FRAME_DECODER, new MinecraftVarintFrameDecoder())
.addLast(LEGACY_PING_ENCODER, LegacyPingEncoder.INSTANCE)
.addLast(FRAME_ENCODER, MinecraftVarintLengthEncoder.INSTANCE)
.addLast(MINECRAFT_DECODER, new MinecraftDecoder(ProtocolConstants.Direction.SERVERBOUND))
.addLast(MINECRAFT_ENCODER, new MinecraftEncoder(ProtocolConstants.Direction.CLIENTBOUND));
final MinecraftConnection connection = new MinecraftConnection(ch, server);
connection.setState(StateRegistry.HANDSHAKE);
connection.setSessionHandler(new HandshakeSessionHandler(connection, server));
ch.pipeline().addLast(Connections.HANDLER, connection);
if (server.getConfiguration().isProxyProtocol()) {
ch.pipeline().addFirst(new HAProxyMessageDecoder());
}
}
})
.childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.IP_TOS, 0x18) .childOption(ChannelOption.IP_TOS, 0x18)
.localAddress(address); .localAddress(address);
@ -92,27 +54,27 @@ public final class ConnectionManager {
final Channel channel = future.channel(); final Channel channel = future.channel();
if (future.isSuccess()) { if (future.isSuccess()) {
this.endpoints.add(channel); this.endpoints.add(channel);
logger.info("Listening on {}", channel.localAddress()); LOGGER.info("Listening on {}", channel.localAddress());
} else { } else {
logger.error("Can't bind to {}", address, future.cause()); LOGGER.error("Can't bind to {}", address, future.cause());
} }
}); });
} }
public void queryBind(final String hostname, final int port) { public void queryBind(final String hostname, final int port) {
Bootstrap bootstrap = new Bootstrap() final Bootstrap bootstrap = new Bootstrap()
.channel(transportType.datagramChannelClass) .channel(this.transportType.datagramChannelClass)
.group(this.workerGroup) .group(this.workerGroup)
.handler(new GS4QueryHandler(server)) .handler(new GS4QueryHandler(this.server))
.localAddress(hostname, port); .localAddress(hostname, port);
bootstrap.bind() bootstrap.bind()
.addListener((ChannelFutureListener) future -> { .addListener((ChannelFutureListener) future -> {
final Channel channel = future.channel(); final Channel channel = future.channel();
if (future.isSuccess()) { if (future.isSuccess()) {
this.endpoints.add(channel); this.endpoints.add(channel);
logger.info("Listening for GS4 query on {}", channel.localAddress()); LOGGER.info("Listening for GS4 query on {}", channel.localAddress());
} else { } else {
logger.error("Can't bind to {}", bootstrap.config().localAddress(), future.cause()); LOGGER.error("Can't bind to {}", bootstrap.config().localAddress(), future.cause());
} }
}); });
} }
@ -122,75 +84,21 @@ public final class ConnectionManager {
.channel(this.transportType.socketChannelClass) .channel(this.transportType.socketChannelClass)
.group(this.workerGroup) .group(this.workerGroup)
.option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, server.getConfiguration().getConnectTimeout()); .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, this.server.getConfiguration().getConnectTimeout());
} }
public void shutdown() { public void shutdown() {
for (final Channel endpoint : this.endpoints) { for (final Channel endpoint : this.endpoints) {
try { try {
logger.info("Closing endpoint {}", endpoint.localAddress()); LOGGER.info("Closing endpoint {}", endpoint.localAddress());
endpoint.close().sync(); endpoint.close().sync();
} catch (final InterruptedException e) { } catch (final InterruptedException e) {
logger.info("Interrupted whilst closing endpoint", e); LOGGER.info("Interrupted whilst closing endpoint", e);
} }
} }
} }
private static ThreadFactory createThreadFactory(final String nameFormat) { public ServerChannelInitializerHolder getServerChannelInitializer() {
return new ThreadFactoryBuilder() return this.serverChannelInitializer;
.setNameFormat(nameFormat)
.setDaemon(true)
.build();
}
private enum TransportType {
NIO(NioServerSocketChannel.class, NioSocketChannel.class, NioDatagramChannel.class) {
@Override
public EventLoopGroup createEventLoopGroup(boolean boss) {
String name = "Netty NIO " + (boss ? "Boss" : "Worker") + " #%d";
return new NioEventLoopGroup(0, createThreadFactory(name));
}
},
EPOLL(EpollServerSocketChannel.class, EpollSocketChannel.class, EpollDatagramChannel.class) {
@Override
public EventLoopGroup createEventLoopGroup(boolean boss) {
String name = "Netty Epoll " + (boss ? "Boss" : "Worker") + " #%d";
return new EpollEventLoopGroup(0, createThreadFactory(name));
}
},
KQUEUE(KQueueServerSocketChannel.class, KQueueSocketChannel.class, KQueueDatagramChannel.class) {
@Override
public EventLoopGroup createEventLoopGroup(boolean boss) {
String name = "Netty KQueue " + (boss ? "Boss" : "Worker") + " #%d";
return new KQueueEventLoopGroup(0, createThreadFactory(name));
}
};
private final Class<? extends ServerSocketChannel> serverSocketChannelClass;
private final Class<? extends SocketChannel> socketChannelClass;
private final Class<? extends DatagramChannel> datagramChannelClass;
TransportType(Class<? extends ServerSocketChannel> serverSocketChannelClass, Class<? extends SocketChannel> socketChannelClass, Class<? extends DatagramChannel> datagramChannelClass) {
this.serverSocketChannelClass = serverSocketChannelClass;
this.socketChannelClass = socketChannelClass;
this.datagramChannelClass = datagramChannelClass;
}
@Override
public String toString() {
return name().toLowerCase(Locale.US);
}
public abstract EventLoopGroup createEventLoopGroup(boolean boss);
public static TransportType bestType() {
if (Epoll.isAvailable()) {
return EPOLL;
} else if (KQueue.isAvailable()) {
return KQUEUE;
} else {
return NIO;
}
}
} }
} }

Datei anzeigen

@ -0,0 +1,57 @@
package com.velocitypowered.proxy.network;
import com.velocitypowered.proxy.VelocityServer;
import com.velocitypowered.proxy.connection.MinecraftConnection;
import com.velocitypowered.proxy.connection.client.HandshakeSessionHandler;
import com.velocitypowered.proxy.protocol.ProtocolConstants;
import com.velocitypowered.proxy.protocol.StateRegistry;
import com.velocitypowered.proxy.protocol.netty.LegacyPingDecoder;
import com.velocitypowered.proxy.protocol.netty.LegacyPingEncoder;
import com.velocitypowered.proxy.protocol.netty.MinecraftDecoder;
import com.velocitypowered.proxy.protocol.netty.MinecraftEncoder;
import com.velocitypowered.proxy.protocol.netty.MinecraftVarintFrameDecoder;
import com.velocitypowered.proxy.protocol.netty.MinecraftVarintLengthEncoder;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.handler.codec.haproxy.HAProxyMessageDecoder;
import io.netty.handler.timeout.ReadTimeoutHandler;
import java.util.concurrent.TimeUnit;
import static com.velocitypowered.proxy.network.Connections.FRAME_DECODER;
import static com.velocitypowered.proxy.network.Connections.FRAME_ENCODER;
import static com.velocitypowered.proxy.network.Connections.LEGACY_PING_DECODER;
import static com.velocitypowered.proxy.network.Connections.LEGACY_PING_ENCODER;
import static com.velocitypowered.proxy.network.Connections.MINECRAFT_DECODER;
import static com.velocitypowered.proxy.network.Connections.MINECRAFT_ENCODER;
import static com.velocitypowered.proxy.network.Connections.READ_TIMEOUT;
@SuppressWarnings("WeakerAccess")
public class ServerChannelInitializer extends ChannelInitializer<Channel> {
private final VelocityServer server;
public ServerChannelInitializer(final VelocityServer server) {
this.server = server;
}
@Override
protected void initChannel(final Channel ch) {
ch.pipeline()
.addLast(READ_TIMEOUT, new ReadTimeoutHandler(this.server.getConfiguration().getReadTimeout(), TimeUnit.SECONDS))
.addLast(LEGACY_PING_DECODER, new LegacyPingDecoder())
.addLast(FRAME_DECODER, new MinecraftVarintFrameDecoder())
.addLast(LEGACY_PING_ENCODER, LegacyPingEncoder.INSTANCE)
.addLast(FRAME_ENCODER, MinecraftVarintLengthEncoder.INSTANCE)
.addLast(MINECRAFT_DECODER, new MinecraftDecoder(ProtocolConstants.Direction.SERVERBOUND))
.addLast(MINECRAFT_ENCODER, new MinecraftEncoder(ProtocolConstants.Direction.CLIENTBOUND));
final MinecraftConnection connection = new MinecraftConnection(ch, this.server);
connection.setState(StateRegistry.HANDSHAKE);
connection.setSessionHandler(new HandshakeSessionHandler(connection, this.server));
ch.pipeline().addLast(Connections.HANDLER, connection);
if (ServerChannelInitializer.this.server.getConfiguration().isProxyProtocol()) {
ch.pipeline().addFirst(new HAProxyMessageDecoder());
}
}
}

Datei anzeigen

@ -0,0 +1,28 @@
package com.velocitypowered.proxy.network;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.function.Supplier;
public class ServerChannelInitializerHolder implements Supplier<ChannelInitializer<Channel>> {
private static final Logger LOGGER = LogManager.getLogger(ConnectionManager.class);
private ChannelInitializer<Channel> initializer;
public ServerChannelInitializerHolder(final ChannelInitializer<Channel> initializer) {
this.initializer = initializer;
}
@Override
public ChannelInitializer<Channel> get() {
return this.initializer;
}
@Deprecated
public void set(final ChannelInitializer<Channel> initializer) {
LOGGER.warn("The server channel initializer has been replaced by {}", Thread.currentThread().getStackTrace()[2]);
this.initializer = initializer;
}
}

Datei anzeigen

@ -0,0 +1,86 @@
package com.velocitypowered.proxy.network;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollDatagramChannel;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.kqueue.KQueue;
import io.netty.channel.kqueue.KQueueDatagramChannel;
import io.netty.channel.kqueue.KQueueEventLoopGroup;
import io.netty.channel.kqueue.KQueueServerSocketChannel;
import io.netty.channel.kqueue.KQueueSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.util.concurrent.ThreadFactory;
import java.util.function.BiFunction;
enum TransportType {
NIO("NIO", NioServerSocketChannel.class, NioSocketChannel.class, NioDatagramChannel.class, (name, type) -> new NioEventLoopGroup(0, createThreadFactory(name, type))),
EPOLL("epoll", EpollServerSocketChannel.class, EpollSocketChannel.class, EpollDatagramChannel.class, (name, type) -> new EpollEventLoopGroup(0, createThreadFactory(name, type))),
KQUEUE("Kqueue", KQueueServerSocketChannel.class, KQueueSocketChannel.class, KQueueDatagramChannel.class, (name, type) -> new KQueueEventLoopGroup(0, createThreadFactory(name, type)));
final String name;
final Class<? extends ServerSocketChannel> serverSocketChannelClass;
final Class<? extends SocketChannel> socketChannelClass;
final Class<? extends DatagramChannel> datagramChannelClass;
final BiFunction<String, Type, EventLoopGroup> eventLoopGroupFactory;
TransportType(final String name, final Class<? extends ServerSocketChannel> serverSocketChannelClass, final Class<? extends SocketChannel> socketChannelClass, final Class<? extends DatagramChannel> datagramChannelClass, final BiFunction<String, Type, EventLoopGroup> eventLoopGroupFactory) {
this.name = name;
this.serverSocketChannelClass = serverSocketChannelClass;
this.socketChannelClass = socketChannelClass;
this.datagramChannelClass = datagramChannelClass;
this.eventLoopGroupFactory = eventLoopGroupFactory;
}
@Override
public String toString() {
return this.name;
}
public EventLoopGroup createEventLoopGroup(final Type type) {
return this.eventLoopGroupFactory.apply(this.name, type);
}
private static ThreadFactory createThreadFactory(final String name, final Type type) {
return new ThreadFactoryBuilder()
.setNameFormat("Netty " + name + ' ' + type.toString() + " #%d")
.setDaemon(true)
.build();
}
public static TransportType bestType() {
if (Epoll.isAvailable()) {
return EPOLL;
} else if (KQueue.isAvailable()) {
return KQUEUE;
} else {
return NIO;
}
}
public enum Type {
BOSS("Boss"),
WORKER("Worker");
private final String name;
Type(final String name) {
this.name = name;
}
@Override
public String toString() {
return this.name;
}
}
}