From 7186970386ca7b9887fc79e00510154f2b248d86 Mon Sep 17 00:00:00 2001 From: md_5 <md_5@live.com.au> Date: Fri, 21 Jun 2013 18:23:00 +1000 Subject: [PATCH] Netty diff --git a/pom.xml b/pom.xml index 8c9f66b..a33020e 100644 --- a/pom.xml +++ b/pom.xml @@ -132,6 +132,16 @@ <artifactId>trove4j</artifactId> <version>3.0.2</version> </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-all</artifactId> + <version>4.0.0.CR1</version> + </dependency> + <dependency> + <groupId>org.javassist</groupId> + <artifactId>javassist</artifactId> + <version>3.17.1-GA</version> + </dependency> </dependencies> <!-- This builds a completely 'ready to start' jar with all dependencies inside --> diff --git a/src/main/java/net/minecraft/server/DedicatedServer.java b/src/main/java/net/minecraft/server/DedicatedServer.java index 59444cb..e101f95 100644 --- a/src/main/java/net/minecraft/server/DedicatedServer.java +++ b/src/main/java/net/minecraft/server/DedicatedServer.java @@ -100,7 +100,7 @@ public class DedicatedServer extends MinecraftServer implements IMinecraftServer this.getLogger().info("Starting Minecraft server on " + (this.getServerIp().length() == 0 ? "*" : this.getServerIp()) + ":" + this.G()); try { - this.r = new DedicatedServerConnection(this, inetaddress, this.G()); + this.r = new org.spigotmc.MultiplexingServerConnection(this); // Spigot } catch (Throwable ioexception) { // CraftBukkit - IOException -> Throwable this.getLogger().warning("**** FAILED TO BIND TO PORT!"); this.getLogger().warning("The exception was: {0}", new Object[] { ioexception.toString()}); diff --git a/src/main/java/net/minecraft/server/DedicatedServerConnectionThread.java b/src/main/java/net/minecraft/server/DedicatedServerConnectionThread.java index ef7e10d..5f2e42e 100644 --- a/src/main/java/net/minecraft/server/DedicatedServerConnectionThread.java +++ b/src/main/java/net/minecraft/server/DedicatedServerConnectionThread.java @@ -82,7 +82,7 @@ public class DedicatedServerConnectionThread extends Thread { PendingConnection pendingconnection = new PendingConnection(this.e.d(), socket, "Connection #" + this.c++); - this.a(pendingconnection); + ((org.spigotmc.MultiplexingServerConnection) this.e.d().ae()).register(pendingconnection); // Spigot } catch (IOException ioexception) { this.e.d().getLogger().warning("DSCT: " + ioexception.getMessage()); // CraftBukkit } diff --git a/src/main/java/net/minecraft/server/INetworkManager.java b/src/main/java/net/minecraft/server/INetworkManager.java new file mode 100644 index 0000000..6fcc5d7 --- /dev/null +++ b/src/main/java/net/minecraft/server/INetworkManager.java @@ -0,0 +1,26 @@ +package net.minecraft.server; + +import java.net.SocketAddress; + +public interface INetworkManager { + + void a(Connection connection); + + void queue(Packet packet); + + void a(); + + void b(); + + SocketAddress getSocketAddress(); + + void d(); + + int e(); + + void a(String s, Object... aobject); + + java.net.Socket getSocket(); // Spigot + + void setSocketAddress(java.net.SocketAddress address); // Spigot +} diff --git a/src/main/java/net/minecraft/server/NetworkManager.java b/src/main/java/net/minecraft/server/NetworkManager.java index 1862863..5a24f2a 100644 --- a/src/main/java/net/minecraft/server/NetworkManager.java +++ b/src/main/java/net/minecraft/server/NetworkManager.java @@ -24,7 +24,7 @@ public class NetworkManager implements INetworkManager { private final Object h = new Object(); private final IConsoleLogManager i; public Socket socket; // CraftBukkit - private -> public - private final SocketAddress k; + private SocketAddress k; // Spigot - remove final private volatile DataInputStream input; private volatile DataOutputStream output; private volatile boolean n = true; @@ -369,4 +369,6 @@ public class NetworkManager implements INetworkManager { static Thread h(NetworkManager networkmanager) { return networkmanager.u; } + + public void setSocketAddress(SocketAddress address) { k = address; } // Spigot } diff --git a/src/main/java/net/minecraft/server/Packet51MapChunk.java b/src/main/java/net/minecraft/server/Packet51MapChunk.java index efe102e..e488fa8 100644 --- a/src/main/java/net/minecraft/server/Packet51MapChunk.java +++ b/src/main/java/net/minecraft/server/Packet51MapChunk.java @@ -42,7 +42,7 @@ public class Packet51MapChunk extends Packet { this.b = chunk.z; this.e = flag; ChunkMap chunkmap = a(chunk, flag, i); - Deflater deflater = new Deflater(-1); + Deflater deflater = new Deflater(4); this.d = chunkmap.c; this.c = chunkmap.b; diff --git a/src/main/java/net/minecraft/server/Packet56MapChunkBulk.java b/src/main/java/net/minecraft/server/Packet56MapChunkBulk.java index a2cd9b0..f586415 100644 --- a/src/main/java/net/minecraft/server/Packet56MapChunkBulk.java +++ b/src/main/java/net/minecraft/server/Packet56MapChunkBulk.java @@ -24,7 +24,7 @@ public class Packet56MapChunkBulk extends Packet { @Override protected Deflater initialValue() { // Don't use higher compression level, slows things down too much - return new Deflater(6); + return new Deflater(4); // Spigot - use lower compression level still } }; // CraftBukkit end diff --git a/src/main/java/net/minecraft/server/PendingConnection.java b/src/main/java/net/minecraft/server/PendingConnection.java index 17cfacc..a945892 100644 --- a/src/main/java/net/minecraft/server/PendingConnection.java +++ b/src/main/java/net/minecraft/server/PendingConnection.java @@ -17,7 +17,7 @@ public class PendingConnection extends Connection { private static Random random = new Random(); private byte[] d; private final MinecraftServer server; - public final NetworkManager networkManager; + public final INetworkManager networkManager; public boolean b = false; private int f = 0; private String g = null; @@ -27,10 +27,15 @@ public class PendingConnection extends Connection { private SecretKey k = null; public String hostname = ""; // CraftBukkit - add field + public PendingConnection(MinecraftServer minecraftserver, org.spigotmc.netty.NettyNetworkManager networkManager) { + this.server = minecraftserver; + this.networkManager = networkManager; + } + public PendingConnection(MinecraftServer minecraftserver, Socket socket, String s) throws java.io.IOException { // CraftBukkit - throws IOException this.server = minecraftserver; this.networkManager = new NetworkManager(minecraftserver.getLogger(), socket, s, this, minecraftserver.F().getPrivate()); - this.networkManager.e = 0; + // this.networkManager.e = 0; } // CraftBukkit start @@ -146,7 +151,7 @@ public class PendingConnection extends Connection { // CraftBukkit org.bukkit.event.server.ServerListPingEvent pingEvent = org.bukkit.craftbukkit.event.CraftEventFactory.callServerListPingEvent(this.server.server, getSocket().getInetAddress(), this.server.getMotd(), playerlist.getPlayerCount(), playerlist.getMaxPlayers()); - if (packet254getinfo.a == 1) { + if (true) { // CraftBukkit start - Fix decompile issues, don't create a list from an array Object[] list = new Object[] { 1, 61, this.server.getVersion(), pingEvent.getMotd(), playerlist.getPlayerCount(), pingEvent.getMaxPlayers() }; @@ -173,9 +178,11 @@ public class PendingConnection extends Connection { this.networkManager.queue(new Packet255KickDisconnect(s)); this.networkManager.d(); - if (inetaddress != null && this.server.ae() instanceof DedicatedServerConnection) { - ((DedicatedServerConnection) this.server.ae()).a(inetaddress); + // Spigot start + if (inetaddress != null) { + ((org.spigotmc.MultiplexingServerConnection) this.server.ae()).unThrottle(inetaddress); } + // Spigot end this.b = true; } catch (Exception exception) { diff --git a/src/main/java/org/bukkit/craftbukkit/CraftServer.java b/src/main/java/org/bukkit/craftbukkit/CraftServer.java index 6e6fe1c..68694de 100644 --- a/src/main/java/org/bukkit/craftbukkit/CraftServer.java +++ b/src/main/java/org/bukkit/craftbukkit/CraftServer.java @@ -1369,4 +1369,20 @@ public final class CraftServer implements Server { public CraftScoreboardManager getScoreboardManager() { return scoreboardManager; } + + // Spigot start + @SuppressWarnings("unchecked") + public java.util.Collection<java.net.InetSocketAddress> getSecondaryHosts() { + java.util.Collection<java.net.InetSocketAddress> ret = new java.util.HashSet<java.net.InetSocketAddress>(); + List<?> listeners = configuration.getList("listeners"); + if (listeners != null) { + for (Object o : listeners) { + + Map<String, Object> sect = (Map<String, Object>) o; + ret.add(new java.net.InetSocketAddress((String) sect.get("address"), (Integer) sect.get("port"))); + } + } + return ret; + } + // Spigot end } diff --git a/src/main/java/org/spigotmc/MultiplexingServerConnection.java b/src/main/java/org/spigotmc/MultiplexingServerConnection.java new file mode 100644 index 0000000..386c2f8 --- /dev/null +++ b/src/main/java/org/spigotmc/MultiplexingServerConnection.java @@ -0,0 +1,136 @@ +package org.spigotmc; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.logging.Level; +import net.minecraft.server.DedicatedServerConnection; +import net.minecraft.server.MinecraftServer; +import net.minecraft.server.PendingConnection; +import net.minecraft.server.ServerConnection; +import org.bukkit.Bukkit; + +public class MultiplexingServerConnection extends ServerConnection +{ + + private final Collection<ServerConnection> children = new HashSet<ServerConnection>(); + private final List<PendingConnection> pending = Collections.synchronizedList( new ArrayList<PendingConnection>() ); + private final HashMap<InetAddress, Long> throttle = new HashMap<InetAddress, Long>(); + + public MultiplexingServerConnection(MinecraftServer ms) + { + super( ms ); + + for ( SpigotConfig.Listener listener : SpigotConfig.listeners ) + { + try + { + // Calculate address, can't use isEmpty due to Java 5 + InetAddress socketAddress = ( listener.host.length() == 0 ) ? null : InetAddress.getByName( listener.host ); + // Say hello to the log + d().getLogger().info( "Starting listener #" + children.size() + " on " + ( socketAddress == null ? "*" : listener.host ) + ":" + listener.port ); + // Start connection: Netty / non Netty + ServerConnection l = ( listener.netty ) ? new DedicatedServerConnection( d(), socketAddress, listener.port ) : new org.spigotmc.netty.NettyServerConnection( d(), socketAddress, listener.port ); + // Register with other connections + children.add( l ); + // Gotta catch em all + } catch ( Throwable t ) + { + // Just print some info to the log + t.printStackTrace(); + d().getLogger().warning( "**** FAILED TO BIND TO PORT!" ); + d().getLogger().warning( "The exception was: {0}", t ); + d().getLogger().warning( "Perhaps a server is already running on that port?" ); + } + } + } + + /** + * close. + */ + @Override + public void a() + { + for ( ServerConnection child : children ) + { + child.a(); + } + } + + /** + * Pulse. This method pulses all connections causing them to update. It is + * called from the main server thread a few times a tick. + */ + @Override + public void b() + { + super.b(); // pulse PlayerConnections + for ( int i = 0; i < pending.size(); ++i ) + { + PendingConnection connection = pending.get( i ); + + try + { + connection.c(); + } catch ( Exception ex ) + { + connection.disconnect( "Internal server error" ); + Bukkit.getServer().getLogger().log( Level.WARNING, "Failed to handle packet: " + ex, ex ); + } + + if ( connection.b ) + { + pending.remove( i-- ); + } + } + } + + /** + * Remove the user from connection throttle. This should fix the server ping + * bugs. + * + * @param address the address to remove + */ + public void unThrottle(InetAddress address) + { + if ( address != null ) + { + synchronized ( throttle ) + { + throttle.remove( address ); + } + } + } + + /** + * Add a connection to the throttle list. + * + * @param address + * @return Whether they must be disconnected + */ + public boolean throttle(InetAddress address) + { + long currentTime = System.currentTimeMillis(); + synchronized ( throttle ) + { + Long value = throttle.get( address ); + if ( value != null && !address.isLoopbackAddress() && currentTime - value < d().server.getConnectionThrottle() ) + { + throttle.put( address, currentTime ); + return true; + } + + throttle.put( address, currentTime ); + } + return false; + } + + public void register(PendingConnection conn) + { + pending.add( conn ); + } +} diff --git a/src/main/java/org/spigotmc/SpigotConfig.java b/src/main/java/org/spigotmc/SpigotConfig.java index b7f3896..910d0de 100644 --- a/src/main/java/org/spigotmc/SpigotConfig.java +++ b/src/main/java/org/spigotmc/SpigotConfig.java @@ -6,6 +6,8 @@ import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.Modifier; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -148,4 +150,47 @@ public class SpigotConfig commands.put( "restart", new RestartCommand( "restart" ) ); WatchdogThread.doStart( timeoutTime, restartOnCrash ); } + + public static class Listener + { + + public String host; + public int port; + public boolean netty; + public long connectionThrottle; + + public Listener(String host, int port, boolean netty, long connectionThrottle) + { + this.host = host; + this.port = port; + this.netty = netty; + this.connectionThrottle = connectionThrottle; + } + } + public static List<Listener> listeners = new ArrayList<Listener>(); + public static int nettyThreads; + private static void listeners() + { + Map<String, Object> def = new HashMap<String, Object>(); + def.put( "host", "default" ); + def.put( "port", "default" ); + def.put( "netty", true ); + def.put( "throttle", "default" ); + + config.addDefault( "listeners", Collections.singletonList( def ) ); + for ( Map<String, Object> info : (List<Map<String, Object>>) config.getList( "listeners" ) ) + { + String host = (String) info.get( "host" ); + if ( "default".equals( host ) ) + { + host = Bukkit.getIp(); + } + int port = ( info.get( "port" ) instanceof Integer ) ? (Integer) info.get( "port" ) : Bukkit.getPort(); + boolean netty = (Boolean) info.get( "netty" ); + long connectionThrottle = ( info.get( "throttle" ) instanceof Number ) ? ( (Number) info.get( "throttle" ) ).longValue() : Bukkit.getConnectionThrottle(); + listeners.add( new Listener( host, port, netty, connectionThrottle ) ); + } + + nettyThreads = getInt( "settings.netty-threads", 3 ); + } } diff --git a/src/main/java/org/spigotmc/netty/CipherBase.java b/src/main/java/org/spigotmc/netty/CipherBase.java new file mode 100644 index 0000000..c75a60f --- /dev/null +++ b/src/main/java/org/spigotmc/netty/CipherBase.java @@ -0,0 +1,54 @@ +package org.spigotmc.netty; + +import io.netty.buffer.ByteBuf; +import javax.crypto.Cipher; +import javax.crypto.ShortBufferException; + +/** + * Class to expose an + * {@link #cipher(io.netty.buffer.ByteBuf, io.netty.buffer.ByteBuf)} method to + * aid in the efficient passing of ByteBuffers through a cipher. + */ +class CipherBase +{ + + private final Cipher cipher; + private ThreadLocal<byte[]> heapInLocal = new EmptyByteThreadLocal(); + private ThreadLocal<byte[]> heapOutLocal = new EmptyByteThreadLocal(); + + private static class EmptyByteThreadLocal extends ThreadLocal<byte[]> + { + + @Override + protected byte[] initialValue() + { + return new byte[ 0 ]; + } + } + + protected CipherBase(Cipher cipher) + { + this.cipher = cipher; + } + + protected void cipher(ByteBuf in, ByteBuf out) throws ShortBufferException + { + byte[] heapIn = heapInLocal.get(); + int readableBytes = in.readableBytes(); + if ( heapIn.length < readableBytes ) + { + heapIn = new byte[ readableBytes ]; + heapInLocal.set( heapIn ); + } + in.readBytes( heapIn, 0, readableBytes ); + + byte[] heapOut = heapOutLocal.get(); + int outputSize = cipher.getOutputSize( readableBytes ); + if ( heapOut.length < outputSize ) + { + heapOut = new byte[ outputSize ]; + heapOutLocal.set( heapOut ); + } + out.writeBytes( heapOut, 0, cipher.update( heapIn, 0, readableBytes, heapOut ) ); + } +} diff --git a/src/main/java/org/spigotmc/netty/CipherDecoder.java b/src/main/java/org/spigotmc/netty/CipherDecoder.java new file mode 100644 index 0000000..98dc3a0 --- /dev/null +++ b/src/main/java/org/spigotmc/netty/CipherDecoder.java @@ -0,0 +1,23 @@ +package org.spigotmc.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToByteDecoder; +import javax.crypto.Cipher; + +class CipherDecoder extends ByteToByteDecoder +{ + + private final CipherBase cipher; + + public CipherDecoder(Cipher cipher) + { + this.cipher = new CipherBase( cipher ); + } + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception + { + cipher.cipher( in, out ); + } +} diff --git a/src/main/java/org/spigotmc/netty/CipherEncoder.java b/src/main/java/org/spigotmc/netty/CipherEncoder.java new file mode 100644 index 0000000..4ff943b --- /dev/null +++ b/src/main/java/org/spigotmc/netty/CipherEncoder.java @@ -0,0 +1,23 @@ +package org.spigotmc.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToByteEncoder; +import javax.crypto.Cipher; + +class CipherEncoder extends ByteToByteEncoder +{ + + private final CipherBase cipher; + + public CipherEncoder(Cipher cipher) + { + this.cipher = new CipherBase( cipher ); + } + + @Override + protected void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception + { + cipher.cipher( in, out ); + } +} diff --git a/src/main/java/org/spigotmc/netty/NettyNetworkManager.java b/src/main/java/org/spigotmc/netty/NettyNetworkManager.java new file mode 100644 index 0000000..fdef0c8 --- /dev/null +++ b/src/main/java/org/spigotmc/netty/NettyNetworkManager.java @@ -0,0 +1,292 @@ +package org.spigotmc.netty; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundMessageHandlerAdapter; +import io.netty.channel.ChannelPromise; +import io.netty.channel.socket.SocketChannel; +import java.net.Socket; +import java.net.SocketAddress; +import java.security.PrivateKey; +import java.util.AbstractList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import javax.crypto.Cipher; +import javax.crypto.SecretKey; +import net.minecraft.server.Connection; +import net.minecraft.server.INetworkManager; +import net.minecraft.server.MinecraftServer; +import net.minecraft.server.Packet; +import net.minecraft.server.Packet252KeyResponse; +import net.minecraft.server.Packet255KickDisconnect; +import net.minecraft.server.PendingConnection; +import net.minecraft.server.PlayerConnection; +import org.spigotmc.MultiplexingServerConnection; + +/** + * This class forms the basis of the Netty integration. It implements + * {@link INetworkManager} and handles all events and inbound messages provided + * by the upstream Netty process. + */ +public class NettyNetworkManager extends ChannelInboundMessageHandlerAdapter<Packet> implements INetworkManager +{ + + private static final ExecutorService threadPool = Executors.newCachedThreadPool( new ThreadFactoryBuilder().setNameFormat( "Async Packet Handler - %1$d" ).build() ); + private static final MinecraftServer server = MinecraftServer.getServer(); + private static final PrivateKey key = server.F().getPrivate(); + private static final MultiplexingServerConnection serverConnection = (MultiplexingServerConnection) server.ae(); + /*========================================================================*/ + private final Queue<Packet> syncPackets = new ConcurrentLinkedQueue<Packet>(); + private final List<Packet> highPriorityQueue = new AbstractList<Packet>() + { + @Override + public void add(int index, Packet element) + { + // NOP + } + + @Override + public Packet get(int index) + { + throw new UnsupportedOperationException(); + } + + @Override + public int size() + { + return 0; + } + }; + private volatile boolean connected; + private Channel channel; + private SocketAddress address; + Connection connection; + private SecretKey secret; + private String dcReason; + private Object[] dcArgs; + private Socket socketAdaptor; + private long writtenBytes; + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception + { + // Channel and address groundwork first + channel = ctx.channel(); + address = channel.remoteAddress(); + // Then the socket adaptor + socketAdaptor = NettySocketAdaptor.adapt( (SocketChannel) channel ); + // Followed by their first handler + connection = new PendingConnection( server, this ); + // Finally register the connection + connected = true; + serverConnection.register( (PendingConnection) connection ); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception + { + a( "disconnect.endOfStream", new Object[ 0 ] ); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception + { + // TODO: Remove this once we are more stable + // Bukkit.getServer().getLogger().severe("======================= Start Netty Debug Log ======================="); + // Bukkit.getServer().getLogger().log(Level.SEVERE, "Error caught whilst handling " + channel, cause); + // Bukkit.getServer().getLogger().severe("======================= End Netty Debug Log ======================="); + // Disconnect with generic reason + exception + a( "disconnect.genericReason", new Object[] + { + "Internal exception: " + cause + } ); + } + + @Override + public void messageReceived(ChannelHandlerContext ctx, final Packet msg) throws Exception + { + if ( connected ) + { + if ( msg instanceof Packet252KeyResponse ) + { + secret = ( (Packet252KeyResponse) msg ).a( key ); + Cipher decrypt = NettyServerConnection.getCipher( Cipher.DECRYPT_MODE, secret ); + channel.pipeline().addBefore( "decoder", "decrypt", new CipherDecoder( decrypt ) ); + } + + if ( msg.a_() ) + { + threadPool.submit( new Runnable() + { + public void run() + { + Packet packet = PacketListener.callReceived( NettyNetworkManager.this, connection, msg ); + if ( packet != null ) + { + packet.handle( connection ); + } + } + } ); + } else + { + syncPackets.add( msg ); + } + } + } + + public Socket getSocket() + { + return socketAdaptor; + } + + /** + * setHandler. Set the {@link NetHandler} used to process received packets. + * + * @param nh the new {@link NetHandler} instance + */ + public void a(Connection nh) + { + connection = nh; + } + + /** + * queue. Queue a packet for sending, or in this case send it to be write it + * straight to the channel. + * + * @param packet the packet to queue + */ + public void queue(Packet packet) + { + // Only send if channel is still connected + if ( connected ) + { + // Process packet via handler + packet = PacketListener.callQueued( this, connection, packet ); + // If handler indicates packet send + if ( packet != null ) + { + highPriorityQueue.add( packet ); + + ChannelPromise promise = channel.newPromise(); + if ( packet instanceof Packet255KickDisconnect ) + { + channel.pipeline().get( OutboundManager.class ).lastFlush = 0; + } + + channel.write( packet, promise ); + if ( packet instanceof Packet252KeyResponse ) + { + Cipher encrypt = NettyServerConnection.getCipher( Cipher.ENCRYPT_MODE, secret ); + channel.pipeline().addBefore( "decoder", "encrypt", new CipherEncoder( encrypt ) ); + } + } + } + } + + /** + * wakeThreads. In Vanilla this method will interrupt the network read and + * write threads, thus waking them. + */ + public void a() + { + } + + /** + * processPackets. Remove up to 1000 packets from the queue and process + * them. This method should only be called from the main server thread. + */ + public void b() + { + for ( int i = 1000; !syncPackets.isEmpty() && i >= 0; i-- ) + { + if ( connection instanceof PendingConnection ? ( (PendingConnection) connection ).b : ( (PlayerConnection) connection ).disconnected ) + { + syncPackets.clear(); + break; + } + + Packet packet = PacketListener.callReceived( this, connection, syncPackets.poll() ); + if ( packet != null ) + { + packet.handle( connection ); + } + } + + // Disconnect via the handler - this performs all plugin related cleanup + logging + if ( !connected && ( dcReason != null || dcArgs != null ) ) + { + connection.a( dcReason, dcArgs ); + } + } + + /** + * getSocketAddress. Return the remote address of the connected user. It is + * important that this method returns a value even after disconnect. + * + * @return the remote address of this connection + */ + public SocketAddress getSocketAddress() + { + return address; + } + + public void setSocketAddress(SocketAddress address) + { + this.address = address; + } + + /** + * close. Close and release all resources associated with this connection. + */ + public void d() + { + if ( connected ) + { + connected = false; + channel.close(); + } + } + + /** + * queueSize. Return the number of packets in the low priority queue. In a + * NIO environment this will always be 0. + * + * @return the size of the packet send queue + */ + public int e() + { + return 0; + } + + /** + * networkShutdown. Shuts down this connection, storing the reason and + * parameters, used to notify the current {@link Connection}. + * + * @param reason the main disconnect reason + * @param arguments additional disconnect arguments, for example, the + * exception which triggered the disconnect. + */ + public void a(String reason, Object... arguments) + { + if ( connected ) + { + dcReason = reason; + dcArgs = arguments; + d(); + } + } + + public long getWrittenBytes() + { + return writtenBytes; + } + + public void addWrittenBytes(int written) + { + writtenBytes += written; + } +} diff --git a/src/main/java/org/spigotmc/netty/NettyServerConnection.java b/src/main/java/org/spigotmc/netty/NettyServerConnection.java new file mode 100644 index 0000000..f232efd --- /dev/null +++ b/src/main/java/org/spigotmc/netty/NettyServerConnection.java @@ -0,0 +1,104 @@ +package org.spigotmc.netty; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelException; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.timeout.ReadTimeoutHandler; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.security.GeneralSecurityException; +import java.security.Key; +import javax.crypto.Cipher; +import javax.crypto.spec.IvParameterSpec; +import net.minecraft.server.MinecraftServer; +import net.minecraft.server.ServerConnection; +import org.spigotmc.MultiplexingServerConnection; + +/** + * This is the NettyServerConnection class. It implements + * {@link ServerConnection} and is the main interface between the Minecraft + * server and this NIO implementation. It handles starting, stopping and + * processing the Netty backend. + */ +public class NettyServerConnection extends ServerConnection +{ + + private final ChannelFuture socket; + private static EventLoopGroup group; + + public NettyServerConnection(final MinecraftServer ms, InetAddress host, int port) + { + super( ms ); + if ( group == null ) + { + group = new NioEventLoopGroup( org.spigotmc.SpigotConfig.nettyThreads, new ThreadFactoryBuilder().setNameFormat( "Netty IO Thread - %1$d" ).build() ); + } + socket = new ServerBootstrap().channel( NioServerSocketChannel.class ).childHandler( new ChannelInitializer() + { + @Override + public void initChannel(Channel ch) throws Exception + { + // Check the throttle + if ( ( (MultiplexingServerConnection) ms.ae() ).throttle( ( (InetSocketAddress) ch.remoteAddress() ).getAddress() ) ) + { + ch.close(); + return; + } + // Set IP_TOS + try + { + ch.config().setOption( ChannelOption.IP_TOS, 0x18 ); + } catch ( ChannelException ex ) + { + // IP_TOS is not supported (Windows XP / Windows Server 2003) + } + + NettyNetworkManager networkManager = new NettyNetworkManager(); + ch.pipeline() + .addLast( "flusher", new OutboundManager( networkManager ) ) + .addLast( "timer", new ReadTimeoutHandler( 30 ) ) + .addLast( "decoder", new PacketDecoder() ) + .addLast( "encoder", new PacketEncoder( networkManager ) ) + .addLast( "manager", networkManager ); + } + } ).childOption( ChannelOption.TCP_NODELAY, false ).group( group ).localAddress( host, port ).bind(); + } + + /** + * Shutdown. This method is called when the server is shutting down and the + * server socket and all clients should be terminated with no further + * action. + */ + @Override + public void a() + { + socket.channel().close().syncUninterruptibly(); + } + + /** + * Return a Minecraft compatible cipher instance from the specified key. + * + * @param opMode the mode to initialize the cipher in + * @param key to use as the initial vector + * @return the initialized cipher + */ + public static Cipher getCipher(int opMode, Key key) + { + try + { + Cipher cip = Cipher.getInstance( "AES/CFB8/NoPadding" ); + cip.init( opMode, key, new IvParameterSpec( key.getEncoded() ) ); + return cip; + } catch ( GeneralSecurityException ex ) + { + throw new RuntimeException( ex ); + } + } +} diff --git a/src/main/java/org/spigotmc/netty/NettySocketAdaptor.java b/src/main/java/org/spigotmc/netty/NettySocketAdaptor.java new file mode 100644 index 0000000..5da8a59 --- /dev/null +++ b/src/main/java/org/spigotmc/netty/NettySocketAdaptor.java @@ -0,0 +1,294 @@ +package org.spigotmc.netty; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelOption; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetAddress; +import java.net.Socket; +import java.net.SocketAddress; +import java.net.SocketException; +import java.nio.channels.SocketChannel; + +/** + * This class wraps a Netty {@link Channel} in a {@link Socket}. It overrides + * all methods in {@link Socket} to ensure that calls are not mistakingly made + * to the unsupported super socket. All operations that can be sanely applied to + * a {@link Channel} are implemented here. Those which cannot will throw an + * {@link UnsupportedOperationException}. + */ +public class NettySocketAdaptor extends Socket +{ + + private final io.netty.channel.socket.SocketChannel ch; + + private NettySocketAdaptor(io.netty.channel.socket.SocketChannel ch) + { + this.ch = ch; + } + + public static NettySocketAdaptor adapt(io.netty.channel.socket.SocketChannel ch) + { + return new NettySocketAdaptor( ch ); + } + + @Override + public void bind(SocketAddress bindpoint) throws IOException + { + ch.bind( bindpoint ).syncUninterruptibly(); + } + + @Override + public synchronized void close() throws IOException + { + ch.close().syncUninterruptibly(); + } + + @Override + public void connect(SocketAddress endpoint) throws IOException + { + ch.connect( endpoint ).syncUninterruptibly(); + } + + @Override + public void connect(SocketAddress endpoint, int timeout) throws IOException + { + ch.config().setConnectTimeoutMillis( timeout ); + ch.connect( endpoint ).syncUninterruptibly(); + } + + @Override + public boolean equals(Object obj) + { + return obj instanceof NettySocketAdaptor && ch.equals( ( (NettySocketAdaptor) obj ).ch ); + } + + @Override + public SocketChannel getChannel() + { + throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." ); + } + + @Override + public InetAddress getInetAddress() + { + return ch.remoteAddress().getAddress(); + } + + @Override + public InputStream getInputStream() throws IOException + { + throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." ); + } + + @Override + public boolean getKeepAlive() throws SocketException + { + return ch.config().getOption( ChannelOption.SO_KEEPALIVE ); + } + + @Override + public InetAddress getLocalAddress() + { + return ch.localAddress().getAddress(); + } + + @Override + public int getLocalPort() + { + return ch.localAddress().getPort(); + } + + @Override + public SocketAddress getLocalSocketAddress() + { + return ch.localAddress(); + } + + @Override + public boolean getOOBInline() throws SocketException + { + throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." ); + } + + @Override + public OutputStream getOutputStream() throws IOException + { + throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." ); + } + + @Override + public int getPort() + { + return ch.remoteAddress().getPort(); + } + + @Override + public synchronized int getReceiveBufferSize() throws SocketException + { + return ch.config().getOption( ChannelOption.SO_RCVBUF ); + } + + @Override + public SocketAddress getRemoteSocketAddress() + { + return ch.remoteAddress(); + } + + @Override + public boolean getReuseAddress() throws SocketException + { + return ch.config().getOption( ChannelOption.SO_REUSEADDR ); + } + + @Override + public synchronized int getSendBufferSize() throws SocketException + { + return ch.config().getOption( ChannelOption.SO_SNDBUF ); + } + + @Override + public int getSoLinger() throws SocketException + { + return ch.config().getOption( ChannelOption.SO_LINGER ); + } + + @Override + public synchronized int getSoTimeout() throws SocketException + { + throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." ); + } + + @Override + public boolean getTcpNoDelay() throws SocketException + { + return ch.config().getOption( ChannelOption.TCP_NODELAY ); + } + + @Override + public int getTrafficClass() throws SocketException + { + return ch.config().getOption( ChannelOption.IP_TOS ); + } + + @Override + public int hashCode() + { + return ch.hashCode(); + } + + @Override + public boolean isBound() + { + return ch.localAddress() != null; + } + + @Override + public boolean isClosed() + { + return !ch.isOpen(); + } + + @Override + public boolean isConnected() + { + return ch.isActive(); + } + + @Override + public boolean isInputShutdown() + { + return ch.isInputShutdown(); + } + + @Override + public boolean isOutputShutdown() + { + return ch.isOutputShutdown(); + } + + @Override + public void sendUrgentData(int data) throws IOException + { + throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." ); + } + + @Override + public void setKeepAlive(boolean on) throws SocketException + { + ch.config().setOption( ChannelOption.SO_KEEPALIVE, on ); + } + + @Override + public void setOOBInline(boolean on) throws SocketException + { + throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." ); + } + + @Override + public void setPerformancePreferences(int connectionTime, int latency, int bandwidth) + { + throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." ); + } + + @Override + public synchronized void setReceiveBufferSize(int size) throws SocketException + { + ch.config().setOption( ChannelOption.SO_RCVBUF, size ); + } + + @Override + public void setReuseAddress(boolean on) throws SocketException + { + ch.config().setOption( ChannelOption.SO_REUSEADDR, on ); + } + + @Override + public synchronized void setSendBufferSize(int size) throws SocketException + { + ch.config().setOption( ChannelOption.SO_SNDBUF, size ); + } + + @Override + public void setSoLinger(boolean on, int linger) throws SocketException + { + ch.config().setOption( ChannelOption.SO_LINGER, linger ); + } + + @Override + public synchronized void setSoTimeout(int timeout) throws SocketException + { + throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." ); + } + + @Override + public void setTcpNoDelay(boolean on) throws SocketException + { + ch.config().setOption( ChannelOption.TCP_NODELAY, on ); + } + + @Override + public void setTrafficClass(int tc) throws SocketException + { + ch.config().setOption( ChannelOption.IP_TOS, tc ); + } + + @Override + public void shutdownInput() throws IOException + { + throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." ); + } + + @Override + public void shutdownOutput() throws IOException + { + ch.shutdownOutput().syncUninterruptibly(); + } + + @Override + public String toString() + { + return ch.toString(); + } +} diff --git a/src/main/java/org/spigotmc/netty/OutboundManager.java b/src/main/java/org/spigotmc/netty/OutboundManager.java new file mode 100644 index 0000000..4f37cb3 --- /dev/null +++ b/src/main/java/org/spigotmc/netty/OutboundManager.java @@ -0,0 +1,29 @@ +package org.spigotmc.netty; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOperationHandlerAdapter; +import io.netty.channel.ChannelPromise; +import net.minecraft.server.PendingConnection; + +class OutboundManager extends ChannelOperationHandlerAdapter +{ + + private static final int FLUSH_TIME = 1; + /*========================================================================*/ + public long lastFlush; + private final NettyNetworkManager manager; + + OutboundManager(NettyNetworkManager manager) + { + this.manager = manager; + } + + public void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception + { + if ( manager.connection instanceof PendingConnection || System.currentTimeMillis() - lastFlush > FLUSH_TIME ) + { + lastFlush = System.currentTimeMillis(); + ctx.flush( promise ); + } + } +} diff --git a/src/main/java/org/spigotmc/netty/PacketDecoder.java b/src/main/java/org/spigotmc/netty/PacketDecoder.java new file mode 100644 index 0000000..29e344a --- /dev/null +++ b/src/main/java/org/spigotmc/netty/PacketDecoder.java @@ -0,0 +1,68 @@ +package org.spigotmc.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufInputStream; +import io.netty.buffer.MessageBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ReplayingDecoder; +import java.io.DataInputStream; +import java.io.EOFException; +import java.io.IOException; +import net.minecraft.server.MinecraftServer; +import net.minecraft.server.Packet; + +/** + * Packet decoding class backed by a reusable {@link DataInputStream} which + * backs the input {@link ByteBuf}. Reads an unsigned byte packet header and + * then decodes the packet accordingly. + */ +public class PacketDecoder extends ReplayingDecoder<ReadState> +{ + + private DataInputStream input; + private Packet packet; + + public PacketDecoder() + { + super( ReadState.HEADER ); + } + + @Override + protected Packet decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception + { + if ( input == null ) + { + input = new DataInputStream( new ByteBufInputStream( in ) ); + } + + while ( true ) + { + switch ( state() ) + { + case HEADER: + short packetId = in.readUnsignedByte(); + packet = Packet.a( MinecraftServer.getServer().getLogger(), packetId ); + if ( packet == null ) + { + throw new IOException( "Bad packet id " + packetId ); + } + checkpoint( ReadState.DATA ); + case DATA: + try + { + packet.a( input ); + } catch ( EOFException ex ) + { + return null; + } + + checkpoint( ReadState.HEADER ); + Packet readPacket = packet; + packet = null; + return readPacket; + default: + throw new IllegalStateException(); + } + } + } +} diff --git a/src/main/java/org/spigotmc/netty/PacketEncoder.java b/src/main/java/org/spigotmc/netty/PacketEncoder.java new file mode 100644 index 0000000..f0880c2 --- /dev/null +++ b/src/main/java/org/spigotmc/netty/PacketEncoder.java @@ -0,0 +1,55 @@ +package org.spigotmc.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufOutputStream; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; +import java.io.DataOutputStream; +import net.minecraft.server.Packet; + +/** + * Netty encoder which takes a packet and encodes it, and adds a byte packet id + * header. + */ +public class PacketEncoder extends MessageToByteEncoder<Packet> +{ + + private ByteBuf outBuf; + private DataOutputStream dataOut; + private final NettyNetworkManager networkManager; + + public PacketEncoder(NettyNetworkManager networkManager) + { + this.networkManager = networkManager; + } + + @Override + public void encode(ChannelHandlerContext ctx, Packet msg, ByteBuf out) throws Exception + { + if ( outBuf == null ) + { + outBuf = ctx.alloc().buffer(); + } + if ( dataOut == null ) + { + dataOut = new DataOutputStream( new ByteBufOutputStream( outBuf ) ); + } + + out.writeByte( msg.n() ); + msg.a( dataOut ); + + networkManager.addWrittenBytes( outBuf.readableBytes() ); + out.writeBytes( outBuf ); + outBuf.discardSomeReadBytes(); + } + + @Override + public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception + { + if ( outBuf != null ) + { + outBuf.release(); + outBuf = null; + } + } +} diff --git a/src/main/java/org/spigotmc/netty/PacketListener.java b/src/main/java/org/spigotmc/netty/PacketListener.java new file mode 100644 index 0000000..965ba12 --- /dev/null +++ b/src/main/java/org/spigotmc/netty/PacketListener.java @@ -0,0 +1,112 @@ +package org.spigotmc.netty; + +import com.google.common.base.Preconditions; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.logging.Level; +import net.minecraft.server.Connection; +import net.minecraft.server.INetworkManager; +import net.minecraft.server.Packet; +import org.bukkit.Bukkit; +import org.bukkit.plugin.Plugin; + +/** + * This class is used for plugins that wish to register to listen to incoming + * and outgoing packets. To use this class, simply create a new instance, + * override the methods you wish to use, and call + * {@link #register(org.spigotmc.netty.PacketListener, org.bukkit.plugin.Plugin)}. + */ +public class PacketListener +{ + + /** + * A mapping of all registered listeners and their owning plugins. + */ + private static final Map<PacketListener, Plugin> listeners = new HashMap<PacketListener, Plugin>(); + /** + * A baked list of all listeners, for efficiency sake. + */ + private static PacketListener[] baked = new PacketListener[ 0 ]; + + /** + * Used to register a handler for receiving notifications of packet + * activity. + * + * @param listener the listener to register + * @param plugin the plugin owning this listener + */ + public static synchronized void register(PacketListener listener, Plugin plugin) + { + Preconditions.checkNotNull( listener, "listener" ); + Preconditions.checkNotNull( plugin, "plugin" ); + Preconditions.checkState( !listeners.containsKey( listener ), "listener already registered" ); + + int size = listeners.size(); + Preconditions.checkState( baked.length == size ); + listeners.put( listener, plugin ); + baked = Arrays.copyOf( baked, size + 1 ); + baked[size] = listener; + } + + static Packet callReceived(INetworkManager networkManager, Connection connection, Packet packet) + { + for ( PacketListener listener : baked ) + { + try + { + packet = listener.packetReceived( networkManager, connection, packet ); + } catch ( Throwable t ) + { + Bukkit.getServer().getLogger().log( Level.SEVERE, "Error whilst firing receive hook for packet", t ); + } + } + return packet; + } + + static Packet callQueued(INetworkManager networkManager, Connection connection, Packet packet) + { + for ( PacketListener listener : baked ) + { + try + { + packet = listener.packetQueued( networkManager, connection, packet ); + } catch ( Throwable t ) + { + Bukkit.getServer().getLogger().log( Level.SEVERE, "Error whilst firing queued hook for packet", t ); + } + } + return packet; + } + + /** + * Called when a packet has been received and is about to be handled by the + * current {@link Connection}. The returned packet will be the packet passed + * on for handling, or in the case of null being returned, not handled at + * all. + * + * @param networkManager the NetworkManager receiving the packet + * @param connection the connection which will handle the packet + * @param packet the received packet + * @return the packet to be handled, or null to cancel + */ + public Packet packetReceived(INetworkManager networkManager, Connection connection, Packet packet) + { + return packet; + } + + /** + * Called when a packet is queued to be sent. The returned packet will be + * the packet sent. In the case of null being returned, the packet will not + * be sent. + * + * @param networkManager the NetworkManager which will send the packet + * @param connection the connection which queued the packet + * @param packet the queue packet + * @return the packet to be sent, or null if the packet will not be sent. + */ + public Packet packetQueued(INetworkManager networkManager, Connection connection, Packet packet) + { + return packet; + } +} diff --git a/src/main/java/org/spigotmc/netty/ReadState.java b/src/main/java/org/spigotmc/netty/ReadState.java new file mode 100644 index 0000000..d3a9cab --- /dev/null +++ b/src/main/java/org/spigotmc/netty/ReadState.java @@ -0,0 +1,17 @@ +package org.spigotmc.netty; + +/** + * Stores the state of the packet currently being read. + */ +public enum ReadState +{ + + /** + * Indicates the byte representing the ID has been read. + */ + HEADER, + /** + * Shows the packet body is being read. + */ + DATA; +} -- 1.8.1.2