From 8f58b1d2721288efdce41d73bfe8d800d7b9d09e Mon Sep 17 00:00:00 2001 From: Spigot Date: Sun, 21 Apr 2013 08:53:15 +1000 Subject: [PATCH] Readd previous Netty tweaks, the memory leak has been fixed By: md_5 --- ...ies-in-chunks-being-processed-for-th.patch | 41 +++- CraftBukkit-Patches/0021-Netty.patch | 212 ++++++++---------- 2 files changed, 130 insertions(+), 123 deletions(-) diff --git a/CraftBukkit-Patches/0016-Only-count-entities-in-chunks-being-processed-for-th.patch b/CraftBukkit-Patches/0016-Only-count-entities-in-chunks-being-processed-for-th.patch index 0945ee1531..a744ccdb7e 100644 --- a/CraftBukkit-Patches/0016-Only-count-entities-in-chunks-being-processed-for-th.patch +++ b/CraftBukkit-Patches/0016-Only-count-entities-in-chunks-being-processed-for-th.patch @@ -1,4 +1,4 @@ -From a6f63db4333363a12ea2905bc4a2093abde6839d Mon Sep 17 00:00:00 2001 +From 832f647429af64b9c70b0647c5a2217e8aa85812 Mon Sep 17 00:00:00 2001 From: Aikar Date: Tue, 29 Jan 2013 13:25:53 -0500 Subject: [PATCH] Only count entities in chunks being processed for the spawn @@ -6,7 +6,7 @@ Subject: [PATCH] Only count entities in chunks being processed for the spawn diff --git a/src/main/java/net/minecraft/server/SpawnerCreature.java b/src/main/java/net/minecraft/server/SpawnerCreature.java -index b3e2818..21fbf7d 100644 +index b3e2818..6362a37 100644 --- a/src/main/java/net/minecraft/server/SpawnerCreature.java +++ b/src/main/java/net/minecraft/server/SpawnerCreature.java @@ -16,6 +16,7 @@ public final class SpawnerCreature { @@ -17,7 +17,34 @@ index b3e2818..21fbf7d 100644 protected static ChunkPosition getRandomPosition(World world, int i, int j) { Chunk chunk = world.getChunkAt(i, j); -@@ -34,13 +35,24 @@ public final class SpawnerCreature { +@@ -26,6 +27,26 @@ public final class SpawnerCreature { + return new ChunkPosition(k, i1, l); + } + ++ // Spigot start - get entity count only from chunks being processed in b ++ public static final int getEntityCount(WorldServer server, Class oClass) { ++ int i = 0; ++ for (Long coord : b.keySet()) { ++ int x = LongHash.msw(coord); ++ int z = LongHash.lsw(coord); ++ if (!server.chunkProviderServer.unloadQueue.contains(x,z) && server.isChunkLoaded(x, z)) { ++ for (List entitySlice : server.getChunkAt(x, z).entitySlices) { ++ for (Entity entity : entitySlice) { ++ if (oClass.isAssignableFrom(entity.getClass())) { ++ ++i; ++ } ++ } ++ } ++ } ++ } ++ return i; ++ } ++ // Spigot end ++ + public static final int spawnEntities(WorldServer worldserver, boolean flag, boolean flag1, boolean flag2) { + if (!flag && !flag1) { + return 0; +@@ -34,13 +55,24 @@ public final class SpawnerCreature { int i; int j; @@ -43,7 +70,7 @@ index b3e2818..21fbf7d 100644 for (int l = -b0; l <= b0; ++l) { for (int i1 = -b0; i1 <= b0; ++i1) { -@@ -88,13 +100,15 @@ public final class SpawnerCreature { +@@ -88,13 +120,15 @@ public final class SpawnerCreature { if (limit == 0) { continue; } @@ -51,7 +78,7 @@ index b3e2818..21fbf7d 100644 // CraftBukkit end - if ((!enumcreaturetype.d() || flag1) && (enumcreaturetype.d() || flag) && (!enumcreaturetype.e() || flag2) && worldserver.a(enumcreaturetype.a()) <= limit * b.size() / 256) { // CraftBukkit - use per-world limits -+ if ((!enumcreaturetype.d() || flag1) && (enumcreaturetype.d() || flag) && (!enumcreaturetype.e() || flag2) && (mobcnt = worldserver.a(enumcreaturetype.a())) <= limit * b.size() / 256) { // CraftBukkit - use per-world limits and use all loaded chunks ++ if ((!enumcreaturetype.d() || flag1) && (enumcreaturetype.d() || flag) && (!enumcreaturetype.e() || flag2) && (mobcnt = getEntityCount(worldserver, enumcreaturetype.a())) <= limit * b.size() / 256) { // CraftBukkit - use per-world limits and use all loaded chunks Iterator iterator = b.keySet().iterator(); + int moblimit = (limit * b.size() / 256) - mobcnt + 1; // CraftBukkit - up to 1 more than limit @@ -61,7 +88,7 @@ index b3e2818..21fbf7d 100644 // CraftBukkit start long key = ((Long) iterator.next()).longValue(); -@@ -158,6 +172,12 @@ public final class SpawnerCreature { +@@ -158,6 +192,12 @@ public final class SpawnerCreature { a(entityliving, worldserver, f, f1, f2); worldserver.addEntity(entityliving, SpawnReason.NATURAL); // CraftBukkit end @@ -75,5 +102,5 @@ index b3e2818..21fbf7d 100644 continue label110; } -- -1.7.11.msysgit.0 +1.8.2.1 diff --git a/CraftBukkit-Patches/0021-Netty.patch b/CraftBukkit-Patches/0021-Netty.patch index 7723ae830f..29f4188305 100644 --- a/CraftBukkit-Patches/0021-Netty.patch +++ b/CraftBukkit-Patches/0021-Netty.patch @@ -1,4 +1,4 @@ -From efe9a9aecb849b6886372c7d9445cd79dd706687 Mon Sep 17 00:00:00 2001 +From 92c3a39d341cb9aa166c5fb00756cd18ff4908a6 Mon Sep 17 00:00:00 2001 From: md_5 Date: Fri, 19 Apr 2013 17:44:39 +1000 Subject: [PATCH] Netty @@ -417,10 +417,10 @@ index 0000000..c8ea80a +} diff --git a/src/main/java/org/spigotmc/netty/CipherCodec.java b/src/main/java/org/spigotmc/netty/CipherCodec.java new file mode 100644 -index 0000000..2dbbf6c +index 0000000..5e3a5f9 --- /dev/null +++ b/src/main/java/org/spigotmc/netty/CipherCodec.java -@@ -0,0 +1,67 @@ +@@ -0,0 +1,59 @@ +package org.spigotmc.netty; + +import io.netty.buffer.ByteBuf; @@ -428,7 +428,6 @@ index 0000000..2dbbf6c +import io.netty.handler.codec.ByteToByteCodec; +import javax.crypto.Cipher; +import javax.crypto.ShortBufferException; -+import net.minecraft.server.Packet252KeyResponse; + +/** + * This class is a complete solution for encrypting and decoding bytes in a @@ -439,7 +438,6 @@ index 0000000..2dbbf6c + + private Cipher encrypt; + private Cipher decrypt; -+ private Packet252KeyResponse responsePacket; + private ThreadLocal heapInLocal = new EmptyByteThreadLocal(); + private ThreadLocal heapOutLocal = new EmptyByteThreadLocal(); + @@ -451,15 +449,9 @@ index 0000000..2dbbf6c + } + } + -+ public CipherCodec(Cipher encrypt, Cipher decrypt, Packet252KeyResponse responsePacket) { ++ public CipherCodec(Cipher encrypt, Cipher decrypt) { + this.encrypt = encrypt; + this.decrypt = decrypt; -+ this.responsePacket = responsePacket; -+ } -+ -+ @Override -+ public void beforeAdd(ChannelHandlerContext ctx) throws Exception { -+ ctx.channel().write(responsePacket); + } + + @Override @@ -490,27 +482,37 @@ index 0000000..2dbbf6c +} diff --git a/src/main/java/org/spigotmc/netty/NettyNetworkManager.java b/src/main/java/org/spigotmc/netty/NettyNetworkManager.java new file mode 100644 -index 0000000..0e1b1fd +index 0000000..bf5d731 --- /dev/null +++ b/src/main/java/org/spigotmc/netty/NettyNetworkManager.java -@@ -0,0 +1,253 @@ +@@ -0,0 +1,308 @@ +package org.spigotmc.netty; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; ++import io.netty.buffer.ByteBuf; ++import io.netty.buffer.ByteBufOutputStream; ++import io.netty.buffer.IllegalBufferAccessException; +import io.netty.channel.Channel; ++import io.netty.channel.ChannelFuture; ++import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundMessageHandlerAdapter; +import io.netty.channel.socket.SocketChannel; ++import io.netty.util.concurrent.ScheduledFuture; ++import java.io.DataOutputStream; ++import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketAddress; +import java.security.PrivateKey; -+import java.util.AbstractList; ++import java.util.ArrayList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; ++import java.util.concurrent.TimeUnit; ++import java.util.concurrent.locks.ReentrantLock; +import javax.crypto.Cipher; +import javax.crypto.SecretKey; +import net.minecraft.server.Connection; @@ -535,22 +537,10 @@ index 0000000..0e1b1fd + private static final MultiplexingServerConnection serverConnection = (MultiplexingServerConnection) server.ae(); + /*========================================================================*/ + private final Queue syncPackets = new ConcurrentLinkedQueue(); -+ private final List highPriorityQueue = new AbstractList() { -+ @Override -+ public void add(int index, Packet element) { -+ // NOP -+ } -+ -+ @Override -+ public Packet get(int index) { -+ throw new UnsupportedOperationException(); -+ } -+ -+ @Override -+ public int size() { -+ return 0; -+ } -+ }; ++ private final List highPriorityQueue = new ArrayList(); ++ private final ReentrantLock writeLock = new ReentrantLock(); ++ private Runnable packetDispatcher; ++ private ScheduledFuture scheduledTask; + private volatile boolean connected; + private Channel channel; + private SocketAddress address; @@ -569,6 +559,7 @@ index 0000000..0e1b1fd + // Check the throttle + if (serverConnection.throttle(((InetSocketAddress) channel.remoteAddress()).getAddress())) { + channel.close(); ++ return; + } + // Then the socket adaptor + socketAdaptor = NettySocketAdaptor.adapt((SocketChannel) channel); @@ -577,10 +568,62 @@ index 0000000..0e1b1fd + // Finally register the connection + connected = true; + serverConnection.register((PendingConnection) connection); ++ // And register our send dispatcher ++ packetDispatcher = new Runnable() { ++ public void run() { ++ // Ensure exclusive access to the queue ++ if (!writeLock.isHeldByCurrentThread()) { ++ writeLock.lock(); ++ } ++ try { ++ if (highPriorityQueue.size() == 0) { ++ return; ++ } ++ // Try and get a bearing on the size ++ int estimatedSize = 0; ++ for (Packet packet : highPriorityQueue) { ++ estimatedSize += packet.a(); ++ } ++ // Allocate a buffer ++ final ByteBuf buf = channel.alloc().directBuffer(estimatedSize); ++ // And an outputstream to this buffer ++ DataOutputStream out = new DataOutputStream(new ByteBufOutputStream(buf)); ++ // Loop through all packets ++ for (Packet packet : highPriorityQueue) { ++ // Write packet ID ++ buf.writeByte(packet.n()); ++ try { ++ // Write actual packet ++ packet.a(out); ++ } catch (IOException ex) { ++ // Catch exception in case it ever happens (should never) ++ a("disconnect.genericReason", new Object[]{"Exception writing packet: " + ex}); ++ } ++ } ++ // Clear existing packets so we can unlock our lock ++ highPriorityQueue.clear(); ++ // Send the whole buffer down ++ writtenBytes += buf.readableBytes(); ++ channel.write(buf).addListener(new ChannelFutureListener() { ++ public void operationComplete(ChannelFuture future) throws Exception { ++ buf.release(); ++ if (buf.refCnt() != 0) { ++ throw new IllegalBufferAccessException("Buffer not freed!"); ++ } ++ } ++ }); ++ } finally { ++ writeLock.unlock(); ++ } ++ } ++ }; ++ scheduledTask = ctx.executor().scheduleWithFixedDelay(packetDispatcher, 20, 20, TimeUnit.MILLISECONDS); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { ++ // Cleanup the timer task ++ scheduledTask.cancel(false); + a("disconnect.endOfStream", new Object[0]); + } + @@ -642,18 +685,24 @@ index 0000000..0e1b1fd + packet = PacketListener.callQueued(this, connection, packet); + // If handler indicates packet send + if (packet != null) { -+ highPriorityQueue.add(packet); -+ -+ // If needed, check and prepare encryption phase -+ // We don't send the packet here as it is sent just before the cipher handler has been added to ensure we can safeguard from any race conditions -+ // Which are caused by the slow first initialization of the cipher SPI -+ if (packet instanceof Packet252KeyResponse) { -+ Cipher encrypt = NettyServerConnection.getCipher(Cipher.ENCRYPT_MODE, secret); -+ Cipher decrypt = NettyServerConnection.getCipher(Cipher.DECRYPT_MODE, secret); -+ CipherCodec codec = new CipherCodec(encrypt, decrypt, (Packet252KeyResponse) packet); -+ channel.pipeline().addBefore("decoder", "cipher", codec); -+ } else { -+ channel.write(packet); ++ // Aquire lock ++ writeLock.lock(); ++ try { ++ highPriorityQueue.add(packet); ++ // If needed, check and prepare encryption phase ++ if (packet instanceof Packet252KeyResponse) { ++ Cipher encrypt = NettyServerConnection.getCipher(Cipher.ENCRYPT_MODE, secret); ++ Cipher decrypt = NettyServerConnection.getCipher(Cipher.DECRYPT_MODE, secret); ++ CipherCodec codec = new CipherCodec(encrypt, decrypt); ++ // Flush send queue ++ packetDispatcher.run(); ++ channel.pipeline().addBefore("decoder", "cipher", codec); ++ } ++ } finally { ++ // If we still have a lock, we need to get ri ++ if (writeLock.isHeldByCurrentThread()) { ++ writeLock.unlock(); ++ } + } + } + } @@ -709,6 +758,8 @@ index 0000000..0e1b1fd + public void d() { + if (connected) { + connected = false; ++ // Send all pending packets ++ packetDispatcher.run(); + channel.close(); + } + } @@ -742,17 +793,13 @@ index 0000000..0e1b1fd + public long getWrittenBytes() { + return writtenBytes; + } -+ -+ public void addWrittenBytes(int written) { -+ writtenBytes += written; -+ } +} diff --git a/src/main/java/org/spigotmc/netty/NettyServerConnection.java b/src/main/java/org/spigotmc/netty/NettyServerConnection.java new file mode 100644 -index 0000000..e5d24f7 +index 0000000..9ad9c52 --- /dev/null +++ b/src/main/java/org/spigotmc/netty/NettyServerConnection.java -@@ -0,0 +1,90 @@ +@@ -0,0 +1,79 @@ +package org.spigotmc.netty; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -768,17 +815,10 @@ index 0000000..e5d24f7 +import java.net.InetAddress; +import java.security.GeneralSecurityException; +import java.security.Key; -+import java.util.ArrayList; -+import java.util.Collections; -+import java.util.HashMap; -+import java.util.List; -+import java.util.logging.Level; +import javax.crypto.Cipher; +import javax.crypto.spec.IvParameterSpec; +import net.minecraft.server.MinecraftServer; -+import net.minecraft.server.PendingConnection; +import net.minecraft.server.ServerConnection; -+import org.bukkit.Bukkit; + +/** + * This is the NettyServerConnection class. It implements @@ -790,8 +830,6 @@ index 0000000..e5d24f7 + + private final ChannelFuture socket; + -+ -+ + public NettyServerConnection(MinecraftServer ms, InetAddress host, int port) { + super(ms); + int threads = Integer.getInteger("org.spigotmc.netty.threads", 3); @@ -808,14 +846,12 @@ index 0000000..e5d24f7 + ch.pipeline() + .addLast("timer", new ReadTimeoutHandler(30)) + .addLast("decoder", new PacketDecoder()) -+ .addLast("encoder", new PacketEncoder(networkManager)) + .addLast("manager", networkManager); + } + }).group(new NioEventLoopGroup(threads, new ThreadFactoryBuilder().setNameFormat("Netty IO Thread - %1$d").build())).localAddress(host, port).bind(); + MinecraftServer.getServer().getLogger().info("Using Netty NIO with " + threads + " threads for network connections."); + } + -+ + /** + * Shutdown. This method is called when the server is shutting down and the + * server socket and all clients should be terminated with no further @@ -1167,62 +1203,6 @@ index 0000000..65074d2 + packet = null; + } +} -diff --git a/src/main/java/org/spigotmc/netty/PacketEncoder.java b/src/main/java/org/spigotmc/netty/PacketEncoder.java -new file mode 100644 -index 0000000..c8832d6 ---- /dev/null -+++ b/src/main/java/org/spigotmc/netty/PacketEncoder.java -@@ -0,0 +1,50 @@ -+package org.spigotmc.netty; -+ -+import io.netty.buffer.ByteBuf; -+import io.netty.buffer.ByteBufOutputStream; -+import io.netty.channel.ChannelHandlerContext; -+import io.netty.handler.codec.MessageToByteEncoder; -+import java.io.DataOutputStream; -+import net.minecraft.server.Packet; -+ -+/** -+ * Netty encoder which takes a packet and encodes it, and adds a byte packet id -+ * header. -+ */ -+public class PacketEncoder extends MessageToByteEncoder { -+ -+ private ByteBuf outBuf; -+ private DataOutputStream dataOut; -+ private final NettyNetworkManager networkManager; -+ -+ public PacketEncoder(NettyNetworkManager networkManager) { -+ this.networkManager = networkManager; -+ } -+ -+ @Override -+ public void encode(ChannelHandlerContext ctx, Packet msg, ByteBuf out) throws Exception { -+ if (outBuf == null) { -+ outBuf = ctx.alloc().directBuffer(); -+ } -+ if (dataOut == null) { -+ dataOut = new DataOutputStream(new ByteBufOutputStream(outBuf)); -+ } -+ -+ out.writeByte(msg.n()); -+ msg.a(dataOut); -+ -+ networkManager.addWrittenBytes(outBuf.readableBytes()); -+ out.writeBytes(outBuf); -+ out.discardSomeReadBytes(); -+ } -+ -+ @Override -+ public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception { -+ super.freeOutboundBuffer(ctx); -+ if (outBuf != null) { -+ outBuf.release(); -+ outBuf = null; -+ } -+ dataOut = null; -+ } -+} diff --git a/src/main/java/org/spigotmc/netty/PacketListener.java b/src/main/java/org/spigotmc/netty/PacketListener.java new file mode 100644 index 0000000..8e3b932