From ca2bc3ecc55bc144bc3b402f083e46c216b6e3f7 Mon Sep 17 00:00:00 2001 From: Dan Mulloy Date: Sat, 15 Nov 2014 14:56:57 -0500 Subject: [PATCH] Attempt to fix memory leaks with the ChannelInjector Addresses https://github.com/aadnk/ProtocolLib/issues/70 --- .../injector/netty/ChannelInjector.java | 231 +++++++++--------- 1 file changed, 119 insertions(+), 112 deletions(-) diff --git a/ProtocolLib/src/main/java/com/comphenix/protocol/injector/netty/ChannelInjector.java b/ProtocolLib/src/main/java/com/comphenix/protocol/injector/netty/ChannelInjector.java index bb9124d9..a6f78277 100644 --- a/ProtocolLib/src/main/java/com/comphenix/protocol/injector/netty/ChannelInjector.java +++ b/ProtocolLib/src/main/java/com/comphenix/protocol/injector/netty/ChannelInjector.java @@ -61,16 +61,16 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector { public static final ReportType REPORT_CANNOT_INTERCEPT_CLIENT_PACKET = new ReportType("Unable to intercept a read client packet."); public static final ReportType REPORT_CANNOT_EXECUTE_IN_CHANNEL_THREAD = new ReportType("Cannot execute code in channel thread."); public static final ReportType REPORT_CANNOT_FIND_GET_VERSION = new ReportType("Cannot find getVersion() in NetworkMananger"); - + /** * Indicates that a packet has bypassed packet listeners. */ private static final PacketEvent BYPASSED_PACKET = new PacketEvent(ChannelInjector.class); - + // The login packet private static Class PACKET_LOGIN_CLIENT = null; private static FieldAccessor LOGIN_GAME_PROFILE = null; - + // Saved accessors private static MethodAccessor DECODE_BUFFER; private static MethodAccessor ENCODE_BUFFER; @@ -78,17 +78,17 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector { // For retrieving the protocol private static FieldAccessor PROTOCOL_ACCESSOR; - + // The factory that created this injector private InjectionFactory factory; - + // The player, or temporary player private Player player; private Player updated; - + // The player connection private Object playerConnection; - + // The current network manager and channel private final Object networkManager; private final Channel originalChannel; @@ -96,32 +96,33 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector { // Known network markers private ConcurrentMap packetMarker = new MapMaker().weakKeys().makeMap(); - + /** * Indicate that this packet has been processed by event listeners. *

* This must never be set outside the channel pipeline's thread. */ private PacketEvent currentEvent; - + /** * A packet event that should be processed by the write method. */ private PacketEvent finalEvent; - + /** * A flag set by the main thread to indiciate that a packet should not be processed. */ private final ThreadLocal scheduleProcessPackets = new ThreadLocal() { + @Override protected Boolean initialValue() { return true; }; }; - + // Other handlers private ByteToMessageDecoder vanillaDecoder; private MessageToByteEncoder vanillaEncoder; - + // Our extra handlers private MessageToByteEncoder protocolEncoder; private ChannelInboundHandler finishHandler; @@ -129,14 +130,14 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector { // The channel listener private ChannelListener channelListener; - + // Processing network markers private NetworkProcessor processor; - + // Closed private boolean injected; private boolean closed; - + /** * Construct a new channel injector. * @param player - the current player, or temporary player. @@ -152,14 +153,14 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector { this.channelListener = Preconditions.checkNotNull(channelListener, "channelListener cannot be NULL"); this.factory = Preconditions.checkNotNull(factory, "factory cannot be NULL"); this.processor = new NetworkProcessor(ProtocolLibrary.getErrorReporter()); - + // Get the channel field this.channelField = new VolatileField( FuzzyReflection.fromObject(networkManager, true). - getFieldByType("channel", Channel.class), + getFieldByType("channel", Channel.class), networkManager, true); } - + /** * Get the version of the current protocol. * @return The version. @@ -168,7 +169,7 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector { public int getProtocolVersion() { return MinecraftProtocolVersion.getCurrentVersion(); } - + @Override @SuppressWarnings("unchecked") public boolean inject() { @@ -179,8 +180,8 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector { return false; if (!originalChannel.isActive()) return false; - - // Main thread? We should synchronize with the channel thread, otherwise we might see a + + // Main thread? We should synchronize with the channel thread, otherwise we might see a // pipeline with only some of the handlers removed if (Bukkit.isPrimaryThread()) { // Just like in the close() method, we'll avoid blocking the main thread @@ -192,43 +193,43 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector { }); return false; // We don't know } - + // Don't inject the same channel twice if (findChannelHandler(originalChannel, ChannelInjector.class) != null) { return false; } - + // Get the vanilla decoder, so we don't have to replicate the work vanillaDecoder = (ByteToMessageDecoder) originalChannel.pipeline().get("decoder"); vanillaEncoder = (MessageToByteEncoder) originalChannel.pipeline().get("encoder"); - + if (vanillaDecoder == null) throw new IllegalArgumentException("Unable to find vanilla decoder in " + originalChannel.pipeline() ); if (vanillaEncoder == null) throw new IllegalArgumentException("Unable to find vanilla encoder in " + originalChannel.pipeline() ); patchEncoder(vanillaEncoder); - + if (DECODE_BUFFER == null) - DECODE_BUFFER = Accessors.getMethodAccessor(vanillaDecoder.getClass(), + DECODE_BUFFER = Accessors.getMethodAccessor(vanillaDecoder.getClass(), "decode", ChannelHandlerContext.class, ByteBuf.class, List.class); if (ENCODE_BUFFER == null) ENCODE_BUFFER = Accessors.getMethodAccessor(vanillaEncoder.getClass(), "encode", ChannelHandlerContext.class, Object.class, ByteBuf.class); - + // Intercept sent packets protocolEncoder = new MessageToByteEncoder() { @Override protected void encode(ChannelHandlerContext ctx, Object packet, ByteBuf output) throws Exception { ChannelInjector.this.encode(ctx, packet, output); } - + @Override public void write(ChannelHandlerContext ctx, Object packet, ChannelPromise promise) throws Exception { super.write(ctx, packet, promise); ChannelInjector.this.finalWrite(ctx, packet, promise); } }; - + // Intercept recieved packets finishHandler = new ChannelInboundHandlerAdapter() { @Override @@ -238,27 +239,27 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector { ChannelInjector.this.finishRead(ctx, msg); } }; - - // Insert our handlers - note that we effectively replace the vanilla encoder/decoder + + // Insert our handlers - note that we effectively replace the vanilla encoder/decoder originalChannel.pipeline().addBefore("decoder", "protocol_lib_decoder", this); originalChannel.pipeline().addBefore("protocol_lib_decoder", "protocol_lib_finish", finishHandler); originalChannel.pipeline().addAfter("encoder", "protocol_lib_encoder", protocolEncoder); - + // Intercept all write methods channelField.setValue(new ChannelProxy(originalChannel, MinecraftReflection.getPacketClass()) { @Override protected Callable onMessageScheduled(final Callable callable, FieldAccessor packetAccessor) { final PacketEvent event = handleScheduled(callable, packetAccessor); - + // Handle cancelled events if (event != null && event.isCancelled()) return null; - + return new Callable() { @Override public T call() throws Exception { T result = null; - + // This field must only be updated in the pipeline thread currentEvent = event; result = callable.call(); @@ -267,11 +268,11 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector { } }; } - + @Override protected Runnable onMessageScheduled(final Runnable runnable, FieldAccessor packetAccessor) { final PacketEvent event = handleScheduled(runnable, packetAccessor); - + // Handle cancelled events if (event != null && event.isCancelled()) return null; @@ -285,15 +286,15 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector { } }; } - + protected PacketEvent handleScheduled(Object instance, FieldAccessor accessor) { // Let the filters handle this packet Object original = accessor.get(instance); - + // See if we've been instructed not to process packets if (!scheduleProcessPackets.get()) { NetworkMarker marker = getMarker(original); - + if (marker != null) { PacketEvent result = new PacketEvent(ChannelInjector.class); result.setNetworkMarker(marker); @@ -306,7 +307,7 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector { if (event != null && !event.isCancelled()) { Object changed = event.getPacket().getHandle(); - + // Change packet to be scheduled if (original != changed) accessor.set(instance, changed); @@ -314,7 +315,7 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector { return event != null ? event : BYPASSED_PACKET; } }); - + injected = true; return true; } @@ -328,7 +329,7 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector { private PacketEvent processSending(Object message) { return channelListener.onPacketSending(ChannelInjector.this, message, getMarker(message)); } - + /** * This method patches the encoder so that it skips already created packets. * @param encoder - the encoder to patch. @@ -339,17 +340,17 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector { } ENCODER_TYPE_MATCHER.set(encoder, TypeParameterMatcher.get(MinecraftReflection.getPacketClass())); } - + @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { if (channelListener.isDebug()) cause.printStackTrace(); super.exceptionCaught(ctx, cause); } - + /** * Encode a packet to a byte buffer, taking over for the standard Minecraft encoder. - * @param ctx - the current context. + * @param ctx - the current context. * @param packet - the packet to encode to a byte array. * @param output - the output byte array. * @throws Exception If anything went wrong. @@ -357,26 +358,26 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector { protected void encode(ChannelHandlerContext ctx, Object packet, ByteBuf output) throws Exception { NetworkMarker marker = null; PacketEvent event = currentEvent; - + try { // Skip every kind of non-filtered packet if (!scheduleProcessPackets.get()) { return; } - + // This packet has not been seen by the main thread if (event == null) { Class clazz = packet.getClass(); - + // Schedule the transmission on the main thread instead if (channelListener.hasMainThreadListener(clazz)) { // Delay the packet scheduleMainThread(packet); packet = null; - + } else { event = processSending(packet); - + // Handle the output if (event != null) { packet = !event.isCancelled() ? event.getPacket().getHandle() : null; @@ -385,9 +386,9 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector { } if (event != null) { // Retrieve marker without accidentally constructing it - marker = NetworkMarker.getNetworkMarker(event); + marker = NetworkMarker.getNetworkMarker(event); } - + // Process output handler if (packet != null && event != null && NetworkMarker.hasOutputHandlers(marker)) { ByteBuf packetBuffer = ctx.alloc().buffer(); @@ -395,17 +396,17 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector { // Let each handler prepare the actual output byte[] data = processor.processOutput(event, marker, getBytes(packetBuffer)); - + // Write the result output.writeBytes(data); packet = null; - + // Sent listeners? finalEvent = event; return; } } catch (Exception e) { - channelListener.getReporter().reportDetailed(this, + channelListener.getReporter().reportDetailed(this, Report.newBuilder(REPORT_CANNOT_INTERCEPT_SERVER_PACKET).callerParam(packet).error(e).build()); } finally { // Attempt to handle the packet nevertheless @@ -424,12 +425,12 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector { */ protected void finalWrite(ChannelHandlerContext ctx, Object packet, ChannelPromise promise) { PacketEvent event = finalEvent; - + if (event != null) { // Necessary to prevent infinite loops finalEvent = null; currentEvent = null; - + processor.invokePostEvent(event, NetworkMarker.getNetworkMarker(event)); } } @@ -443,30 +444,30 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector { } }); } - + @Override protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuffer, List packets) throws Exception { byteBuffer.markReaderIndex(); DECODE_BUFFER.invoke(vanillaDecoder, ctx, byteBuffer, packets); - - try { + + try { // Reset queue finishQueue.clear(); - + for (ListIterator it = packets.listIterator(); it.hasNext(); ) { Object input = it.next(); Class packetClass = input.getClass(); NetworkMarker marker = null; - + // Special case! handleLogin(packetClass, input); - + if (channelListener.includeBuffer(packetClass)) { byteBuffer.resetReaderIndex(); marker = new NettyNetworkMarker(ConnectionSide.CLIENT_SIDE, getBytes(byteBuffer)); } PacketEvent output = channelListener.onPacketReceiving(this, input, marker); - + // Handle packet changes if (output != null) { if (output.isCancelled()) { @@ -475,16 +476,16 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector { } else if (output.getPacket().getHandle() != input) { it.set(output.getPacket().getHandle()); } - + finishQueue.addLast(output); } } } catch (Exception e) { - channelListener.getReporter().reportDetailed(this, + channelListener.getReporter().reportDetailed(this, Report.newBuilder(REPORT_CANNOT_INTERCEPT_CLIENT_PACKET).callerParam(byteBuffer).error(e).build()); } } - + /** * Invoked after our decoder. * @param ctx - current context. @@ -493,16 +494,16 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector { protected void finishRead(ChannelHandlerContext ctx, Object msg) { // Assume same order PacketEvent event = finishQueue.pollFirst(); - + if (event != null) { NetworkMarker marker = NetworkMarker.getNetworkMarker(event); - + if (marker != null) { processor.invokePostEvent(event, marker); } } } - + /** * Invoked when we may need to handle the login packet. * @param packetClass - the packet class. @@ -511,7 +512,7 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector { protected void handleLogin(Class packetClass, Object packet) { Class loginClass = PACKET_LOGIN_CLIENT; FieldAccessor loginClient = LOGIN_GAME_PROFILE; - + // Initialize packet class and login if (loginClass == null) { loginClass = PacketType.Login.Client.START.getPacketClass(); @@ -521,26 +522,26 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector { loginClient = Accessors.getFieldAccessor(PACKET_LOGIN_CLIENT, GameProfile.class, true); LOGIN_GAME_PROFILE = loginClient; } - + // See if we are dealing with the login packet if (loginClass.equals(packetClass)) { GameProfile profile = (GameProfile) loginClient.get(packet); - + // Save the channel injector factory.cacheInjector(profile.getName(), this); } } - + @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); - + // See NetworkManager.channelActive(ChannelHandlerContext) for why if (channelField != null) { channelField.refreshValue(); } } - + /** * Retrieve every byte in the given byte buffer. * @param buffer - the buffer. @@ -548,11 +549,11 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector { */ private byte[] getBytes(ByteBuf buffer){ byte[] data = new byte[buffer.readableBytes()]; - + buffer.readBytes(data); return data; } - + /** * Disconnect the current player. * @param message - the disconnect message, if possible. @@ -575,7 +576,7 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector { @Override public void sendServerPacket(Object packet, NetworkMarker marker, boolean filtered) { saveMarker(packet, marker); - + try { scheduleProcessPackets.set(filtered); invokeSendPacket(packet); @@ -583,7 +584,7 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector { scheduleProcessPackets.set(true); } } - + /** * Invoke the sendPacket method in Minecraft. * @param packet - the packet to send. @@ -600,11 +601,11 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector { throw new RuntimeException("Unable to send server packet " + packet, e); } } - + @Override public void recieveClientPacket(final Object packet) { // TODO: Ensure the packet listeners are executed in the channel thread. - + // Execute this in the channel thread Runnable action = new Runnable() { @Override @@ -617,7 +618,7 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector { } } }; - + // Execute in the worker thread if (originalChannel.eventLoop().inEventLoop()) { action.run(); @@ -625,7 +626,7 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector { originalChannel.eventLoop().execute(action); } } - + @Override public Protocol getCurrentProtocol() { if (PROTOCOL_ACCESSOR == null) { @@ -634,7 +635,7 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector { } return Protocol.fromVanilla((Enum) PROTOCOL_ACCESSOR.get(networkManager)); } - + /** * Retrieve the player connection of the current player. * @return The player connection. @@ -645,45 +646,47 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector { } return playerConnection; } - + @Override public NetworkMarker getMarker(Object packet) { return packetMarker.get(packet); } - + @Override public void saveMarker(Object packet, NetworkMarker marker) { if (marker != null) { packetMarker.put(packet, marker); } } - + @Override public Player getPlayer() { return player; } - + /** * Set the player instance. * @param player - current instance. */ + @Override public void setPlayer(Player player) { this.player = player; } - + /** * Set the updated player instance. * @param updated - updated instance. */ + @Override public void setUpdatedPlayer(Player updated) { this.updated = updated; } - + @Override public boolean isInjected() { return injected; } - + /** * Determine if this channel has been closed and cleaned up. * @return TRUE if it has, FALSE otherwise. @@ -692,29 +695,29 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector { public boolean isClosed() { return closed; } - + @Override public void close() { if (!closed) { closed = true; - + if (injected) { channelField.revertValue(); - - // Calling remove() in the main thread will block the main thread, which may lead + + // Calling remove() in the main thread will block the main thread, which may lead // to a deadlock: // http://pastebin.com/L3SBVKzp - // + // // ProtocolLib executes this close() method through a PlayerQuitEvent in the main thread, - // which has implicitly aquired a lock on SimplePluginManager (see SimplePluginManager.callEvent(Event)). - // Unfortunately, the remove() method will schedule the removal on one of the Netty worker threads if + // which has implicitly aquired a lock on SimplePluginManager (see SimplePluginManager.callEvent(Event)). + // Unfortunately, the remove() method will schedule the removal on one of the Netty worker threads if // it's called from a different thread, blocking until the removal has been confirmed. - // - // This is bad enough (Rule #1: Don't block the main thread), but the real trouble starts if the same - // worker thread happens to be handling a server ping connection when this removal task is scheduled. - // In that case, it may attempt to invoke an asynchronous ServerPingEvent (see PacketStatusListener) - // using SimplePluginManager.callEvent(). But, since this has already been locked by the main thread, - // we end up with a deadlock. The main thread is waiting for the worker thread to process the task, and + // + // This is bad enough (Rule #1: Don't block the main thread), but the real trouble starts if the same + // worker thread happens to be handling a server ping connection when this removal task is scheduled. + // In that case, it may attempt to invoke an asynchronous ServerPingEvent (see PacketStatusListener) + // using SimplePluginManager.callEvent(). But, since this has already been locked by the main thread, + // we end up with a deadlock. The main thread is waiting for the worker thread to process the task, and // the worker thread is waiting for the main thread to finish executing PlayerQuitEvent. // // TLDR: Concurrenty is hard. @@ -730,10 +733,14 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector { } } }); - + // Clear cache factory.invalidate(player); } + + // dmulloy2 - attempt to fix memory leakage + this.player = null; + this.updated = null; } } @@ -750,13 +757,13 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector { try { command.run(); } catch (Exception e) { - ProtocolLibrary.getErrorReporter().reportDetailed(ChannelInjector.this, + ProtocolLibrary.getErrorReporter().reportDetailed(ChannelInjector.this, Report.newBuilder(REPORT_CANNOT_EXECUTE_IN_CHANNEL_THREAD).error(e).build()); } } }); } - + /** * Find the first channel handler that is assignable to a given type. * @param channel - the channel. @@ -771,14 +778,14 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector { } return null; } - + /** * Represents a socket injector that foreards to the current channel injector. * @author Kristian */ static class ChannelSocketInjector implements SocketInjector { private final ChannelInjector injector; - + public ChannelSocketInjector(ChannelInjector injector) { this.injector = Preconditions.checkNotNull(injector, "injector cannot be NULL"); } @@ -822,7 +829,7 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector { public void setUpdatedPlayer(Player updatedPlayer) { injector.player = updatedPlayer; } - + public ChannelInjector getChannelInjector() { return injector; }