diff --git a/CraftBukkit-Patches/0105-Implement-Threaded-Bulk-Chunk-Compression-and-Cachin.patch b/CraftBukkit-Patches/0105-Implement-Threaded-Bulk-Chunk-Compression-and-Cachin.patch index 4054bd8d3d..53af35726f 100644 --- a/CraftBukkit-Patches/0105-Implement-Threaded-Bulk-Chunk-Compression-and-Cachin.patch +++ b/CraftBukkit-Patches/0105-Implement-Threaded-Bulk-Chunk-Compression-and-Cachin.patch @@ -1,21 +1,25 @@ -From 0db589c05b3eb0d213df3e7640818b77935c2e02 Mon Sep 17 00:00:00 2001 +From d56db82cd2978a74349685a4a392da310f9822df Mon Sep 17 00:00:00 2001 From: md_5 Date: Tue, 28 Jan 2014 20:32:07 +1100 Subject: [PATCH] Implement Threaded Bulk Chunk Compression and Caching diff --git a/src/main/java/net/minecraft/server/EntityPlayer.java b/src/main/java/net/minecraft/server/EntityPlayer.java -index 9b853a9..a4c8843 100644 +index 9b853a9..295b80d 100644 --- a/src/main/java/net/minecraft/server/EntityPlayer.java +++ b/src/main/java/net/minecraft/server/EntityPlayer.java -@@ -228,6 +228,7 @@ public class EntityPlayer extends EntityHuman implements ICrafting { +@@ -228,7 +228,10 @@ public class EntityPlayer extends EntityHuman implements ICrafting { } if (!arraylist.isEmpty()) { -+ java.util.Collections.sort( arraylist, new PlayerChunkMap.ChunkComparator( this ) ); // Spigot - sort a final time before sending - this.playerConnection.sendPacket(new PacketPlayOutMapChunkBulk(arraylist)); +- this.playerConnection.sendPacket(new PacketPlayOutMapChunkBulk(arraylist)); ++ // Spigot Start ++ java.util.Collections.sort( arraylist, new PlayerChunkMap.ChunkComparator( this ) ); ++ org.spigotmc.ChunkCompressor.sendAndCompress( this, new PacketPlayOutMapChunkBulk( arraylist ) ); ++ // Spigot End Iterator iterator2 = arraylist1.iterator(); + while (iterator2.hasNext()) { diff --git a/src/main/java/net/minecraft/server/PacketPlayOutMapChunkBulk.java b/src/main/java/net/minecraft/server/PacketPlayOutMapChunkBulk.java index fc92026..484d727 100644 --- a/src/main/java/net/minecraft/server/PacketPlayOutMapChunkBulk.java @@ -66,142 +70,90 @@ index ae4ca63..962e902 100644 private int x; private int z; -diff --git a/src/main/java/net/minecraft/server/ServerConnectionChannel.java b/src/main/java/net/minecraft/server/ServerConnectionChannel.java -index fb95be4..a382235 100644 ---- a/src/main/java/net/minecraft/server/ServerConnectionChannel.java -+++ b/src/main/java/net/minecraft/server/ServerConnectionChannel.java -@@ -1,14 +1,25 @@ - package net.minecraft.server; - -+import com.google.common.util.concurrent.ThreadFactoryBuilder; // Spigot - import net.minecraft.util.io.netty.channel.Channel; - import net.minecraft.util.io.netty.channel.ChannelException; - import net.minecraft.util.io.netty.channel.ChannelInitializer; - import net.minecraft.util.io.netty.channel.ChannelOption; - import net.minecraft.util.io.netty.handler.timeout.ReadTimeoutHandler; -+// Spigot Start -+import net.minecraft.util.io.netty.util.concurrent.DefaultEventExecutorGroup; -+import net.minecraft.util.io.netty.util.concurrent.EventExecutorGroup; -+import org.spigotmc.ChunkCompressor; -+import org.spigotmc.SpigotConfig; -+// Spigot End - - class ServerConnectionChannel extends ChannelInitializer { - - final ServerConnection a; -+ // Spigot Start -+ private static final EventExecutorGroup threadPool = new DefaultEventExecutorGroup( SpigotConfig.compressionThreads, new ThreadFactoryBuilder().setNameFormat( "Chunk Compressor #%d" ).setDaemon( true ).build() ); -+ private static final ChunkCompressor chunkCompressor = new ChunkCompressor(); -+ // Spigot End - - ServerConnectionChannel(ServerConnection serverconnection) { - this.a = serverconnection; -@@ -28,6 +39,7 @@ class ServerConnectionChannel extends ChannelInitializer { - } - - channel.pipeline().addLast("timeout", new ReadTimeoutHandler(30)).addLast("legacy_query", new LegacyPingHandler(this.a)).addLast("splitter", new PacketSplitter()).addLast("decoder", new PacketDecoder()).addLast("prepender", new PacketPrepender()).addLast("encoder", new PacketEncoder()); -+ channel.pipeline().addLast( threadPool, "compressor", chunkCompressor ); // Spigot - NetworkManager networkmanager = new NetworkManager(false); - - ServerConnection.a(this.a).add(networkmanager); diff --git a/src/main/java/org/spigotmc/ChunkCompressor.java b/src/main/java/org/spigotmc/ChunkCompressor.java new file mode 100644 -index 0000000..481211b +index 0000000..d43e528 --- /dev/null +++ b/src/main/java/org/spigotmc/ChunkCompressor.java -@@ -0,0 +1,94 @@ +@@ -0,0 +1,80 @@ +package org.spigotmc; + ++import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.Arrays; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.logging.Level; +import java.util.zip.CRC32; ++import net.minecraft.server.EntityPlayer; +import net.minecraft.server.PacketPlayOutMapChunkBulk; -+import net.minecraft.util.io.netty.channel.ChannelHandler; -+import net.minecraft.util.io.netty.channel.ChannelHandlerContext; -+import net.minecraft.util.io.netty.channel.ChannelOutboundHandlerAdapter; -+import net.minecraft.util.io.netty.channel.ChannelPromise; ++import net.minecraft.util.io.netty.util.concurrent.DefaultEventExecutorGroup; ++import net.minecraft.util.io.netty.util.concurrent.EventExecutorGroup; +import org.bukkit.Bukkit; + -+@ChannelHandler.Sharable -+public class ChunkCompressor extends ChannelOutboundHandlerAdapter ++public class ChunkCompressor +{ + -+ private final LinkedHashMap cache = new LinkedHashMap( 16, 0.75f, true ); // Defaults, order by access -+ private volatile int cacheSize; ++ private static final EventExecutorGroup threadPool = new DefaultEventExecutorGroup( SpigotConfig.compressionThreads, new ThreadFactoryBuilder().setNameFormat( "Chunk Compressor #%d" ).setDaemon( true ).build() ); ++ private static final LinkedHashMap cache = new LinkedHashMap( 16, 0.75f, true ); // Defaults, order by access ++ private static volatile int cacheSize; + -+ @Override -+ public synchronized void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception ++ public static void sendAndCompress(final EntityPlayer player, final PacketPlayOutMapChunkBulk chunk) + { -+ try ++ threadPool.submit( new Runnable() + { -+ long start = System.currentTimeMillis(); -+ boolean cached = false; -+ if ( msg instanceof PacketPlayOutMapChunkBulk ) ++ ++ @Override ++ public void run() + { -+ PacketPlayOutMapChunkBulk chunk = (PacketPlayOutMapChunkBulk) msg; -+ // Well shit, orebfuscator is at it again -+ if ( chunk.inflatedBuffers == null ) ++ try + { -+ Bukkit.getServer().getLogger().warning( "chunk.inflatedBuffers is null. Are you running Oreobfuscator? If so, please note that it is unsupported and will disable async compression" ); -+ super.write( ctx, msg, promise ); -+ } -+ -+ // Here we assign a hash to the chunk based on its contents. CRC32 is fast and sufficient for use here. -+ CRC32 crc = new CRC32(); -+ for ( byte[] c : chunk.inflatedBuffers ) -+ { -+ crc.update( c ); -+ } -+ long hash = crc.getValue(); -+ -+ byte[] deflated = cache.get( hash ); -+ if ( deflated != null ) -+ { -+ chunk.buffer = deflated; -+ chunk.size = deflated.length; -+ cached = true; -+ } else -+ { -+ chunk.compress(); // Compress the chunk -+ byte[] buffer = Arrays.copyOf( chunk.buffer, chunk.size ); // Resize the array to correct sizing -+ -+ Iterator iter = cache.values().iterator(); // Grab a single iterator reference -+ // Whilst this next entry is too big for us, and we have stuff to remove -+ while ( cacheSize + buffer.length > org.spigotmc.SpigotConfig.chunkCacheBytes && iter.hasNext() ) ++ long start = System.currentTimeMillis(); ++ boolean cached = false; ++ // Well shit, orebfuscator is at it again ++ if ( chunk.inflatedBuffers == null ) + { -+ cacheSize -= iter.next().length; // Update size table -+ iter.remove(); // Remove it alltogether ++ Bukkit.getServer().getLogger().warning( "chunk.inflatedBuffers is null. Are you running Oreobfuscator? If so, please note that it is unsupported and will disable async compression" ); ++ player.playerConnection.sendPacket( chunk ); ++ return; + } -+ cacheSize += buffer.length; // Update size table -+ cache.put( hash, buffer ); // Pop the new one in the cache ++ ++ // Here we assign a hash to the chunk based on its contents. CRC32 is fast and sufficient for use here. ++ CRC32 crc = new CRC32(); ++ for ( byte[] c : chunk.inflatedBuffers ) ++ { ++ crc.update( c ); ++ } ++ long hash = crc.getValue(); ++ ++ byte[] deflated = cache.get( hash ); ++ if ( deflated != null ) ++ { ++ chunk.buffer = deflated; ++ chunk.size = deflated.length; ++ cached = true; ++ } else ++ { ++ chunk.compress(); // Compress the chunk ++ byte[] buffer = Arrays.copyOf( chunk.buffer, chunk.size ); // Resize the array to correct sizing ++ ++ Iterator iter = cache.values().iterator(); // Grab a single iterator reference ++ // Whilst this next entry is too big for us, and we have stuff to remove ++ while ( cacheSize + buffer.length > org.spigotmc.SpigotConfig.chunkCacheBytes && iter.hasNext() ) ++ { ++ cacheSize -= iter.next().length; // Update size table ++ iter.remove(); // Remove it alltogether ++ } ++ cacheSize += buffer.length; // Update size table ++ cache.put( hash, buffer ); // Pop the new one in the cache ++ } ++ // System.out.println( "Time: " + ( System.currentTimeMillis() - start ) + " " + cached + " " + cacheSize ); ++ player.playerConnection.sendPacket( chunk ); ++ } catch ( Throwable t ) ++ { ++ Bukkit.getServer().getLogger().log( Level.WARNING, "Error compressing or caching chunk", t ); + } -+ // System.out.println( "Time: " + ( System.currentTimeMillis() - start ) + " " + cached + " " + cacheSize ); + } -+ } catch ( Throwable t ) -+ { -+ Bukkit.getServer().getLogger().log( Level.WARNING, "Error compressing or caching chunk", t ); -+ } -+ -+ super.write( ctx, msg, promise ); -+ } -+ -+ @Override -+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception -+ { -+ // In short, there isn't actually anything wrong with the async chunk compressor, it just accidentally caused console logging of errors which were previously ignored.\ -+ // This commit restores that behaviour -+ -+ // You may be asking yourself why we are completely ignoring any errors which come this far down the pipeline. -+ // The answer is quite simple: -+ // Mojang did it -+ // The default Mojang pipeline doesn't have any ChannelOutboundHandlerAdapter or similar instances, and thus nothing to handle exceptionCaught -+ // So when a channel.write() or channel.flush() fails, the error message is actually just passed straight to the future provided. -+ // It is then subsequently discarded, the channel closed, and no one except the user was any the wiser it actually happened! -+ // Unfortunately for us, the default exceptionCaught in this class sends a blaring warning to the server admins indicating that it couldn't send a packet to a disconnected user! -+ // We don't care about these warnings, if we did something wrong to disconnect the user, it is already logged in the proper location, as are broken sockets -+ // tl;dr no need to blare warnings on each write to a broken socket ++ } ); + } +} diff --git a/src/main/java/org/spigotmc/SpigotConfig.java b/src/main/java/org/spigotmc/SpigotConfig.java