diff --git a/paper-server/src/main/java/org/bukkit/craftbukkit/ChunkCompressionThread.java b/paper-server/src/main/java/org/bukkit/craftbukkit/ChunkCompressionThread.java new file mode 100644 index 0000000000..7490002f6d --- /dev/null +++ b/paper-server/src/main/java/org/bukkit/craftbukkit/ChunkCompressionThread.java @@ -0,0 +1,135 @@ +package org.bukkit.craftbukkit; + +import java.util.HashMap; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.zip.Deflater; + +import net.minecraft.server.EntityPlayer; +import net.minecraft.server.Packet; +import net.minecraft.server.Packet51MapChunk; + +public final class ChunkCompressionThread implements Runnable { + + private static final ChunkCompressionThread instance = new ChunkCompressionThread(); + private static boolean isRunning = false; + + private final int QUEUE_CAPACITY = 1024 * 10; + private final HashMap queueSizePerPlayer = new HashMap(); + private final BlockingQueue packetQueue = new LinkedBlockingQueue(QUEUE_CAPACITY); + + private final int CHUNK_SIZE = 16 * 128 * 16 * 5 / 2; + private final int REDUCED_DEFLATE_THRESHOLD = CHUNK_SIZE / 4; + private final int DEFLATE_LEVEL_CHUNKS = 6; + private final int DEFLATE_LEVEL_PARTS = 1; + + private final Deflater deflater = new Deflater(); + private byte[] deflateBuffer = new byte[CHUNK_SIZE + 100]; + + public static void startThread() { + if (!isRunning) { + isRunning = true; + new Thread(instance).start(); + } + } + + public void run() { + while (true) { + try { + handleQueuedPacket(packetQueue.take()); + } catch (InterruptedException ie) { + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + private void handleQueuedPacket(QueuedPacket queuedPacket) { + addToPlayerQueueSize(queuedPacket.player, -1); + // Compress the packet if necessary. + if (queuedPacket.compress) { + handleMapChunk(queuedPacket); + } + sendToNetworkQueue(queuedPacket); + } + + private void handleMapChunk(QueuedPacket queuedPacket) { + Packet51MapChunk packet = (Packet51MapChunk) queuedPacket.packet; + + // If 'packet.g' is set then this packet has already been compressed. + if (packet.g != null) { + return; + } + + int dataSize = packet.rawData.length; + if (deflateBuffer.length < dataSize + 100) { + deflateBuffer = new byte[dataSize + 100]; + } + + deflater.reset(); + deflater.setLevel(dataSize < REDUCED_DEFLATE_THRESHOLD ? DEFLATE_LEVEL_PARTS : DEFLATE_LEVEL_CHUNKS); + deflater.setInput(packet.rawData); + deflater.finish(); + int size = deflater.deflate(deflateBuffer); + if (size == 0) { + size = deflater.deflate(deflateBuffer); + } + + // copy compressed data to packet + packet.g = new byte[size]; + packet.h = size; + System.arraycopy(deflateBuffer, 0, packet.g, 0, size); + } + + private void sendToNetworkQueue(QueuedPacket queuedPacket) { + queuedPacket.player.netServerHandler.networkManager.queue(queuedPacket.packet); + } + + public static void sendPacket(EntityPlayer player, Packet packet) { + if (packet instanceof Packet51MapChunk) { + // MapChunk Packets need compressing. + instance.addQueuedPacket(new QueuedPacket(player, packet, true)); + } else { + // Other Packets don't. + instance.addQueuedPacket(new QueuedPacket(player, packet, false)); + } + } + + private void addToPlayerQueueSize(EntityPlayer player, int amount) { + synchronized (queueSizePerPlayer) { + Integer count = queueSizePerPlayer.get(player); + queueSizePerPlayer.put(player, (count == null ? 0 : count) + amount); + } + } + + public static int getPlayerQueueSize(EntityPlayer player) { + synchronized (instance.queueSizePerPlayer) { + Integer count = instance.queueSizePerPlayer.get(player); + return count == null ? 0 : count; + } + } + + private void addQueuedPacket(QueuedPacket task) { + addToPlayerQueueSize(task.player, +1); + + while (true) { + try { + packetQueue.put(task); + return; + } catch (InterruptedException e) { + } + } + } + + private static class QueuedPacket { + final EntityPlayer player; + final Packet packet; + final boolean compress; + + QueuedPacket(EntityPlayer player, Packet packet, boolean compress) { + this.player = player; + this.packet = packet; + this.compress = compress; + } + } +} \ No newline at end of file diff --git a/paper-server/src/main/java/org/bukkit/craftbukkit/CraftServer.java b/paper-server/src/main/java/org/bukkit/craftbukkit/CraftServer.java index 692fdd6d11..c4a87b8079 100644 --- a/paper-server/src/main/java/org/bukkit/craftbukkit/CraftServer.java +++ b/paper-server/src/main/java/org/bukkit/craftbukkit/CraftServer.java @@ -96,6 +96,8 @@ public final class CraftServer implements Server { loadConfig(); loadPlugins(); enablePlugins(PluginLoadOrder.STARTUP); + + ChunkCompressionThread.startThread(); } private void loadConfig() {