geforkt von Mirrors/Paper
1345 Zeilen
50 KiB
Diff
1345 Zeilen
50 KiB
Diff
From 73f2cb6cf3b18d0cca0ee1057d4563ebc46fbded Mon Sep 17 00:00:00 2001
|
|
From: md_5 <md_5@live.com.au>
|
|
Date: Tue, 23 Apr 2013 11:47:32 +1000
|
|
Subject: [PATCH] Netty
|
|
|
|
Implement an uber efficient network engine based on the
|
|
Java NIO framework Netty. This is basically a complete rewrite of the
|
|
Minecraft network engine with many distinct advantages. First and foremost,
|
|
there will no longer be the horrid, and redundant case of 2, or even at
|
|
times, 3 threads per a connection. Instead low level select/epoll based NIO
|
|
is used. The number of threads used for network reading and writing will
|
|
scale automatically to the number of cores for use on your server. In most
|
|
cases this will be around 8 threads for a 4 core server, much better than the
|
|
up to 1000 threads that could be in use at one time with the old engine. To
|
|
facilitate asynchronous packet sending or receiving (currently only chat), a
|
|
thread pool of 16 threads is kept handy. == Plugin incompatibilities As a
|
|
side effect of this change, plugins which rely on very specific
|
|
implementation level details within Minecraft are broken. At this point in
|
|
time, TagAPI and ProtocolLib are affected. If you are a user of ProtocolLib
|
|
you are advised to update to the latest build, where full support is enabled.
|
|
If you are a user of TagAPI, support has not yet been added, so you will need
|
|
to install the updated ProtocolLib so that TagAPI may use its functions. ==
|
|
Stability The code within this commit has been very lightly tested in
|
|
production (300 players for approximately 24 hours), however it is not
|
|
guaranteed to be free from all bugs. If you experence weird connection
|
|
behaviour, reporting the bug and steps to reproduce are advised. You are also
|
|
free to downgrade to the latest recommend build, which is guaranteed to be
|
|
stable. == Summary This commit provides a reduction in threads, which gives
|
|
the CPU / operating system more time to allocate to the main server threads,
|
|
as well as various other side benefits such as chat thread pooling and a
|
|
slight reduction in latency. This commit is licensed under the Creative
|
|
Commons Attribution-ShareAlike 3.0 Unported license.
|
|
|
|
diff --git a/pom.xml b/pom.xml
|
|
index 3e02cc3..f2b6e35 100644
|
|
--- a/pom.xml
|
|
+++ b/pom.xml
|
|
@@ -132,6 +132,11 @@
|
|
<artifactId>trove4j</artifactId>
|
|
<version>3.0.2</version>
|
|
</dependency>
|
|
+ <dependency>
|
|
+ <groupId>io.netty</groupId>
|
|
+ <artifactId>netty-all</artifactId>
|
|
+ <version>4.0.0.CR1</version>
|
|
+ </dependency>
|
|
</dependencies>
|
|
|
|
<!-- This builds a completely 'ready to start' jar with all dependencies inside -->
|
|
diff --git a/src/main/java/net/minecraft/server/DedicatedServer.java b/src/main/java/net/minecraft/server/DedicatedServer.java
|
|
index bd7e41c..b04d8a1 100644
|
|
--- a/src/main/java/net/minecraft/server/DedicatedServer.java
|
|
+++ b/src/main/java/net/minecraft/server/DedicatedServer.java
|
|
@@ -91,10 +91,10 @@ public class DedicatedServer extends MinecraftServer implements IMinecraftServer
|
|
|
|
this.getLogger().info("Generating keypair");
|
|
this.a(MinecraftEncryption.b());
|
|
- this.getLogger().info("Starting Minecraft server on " + (this.getServerIp().length() == 0 ? "*" : this.getServerIp()) + ":" + this.G());
|
|
+ this.a((PlayerList) (new DedicatedPlayerList(this))); // CraftBukkit
|
|
|
|
try {
|
|
- this.r = new DedicatedServerConnection(this, inetaddress, this.G());
|
|
+ this.r = new org.spigotmc.MultiplexingServerConnection(this); // Spigot
|
|
} catch (Throwable ioexception) { // CraftBukkit - IOException -> Throwable
|
|
this.getLogger().warning("**** FAILED TO BIND TO PORT!");
|
|
this.getLogger().warning("The exception was: {0}", new Object[] { ioexception.toString()});
|
|
@@ -102,8 +102,6 @@ public class DedicatedServer extends MinecraftServer implements IMinecraftServer
|
|
return false;
|
|
}
|
|
|
|
- this.a((PlayerList) (new DedicatedPlayerList(this))); // CraftBukkit
|
|
-
|
|
if (!this.getOnlineMode()) {
|
|
this.getLogger().warning("**** SERVER IS RUNNING IN OFFLINE/INSECURE MODE!");
|
|
this.getLogger().warning("The server will make no attempt to authenticate usernames. Beware.");
|
|
diff --git a/src/main/java/net/minecraft/server/DedicatedServerConnectionThread.java b/src/main/java/net/minecraft/server/DedicatedServerConnectionThread.java
|
|
index ef7e10d..5f2e42e 100644
|
|
--- a/src/main/java/net/minecraft/server/DedicatedServerConnectionThread.java
|
|
+++ b/src/main/java/net/minecraft/server/DedicatedServerConnectionThread.java
|
|
@@ -82,7 +82,7 @@ public class DedicatedServerConnectionThread extends Thread {
|
|
|
|
PendingConnection pendingconnection = new PendingConnection(this.e.d(), socket, "Connection #" + this.c++);
|
|
|
|
- this.a(pendingconnection);
|
|
+ ((org.spigotmc.MultiplexingServerConnection) this.e.d().ae()).register(pendingconnection); // Spigot
|
|
} catch (IOException ioexception) {
|
|
this.e.d().getLogger().warning("DSCT: " + ioexception.getMessage()); // CraftBukkit
|
|
}
|
|
diff --git a/src/main/java/net/minecraft/server/INetworkManager.java b/src/main/java/net/minecraft/server/INetworkManager.java
|
|
new file mode 100644
|
|
index 0000000..6fcc5d7
|
|
--- /dev/null
|
|
+++ b/src/main/java/net/minecraft/server/INetworkManager.java
|
|
@@ -0,0 +1,26 @@
|
|
+package net.minecraft.server;
|
|
+
|
|
+import java.net.SocketAddress;
|
|
+
|
|
+public interface INetworkManager {
|
|
+
|
|
+ void a(Connection connection);
|
|
+
|
|
+ void queue(Packet packet);
|
|
+
|
|
+ void a();
|
|
+
|
|
+ void b();
|
|
+
|
|
+ SocketAddress getSocketAddress();
|
|
+
|
|
+ void d();
|
|
+
|
|
+ int e();
|
|
+
|
|
+ void a(String s, Object... aobject);
|
|
+
|
|
+ java.net.Socket getSocket(); // Spigot
|
|
+
|
|
+ void setSocketAddress(java.net.SocketAddress address); // Spigot
|
|
+}
|
|
diff --git a/src/main/java/net/minecraft/server/NetworkManager.java b/src/main/java/net/minecraft/server/NetworkManager.java
|
|
index 1862863..5a24f2a 100644
|
|
--- a/src/main/java/net/minecraft/server/NetworkManager.java
|
|
+++ b/src/main/java/net/minecraft/server/NetworkManager.java
|
|
@@ -24,7 +24,7 @@ public class NetworkManager implements INetworkManager {
|
|
private final Object h = new Object();
|
|
private final IConsoleLogManager i;
|
|
public Socket socket; // CraftBukkit - private -> public
|
|
- private final SocketAddress k;
|
|
+ private SocketAddress k; // Spigot - remove final
|
|
private volatile DataInputStream input;
|
|
private volatile DataOutputStream output;
|
|
private volatile boolean n = true;
|
|
@@ -369,4 +369,6 @@ public class NetworkManager implements INetworkManager {
|
|
static Thread h(NetworkManager networkmanager) {
|
|
return networkmanager.u;
|
|
}
|
|
+
|
|
+ public void setSocketAddress(SocketAddress address) { k = address; } // Spigot
|
|
}
|
|
diff --git a/src/main/java/net/minecraft/server/Packet51MapChunk.java b/src/main/java/net/minecraft/server/Packet51MapChunk.java
|
|
index 84dbb88..617b474 100644
|
|
--- a/src/main/java/net/minecraft/server/Packet51MapChunk.java
|
|
+++ b/src/main/java/net/minecraft/server/Packet51MapChunk.java
|
|
@@ -42,7 +42,7 @@ public class Packet51MapChunk extends Packet {
|
|
this.b = chunk.z;
|
|
this.e = flag;
|
|
ChunkMap chunkmap = a(chunk, flag, i);
|
|
- Deflater deflater = new Deflater(-1);
|
|
+ Deflater deflater = new Deflater(4);
|
|
|
|
this.d = chunkmap.c;
|
|
this.c = chunkmap.b;
|
|
diff --git a/src/main/java/net/minecraft/server/Packet56MapChunkBulk.java b/src/main/java/net/minecraft/server/Packet56MapChunkBulk.java
|
|
index 9f8afe3..a89c9c9 100644
|
|
--- a/src/main/java/net/minecraft/server/Packet56MapChunkBulk.java
|
|
+++ b/src/main/java/net/minecraft/server/Packet56MapChunkBulk.java
|
|
@@ -24,7 +24,7 @@ public class Packet56MapChunkBulk extends Packet {
|
|
@Override
|
|
protected Deflater initialValue() {
|
|
// Don't use higher compression level, slows things down too much
|
|
- return new Deflater(6);
|
|
+ return new Deflater(4); // Spigot - use lower compression level still
|
|
}
|
|
};
|
|
// CraftBukkit end
|
|
diff --git a/src/main/java/net/minecraft/server/PendingConnection.java b/src/main/java/net/minecraft/server/PendingConnection.java
|
|
index eb474f5..836ad94 100644
|
|
--- a/src/main/java/net/minecraft/server/PendingConnection.java
|
|
+++ b/src/main/java/net/minecraft/server/PendingConnection.java
|
|
@@ -17,7 +17,7 @@ public class PendingConnection extends Connection {
|
|
private static Random random = new Random();
|
|
private byte[] d;
|
|
private final MinecraftServer server;
|
|
- public final NetworkManager networkManager;
|
|
+ public final INetworkManager networkManager;
|
|
public boolean b = false;
|
|
private int f = 0;
|
|
private String g = null;
|
|
@@ -27,10 +27,15 @@ public class PendingConnection extends Connection {
|
|
private SecretKey k = null;
|
|
public String hostname = ""; // CraftBukkit - add field
|
|
|
|
+ public PendingConnection(MinecraftServer minecraftserver, org.spigotmc.netty.NettyNetworkManager networkManager) {
|
|
+ this.server = minecraftserver;
|
|
+ this.networkManager = networkManager;
|
|
+ }
|
|
+
|
|
public PendingConnection(MinecraftServer minecraftserver, Socket socket, String s) throws java.io.IOException { // CraftBukkit - throws IOException
|
|
this.server = minecraftserver;
|
|
this.networkManager = new NetworkManager(minecraftserver.getLogger(), socket, s, this, minecraftserver.F().getPrivate());
|
|
- this.networkManager.e = 0;
|
|
+ // this.networkManager.e = 0;
|
|
}
|
|
|
|
// CraftBukkit start
|
|
@@ -146,7 +151,7 @@ public class PendingConnection extends Connection {
|
|
// CraftBukkit
|
|
org.bukkit.event.server.ServerListPingEvent pingEvent = org.bukkit.craftbukkit.event.CraftEventFactory.callServerListPingEvent(this.server.server, getSocket().getInetAddress(), this.server.getMotd(), playerlist.getPlayerCount(), playerlist.getMaxPlayers());
|
|
|
|
- if (packet254getinfo.a == 1) {
|
|
+ if (true) {
|
|
// CraftBukkit start - Fix decompile issues, don't create a list from an array
|
|
Object[] list = new Object[] { 1, 60, this.server.getVersion(), pingEvent.getMotd(), playerlist.getPlayerCount(), pingEvent.getMaxPlayers() };
|
|
|
|
@@ -173,9 +178,11 @@ public class PendingConnection extends Connection {
|
|
|
|
this.networkManager.queue(new Packet255KickDisconnect(s));
|
|
this.networkManager.d();
|
|
- if (inetaddress != null && this.server.ae() instanceof DedicatedServerConnection) {
|
|
- ((DedicatedServerConnection) this.server.ae()).a(inetaddress);
|
|
+ // Spigot start
|
|
+ if (inetaddress != null) {
|
|
+ ((org.spigotmc.MultiplexingServerConnection) this.server.ae()).unThrottle(inetaddress);
|
|
}
|
|
+ // Spigot end
|
|
|
|
this.b = true;
|
|
} catch (Exception exception) {
|
|
diff --git a/src/main/java/org/bukkit/craftbukkit/CraftServer.java b/src/main/java/org/bukkit/craftbukkit/CraftServer.java
|
|
index 20d38b4..f7ec7bb 100644
|
|
--- a/src/main/java/org/bukkit/craftbukkit/CraftServer.java
|
|
+++ b/src/main/java/org/bukkit/craftbukkit/CraftServer.java
|
|
@@ -1372,4 +1372,20 @@ public final class CraftServer implements Server {
|
|
public CraftScoreboardManager getScoreboardManager() {
|
|
return scoreboardManager;
|
|
}
|
|
+
|
|
+ // Spigot start
|
|
+ @SuppressWarnings("unchecked")
|
|
+ public java.util.Collection<java.net.InetSocketAddress> getSecondaryHosts() {
|
|
+ java.util.Collection<java.net.InetSocketAddress> ret = new java.util.HashSet<java.net.InetSocketAddress>();
|
|
+ List<?> listeners = configuration.getList("listeners");
|
|
+ if (listeners != null) {
|
|
+ for (Object o : listeners) {
|
|
+
|
|
+ Map<String, Object> sect = (Map<String, Object>) o;
|
|
+ ret.add(new java.net.InetSocketAddress((String) sect.get("address"), (Integer) sect.get("port")));
|
|
+ }
|
|
+ }
|
|
+ return ret;
|
|
+ }
|
|
+ // Spigot end
|
|
}
|
|
diff --git a/src/main/java/org/spigotmc/MultiplexingServerConnection.java b/src/main/java/org/spigotmc/MultiplexingServerConnection.java
|
|
new file mode 100644
|
|
index 0000000..c8ea80a
|
|
--- /dev/null
|
|
+++ b/src/main/java/org/spigotmc/MultiplexingServerConnection.java
|
|
@@ -0,0 +1,126 @@
|
|
+package org.spigotmc;
|
|
+
|
|
+import java.net.InetAddress;
|
|
+import java.net.InetSocketAddress;
|
|
+import java.util.ArrayList;
|
|
+import java.util.Collection;
|
|
+import java.util.Collections;
|
|
+import java.util.HashMap;
|
|
+import java.util.HashSet;
|
|
+import java.util.List;
|
|
+import java.util.logging.Level;
|
|
+import net.minecraft.server.DedicatedServerConnection;
|
|
+import net.minecraft.server.MinecraftServer;
|
|
+import net.minecraft.server.PendingConnection;
|
|
+import net.minecraft.server.ServerConnection;
|
|
+import org.bukkit.Bukkit;
|
|
+
|
|
+public class MultiplexingServerConnection extends ServerConnection {
|
|
+
|
|
+ private static final boolean NETTY_DISABLED = Boolean.getBoolean("org.spigotmc.netty.disabled");
|
|
+ private final Collection<ServerConnection> children = new HashSet<ServerConnection>();
|
|
+ private final List<PendingConnection> pending = Collections.synchronizedList(new ArrayList<PendingConnection>());
|
|
+ private final HashMap<InetAddress, Long> throttle = new HashMap<InetAddress, Long>();
|
|
+
|
|
+ public MultiplexingServerConnection(MinecraftServer ms) {
|
|
+ super(ms);
|
|
+
|
|
+ // Add primary connection
|
|
+ start(ms.server.getIp(), ms.server.getPort());
|
|
+ // Add all other connections
|
|
+ for (InetSocketAddress address : ms.server.getSecondaryHosts()) {
|
|
+ start(address.getAddress().getHostAddress(), address.getPort());
|
|
+ }
|
|
+ }
|
|
+
|
|
+ private void start(String ipAddress, int port) {
|
|
+ try {
|
|
+ // Calculate address, can't use isEmpty due to Java 5
|
|
+ InetAddress socketAddress = (ipAddress.length() == 0) ? null : InetAddress.getByName(ipAddress);
|
|
+ // Say hello to the log
|
|
+ d().getLogger().info("Starting listener #" + children.size() + " on " + (socketAddress == null ? "*" : ipAddress) + ":" + port);
|
|
+ // Start connection: Netty / non Netty
|
|
+ ServerConnection listener = (NETTY_DISABLED) ? new DedicatedServerConnection(d(), socketAddress, port) : new org.spigotmc.netty.NettyServerConnection(d(), socketAddress, port);
|
|
+ // Register with other connections
|
|
+ children.add(listener);
|
|
+ // Gotta catch em all
|
|
+ } catch (Throwable t) {
|
|
+ // Just print some info to the log
|
|
+ t.printStackTrace();
|
|
+ d().getLogger().warning("**** FAILED TO BIND TO PORT!");
|
|
+ d().getLogger().warning("The exception was: {0}", t);
|
|
+ d().getLogger().warning("Perhaps a server is already running on that port?");
|
|
+ }
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * close.
|
|
+ */
|
|
+ @Override
|
|
+ public void a() {
|
|
+ for (ServerConnection child : children) {
|
|
+ child.a();
|
|
+ }
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * Pulse. This method pulses all connections causing them to update. It is
|
|
+ * called from the main server thread a few times a tick.
|
|
+ */
|
|
+ @Override
|
|
+ public void b() {
|
|
+ super.b(); // pulse PlayerConnections
|
|
+ for (int i = 0; i < pending.size(); ++i) {
|
|
+ PendingConnection connection = pending.get(i);
|
|
+
|
|
+ try {
|
|
+ connection.c();
|
|
+ } catch (Exception ex) {
|
|
+ connection.disconnect("Internal server error");
|
|
+ Bukkit.getServer().getLogger().log(Level.WARNING, "Failed to handle packet: " + ex, ex);
|
|
+ }
|
|
+
|
|
+ if (connection.b) {
|
|
+ pending.remove(i--);
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * Remove the user from connection throttle. This should fix the server ping
|
|
+ * bugs.
|
|
+ *
|
|
+ * @param address the address to remove
|
|
+ */
|
|
+ public void unThrottle(InetAddress address) {
|
|
+ if (address != null) {
|
|
+ synchronized (throttle) {
|
|
+ throttle.remove(address);
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * Add a connection to the throttle list.
|
|
+ *
|
|
+ * @param address
|
|
+ * @return Whether they must be disconnected
|
|
+ */
|
|
+ public boolean throttle(InetAddress address) {
|
|
+ long currentTime = System.currentTimeMillis();
|
|
+ synchronized (throttle) {
|
|
+ Long value = throttle.get(address);
|
|
+ if (value != null && !address.isLoopbackAddress() && currentTime - value < d().server.getConnectionThrottle()) {
|
|
+ throttle.put(address, currentTime);
|
|
+ return true;
|
|
+ }
|
|
+
|
|
+ throttle.put(address, currentTime);
|
|
+ }
|
|
+ return false;
|
|
+ }
|
|
+
|
|
+ public void register(PendingConnection conn) {
|
|
+ pending.add(conn);
|
|
+ }
|
|
+}
|
|
diff --git a/src/main/java/org/spigotmc/netty/CipherCodec.java b/src/main/java/org/spigotmc/netty/CipherCodec.java
|
|
new file mode 100644
|
|
index 0000000..2dbbf6c
|
|
--- /dev/null
|
|
+++ b/src/main/java/org/spigotmc/netty/CipherCodec.java
|
|
@@ -0,0 +1,67 @@
|
|
+package org.spigotmc.netty;
|
|
+
|
|
+import io.netty.buffer.ByteBuf;
|
|
+import io.netty.channel.ChannelHandlerContext;
|
|
+import io.netty.handler.codec.ByteToByteCodec;
|
|
+import javax.crypto.Cipher;
|
|
+import javax.crypto.ShortBufferException;
|
|
+import net.minecraft.server.Packet252KeyResponse;
|
|
+
|
|
+/**
|
|
+ * This class is a complete solution for encrypting and decoding bytes in a
|
|
+ * Netty stream. It takes two {@link Cipher} instances, used for encryption and
|
|
+ * decryption respectively.
|
|
+ */
|
|
+public class CipherCodec extends ByteToByteCodec {
|
|
+
|
|
+ private Cipher encrypt;
|
|
+ private Cipher decrypt;
|
|
+ private Packet252KeyResponse responsePacket;
|
|
+ private ThreadLocal<byte[]> heapInLocal = new EmptyByteThreadLocal();
|
|
+ private ThreadLocal<byte[]> heapOutLocal = new EmptyByteThreadLocal();
|
|
+
|
|
+ private static class EmptyByteThreadLocal extends ThreadLocal<byte[]> {
|
|
+
|
|
+ @Override
|
|
+ protected byte[] initialValue() {
|
|
+ return new byte[0];
|
|
+ }
|
|
+ }
|
|
+
|
|
+ public CipherCodec(Cipher encrypt, Cipher decrypt, Packet252KeyResponse responsePacket) {
|
|
+ this.encrypt = encrypt;
|
|
+ this.decrypt = decrypt;
|
|
+ this.responsePacket = responsePacket;
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
|
|
+ ctx.channel().write(responsePacket);
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception {
|
|
+ cipher(in, out, encrypt);
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public void decode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception {
|
|
+ cipher(in, out, decrypt);
|
|
+ }
|
|
+
|
|
+ private void cipher(ByteBuf in, ByteBuf out, Cipher cipher) throws ShortBufferException {
|
|
+ byte[] heapIn = heapInLocal.get();
|
|
+ int readableBytes = in.readableBytes();
|
|
+ if (heapIn.length < readableBytes) {
|
|
+ heapIn = new byte[readableBytes];
|
|
+ }
|
|
+ in.readBytes(heapIn, 0, readableBytes);
|
|
+
|
|
+ byte[] heapOut = heapOutLocal.get();
|
|
+ int outputSize = cipher.getOutputSize(readableBytes);
|
|
+ if (heapOut.length < outputSize) {
|
|
+ heapOut = new byte[outputSize];
|
|
+ }
|
|
+ out.writeBytes(heapOut, 0, cipher.update(heapIn, 0, readableBytes, heapOut));
|
|
+ }
|
|
+}
|
|
diff --git a/src/main/java/org/spigotmc/netty/NettyNetworkManager.java b/src/main/java/org/spigotmc/netty/NettyNetworkManager.java
|
|
new file mode 100644
|
|
index 0000000..0e1b1fd
|
|
--- /dev/null
|
|
+++ b/src/main/java/org/spigotmc/netty/NettyNetworkManager.java
|
|
@@ -0,0 +1,253 @@
|
|
+package org.spigotmc.netty;
|
|
+
|
|
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
|
+import io.netty.channel.Channel;
|
|
+import io.netty.channel.ChannelHandlerContext;
|
|
+import io.netty.channel.ChannelInboundMessageHandlerAdapter;
|
|
+import io.netty.channel.socket.SocketChannel;
|
|
+import java.net.InetSocketAddress;
|
|
+import java.net.Socket;
|
|
+import java.net.SocketAddress;
|
|
+import java.security.PrivateKey;
|
|
+import java.util.AbstractList;
|
|
+import java.util.List;
|
|
+import java.util.Queue;
|
|
+import java.util.concurrent.ConcurrentLinkedQueue;
|
|
+import java.util.concurrent.ExecutorService;
|
|
+import java.util.concurrent.Executors;
|
|
+import javax.crypto.Cipher;
|
|
+import javax.crypto.SecretKey;
|
|
+import net.minecraft.server.Connection;
|
|
+import net.minecraft.server.INetworkManager;
|
|
+import net.minecraft.server.MinecraftServer;
|
|
+import net.minecraft.server.Packet;
|
|
+import net.minecraft.server.Packet252KeyResponse;
|
|
+import net.minecraft.server.PendingConnection;
|
|
+import net.minecraft.server.PlayerConnection;
|
|
+import org.spigotmc.MultiplexingServerConnection;
|
|
+
|
|
+/**
|
|
+ * This class forms the basis of the Netty integration. It implements
|
|
+ * {@link INetworkManager} and handles all events and inbound messages provided
|
|
+ * by the upstream Netty process.
|
|
+ */
|
|
+public class NettyNetworkManager extends ChannelInboundMessageHandlerAdapter<Packet> implements INetworkManager {
|
|
+
|
|
+ private static final ExecutorService threadPool = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("Async Packet Handler - %1$d").build());
|
|
+ private static final MinecraftServer server = MinecraftServer.getServer();
|
|
+ private static final PrivateKey key = server.F().getPrivate();
|
|
+ private static final MultiplexingServerConnection serverConnection = (MultiplexingServerConnection) server.ae();
|
|
+ /*========================================================================*/
|
|
+ private final Queue<Packet> syncPackets = new ConcurrentLinkedQueue<Packet>();
|
|
+ private final List<Packet> highPriorityQueue = new AbstractList<Packet>() {
|
|
+ @Override
|
|
+ public void add(int index, Packet element) {
|
|
+ // NOP
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public Packet get(int index) {
|
|
+ throw new UnsupportedOperationException();
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public int size() {
|
|
+ return 0;
|
|
+ }
|
|
+ };
|
|
+ private volatile boolean connected;
|
|
+ private Channel channel;
|
|
+ private SocketAddress address;
|
|
+ private Connection connection;
|
|
+ private SecretKey secret;
|
|
+ private String dcReason;
|
|
+ private Object[] dcArgs;
|
|
+ private Socket socketAdaptor;
|
|
+ private long writtenBytes;
|
|
+
|
|
+ @Override
|
|
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
|
|
+ // Channel and address groundwork first
|
|
+ channel = ctx.channel();
|
|
+ address = channel.remoteAddress();
|
|
+ // Check the throttle
|
|
+ if (serverConnection.throttle(((InetSocketAddress) channel.remoteAddress()).getAddress())) {
|
|
+ channel.close();
|
|
+ }
|
|
+ // Then the socket adaptor
|
|
+ socketAdaptor = NettySocketAdaptor.adapt((SocketChannel) channel);
|
|
+ // Followed by their first handler
|
|
+ connection = new PendingConnection(server, this);
|
|
+ // Finally register the connection
|
|
+ connected = true;
|
|
+ serverConnection.register((PendingConnection) connection);
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
|
+ a("disconnect.endOfStream", new Object[0]);
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
|
+ // TODO: Remove this once we are more stable
|
|
+ // Bukkit.getServer().getLogger().severe("======================= Start Netty Debug Log =======================");
|
|
+ // Bukkit.getServer().getLogger().log(Level.SEVERE, "Error caught whilst handling " + channel, cause);
|
|
+ // Bukkit.getServer().getLogger().severe("======================= End Netty Debug Log =======================");
|
|
+ // Disconnect with generic reason + exception
|
|
+ a("disconnect.genericReason", new Object[]{"Internal exception: " + cause});
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public void messageReceived(ChannelHandlerContext ctx, final Packet msg) throws Exception {
|
|
+ if (connected) {
|
|
+ if (msg instanceof Packet252KeyResponse) {
|
|
+ secret = ((Packet252KeyResponse) msg).a(key);
|
|
+ }
|
|
+
|
|
+ if (msg.a_()) {
|
|
+ threadPool.submit(new Runnable() {
|
|
+ public void run() {
|
|
+ Packet packet = PacketListener.callReceived(NettyNetworkManager.this, connection, msg);
|
|
+ if (packet != null) {
|
|
+ packet.handle(connection);
|
|
+ }
|
|
+ }
|
|
+ });
|
|
+ } else {
|
|
+ syncPackets.add(msg);
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+
|
|
+ public Socket getSocket() {
|
|
+ return socketAdaptor;
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * setHandler. Set the {@link NetHandler} used to process received packets.
|
|
+ *
|
|
+ * @param nh the new {@link NetHandler} instance
|
|
+ */
|
|
+ public void a(Connection nh) {
|
|
+ connection = nh;
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * queue. Queue a packet for sending, or in this case send it to be write it
|
|
+ * straight to the channel.
|
|
+ *
|
|
+ * @param packet the packet to queue
|
|
+ */
|
|
+ public void queue(Packet packet) {
|
|
+ // Only send if channel is still connected
|
|
+ if (connected) {
|
|
+ // Process packet via handler
|
|
+ packet = PacketListener.callQueued(this, connection, packet);
|
|
+ // If handler indicates packet send
|
|
+ if (packet != null) {
|
|
+ highPriorityQueue.add(packet);
|
|
+
|
|
+ // If needed, check and prepare encryption phase
|
|
+ // We don't send the packet here as it is sent just before the cipher handler has been added to ensure we can safeguard from any race conditions
|
|
+ // Which are caused by the slow first initialization of the cipher SPI
|
|
+ if (packet instanceof Packet252KeyResponse) {
|
|
+ Cipher encrypt = NettyServerConnection.getCipher(Cipher.ENCRYPT_MODE, secret);
|
|
+ Cipher decrypt = NettyServerConnection.getCipher(Cipher.DECRYPT_MODE, secret);
|
|
+ CipherCodec codec = new CipherCodec(encrypt, decrypt, (Packet252KeyResponse) packet);
|
|
+ channel.pipeline().addBefore("decoder", "cipher", codec);
|
|
+ } else {
|
|
+ channel.write(packet);
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * wakeThreads. In Vanilla this method will interrupt the network read and
|
|
+ * write threads, thus waking them.
|
|
+ */
|
|
+ public void a() {
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * processPackets. Remove up to 1000 packets from the queue and process
|
|
+ * them. This method should only be called from the main server thread.
|
|
+ */
|
|
+ public void b() {
|
|
+ for (int i = 1000; !syncPackets.isEmpty() && i >= 0; i--) {
|
|
+ if (connection instanceof PendingConnection ? ((PendingConnection) connection).b : ((PlayerConnection) connection).disconnected) {
|
|
+ syncPackets.clear();
|
|
+ break;
|
|
+ }
|
|
+
|
|
+ Packet packet = PacketListener.callReceived(this, connection, syncPackets.poll());
|
|
+ if (packet != null) {
|
|
+ packet.handle(connection);
|
|
+ }
|
|
+ }
|
|
+
|
|
+ // Disconnect via the handler - this performs all plugin related cleanup + logging
|
|
+ if (!connected && (dcReason != null || dcArgs != null)) {
|
|
+ connection.a(dcReason, dcArgs);
|
|
+ }
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * getSocketAddress. Return the remote address of the connected user. It is
|
|
+ * important that this method returns a value even after disconnect.
|
|
+ *
|
|
+ * @return the remote address of this connection
|
|
+ */
|
|
+ public SocketAddress getSocketAddress() {
|
|
+ return address;
|
|
+ }
|
|
+
|
|
+ public void setSocketAddress(SocketAddress address) {
|
|
+ this.address = address;
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * close. Close and release all resources associated with this connection.
|
|
+ */
|
|
+ public void d() {
|
|
+ if (connected) {
|
|
+ connected = false;
|
|
+ channel.close();
|
|
+ }
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * queueSize. Return the number of packets in the low priority queue. In a
|
|
+ * NIO environment this will always be 0.
|
|
+ *
|
|
+ * @return the size of the packet send queue
|
|
+ */
|
|
+ public int e() {
|
|
+ return 0;
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * networkShutdown. Shuts down this connection, storing the reason and
|
|
+ * parameters, used to notify the current {@link Connection}.
|
|
+ *
|
|
+ * @param reason the main disconnect reason
|
|
+ * @param arguments additional disconnect arguments, for example, the
|
|
+ * exception which triggered the disconnect.
|
|
+ */
|
|
+ public void a(String reason, Object... arguments) {
|
|
+ if (connected) {
|
|
+ dcReason = reason;
|
|
+ dcArgs = arguments;
|
|
+ d();
|
|
+ }
|
|
+ }
|
|
+
|
|
+ public long getWrittenBytes() {
|
|
+ return writtenBytes;
|
|
+ }
|
|
+
|
|
+ public void addWrittenBytes(int written) {
|
|
+ writtenBytes += written;
|
|
+ }
|
|
+}
|
|
diff --git a/src/main/java/org/spigotmc/netty/NettyServerConnection.java b/src/main/java/org/spigotmc/netty/NettyServerConnection.java
|
|
new file mode 100644
|
|
index 0000000..cb58bd2
|
|
--- /dev/null
|
|
+++ b/src/main/java/org/spigotmc/netty/NettyServerConnection.java
|
|
@@ -0,0 +1,81 @@
|
|
+package org.spigotmc.netty;
|
|
+
|
|
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
|
+import io.netty.bootstrap.ServerBootstrap;
|
|
+import io.netty.channel.Channel;
|
|
+import io.netty.channel.ChannelException;
|
|
+import io.netty.channel.ChannelFuture;
|
|
+import io.netty.channel.ChannelInitializer;
|
|
+import io.netty.channel.ChannelOption;
|
|
+import io.netty.channel.nio.NioEventLoopGroup;
|
|
+import io.netty.channel.socket.nio.NioServerSocketChannel;
|
|
+import io.netty.handler.timeout.ReadTimeoutHandler;
|
|
+import java.net.InetAddress;
|
|
+import java.security.GeneralSecurityException;
|
|
+import java.security.Key;
|
|
+import javax.crypto.Cipher;
|
|
+import javax.crypto.spec.IvParameterSpec;
|
|
+import net.minecraft.server.MinecraftServer;
|
|
+import net.minecraft.server.ServerConnection;
|
|
+
|
|
+/**
|
|
+ * This is the NettyServerConnection class. It implements
|
|
+ * {@link ServerConnection} and is the main interface between the Minecraft
|
|
+ * server and this NIO implementation. It handles starting, stopping and
|
|
+ * processing the Netty backend.
|
|
+ */
|
|
+public class NettyServerConnection extends ServerConnection {
|
|
+
|
|
+ private final ChannelFuture socket;
|
|
+
|
|
+ public NettyServerConnection(MinecraftServer ms, InetAddress host, int port) {
|
|
+ super(ms);
|
|
+ int threads = Integer.getInteger("org.spigotmc.netty.threads", 3);
|
|
+ socket = new ServerBootstrap().channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer() {
|
|
+ @Override
|
|
+ public void initChannel(Channel ch) throws Exception {
|
|
+ try {
|
|
+ ch.config().setOption(ChannelOption.IP_TOS, 0x18);
|
|
+ } catch (ChannelException ex) {
|
|
+ // IP_TOS is not supported (Windows XP / Windows Server 2003)
|
|
+ }
|
|
+
|
|
+ NettyNetworkManager networkManager = new NettyNetworkManager();
|
|
+ ch.pipeline()
|
|
+ .addLast("flusher", new OutboundManager())
|
|
+ .addLast("timer", new ReadTimeoutHandler(30))
|
|
+ .addLast("decoder", new PacketDecoder())
|
|
+ .addLast("encoder", new PacketEncoder(networkManager))
|
|
+ .addLast("manager", networkManager);
|
|
+ }
|
|
+ }).childOption(ChannelOption.TCP_NODELAY, false).group(new NioEventLoopGroup(threads, new ThreadFactoryBuilder().setNameFormat("Netty IO Thread - %1$d").build())).localAddress(host, port).bind();
|
|
+ MinecraftServer.getServer().getLogger().info("Using Netty NIO with " + threads + " threads for network connections.");
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * Shutdown. This method is called when the server is shutting down and the
|
|
+ * server socket and all clients should be terminated with no further
|
|
+ * action.
|
|
+ */
|
|
+ @Override
|
|
+ public void a() {
|
|
+ socket.channel().close().syncUninterruptibly();
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * Return a Minecraft compatible cipher instance from the specified key.
|
|
+ *
|
|
+ * @param opMode the mode to initialize the cipher in
|
|
+ * @param key to use as the initial vector
|
|
+ * @return the initialized cipher
|
|
+ */
|
|
+ public static Cipher getCipher(int opMode, Key key) {
|
|
+ try {
|
|
+ Cipher cip = Cipher.getInstance("AES/CFB8/NoPadding");
|
|
+ cip.init(opMode, key, new IvParameterSpec(key.getEncoded()));
|
|
+ return cip;
|
|
+ } catch (GeneralSecurityException ex) {
|
|
+ throw new RuntimeException(ex);
|
|
+ }
|
|
+ }
|
|
+}
|
|
diff --git a/src/main/java/org/spigotmc/netty/NettySocketAdaptor.java b/src/main/java/org/spigotmc/netty/NettySocketAdaptor.java
|
|
new file mode 100644
|
|
index 0000000..a3b86b8
|
|
--- /dev/null
|
|
+++ b/src/main/java/org/spigotmc/netty/NettySocketAdaptor.java
|
|
@@ -0,0 +1,248 @@
|
|
+package org.spigotmc.netty;
|
|
+
|
|
+import io.netty.channel.Channel;
|
|
+import io.netty.channel.ChannelOption;
|
|
+import java.io.IOException;
|
|
+import java.io.InputStream;
|
|
+import java.io.OutputStream;
|
|
+import java.net.InetAddress;
|
|
+import java.net.Socket;
|
|
+import java.net.SocketAddress;
|
|
+import java.net.SocketException;
|
|
+import java.nio.channels.SocketChannel;
|
|
+
|
|
+/**
|
|
+ * This class wraps a Netty {@link Channel} in a {@link Socket}. It overrides
|
|
+ * all methods in {@link Socket} to ensure that calls are not mistakingly made
|
|
+ * to the unsupported super socket. All operations that can be sanely applied to
|
|
+ * a {@link Channel} are implemented here. Those which cannot will throw an
|
|
+ * {@link UnsupportedOperationException}.
|
|
+ */
|
|
+public class NettySocketAdaptor extends Socket {
|
|
+
|
|
+ private final io.netty.channel.socket.SocketChannel ch;
|
|
+
|
|
+ private NettySocketAdaptor(io.netty.channel.socket.SocketChannel ch) {
|
|
+ this.ch = ch;
|
|
+ }
|
|
+
|
|
+ public static NettySocketAdaptor adapt(io.netty.channel.socket.SocketChannel ch) {
|
|
+ return new NettySocketAdaptor(ch);
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public void bind(SocketAddress bindpoint) throws IOException {
|
|
+ ch.bind(bindpoint).syncUninterruptibly();
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public synchronized void close() throws IOException {
|
|
+ ch.close().syncUninterruptibly();
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public void connect(SocketAddress endpoint) throws IOException {
|
|
+ ch.connect(endpoint).syncUninterruptibly();
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public void connect(SocketAddress endpoint, int timeout) throws IOException {
|
|
+ ch.config().setConnectTimeoutMillis(timeout);
|
|
+ ch.connect(endpoint).syncUninterruptibly();
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public boolean equals(Object obj) {
|
|
+ return obj instanceof NettySocketAdaptor && ch.equals(((NettySocketAdaptor) obj).ch);
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public SocketChannel getChannel() {
|
|
+ throw new UnsupportedOperationException("Operation not supported on Channel wrapper.");
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public InetAddress getInetAddress() {
|
|
+ return ch.remoteAddress().getAddress();
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public InputStream getInputStream() throws IOException {
|
|
+ throw new UnsupportedOperationException("Operation not supported on Channel wrapper.");
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public boolean getKeepAlive() throws SocketException {
|
|
+ return ch.config().getOption(ChannelOption.SO_KEEPALIVE);
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public InetAddress getLocalAddress() {
|
|
+ return ch.localAddress().getAddress();
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public int getLocalPort() {
|
|
+ return ch.localAddress().getPort();
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public SocketAddress getLocalSocketAddress() {
|
|
+ return ch.localAddress();
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public boolean getOOBInline() throws SocketException {
|
|
+ throw new UnsupportedOperationException("Operation not supported on Channel wrapper.");
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public OutputStream getOutputStream() throws IOException {
|
|
+ throw new UnsupportedOperationException("Operation not supported on Channel wrapper.");
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public int getPort() {
|
|
+ return ch.remoteAddress().getPort();
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public synchronized int getReceiveBufferSize() throws SocketException {
|
|
+ return ch.config().getOption(ChannelOption.SO_RCVBUF);
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public SocketAddress getRemoteSocketAddress() {
|
|
+ return ch.remoteAddress();
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public boolean getReuseAddress() throws SocketException {
|
|
+ return ch.config().getOption(ChannelOption.SO_REUSEADDR);
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public synchronized int getSendBufferSize() throws SocketException {
|
|
+ return ch.config().getOption(ChannelOption.SO_SNDBUF);
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public int getSoLinger() throws SocketException {
|
|
+ return ch.config().getOption(ChannelOption.SO_LINGER);
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public synchronized int getSoTimeout() throws SocketException {
|
|
+ throw new UnsupportedOperationException("Operation not supported on Channel wrapper.");
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public boolean getTcpNoDelay() throws SocketException {
|
|
+ return ch.config().getOption(ChannelOption.TCP_NODELAY);
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public int getTrafficClass() throws SocketException {
|
|
+ return ch.config().getOption(ChannelOption.IP_TOS);
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public int hashCode() {
|
|
+ return ch.hashCode();
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public boolean isBound() {
|
|
+ return ch.localAddress() != null;
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public boolean isClosed() {
|
|
+ return !ch.isOpen();
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public boolean isConnected() {
|
|
+ return ch.isActive();
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public boolean isInputShutdown() {
|
|
+ return ch.isInputShutdown();
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public boolean isOutputShutdown() {
|
|
+ return ch.isOutputShutdown();
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public void sendUrgentData(int data) throws IOException {
|
|
+ throw new UnsupportedOperationException("Operation not supported on Channel wrapper.");
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public void setKeepAlive(boolean on) throws SocketException {
|
|
+ ch.config().setOption(ChannelOption.SO_KEEPALIVE, on);
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public void setOOBInline(boolean on) throws SocketException {
|
|
+ throw new UnsupportedOperationException("Operation not supported on Channel wrapper.");
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public void setPerformancePreferences(int connectionTime, int latency, int bandwidth) {
|
|
+ throw new UnsupportedOperationException("Operation not supported on Channel wrapper.");
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public synchronized void setReceiveBufferSize(int size) throws SocketException {
|
|
+ ch.config().setOption(ChannelOption.SO_RCVBUF, size);
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public void setReuseAddress(boolean on) throws SocketException {
|
|
+ ch.config().setOption(ChannelOption.SO_REUSEADDR, on);
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public synchronized void setSendBufferSize(int size) throws SocketException {
|
|
+ ch.config().setOption(ChannelOption.SO_SNDBUF, size);
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public void setSoLinger(boolean on, int linger) throws SocketException {
|
|
+ ch.config().setOption(ChannelOption.SO_LINGER, linger);
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public synchronized void setSoTimeout(int timeout) throws SocketException {
|
|
+ throw new UnsupportedOperationException("Operation not supported on Channel wrapper.");
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public void setTcpNoDelay(boolean on) throws SocketException {
|
|
+ ch.config().setOption(ChannelOption.TCP_NODELAY, on);
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public void setTrafficClass(int tc) throws SocketException {
|
|
+ ch.config().setOption(ChannelOption.IP_TOS, tc);
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public void shutdownInput() throws IOException {
|
|
+ throw new UnsupportedOperationException("Operation not supported on Channel wrapper.");
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public void shutdownOutput() throws IOException {
|
|
+ ch.shutdownOutput().syncUninterruptibly();
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public String toString() {
|
|
+ return ch.toString();
|
|
+ }
|
|
+}
|
|
diff --git a/src/main/java/org/spigotmc/netty/OutboundManager.java b/src/main/java/org/spigotmc/netty/OutboundManager.java
|
|
new file mode 100644
|
|
index 0000000..80205ed
|
|
--- /dev/null
|
|
+++ b/src/main/java/org/spigotmc/netty/OutboundManager.java
|
|
@@ -0,0 +1,18 @@
|
|
+package org.spigotmc.netty;
|
|
+
|
|
+import io.netty.channel.ChannelHandlerContext;
|
|
+import io.netty.channel.ChannelOperationHandlerAdapter;
|
|
+import io.netty.channel.ChannelPromise;
|
|
+
|
|
+public class OutboundManager extends ChannelOperationHandlerAdapter {
|
|
+
|
|
+ private static final int FLUSH_TIME = 10;
|
|
+ private long lastFlush;
|
|
+
|
|
+ public void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
|
|
+ if (System.currentTimeMillis() - lastFlush > FLUSH_TIME) {
|
|
+ lastFlush = System.currentTimeMillis();
|
|
+ ctx.flush(promise);
|
|
+ }
|
|
+ }
|
|
+}
|
|
diff --git a/src/main/java/org/spigotmc/netty/PacketDecoder.java b/src/main/java/org/spigotmc/netty/PacketDecoder.java
|
|
new file mode 100644
|
|
index 0000000..65074d2
|
|
--- /dev/null
|
|
+++ b/src/main/java/org/spigotmc/netty/PacketDecoder.java
|
|
@@ -0,0 +1,64 @@
|
|
+package org.spigotmc.netty;
|
|
+
|
|
+import io.netty.buffer.ByteBuf;
|
|
+import io.netty.buffer.ByteBufInputStream;
|
|
+import io.netty.channel.ChannelHandlerContext;
|
|
+import io.netty.handler.codec.ReplayingDecoder;
|
|
+import java.io.DataInputStream;
|
|
+import java.io.EOFException;
|
|
+import java.io.IOException;
|
|
+import net.minecraft.server.MinecraftServer;
|
|
+import net.minecraft.server.Packet;
|
|
+
|
|
+/**
|
|
+ * Packet decoding class backed by a reusable {@link DataInputStream} which
|
|
+ * backs the input {@link ByteBuf}. Reads an unsigned byte packet header and
|
|
+ * then decodes the packet accordingly.
|
|
+ */
|
|
+public class PacketDecoder extends ReplayingDecoder<ReadState> {
|
|
+
|
|
+ private DataInputStream input;
|
|
+ private Packet packet;
|
|
+
|
|
+ public PacketDecoder() {
|
|
+ super(ReadState.HEADER);
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public Packet decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
|
|
+ if (input == null) {
|
|
+ input = new DataInputStream(new ByteBufInputStream(in));
|
|
+ }
|
|
+
|
|
+ switch (state()) {
|
|
+ case HEADER:
|
|
+ short packetId = in.readUnsignedByte();
|
|
+ packet = Packet.a(MinecraftServer.getServer().getLogger(), packetId);
|
|
+ if (packet == null) {
|
|
+ throw new IOException("Bad packet id " + packetId);
|
|
+ }
|
|
+ checkpoint(ReadState.DATA);
|
|
+ case DATA:
|
|
+ try {
|
|
+ packet.a(input);
|
|
+ } catch (EOFException ex) {
|
|
+ return null;
|
|
+ }
|
|
+
|
|
+ checkpoint(ReadState.HEADER);
|
|
+ Packet ret = packet;
|
|
+ packet = null;
|
|
+
|
|
+ return ret;
|
|
+ default:
|
|
+ throw new IllegalStateException();
|
|
+ }
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
|
+ super.freeInboundBuffer(ctx);
|
|
+ input = null;
|
|
+ packet = null;
|
|
+ }
|
|
+}
|
|
diff --git a/src/main/java/org/spigotmc/netty/PacketEncoder.java b/src/main/java/org/spigotmc/netty/PacketEncoder.java
|
|
new file mode 100644
|
|
index 0000000..c8832d6
|
|
--- /dev/null
|
|
+++ b/src/main/java/org/spigotmc/netty/PacketEncoder.java
|
|
@@ -0,0 +1,50 @@
|
|
+package org.spigotmc.netty;
|
|
+
|
|
+import io.netty.buffer.ByteBuf;
|
|
+import io.netty.buffer.ByteBufOutputStream;
|
|
+import io.netty.channel.ChannelHandlerContext;
|
|
+import io.netty.handler.codec.MessageToByteEncoder;
|
|
+import java.io.DataOutputStream;
|
|
+import net.minecraft.server.Packet;
|
|
+
|
|
+/**
|
|
+ * Netty encoder which takes a packet and encodes it, and adds a byte packet id
|
|
+ * header.
|
|
+ */
|
|
+public class PacketEncoder extends MessageToByteEncoder<Packet> {
|
|
+
|
|
+ private ByteBuf outBuf;
|
|
+ private DataOutputStream dataOut;
|
|
+ private final NettyNetworkManager networkManager;
|
|
+
|
|
+ public PacketEncoder(NettyNetworkManager networkManager) {
|
|
+ this.networkManager = networkManager;
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public void encode(ChannelHandlerContext ctx, Packet msg, ByteBuf out) throws Exception {
|
|
+ if (outBuf == null) {
|
|
+ outBuf = ctx.alloc().directBuffer();
|
|
+ }
|
|
+ if (dataOut == null) {
|
|
+ dataOut = new DataOutputStream(new ByteBufOutputStream(outBuf));
|
|
+ }
|
|
+
|
|
+ out.writeByte(msg.n());
|
|
+ msg.a(dataOut);
|
|
+
|
|
+ networkManager.addWrittenBytes(outBuf.readableBytes());
|
|
+ out.writeBytes(outBuf);
|
|
+ out.discardSomeReadBytes();
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
|
+ super.freeOutboundBuffer(ctx);
|
|
+ if (outBuf != null) {
|
|
+ outBuf.release();
|
|
+ outBuf = null;
|
|
+ }
|
|
+ dataOut = null;
|
|
+ }
|
|
+}
|
|
diff --git a/src/main/java/org/spigotmc/netty/PacketListener.java b/src/main/java/org/spigotmc/netty/PacketListener.java
|
|
new file mode 100644
|
|
index 0000000..8e3b932
|
|
--- /dev/null
|
|
+++ b/src/main/java/org/spigotmc/netty/PacketListener.java
|
|
@@ -0,0 +1,100 @@
|
|
+package org.spigotmc.netty;
|
|
+
|
|
+import com.google.common.base.Preconditions;
|
|
+import java.util.Arrays;
|
|
+import java.util.HashMap;
|
|
+import java.util.Map;
|
|
+import java.util.logging.Level;
|
|
+import net.minecraft.server.Connection;
|
|
+import net.minecraft.server.INetworkManager;
|
|
+import net.minecraft.server.Packet;
|
|
+import org.bukkit.Bukkit;
|
|
+import org.bukkit.plugin.Plugin;
|
|
+
|
|
+/**
|
|
+ * This class is used for plugins that wish to register to listen to incoming
|
|
+ * and outgoing packets. To use this class, simply create a new instance,
|
|
+ * override the methods you wish to use, and call
|
|
+ * {@link #register(org.spigotmc.netty.PacketListener, org.bukkit.plugin.Plugin)}.
|
|
+ */
|
|
+public class PacketListener {
|
|
+
|
|
+ /**
|
|
+ * A mapping of all registered listeners and their owning plugins.
|
|
+ */
|
|
+ private static final Map<PacketListener, Plugin> listeners = new HashMap<PacketListener, Plugin>();
|
|
+ /**
|
|
+ * A baked list of all listeners, for efficiency sake.
|
|
+ */
|
|
+ private static PacketListener[] baked = new PacketListener[0];
|
|
+
|
|
+ /**
|
|
+ * Used to register a handler for receiving notifications of packet
|
|
+ * activity.
|
|
+ *
|
|
+ * @param listener the listener to register
|
|
+ * @param plugin the plugin owning this listener
|
|
+ */
|
|
+ public static synchronized void register(PacketListener listener, Plugin plugin) {
|
|
+ Preconditions.checkNotNull(listener, "listener");
|
|
+ Preconditions.checkNotNull(plugin, "plugin");
|
|
+ Preconditions.checkState(!listeners.containsKey(listener), "listener already registered");
|
|
+
|
|
+ int size = listeners.size();
|
|
+ Preconditions.checkState(baked.length == size);
|
|
+ listeners.put(listener, plugin);
|
|
+ baked = Arrays.copyOf(baked, size + 1);
|
|
+ baked[size] = listener;
|
|
+ }
|
|
+
|
|
+ static Packet callReceived(INetworkManager networkManager, Connection connection, Packet packet) {
|
|
+ for (PacketListener listener : baked) {
|
|
+ try {
|
|
+ packet = listener.packetReceived(networkManager, connection, packet);
|
|
+ } catch (Throwable t) {
|
|
+ Bukkit.getServer().getLogger().log(Level.SEVERE, "Error whilst firing receive hook for packet", t);
|
|
+ }
|
|
+ }
|
|
+ return packet;
|
|
+ }
|
|
+
|
|
+ static Packet callQueued(INetworkManager networkManager, Connection connection, Packet packet) {
|
|
+ for (PacketListener listener : baked) {
|
|
+ try {
|
|
+ packet = listener.packetQueued(networkManager, connection, packet);
|
|
+ } catch (Throwable t) {
|
|
+ Bukkit.getServer().getLogger().log(Level.SEVERE, "Error whilst firing queued hook for packet", t);
|
|
+ }
|
|
+ }
|
|
+ return packet;
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * Called when a packet has been received and is about to be handled by the
|
|
+ * current {@link Connection}. The returned packet will be the packet passed
|
|
+ * on for handling, or in the case of null being returned, not handled at
|
|
+ * all.
|
|
+ *
|
|
+ * @param networkManager the NetworkManager receiving the packet
|
|
+ * @param connection the connection which will handle the packet
|
|
+ * @param packet the received packet
|
|
+ * @return the packet to be handled, or null to cancel
|
|
+ */
|
|
+ public Packet packetReceived(INetworkManager networkManager, Connection connection, Packet packet) {
|
|
+ return packet;
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * Called when a packet is queued to be sent. The returned packet will be
|
|
+ * the packet sent. In the case of null being returned, the packet will not
|
|
+ * be sent.
|
|
+ *
|
|
+ * @param networkManager the NetworkManager which will send the packet
|
|
+ * @param connection the connection which queued the packet
|
|
+ * @param packet the queue packet
|
|
+ * @return the packet to be sent, or null if the packet will not be sent.
|
|
+ */
|
|
+ public Packet packetQueued(INetworkManager networkManager, Connection connection, Packet packet) {
|
|
+ return packet;
|
|
+ }
|
|
+}
|
|
diff --git a/src/main/java/org/spigotmc/netty/ReadState.java b/src/main/java/org/spigotmc/netty/ReadState.java
|
|
new file mode 100644
|
|
index 0000000..5dc3754
|
|
--- /dev/null
|
|
+++ b/src/main/java/org/spigotmc/netty/ReadState.java
|
|
@@ -0,0 +1,16 @@
|
|
+package org.spigotmc.netty;
|
|
+
|
|
+/**
|
|
+ * Stores the state of the packet currently being read.
|
|
+ */
|
|
+public enum ReadState {
|
|
+
|
|
+ /**
|
|
+ * Indicates the byte representing the ID has been read.
|
|
+ */
|
|
+ HEADER,
|
|
+ /**
|
|
+ * Shows the packet body is being read.
|
|
+ */
|
|
+ DATA;
|
|
+}
|
|
diff --git a/src/main/resources/configurations/bukkit.yml b/src/main/resources/configurations/bukkit.yml
|
|
index 7a6c12f..1a57fdd 100644
|
|
--- a/src/main/resources/configurations/bukkit.yml
|
|
+++ b/src/main/resources/configurations/bukkit.yml
|
|
@@ -13,6 +13,9 @@
|
|
# Bug tracker: http://leaky.bukkit.org/
|
|
|
|
|
|
+#listeners:
|
|
+# - address: 127.0.0.1
|
|
+# port: 25577
|
|
settings:
|
|
allow-end: true
|
|
warn-on-overload: true
|
|
--
|
|
1.8.2.1
|
|
|