From 6b03b28a61d24b770bdf04c9eaf9424409f7b378 Mon Sep 17 00:00:00 2001 From: Gegy Date: Mon, 27 Nov 2023 18:26:59 +0100 Subject: [PATCH] Refactor ChatQueue to track and expose general chat state --- .../connection/client/ConnectedPlayer.java | 11 +- .../proxy/protocol/packet/chat/ChatQueue.java | 140 ++++++------------ 2 files changed, 49 insertions(+), 102 deletions(-) diff --git a/proxy/src/main/java/com/velocitypowered/proxy/connection/client/ConnectedPlayer.java b/proxy/src/main/java/com/velocitypowered/proxy/connection/client/ConnectedPlayer.java index 8522dfce7..6beeb9cf8 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/connection/client/ConnectedPlayer.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/connection/client/ConnectedPlayer.java @@ -83,6 +83,7 @@ import com.velocitypowered.proxy.protocol.packet.chat.ChatType; import com.velocitypowered.proxy.protocol.packet.chat.ComponentHolder; import com.velocitypowered.proxy.protocol.packet.chat.PlayerChatCompletionPacket; import com.velocitypowered.proxy.protocol.packet.chat.builder.ChatBuilderFactory; +import com.velocitypowered.proxy.protocol.packet.chat.builder.ChatBuilderV2; import com.velocitypowered.proxy.protocol.packet.chat.legacy.LegacyChatPacket; import com.velocitypowered.proxy.protocol.packet.config.ClientboundServerLinksPacket; import com.velocitypowered.proxy.protocol.packet.config.StartUpdatePacket; @@ -1108,11 +1109,11 @@ public class ConnectedPlayer implements MinecraftConnectionAssociation, Player, "input cannot be greater than " + LegacyChatPacket.MAX_SERVERBOUND_MESSAGE_LENGTH + " characters in length"); if (getProtocolVersion().noLessThan(ProtocolVersion.MINECRAFT_1_19)) { - this.chatQueue.hijack(getChatBuilderFactory().builder().asPlayer(this).message(input), - (instant, item) -> { - item.setTimestamp(instant); - return item.toServer(); - }); + ChatBuilderV2 message = getChatBuilderFactory().builder().asPlayer(this).message(input); + this.chatQueue.queuePacket(chatState -> { + message.setTimestamp(chatState.lastTimestamp); + return message.toServer(); + }); } else { ensureBackendConnection().write(getChatBuilderFactory().builder() .asPlayer(this).message(input).toServer()); diff --git a/proxy/src/main/java/com/velocitypowered/proxy/protocol/packet/chat/ChatQueue.java b/proxy/src/main/java/com/velocitypowered/proxy/protocol/packet/chat/ChatQueue.java index 2ddcff17d..07480f54f 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/protocol/packet/chat/ChatQueue.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/protocol/packet/chat/ChatQueue.java @@ -32,9 +32,10 @@ import java.util.function.Function; */ public class ChatQueue { - private final Object internalLock; + private final Object internalLock = new Object(); private final ConnectedPlayer player; - private CompletableFuture packetFuture; + private final ChatState chatState = new ChatState(); + private CompletableFuture head = CompletableFuture.completedFuture(null); /** * Instantiates a {@link ChatQueue} for a specific {@link ConnectedPlayer}. @@ -43,8 +44,19 @@ public class ChatQueue { */ public ChatQueue(ConnectedPlayer player) { this.player = player; - this.packetFuture = CompletableFuture.completedFuture(new WrappedPacket(Instant.EPOCH, null)); - this.internalLock = new Object(); + } + + private void queueTask(Task task) { + synchronized (internalLock) { + MinecraftConnection smc = player.ensureAndGetCurrentServer().ensureConnected(); + head = head.thenCompose(v -> { + try { + return task.update(chatState, smc).exceptionally(ignored -> null); + } catch (Throwable ignored) { + return CompletableFuture.completedFuture(null); + } + }); + } } /** @@ -53,120 +65,54 @@ public class ChatQueue { * and messages. All entries are locked through an internal object lock. * * @param nextPacket the {@link CompletableFuture} which will provide the next-processed packet. - * @param timestamp the {@link Instant} timestamp of this packet so we can allow piggybacking. + * @param timestamp the new {@link Instant} timestamp of this packet to update the internal chat state. */ - public void queuePacket(CompletableFuture nextPacket, Instant timestamp) { - synchronized (internalLock) { // wait for the lock to resolve - we don't want to drop packets - MinecraftConnection smc = player.ensureAndGetCurrentServer().ensureConnected(); - - CompletableFuture nextInLine = WrappedPacket.wrap(timestamp, nextPacket); - this.packetFuture = awaitChat(smc, this.packetFuture, - nextInLine); // we await chat, binding `this.packetFuture` -> `nextInLine` - } + public void queuePacket(CompletableFuture nextPacket, @Nullable Instant timestamp) { + queueTask((chatState, smc) -> { + chatState.update(timestamp); + return nextPacket.thenCompose(packet -> writePacket(packet, smc)); + }); } /** - * Hijacks the latest sent packet's timestamp to provide an in-order packet without polling the + * Hijacks the latest sent packet's chat state to provide an in-order packet without polling the * physical, or prior packets sent through the stream. * - * @param packet the {@link MinecraftPacket} to send. - * @param instantMapper the {@link InstantPacketMapper} which maps the prior timestamp and current - * packet to a new packet. - * @param the type of base to expect when mapping the packet. - * @param the type of packet for instantMapper type-checking. + * @param packetFunction a function that maps the prior {@link ChatState} into a new packet. + * @param the type of packet to send. */ - public void hijack(K packet, - InstantPacketMapper instantMapper) { - synchronized (internalLock) { - CompletableFuture trueFuture = CompletableFuture.completedFuture(packet); - MinecraftConnection smc = player.ensureAndGetCurrentServer().ensureConnected(); - - this.packetFuture = hijackCurrentPacket(smc, this.packetFuture, trueFuture, instantMapper); - } + public void queuePacket(Function packetFunction) { + queueTask((chatState, smc) -> { + T packet = packetFunction.apply(chatState); + return writePacket(packet, smc); + }); } - private static Function writePacket(MinecraftConnection connection) { - return wrappedPacket -> { - if (!connection.isClosed()) { - ChannelFuture future = wrappedPacket.write(connection); + private static CompletableFuture writePacket(T packet, MinecraftConnection smc) { + return CompletableFuture.runAsync(() -> { + if (!smc.isClosed()) { + ChannelFuture future = smc.write(packet); if (future != null) { future.awaitUninterruptibly(); } } - - return wrappedPacket; - }; + }, smc.eventLoop()); } - private static CompletableFuture awaitChat( - MinecraftConnection connection, - CompletableFuture binder, - CompletableFuture future - ) { - // the binder will run -> then the future will get the `write packet` caller - return binder.thenCompose(ignored -> future.thenApply(writePacket(connection))); + private interface Task { + CompletableFuture update(ChatState chatState, MinecraftConnection smc); } - private static CompletableFuture hijackCurrentPacket( - MinecraftConnection connection, - CompletableFuture binder, - CompletableFuture future, - InstantPacketMapper packetMapper - ) { - CompletableFuture awaitedFuture = new CompletableFuture<>(); - // the binder will complete -> then the future will get the `write packet` caller - binder.whenComplete((previous, ignored) -> { - // map the new packet into a better "designed" packet with the hijacked packet's timestamp - WrappedPacket.wrap(previous.timestamp, - future.thenApply(item -> packetMapper.map(previous.timestamp, item))) - .thenApplyAsync(writePacket(connection), connection.eventLoop()) - .whenComplete( - (packet, throwable) -> awaitedFuture.complete(throwable != null ? null : packet)); - }); - return awaitedFuture; - } + public static class ChatState { + public volatile Instant lastTimestamp = Instant.EPOCH; - /** - * Provides an {@link Instant} based timestamp mapper from an existing object to create a packet. - * - * @param The base object type to map. - * @param The resulting packet type. - */ - public interface InstantPacketMapper { - - /** - * Maps a value into a packet with it and a timestamp. - * - * @param nextInstant the {@link Instant} timestamp to use for tracking. - * @param currentObject the current item to map to the packet. - * @return The resulting packet from the mapping. - */ - V map(Instant nextInstant, K currentObject); - } - - private static class WrappedPacket { - - private final Instant timestamp; - private final MinecraftPacket packet; - - private WrappedPacket(Instant timestamp, MinecraftPacket packet) { - this.timestamp = timestamp; - this.packet = packet; + private ChatState() { } - @Nullable - public ChannelFuture write(MinecraftConnection connection) { - if (packet != null) { - return connection.write(packet); + public void update(@Nullable Instant timestamp) { + if (timestamp != null) { + this.lastTimestamp = timestamp; } - return null; - } - - private static CompletableFuture wrap(Instant timestamp, - CompletableFuture nextPacket) { - return nextPacket - .thenApply(pkt -> new WrappedPacket(timestamp, pkt)) - .exceptionally(ignored -> new WrappedPacket(timestamp, null)); } } }