diff --git a/proxy/src/main/java/com/velocitypowered/proxy/connection/client/ClientPlaySessionHandler.java b/proxy/src/main/java/com/velocitypowered/proxy/connection/client/ClientPlaySessionHandler.java index 073a52eee..33c9192ae 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/connection/client/ClientPlaySessionHandler.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/connection/client/ClientPlaySessionHandler.java @@ -58,6 +58,7 @@ import com.velocitypowered.proxy.protocol.packet.TabCompleteRequest; import com.velocitypowered.proxy.protocol.packet.TabCompleteResponse; import com.velocitypowered.proxy.protocol.packet.TabCompleteResponse.Offer; import com.velocitypowered.proxy.protocol.packet.chat.ChatBuilder; +import com.velocitypowered.proxy.protocol.packet.chat.ChatQueue; import com.velocitypowered.proxy.protocol.packet.chat.LegacyChat; import com.velocitypowered.proxy.protocol.packet.chat.PlayerChat; import com.velocitypowered.proxy.protocol.packet.chat.PlayerCommand; @@ -139,10 +140,10 @@ public class ClientPlaySessionHandler implements MinecraftSessionHandler { } private void processCommandMessage(String message, @Nullable SignedChatCommand signedCommand, - MinecraftPacket original) { - server.getCommandManager().callCommandEvent(player, message) + MinecraftPacket original, Instant passedTimestamp) { + this.player.getChatQueue().queuePacket(server.getCommandManager().callCommandEvent(player, message) .thenComposeAsync(event -> processCommandExecuteResult(message, - event.getResult(), signedCommand)) + event.getResult(), signedCommand, passedTimestamp)) .whenComplete((ignored, throwable) -> { if (server.getConfiguration().isLogCommandExecutions()) { logger.info("{} -> executed command /{}", player, message); @@ -154,7 +155,7 @@ public class ClientPlaySessionHandler implements MinecraftSessionHandler { player.sendMessage(Component.translatable("velocity.command.generic-error", NamedTextColor.RED)); return null; - }); + }), passedTimestamp); } private void processPlayerChat(String message, @Nullable SignedChatMessage signedMessage, @@ -163,48 +164,61 @@ public class ClientPlaySessionHandler implements MinecraftSessionHandler { if (smc == null) { return; } - PlayerChatEvent event = new PlayerChatEvent(player, message); - server.getEventManager().fire(event) - .thenAcceptAsync(pme -> { + + if (signedMessage == null) { + PlayerChatEvent event = new PlayerChatEvent(player, message); + callChat(original, event, null).thenAccept(smc::write); + } else { + Instant messageTimestamp = signedMessage.getExpiryTemporal(); + PlayerChatEvent event = new PlayerChatEvent(player, message); + this.player.getChatQueue().queuePacket(callChat(original, event, signedMessage), messageTimestamp); + } + } + + private CompletableFuture callChat(MinecraftPacket original, PlayerChatEvent event, + @Nullable SignedChatMessage signedMessage) { + return server.getEventManager().fire(event) + .thenApply(pme -> { PlayerChatEvent.ChatResult chatResult = pme.getResult(); if (chatResult.isAllowed()) { Optional eventMsg = pme.getResult().getMessage(); if (eventMsg.isPresent()) { String messageNew = eventMsg.get(); if (player.getIdentifiedKey() != null) { - if (!messageNew.equals(signedMessage.getMessage())) { + if (signedMessage != null && !messageNew.equals(signedMessage.getMessage())) { if (player.getIdentifiedKey().getKeyRevision().compareTo(IdentifiedKey.Revision.LINKED_V2) >= 0) { // Bad, very bad. logger.fatal("A plugin tried to change a signed chat message. " - + "This is no longer possible in 1.19.1 and newer. " - + "Disconnecting player " + player.getUsername()); + + "This is no longer possible in 1.19.1 and newer. " + + "Disconnecting player " + player.getUsername()); player.disconnect(Component.text("A proxy plugin caused an illegal protocol state. " - + "Contact your network administrator.")); + + "Contact your network administrator.")); } else { logger.warn("A plugin changed a signed chat message. The server may not accept it."); - smc.write(ChatBuilder.builder(player.getProtocolVersion()) - .message(messageNew).toServer()); + return ChatBuilder.builder(player.getProtocolVersion()) + .message(messageNew).toServer(); } } else { - smc.write(original); + return original; } } else { - smc.write(ChatBuilder.builder(player.getProtocolVersion()) - .message(messageNew).toServer()); + return ChatBuilder.builder(player.getProtocolVersion()) + .message(messageNew).toServer(); } } else { - smc.write(original); + return original; } } else { if (player.getIdentifiedKey().getKeyRevision().compareTo(IdentifiedKey.Revision.LINKED_V2) >= 0) { logger.fatal("A plugin tried to cancel a signed chat message." - + " This is no longer possible in 1.19.1 and newer. " - + "Disconnecting player " + player.getUsername()); + + " This is no longer possible in 1.19.1 and newer. " + + "Disconnecting player " + player.getUsername()); player.disconnect(Component.text("A proxy plugin caused an illegal protocol state. " - + "Contact your network administrator.")); + + "Contact your network administrator.")); } } - }, smc.eventLoop()) + return null; + }) .exceptionally((ex) -> { logger.error("Exception while handling player chat for {}", player, ex); return null; @@ -260,12 +274,12 @@ public class ClientPlaySessionHandler implements MinecraftSessionHandler { if (!packet.isUnsigned()) { SignedChatCommand signedCommand = packet.signedContainer(player.getIdentifiedKey(), player.getUniqueId(), false); if (signedCommand != null) { - processCommandMessage(packet.getCommand(), signedCommand, packet); + processCommandMessage(packet.getCommand(), signedCommand, packet, packet.getTimestamp()); return true; } } - processCommandMessage(packet.getCommand(), null, packet); + processCommandMessage(packet.getCommand(), null, packet, packet.getTimestamp()); return true; } @@ -303,7 +317,7 @@ public class ClientPlaySessionHandler implements MinecraftSessionHandler { } if (msg.startsWith("/")) { - processCommandMessage(msg.substring(1), null, packet); + processCommandMessage(msg.substring(1), null, packet, Instant.now()); } else { processPlayerChat(msg, null, packet); } @@ -723,77 +737,72 @@ public class ClientPlaySessionHandler implements MinecraftSessionHandler { } - private CompletableFuture processCommandExecuteResult(String originalCommand, - CommandResult result, - @Nullable SignedChatCommand signedCommand) { + private CompletableFuture processCommandExecuteResult(String originalCommand, + CommandResult result, + @Nullable SignedChatCommand signedCommand, + Instant passedTimestamp) { IdentifiedKey playerKey = player.getIdentifiedKey(); if (result == CommandResult.denied() && playerKey != null) { if (signedCommand != null && playerKey.getKeyRevision() .compareTo(IdentifiedKey.Revision.LINKED_V2) >= 0) { logger.fatal("A plugin tried to deny a command with signable component(s). " - + "This is not supported. " - + "Disconnecting player " + player.getUsername()); + + "This is not supported. " + + "Disconnecting player " + player.getUsername()); player.disconnect(Component.text("A proxy plugin caused an illegal protocol state. " - + "Contact your network administrator.")); + + "Contact your network administrator.")); } return CompletableFuture.completedFuture(null); } - MinecraftConnection smc; - try { - smc = player.ensureAndGetCurrentServer().ensureConnected(); - } catch (Exception ex) { - player.disconnect(Component.translatable("velocity.error.player-connection-error", - NamedTextColor.RED)); - return CompletableFuture.completedFuture(null); - } - String commandToRun = result.getCommand().orElse(originalCommand); if (result.isForwardToServer()) { ChatBuilder write = ChatBuilder .builder(player.getProtocolVersion()) + .timestamp(passedTimestamp) .asPlayer(player); if (signedCommand != null && commandToRun.equals(signedCommand.getBaseCommand())) { write.message(signedCommand); } else { if (signedCommand != null && playerKey != null && playerKey.getKeyRevision() - .compareTo(IdentifiedKey.Revision.LINKED_V2) >= 0) { + .compareTo(IdentifiedKey.Revision.LINKED_V2) >= 0) { logger.fatal("A plugin tried to change a command with signed component(s). " - + "This is not supported. " - + "Disconnecting player " + player.getUsername()); + + "This is not supported. " + + "Disconnecting player " + player.getUsername()); player.disconnect(Component.text("A proxy plugin caused an illegal protocol state. " - + "Contact your network administrator.")); + + "Contact your network administrator.")); return CompletableFuture.completedFuture(null); } write.message("/" + commandToRun); } - return CompletableFuture.runAsync(() -> smc.write(write.toServer()), smc.eventLoop()); + return CompletableFuture.completedFuture(write.toServer()); } else { return server.getCommandManager().executeImmediatelyAsync(player, commandToRun) - .thenAcceptAsync(hasRun -> { + .thenApply(hasRun -> { if (!hasRun) { ChatBuilder write = ChatBuilder .builder(player.getProtocolVersion()) + .timestamp(passedTimestamp) .asPlayer(player); if (signedCommand != null && commandToRun.equals(signedCommand.getBaseCommand())) { write.message(signedCommand); } else { if (signedCommand != null && playerKey != null && playerKey.getKeyRevision() - .compareTo(IdentifiedKey.Revision.LINKED_V2) >= 0) { + .compareTo(IdentifiedKey.Revision.LINKED_V2) >= 0) { logger.fatal("A plugin tried to change a command with signed component(s). " - + "This is not supported. " - + "Disconnecting player " + player.getUsername()); + + "This is not supported. " + + "Disconnecting player " + player.getUsername()); player.disconnect(Component.text("A proxy plugin caused an illegal protocol state. " - + "Contact your network administrator.")); - return; + + "Contact your network administrator.")); + return null; } write.message("/" + commandToRun); } - smc.write(write.toServer()); + return write.toServer(); } - }, smc.eventLoop()); + return null; + }); } } diff --git a/proxy/src/main/java/com/velocitypowered/proxy/connection/client/ConnectedPlayer.java b/proxy/src/main/java/com/velocitypowered/proxy/connection/client/ConnectedPlayer.java index 0558bb466..665063cf6 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/connection/client/ConnectedPlayer.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/connection/client/ConnectedPlayer.java @@ -66,6 +66,7 @@ import com.velocitypowered.proxy.protocol.packet.KeepAlive; import com.velocitypowered.proxy.protocol.packet.PluginMessage; import com.velocitypowered.proxy.protocol.packet.ResourcePackRequest; import com.velocitypowered.proxy.protocol.packet.chat.ChatBuilder; +import com.velocitypowered.proxy.protocol.packet.chat.ChatQueue; import com.velocitypowered.proxy.protocol.packet.chat.LegacyChat; import com.velocitypowered.proxy.protocol.packet.title.GenericTitlePacket; import com.velocitypowered.proxy.server.VelocityRegisteredServer; @@ -155,19 +156,20 @@ public class ConnectedPlayer implements MinecraftConnectionAssociation, Player, private @Nullable ResourcePackInfo pendingResourcePack; private @Nullable ResourcePackInfo appliedResourcePack; private final @NotNull Pointers pointers = Player.super.pointers().toBuilder() - .withDynamic(Identity.UUID, this::getUniqueId) - .withDynamic(Identity.NAME, this::getUsername) - .withDynamic(Identity.DISPLAY_NAME, () -> Component.text(this.getUsername())) - .withDynamic(Identity.LOCALE, this::getEffectiveLocale) - .withStatic(PermissionChecker.POINTER, getPermissionChecker()) - .withStatic(FacetPointers.TYPE, Type.PLAYER) - .build(); + .withDynamic(Identity.UUID, this::getUniqueId) + .withDynamic(Identity.NAME, this::getUsername) + .withDynamic(Identity.DISPLAY_NAME, () -> Component.text(this.getUsername())) + .withDynamic(Identity.LOCALE, this::getEffectiveLocale) + .withStatic(PermissionChecker.POINTER, getPermissionChecker()) + .withStatic(FacetPointers.TYPE, Type.PLAYER) + .build(); private @Nullable String clientBrand; private @Nullable Locale effectiveLocale; private @Nullable IdentifiedKey playerKey; + private ChatQueue chatQueue; ConnectedPlayer(VelocityServer server, GameProfile profile, MinecraftConnection connection, - @Nullable InetSocketAddress virtualHost, boolean onlineMode, @Nullable IdentifiedKey playerKey) { + @Nullable InetSocketAddress virtualHost, boolean onlineMode, @Nullable IdentifiedKey playerKey) { this.server = server; this.profile = profile; this.connection = connection; @@ -183,6 +185,11 @@ public class ConnectedPlayer implements MinecraftConnectionAssociation, Player, this.tabList = new VelocityTabListLegacy(this, server); } this.playerKey = playerKey; + this.chatQueue = new ChatQueue(this); + } + + ChatQueue getChatQueue() { + return chatQueue; } @Override @@ -220,6 +227,7 @@ public class ConnectedPlayer implements MinecraftConnectionAssociation, Player, /** * Makes sure the player is connected to a server and returns the server they are connected to. + * * @return the server the player is connected to */ public VelocityServerConnection ensureAndGetCurrentServer() { @@ -320,21 +328,21 @@ public class ConnectedPlayer implements MinecraftConnectionAssociation, Player, Component translated = translateMessage(message); connection.write(ChatBuilder.builder(this.getProtocolVersion()) - .component(translated).forIdentity(identity).toClient()); + .component(translated).forIdentity(identity).toClient()); } @Override public void sendMessage(@NonNull Identity identity, @NonNull Component message, - @NonNull MessageType type) { + @NonNull MessageType type) { Preconditions.checkNotNull(message, "message"); Preconditions.checkNotNull(type, "type"); Component translated = translateMessage(message); connection.write(ChatBuilder.builder(this.getProtocolVersion()) - .component(translated).forIdentity(identity) - .setType(type == MessageType.CHAT ? ChatBuilder.ChatType.CHAT : ChatBuilder.ChatType.SYSTEM) - .toClient()); + .component(translated).forIdentity(identity) + .setType(type == MessageType.CHAT ? ChatBuilder.ChatType.CHAT : ChatBuilder.ChatType.SYSTEM) + .toClient()); } @Override @@ -345,7 +353,7 @@ public class ConnectedPlayer implements MinecraftConnectionAssociation, Player, if (playerVersion.compareTo(ProtocolVersion.MINECRAFT_1_11) >= 0) { // Use the title packet instead. GenericTitlePacket pkt = GenericTitlePacket.constructTitlePacket( - GenericTitlePacket.ActionType.SET_ACTION_BAR, playerVersion); + GenericTitlePacket.ActionType.SET_ACTION_BAR, playerVersion); pkt.setComponent(ProtocolUtils.getJsonChatSerializer(playerVersion) .serialize(translated)); connection.write(pkt); @@ -465,7 +473,7 @@ public class ConnectedPlayer implements MinecraftConnectionAssociation, Player, public void clearTitle() { if (this.getProtocolVersion().compareTo(ProtocolVersion.MINECRAFT_1_8) >= 0) { connection.write(GenericTitlePacket.constructTitlePacket( - GenericTitlePacket.ActionType.HIDE, this.getProtocolVersion())); + GenericTitlePacket.ActionType.HIDE, this.getProtocolVersion())); } } @@ -473,7 +481,7 @@ public class ConnectedPlayer implements MinecraftConnectionAssociation, Player, public void resetTitle() { if (this.getProtocolVersion().compareTo(ProtocolVersion.MINECRAFT_1_8) >= 0) { connection.write(GenericTitlePacket.constructTitlePacket( - GenericTitlePacket.ActionType.RESET, this.getProtocolVersion())); + GenericTitlePacket.ActionType.RESET, this.getProtocolVersion())); } } @@ -497,7 +505,7 @@ public class ConnectedPlayer implements MinecraftConnectionAssociation, Player, } private ConnectionRequestBuilder createConnectionRequest(RegisteredServer server, - @Nullable VelocityServerConnection previousConnection) { + @Nullable VelocityServerConnection previousConnection) { return new ConnectionRequestBuilderImpl(server, previousConnection); } @@ -532,7 +540,8 @@ public class ConnectedPlayer implements MinecraftConnectionAssociation, Player, /** * Disconnects the player from the proxy. - * @param reason the reason for disconnecting the player + * + * @param reason the reason for disconnecting the player * @param duringLogin whether the disconnect happened during login */ public void disconnect0(Component reason, boolean duringLogin) { @@ -559,12 +568,13 @@ public class ConnectedPlayer implements MinecraftConnectionAssociation, Player, /** * Handles unexpected disconnects. - * @param server the server we disconnected from + * + * @param server the server we disconnected from * @param throwable the exception - * @param safe whether or not we can safely reconnect to a new server + * @param safe whether or not we can safely reconnect to a new server */ public void handleConnectionException(RegisteredServer server, Throwable throwable, - boolean safe) { + boolean safe) { if (!isActive()) { // If the connection is no longer active, it makes no sense to try and recover it. return; @@ -597,12 +607,13 @@ public class ConnectedPlayer implements MinecraftConnectionAssociation, Player, /** * Handles unexpected disconnects. - * @param server the server we disconnected from + * + * @param server the server we disconnected from * @param disconnect the disconnect packet - * @param safe whether or not we can safely reconnect to a new server + * @param safe whether or not we can safely reconnect to a new server */ public void handleConnectionException(RegisteredServer server, Disconnect disconnect, - boolean safe) { + boolean safe) { if (!isActive()) { // If the connection is no longer active, it makes no sense to try and recover it. return; @@ -628,7 +639,7 @@ public class ConnectedPlayer implements MinecraftConnectionAssociation, Player, } private void handleConnectionException(RegisteredServer rs, - @Nullable Component kickReason, Component friendlyReason, boolean safe) { + @Nullable Component kickReason, Component friendlyReason, boolean safe) { if (!isActive()) { // If the connection is no longer active, it makes no sense to try and recover it. return; @@ -661,7 +672,7 @@ public class ConnectedPlayer implements MinecraftConnectionAssociation, Player, } private void handleKickEvent(KickedFromServerEvent originalEvent, Component friendlyReason, - boolean kickedFromCurrent) { + boolean kickedFromCurrent) { server.getEventManager().fire(originalEvent) .thenAcceptAsync(event -> { // There can't be any connection in flight now. @@ -696,7 +707,7 @@ public class ConnectedPlayer implements MinecraftConnectionAssociation, Player, // Impossible/nonsensical cases case ALREADY_CONNECTED: case CONNECTION_IN_PROGRESS: - // Fatal case + // Fatal case case CONNECTION_CANCELLED: Component fallbackMsg = res.getMessageComponent(); if (fallbackMsg == null) { @@ -753,7 +764,6 @@ public class ConnectedPlayer implements MinecraftConnectionAssociation, Player, * server. * * @param current the "current" server that the player is on, useful as an override - * * @return the next server to try */ private Optional getNextServerToTry(@Nullable RegisteredServer current) { @@ -897,8 +907,16 @@ public class ConnectedPlayer implements MinecraftConnectionAssociation, Player, Preconditions.checkArgument(input.length() <= LegacyChat.MAX_SERVERBOUND_MESSAGE_LENGTH, "input cannot be greater than " + LegacyChat.MAX_SERVERBOUND_MESSAGE_LENGTH + " characters in length"); - ensureBackendConnection().write(ChatBuilder.builder(getProtocolVersion()) - .asPlayer(this).message(input).toServer()); + if (getProtocolVersion().compareTo(ProtocolVersion.MINECRAFT_1_19) >= 0) { + this.chatQueue.hijack(ChatBuilder.builder(getProtocolVersion()).asPlayer(this).message(input), + (instant, item) -> { + item.timestamp(instant); + return item.toServer(); + }); + } else { + ensureBackendConnection().write(ChatBuilder.builder(getProtocolVersion()) + .asPlayer(this).message(input).toServer()); + } } @Override @@ -943,7 +961,7 @@ public class ConnectedPlayer implements MinecraftConnectionAssociation, Player, while (!outstandingResourcePacks.isEmpty()) { queued = outstandingResourcePacks.peek(); if (queued.getShouldForce() && getProtocolVersion() - .compareTo(ProtocolVersion.MINECRAFT_1_17) >= 0) { + .compareTo(ProtocolVersion.MINECRAFT_1_17) >= 0) { break; } onResourcePackResponse(PlayerResourcePackStatusEvent.Status.DECLINED); @@ -985,19 +1003,19 @@ public class ConnectedPlayer implements MinecraftConnectionAssociation, Player, public boolean onResourcePackResponse(PlayerResourcePackStatusEvent.Status status) { final boolean peek = status == PlayerResourcePackStatusEvent.Status.ACCEPTED; final ResourcePackInfo queued = peek - ? outstandingResourcePacks.peek() : outstandingResourcePacks.poll(); + ? outstandingResourcePacks.peek() : outstandingResourcePacks.poll(); server.getEventManager().fire(new PlayerResourcePackStatusEvent(this, status, queued)) - .thenAcceptAsync(event -> { - if (event.getStatus() == PlayerResourcePackStatusEvent.Status.DECLINED - && event.getPackInfo() != null && event.getPackInfo().getShouldForce() - && (!event.isOverwriteKick() || event.getPlayer() - .getProtocolVersion().compareTo(ProtocolVersion.MINECRAFT_1_17) >= 0) - ) { - event.getPlayer().disconnect(Component - .translatable("multiplayer.requiredTexturePrompt.disconnect")); - } - }); + .thenAcceptAsync(event -> { + if (event.getStatus() == PlayerResourcePackStatusEvent.Status.DECLINED + && event.getPackInfo() != null && event.getPackInfo().getShouldForce() + && (!event.isOverwriteKick() || event.getPlayer() + .getProtocolVersion().compareTo(ProtocolVersion.MINECRAFT_1_17) >= 0) + ) { + event.getPlayer().disconnect(Component + .translatable("multiplayer.requiredTexturePrompt.disconnect")); + } + }); switch (status) { case ACCEPTED: @@ -1023,7 +1041,7 @@ public class ConnectedPlayer implements MinecraftConnectionAssociation, Player, } return queued != null - && queued.getOriginalOrigin() != ResourcePackInfo.Origin.DOWNSTREAM_SERVER; + && queued.getOriginalOrigin() != ResourcePackInfo.Origin.DOWNSTREAM_SERVER; } /** @@ -1061,6 +1079,7 @@ public class ConnectedPlayer implements MinecraftConnectionAssociation, Player, /** * Return all the plugin message channels "known" to the client. + * * @return the channels */ public Collection getKnownChannels() { @@ -1085,7 +1104,7 @@ public class ConnectedPlayer implements MinecraftConnectionAssociation, Player, private final @Nullable VelocityRegisteredServer previousServer; ConnectionRequestBuilderImpl(RegisteredServer toConnect, - @Nullable VelocityServerConnection previousConnection) { + @Nullable VelocityServerConnection previousConnection) { this.toConnect = Preconditions.checkNotNull(toConnect, "info"); this.previousServer = previousConnection == null ? null : previousConnection.getServer(); } @@ -1103,7 +1122,7 @@ public class ConnectedPlayer implements MinecraftConnectionAssociation, Player, return Optional.of(ConnectionRequestBuilder.Status.CONNECTION_IN_PROGRESS); } if (connectedServer != null - && connectedServer.getServer().getServerInfo().equals(server.getServerInfo())) { + && connectedServer.getServer().getServerInfo().equals(server.getServerInfo())) { return Optional.of(ALREADY_CONNECTED); } return Optional.empty(); diff --git a/proxy/src/main/java/com/velocitypowered/proxy/protocol/packet/chat/ChatBuilder.java b/proxy/src/main/java/com/velocitypowered/proxy/protocol/packet/chat/ChatBuilder.java index cd6d0ff55..c71ec32f9 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/protocol/packet/chat/ChatBuilder.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/protocol/packet/chat/ChatBuilder.java @@ -31,6 +31,7 @@ import net.kyori.adventure.identity.Identity; import net.kyori.adventure.text.Component; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.checkerframework.checker.nullness.qual.Nullable; +import org.jetbrains.annotations.NotNull; public class ChatBuilder { @@ -43,11 +44,13 @@ public class ChatBuilder { private @Nullable Player sender; private @Nullable Identity senderIdentity; + private @NotNull Instant timestamp; private ChatType type = ChatType.CHAT; private ChatBuilder(ProtocolVersion version) { this.version = version; + this.timestamp = Instant.now(); } public static ChatBuilder builder(ProtocolVersion version) { @@ -115,6 +118,11 @@ public class ChatBuilder { return this; } + public ChatBuilder timestamp(Instant timestamp) { + this.timestamp = timestamp; + return this; + } + public ChatBuilder asServer() { this.sender = null; return this; @@ -153,7 +161,7 @@ public class ChatBuilder { } else { // Well crap if (message.startsWith("/")) { - return new PlayerCommand(message.substring(1), ImmutableList.of(), Instant.now()); + return new PlayerCommand(message.substring(1), ImmutableList.of(), timestamp); } else { // This will produce an error on the server, but needs to be here. return new PlayerChat(message); diff --git a/proxy/src/main/java/com/velocitypowered/proxy/protocol/packet/chat/ChatQueue.java b/proxy/src/main/java/com/velocitypowered/proxy/protocol/packet/chat/ChatQueue.java new file mode 100644 index 000000000..fc4cc00a7 --- /dev/null +++ b/proxy/src/main/java/com/velocitypowered/proxy/protocol/packet/chat/ChatQueue.java @@ -0,0 +1,157 @@ +/* + * Copyright (C) 2018 Velocity Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package com.velocitypowered.proxy.protocol.packet.chat; + +import com.velocitypowered.proxy.connection.MinecraftConnection; +import com.velocitypowered.proxy.connection.client.ConnectedPlayer; +import com.velocitypowered.proxy.protocol.MinecraftPacket; +import java.time.Instant; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiConsumer; + +/** + * A precisely ordered queue which allows for outside entries into the ordered queue through piggybacking timestamps. + */ +public class ChatQueue { + private final Object internalLock; + private final ConnectedPlayer player; + private CompletableFuture packetFuture; + + /** + * Instantiates a {@link ChatQueue} for a specific {@link ConnectedPlayer}. + * + * @param player the {@link ConnectedPlayer} to maintain the queue for. + */ + public ChatQueue(ConnectedPlayer player) { + this.player = player; + this.packetFuture = CompletableFuture.completedFuture(new WrappedPacket(Instant.EPOCH, null)); + this.internalLock = new Object(); + } + + /** + * Queues a packet sent from the player - all packets must wait until this processes to send their packets. + *
+ * This maintains order on the server-level for the client insertions of commands and messages. All entries are locked + * through an internal object lock. + * + * @param nextPacket the {@link CompletableFuture} which will provide the next-processed packet. + * @param timestamp the {@link Instant} timestamp of this packet so we can allow piggybacking. + */ + public void queuePacket(CompletableFuture nextPacket, Instant timestamp) { + synchronized (internalLock) { // wait for the lock to resolve - we don't want to drop packets + MinecraftConnection smc = player.ensureAndGetCurrentServer().ensureConnected(); + + CompletableFuture nextInLine = WrappedPacket.wrap(timestamp, nextPacket); + awaitChat(smc, this.packetFuture, nextInLine); // we await chat, binding `this.packetFuture` -> `nextInLine` + this.packetFuture = nextInLine; + } + } + + /** + * Hijacks the latest sent packet's timestamp to provide an in-order packet without polling the physical, or prior + * packets sent through the stream. + * + * @param packet the {@link MinecraftPacket} to send. + * @param instantMapper the {@link InstantPacketMapper} which maps the prior timestamp and current packet to a new + * packet. + * @param the type of base to expect when mapping the packet. + * @param the type of packet for instantMapper type-checking. + */ + public void hijack(K packet, InstantPacketMapper instantMapper) { + synchronized (internalLock) { + CompletableFuture trueFuture = CompletableFuture.completedFuture(packet); + MinecraftConnection smc = player.ensureAndGetCurrentServer().ensureConnected(); + + this.packetFuture = hijackCurrentPacket(smc, this.packetFuture, trueFuture, instantMapper); + } + } + + private static BiConsumer writePacket(MinecraftConnection connection) { + return (wrappedPacket, throwable) -> { + if (wrappedPacket != null && !connection.isClosed()) { + wrappedPacket.write(connection); + } + }; + } + + private static void awaitChat( + MinecraftConnection connection, + CompletableFuture binder, + CompletableFuture future + ) { + // the binder will run -> then the future will get the `write packet` caller + binder.whenComplete((ignored1, ignored2) -> future.whenComplete(writePacket(connection))); + } + + private static CompletableFuture hijackCurrentPacket( + MinecraftConnection connection, + CompletableFuture binder, + CompletableFuture future, + InstantPacketMapper packetMapper + ) { + CompletableFuture awaitedFuture = new CompletableFuture<>(); + // the binder will complete -> then the future will get the `write packet` caller + binder.whenComplete((previous, ignored) -> { + // map the new packet into a better "designed" packet with the hijacked packet's timestamp + WrappedPacket.wrap(previous.timestamp, future.thenApply(item -> packetMapper.map(previous.timestamp, item))) + .whenCompleteAsync(writePacket(connection), connection.eventLoop()) + .whenComplete((packet, throwable) -> awaitedFuture.complete(throwable != null ? null : packet)); + }); + return awaitedFuture; + } + + /** + * Provides an {@link Instant} based timestamp mapper from an existing object to create a packet. + * + * @param The base object type to map. + * @param The resulting packet type. + */ + public interface InstantPacketMapper { + /** + * Maps a value into a packet with it and a timestamp. + * + * @param nextInstant the {@link Instant} timestamp to use for tracking. + * @param currentObject the current item to map to the packet. + * @return The resulting packet from the mapping. + */ + V map(Instant nextInstant, K currentObject); + } + + private static class WrappedPacket { + private final Instant timestamp; + private final MinecraftPacket packet; + + private WrappedPacket(Instant timestamp, MinecraftPacket packet) { + this.timestamp = timestamp; + this.packet = packet; + } + + public void write(MinecraftConnection connection) { + if (packet != null) { + connection.write(packet); + } + } + + private static CompletableFuture wrap(Instant timestamp, + CompletableFuture nextPacket) { + return nextPacket + .thenApply(pkt -> new WrappedPacket(timestamp, pkt)) + .exceptionally(ignored -> new WrappedPacket(timestamp, null)); + } + } +}