From 6c3a3e0ba21f506e0fb47381166b6d6f541a6966 Mon Sep 17 00:00:00 2001 From: "Kristian S. Stangeland" Date: Fri, 6 Dec 2013 01:21:35 +0100 Subject: [PATCH] Intercept server packets when they're scheduled in the main thread. Note that some server packets, such as LOGIN and STATUS, will still be run asynchronously. --- .../protocol/events/PacketEvent.java | 15 ++ .../injector/netty/ChannelInjector.java | 92 +++++--- .../protocol/injector/netty/ChannelProxy.java | 159 +++++++------ .../injector/netty/EventLoopProxy.java | 210 ++++++++++++++++++ .../injector/netty/NettyProtocolInjector.java | 4 + 5 files changed, 392 insertions(+), 88 deletions(-) create mode 100644 ProtocolLib/src/main/java/com/comphenix/protocol/injector/netty/EventLoopProxy.java diff --git a/ProtocolLib/src/main/java/com/comphenix/protocol/events/PacketEvent.java b/ProtocolLib/src/main/java/com/comphenix/protocol/events/PacketEvent.java index 86aa1035..69c2e96d 100644 --- a/ProtocolLib/src/main/java/com/comphenix/protocol/events/PacketEvent.java +++ b/ProtocolLib/src/main/java/com/comphenix/protocol/events/PacketEvent.java @@ -22,6 +22,8 @@ import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.lang.ref.WeakReference; import java.util.EventObject; + +import org.bukkit.Bukkit; import org.bukkit.entity.Player; import org.bukkit.event.Cancellable; @@ -137,6 +139,19 @@ public class PacketEvent extends EventObject implements Cancellable { return new PacketEvent(event, marker); } + /** + * Determine if we are executing the packet event in an asynchronous thread. + *

+ * If so, you must synchronize all calls to the Bukkit API. + *

+ * Generally, most server packets are executed on the main thread, whereas client packets + * are all executed asynchronously. + * @return TRUE if we are, FALSE otherwise. + */ + public boolean isAsync() { + return !Bukkit.isPrimaryThread(); + } + /** * Retrieves the packet that will be sent to the player. * @return Packet to send to the player. diff --git a/ProtocolLib/src/main/java/com/comphenix/protocol/injector/netty/ChannelInjector.java b/ProtocolLib/src/main/java/com/comphenix/protocol/injector/netty/ChannelInjector.java index 09c44c51..9d6c7260 100644 --- a/ProtocolLib/src/main/java/com/comphenix/protocol/injector/netty/ChannelInjector.java +++ b/ProtocolLib/src/main/java/com/comphenix/protocol/injector/netty/ChannelInjector.java @@ -118,6 +118,9 @@ class ChannelInjector extends ByteToMessageDecoder { private ConcurrentMap packetMarker = new MapMaker().weakKeys().makeMap(); private ConcurrentMap markerEvent = new MapMaker().weakKeys().makeMap(); + // Packets we have processed before + private Set processedPackets = Collections.newSetFromMap(new MapMaker().weakKeys().makeMap()); + // Packets to ignore private Set ignoredPackets = Collections.newSetFromMap(new MapMaker().weakKeys().makeMap()); @@ -235,32 +238,11 @@ class ChannelInjector extends ByteToMessageDecoder { ENCODE_BUFFER = FuzzyReflection.getMethodAccessor(vanillaEncoder.getClass(), "encode", ChannelHandlerContext.class, Object.class, ByteBuf.class); + // Intercept sent packets protocolEncoder = new MessageToByteEncoder() { @Override protected void encode(ChannelHandlerContext ctx, Object packet, ByteBuf output) throws Exception { - try { - NetworkMarker marker = getMarker(output); - PacketEvent event = markerEvent.remove(marker); - - if (event != null && NetworkMarker.hasOutputHandlers(marker)) { - ByteBuf packetBuffer = ctx.alloc().buffer(); - ENCODE_BUFFER.invoke(vanillaEncoder, ctx, packet, packetBuffer); - byte[] data = getBytes(packetBuffer); - - for (PacketOutputHandler handler : marker.getOutputHandlers()) { - handler.handle(event, data); - } - // Write the result - output.writeBytes(data); - return; - } - } catch (Exception e) { - channelListener.getReporter().reportDetailed(this, - Report.newBuilder(REPORT_CANNOT_INTERCEPT_SERVER_PACKET).callerParam(packet).error(e).build()); - } finally { - // Attempt to handle the packet nevertheless - ENCODE_BUFFER.invoke(vanillaEncoder, ctx, packet, output); - } + ChannelInjector.this.encode(ctx, packet, output); } public void exceptionCaught(ChannelHandlerContext channelhandlercontext, Throwable throwable) { @@ -273,10 +255,16 @@ class ChannelInjector extends ByteToMessageDecoder { originalChannel.pipeline().addAfter("encoder", "protocol_lib_encoder", protocolEncoder); // Intercept all write methods - channelField.setValue(new ChannelProxy(originalChannel) { + channelField.setValue(new ChannelProxy(originalChannel, MinecraftReflection.getPacketClass()) { @Override - protected Object onMessageWritten(Object message) { - return channelListener.onPacketSending(ChannelInjector.this, message, packetMarker.get(message)); + protected Object onMessageScheduled(Object message) { + Object result = processSending(message); + + // We have now processed this packet once already + if (result != null) { + processedPackets.add(result); + } + return result; } }); @@ -285,6 +273,15 @@ class ChannelInjector extends ByteToMessageDecoder { } } + /** + * Process a given message on the packet listeners. + * @param message - the message/packet. + * @return The resulting message/packet. + */ + private Object processSending(Object message) { + return channelListener.onPacketSending(ChannelInjector.this, message, packetMarker.get(message)); + } + /** * This method patches the encoder so that it skips already created packets. * @param encoder - the encoder to patch. @@ -316,6 +313,49 @@ class ChannelInjector extends ByteToMessageDecoder { } } + /** + * Encode a packet to a byte buffer, taking over for the standard Minecraft encoder. + * @param ctx - the current context. + * @param packet - the packet to encode to a byte array. + * @param output - the output byte array. + * @throws Exception If anything went wrong. + */ + protected void encode(ChannelHandlerContext ctx, Object packet, ByteBuf output) throws Exception { + try { + NetworkMarker marker = getMarker(packet); + PacketEvent event = markerEvent.remove(marker); + + // Try again, in case this packet was sent directly in the event loop + if (event == null && !processedPackets.remove(packet)) { + packet = processSending(packet); + marker = getMarker(packet); + event = markerEvent.remove(marker); + } + + // Process output handler + if (packet != null && event != null && NetworkMarker.hasOutputHandlers(marker)) { + ByteBuf packetBuffer = ctx.alloc().buffer(); + ENCODE_BUFFER.invoke(vanillaEncoder, ctx, packet, packetBuffer); + byte[] data = getBytes(packetBuffer); + + for (PacketOutputHandler handler : marker.getOutputHandlers()) { + handler.handle(event, data); + } + // Write the result + output.writeBytes(data); + return; + } + } catch (Exception e) { + channelListener.getReporter().reportDetailed(this, + Report.newBuilder(REPORT_CANNOT_INTERCEPT_SERVER_PACKET).callerParam(packet).error(e).build()); + } finally { + // Attempt to handle the packet nevertheless + if (packet != null) { + ENCODE_BUFFER.invoke(vanillaEncoder, ctx, packet, output); + } + } + } + @Override protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuffer, List packets) throws Exception { try { diff --git a/ProtocolLib/src/main/java/com/comphenix/protocol/injector/netty/ChannelProxy.java b/ProtocolLib/src/main/java/com/comphenix/protocol/injector/netty/ChannelProxy.java index e2244e55..892a1c04 100644 --- a/ProtocolLib/src/main/java/com/comphenix/protocol/injector/netty/ChannelProxy.java +++ b/ProtocolLib/src/main/java/com/comphenix/protocol/injector/netty/ChannelProxy.java @@ -1,8 +1,13 @@ package com.comphenix.protocol.injector.netty; -import java.lang.reflect.Constructor; import java.net.SocketAddress; +import java.util.Map; +import java.util.concurrent.Callable; +import com.comphenix.protocol.reflect.FuzzyReflection; +import com.comphenix.protocol.reflect.FuzzyReflection.FieldAccessor; + +import net.minecraft.util.com.google.common.collect.Maps; import net.minecraft.util.io.netty.buffer.ByteBufAllocator; import net.minecraft.util.io.netty.channel.Channel; import net.minecraft.util.io.netty.channel.ChannelConfig; @@ -14,48 +19,35 @@ import net.minecraft.util.io.netty.channel.ChannelPromise; import net.minecraft.util.io.netty.channel.EventLoop; import net.minecraft.util.io.netty.util.Attribute; import net.minecraft.util.io.netty.util.AttributeKey; -import net.minecraft.util.io.netty.util.concurrent.EventExecutor; abstract class ChannelProxy implements Channel { - private static Constructor FUTURE_CONSTRUCTOR; + // Mark that a certain object does not contain a message field + private static final FieldAccessor MARK_NO_MESSAGE = new FieldAccessor() { + public void set(Object instance, Object value) { } + public Object get(Object instance) { return null; } + }; + + // Looking up packets in inner classes + private static Map, FieldAccessor> MESSAGE_LOOKUP = Maps.newConcurrentMap(); // The underlying channel private Channel delegate; - - public ChannelProxy(Channel delegate) { - this.delegate = delegate; - } - - /** - * Invoked when a packet is being transmitted. - * @param message - the packet to transmit. - * @return The object to transmit. - */ - protected abstract Object onMessageWritten(Object message); + private Class messageClass; - /** - * The future we return when packets are being cancelled. - * @return A succeeded future. - */ - protected ChannelFuture getSucceededFuture() { - try { - if (FUTURE_CONSTRUCTOR == null) { - @SuppressWarnings("unchecked") - Class succededFuture = - (Class) ChannelProxy.class.getClassLoader(). - loadClass("net.minecraft.util.io.netty.channel.SucceededChannelFuture"); - - FUTURE_CONSTRUCTOR = succededFuture.getDeclaredConstructor(Channel.class, EventExecutor.class); - FUTURE_CONSTRUCTOR.setAccessible(true); - } - return FUTURE_CONSTRUCTOR.newInstance(this, null); - - } catch (ClassNotFoundException e) { - throw new RuntimeException("Cannot get succeeded future.", e); - } catch (Exception e) { - throw new RuntimeException("Cannot construct completed future.", e); - } + // Event loop proxy + private transient EventLoopProxy loopProxy; + + public ChannelProxy(Channel delegate, Class messageClass) { + this.delegate = delegate; + this.messageClass = messageClass; } + + /** + * Invoked when a packet is scheduled for transmission in the event loop. + * @param message - the packet to schedule. + * @return The object to transmit, or NULL to cancel. + */ + protected abstract Object onMessageScheduled(Object message); public Attribute attr(AttributeKey paramAttributeKey) { return delegate.attr(paramAttributeKey); @@ -82,7 +74,66 @@ abstract class ChannelProxy implements Channel { } public EventLoop eventLoop() { - return delegate.eventLoop(); + if (loopProxy == null) { + loopProxy = new EventLoopProxy() { + @Override + protected EventLoop getDelegate() { + return delegate.eventLoop(); + } + + @Override + protected Runnable schedulingRunnable(Runnable runnable) { + FieldAccessor accessor = getMessageAccessor(runnable); + + if (accessor != null) { + Object packet = onMessageScheduled(accessor.get(runnable)); + + if (packet != null) + accessor.set(runnable, packet); + else + getEmptyRunnable(); + } + return runnable; + } + + @Override + protected Callable schedulingCallable(Callable callable) { + FieldAccessor accessor = getMessageAccessor(callable); + + if (accessor != null) { + Object packet = onMessageScheduled(accessor.get(callable)); + + if (packet != null) + accessor.set(callable, packet); + else + getEmptyCallable(); + } + return callable; + } + }; + } + return loopProxy; + } + + /** + * Retrieve a way to access the packet field of an object. + * @param value - the object. + * @return The packet field accessor, or NULL if not found. + */ + private FieldAccessor getMessageAccessor(Object value) { + Class clazz = value.getClass(); + FieldAccessor accessor = MESSAGE_LOOKUP.get(clazz); + + if (accessor == null) { + try { + accessor = FuzzyReflection.getFieldAccessor(clazz, messageClass, true); + } catch (IllegalArgumentException e) { + accessor = MARK_NO_MESSAGE; + } + // Save the result + MESSAGE_LOOKUP.put(clazz, accessor); + } + return accessor != MARK_NO_MESSAGE ? accessor : null; } public ChannelFuture connect(SocketAddress paramSocketAddress1, @@ -198,37 +249,21 @@ abstract class ChannelProxy implements Channel { public ChannelFuture deregister(ChannelPromise paramChannelPromise) { return delegate.deregister(paramChannelPromise); } - - public ChannelFuture write(Object message) { - Object result = onMessageWritten(message); - - if (result != null) - return delegate.write(result); - return getSucceededFuture(); + + public ChannelFuture write(Object paramObject) { + return delegate.write(paramObject); } - public ChannelFuture write(Object message, ChannelPromise paramChannelPromise) { - Object result = onMessageWritten(message); - - if (result != null) - return delegate.write(message, paramChannelPromise); - return getSucceededFuture(); + public ChannelFuture write(Object paramObject, ChannelPromise paramChannelPromise) { + return delegate.write(paramObject, paramChannelPromise); } - public ChannelFuture writeAndFlush(Object message, ChannelPromise paramChannelPromise) { - Object result = onMessageWritten(message); - - if (result != null) - return delegate.writeAndFlush(message, paramChannelPromise); - return getSucceededFuture(); + public ChannelFuture writeAndFlush(Object paramObject, ChannelPromise paramChannelPromise) { + return delegate.writeAndFlush(paramObject, paramChannelPromise); } - public ChannelFuture writeAndFlush(Object message) { - Object result = onMessageWritten(message); - - if (result != null) - return delegate.writeAndFlush(message); - return getSucceededFuture(); + public ChannelFuture writeAndFlush(Object paramObject) { + return delegate.writeAndFlush(paramObject); } public int compareTo(Channel o) { diff --git a/ProtocolLib/src/main/java/com/comphenix/protocol/injector/netty/EventLoopProxy.java b/ProtocolLib/src/main/java/com/comphenix/protocol/injector/netty/EventLoopProxy.java new file mode 100644 index 00000000..627a6c09 --- /dev/null +++ b/ProtocolLib/src/main/java/com/comphenix/protocol/injector/netty/EventLoopProxy.java @@ -0,0 +1,210 @@ +package com.comphenix.protocol.injector.netty; + +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import net.minecraft.util.io.netty.channel.Channel; +import net.minecraft.util.io.netty.channel.ChannelFuture; +import net.minecraft.util.io.netty.channel.ChannelPromise; +import net.minecraft.util.io.netty.channel.EventLoop; +import net.minecraft.util.io.netty.channel.EventLoopGroup; +import net.minecraft.util.io.netty.util.concurrent.EventExecutor; +import net.minecraft.util.io.netty.util.concurrent.ProgressivePromise; +import net.minecraft.util.io.netty.util.concurrent.Promise; +import net.minecraft.util.io.netty.util.concurrent.ScheduledFuture; + +/** + * An event loop proxy. + * @author Kristian. + */ +abstract class EventLoopProxy implements EventLoop { + private static final Runnable EMPTY_RUNNABLE = new Runnable() { + public void run() { + // Do nothing + } + }; + private static final Callable EMPTY_CALLABLE = new Callable() { + public Object call() throws Exception { + return null; + }; + }; + + /** + * Retrieve the underlying event loop. + * @return The event loop. + */ + protected abstract EventLoop getDelegate(); + + /** + * Retrieve a callable that does nothing but return NULL. + * @return The empty callable. + */ + @SuppressWarnings("unchecked") + public static Callable getEmptyCallable() { + return (Callable) EMPTY_CALLABLE; + } + + /** + * Retrieve a runnable that does nothing. + * @return A NO-OP runnable. + */ + public static Runnable getEmptyRunnable() { + return EMPTY_RUNNABLE; + } + + /** + * Invoked when a runnable is being scheduled. + * @param runnable - the runnable that is scheduling. + * @return The runnable to schedule instead. Cannot be NULL. + */ + protected abstract Runnable schedulingRunnable(Runnable runnable); + + /** + * Invoked when a callable is being scheduled. + * @param runnable - the callable that is scheduling. + * @return The callable to schedule instead. Cannot be NULL. + */ + protected abstract Callable schedulingCallable(Callable callable); + + public void execute(Runnable command) { + getDelegate().execute(schedulingRunnable(command)); + } + + public net.minecraft.util.io.netty.util.concurrent.Future submit(Callable action) { + return getDelegate().submit(schedulingCallable(action)); + } + + public net.minecraft.util.io.netty.util.concurrent.Future submit(Runnable action, T arg1) { + return getDelegate().submit(schedulingRunnable(action), arg1); + } + + public net.minecraft.util.io.netty.util.concurrent.Future submit(Runnable action) { + return getDelegate().submit(schedulingRunnable(action)); + } + + public ScheduledFuture schedule(Callable action, long arg1, TimeUnit arg2) { + return getDelegate().schedule(schedulingCallable(action), arg1, arg2); + } + + public ScheduledFuture schedule(Runnable action, long arg1, TimeUnit arg2) { + return getDelegate().schedule(schedulingRunnable(action), arg1, arg2); + } + + public ScheduledFuture scheduleAtFixedRate(Runnable action, long arg1, long arg2, TimeUnit arg3) { + return getDelegate().scheduleAtFixedRate(schedulingRunnable(action), arg1, arg2, arg3); + } + + public ScheduledFuture scheduleWithFixedDelay(Runnable action, long arg1, long arg2, TimeUnit arg3) { + return getDelegate().scheduleWithFixedDelay(schedulingRunnable(action), arg1, arg2, arg3); + } + + // Boiler plate: + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return getDelegate().awaitTermination(timeout, unit); + } + + public boolean inEventLoop() { + return getDelegate().inEventLoop(); + } + + public boolean inEventLoop(Thread arg0) { + return getDelegate().inEventLoop(arg0); + } + + public boolean isShutdown() { + return getDelegate().isShutdown(); + } + + public boolean isTerminated() { + return getDelegate().isTerminated(); + } + + public List> invokeAll(Collection> tasks) + throws InterruptedException { + return getDelegate().invokeAll(tasks); + } + + public List> invokeAll(Collection> tasks, long timeout, + TimeUnit unit) throws InterruptedException { + return getDelegate().invokeAll(tasks, timeout, unit); + } + + public T invokeAny(Collection> tasks) throws InterruptedException, + ExecutionException { + return getDelegate().invokeAny(tasks); + } + + public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return getDelegate().invokeAny(tasks, timeout, unit); + } + + public boolean isShuttingDown() { + return getDelegate().isShuttingDown(); + } + + public Iterator iterator() { + return getDelegate().iterator(); + } + + public net.minecraft.util.io.netty.util.concurrent.Future newFailedFuture(Throwable arg0) { + return getDelegate().newFailedFuture(arg0); + } + + @Override + public EventLoop next() { + return getDelegate().next(); + } + + public ProgressivePromise newProgressivePromise() { + return getDelegate().newProgressivePromise(); + } + + public Promise newPromise() { + return getDelegate().newPromise(); + } + + public net.minecraft.util.io.netty.util.concurrent.Future newSucceededFuture(V arg0) { + return getDelegate().newSucceededFuture(arg0); + } + + public EventLoopGroup parent() { + return getDelegate().parent(); + } + + public ChannelFuture register(Channel arg0, ChannelPromise arg1) { + return getDelegate().register(arg0, arg1); + } + + public ChannelFuture register(Channel arg0) { + return getDelegate().register(arg0); + } + + public net.minecraft.util.io.netty.util.concurrent.Future shutdownGracefully() { + return getDelegate().shutdownGracefully(); + } + + public net.minecraft.util.io.netty.util.concurrent.Future shutdownGracefully(long arg0, long arg1, TimeUnit arg2) { + return getDelegate().shutdownGracefully(arg0, arg1, arg2); + } + + public net.minecraft.util.io.netty.util.concurrent.Future terminationFuture() { + return getDelegate().terminationFuture(); + } + + @Deprecated + public void shutdown() { + getDelegate().shutdown(); + } + + @Deprecated + public List shutdownNow() { + return getDelegate().shutdownNow(); + } +} diff --git a/ProtocolLib/src/main/java/com/comphenix/protocol/injector/netty/NettyProtocolInjector.java b/ProtocolLib/src/main/java/com/comphenix/protocol/injector/netty/NettyProtocolInjector.java index a7f18d45..d8274e21 100644 --- a/ProtocolLib/src/main/java/com/comphenix/protocol/injector/netty/NettyProtocolInjector.java +++ b/ProtocolLib/src/main/java/com/comphenix/protocol/injector/netty/NettyProtocolInjector.java @@ -175,6 +175,10 @@ public class NettyProtocolInjector implements ChannelListener { public Object onPacketSending(ChannelInjector injector, Object packet, NetworkMarker marker) { Class clazz = packet.getClass(); + if (!Bukkit.isPrimaryThread()) { + System.out.println("FUCK ME: " + packet); + } + if (queuedFilters.contains(clazz)) { // Check for ignored packets if (injector.unignorePacket(packet)) {