geforkt von Mirrors/Velocity
Refactor ChatQueue to track and expose general chat state
Dieser Commit ist enthalten in:
Ursprung
aa4e8780bd
Commit
6b03b28a61
@ -83,6 +83,7 @@ import com.velocitypowered.proxy.protocol.packet.chat.ChatType;
|
|||||||
import com.velocitypowered.proxy.protocol.packet.chat.ComponentHolder;
|
import com.velocitypowered.proxy.protocol.packet.chat.ComponentHolder;
|
||||||
import com.velocitypowered.proxy.protocol.packet.chat.PlayerChatCompletionPacket;
|
import com.velocitypowered.proxy.protocol.packet.chat.PlayerChatCompletionPacket;
|
||||||
import com.velocitypowered.proxy.protocol.packet.chat.builder.ChatBuilderFactory;
|
import com.velocitypowered.proxy.protocol.packet.chat.builder.ChatBuilderFactory;
|
||||||
|
import com.velocitypowered.proxy.protocol.packet.chat.builder.ChatBuilderV2;
|
||||||
import com.velocitypowered.proxy.protocol.packet.chat.legacy.LegacyChatPacket;
|
import com.velocitypowered.proxy.protocol.packet.chat.legacy.LegacyChatPacket;
|
||||||
import com.velocitypowered.proxy.protocol.packet.config.ClientboundServerLinksPacket;
|
import com.velocitypowered.proxy.protocol.packet.config.ClientboundServerLinksPacket;
|
||||||
import com.velocitypowered.proxy.protocol.packet.config.StartUpdatePacket;
|
import com.velocitypowered.proxy.protocol.packet.config.StartUpdatePacket;
|
||||||
@ -1108,11 +1109,11 @@ public class ConnectedPlayer implements MinecraftConnectionAssociation, Player,
|
|||||||
"input cannot be greater than " + LegacyChatPacket.MAX_SERVERBOUND_MESSAGE_LENGTH
|
"input cannot be greater than " + LegacyChatPacket.MAX_SERVERBOUND_MESSAGE_LENGTH
|
||||||
+ " characters in length");
|
+ " characters in length");
|
||||||
if (getProtocolVersion().noLessThan(ProtocolVersion.MINECRAFT_1_19)) {
|
if (getProtocolVersion().noLessThan(ProtocolVersion.MINECRAFT_1_19)) {
|
||||||
this.chatQueue.hijack(getChatBuilderFactory().builder().asPlayer(this).message(input),
|
ChatBuilderV2 message = getChatBuilderFactory().builder().asPlayer(this).message(input);
|
||||||
(instant, item) -> {
|
this.chatQueue.queuePacket(chatState -> {
|
||||||
item.setTimestamp(instant);
|
message.setTimestamp(chatState.lastTimestamp);
|
||||||
return item.toServer();
|
return message.toServer();
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
ensureBackendConnection().write(getChatBuilderFactory().builder()
|
ensureBackendConnection().write(getChatBuilderFactory().builder()
|
||||||
.asPlayer(this).message(input).toServer());
|
.asPlayer(this).message(input).toServer());
|
||||||
|
@ -32,9 +32,10 @@ import java.util.function.Function;
|
|||||||
*/
|
*/
|
||||||
public class ChatQueue {
|
public class ChatQueue {
|
||||||
|
|
||||||
private final Object internalLock;
|
private final Object internalLock = new Object();
|
||||||
private final ConnectedPlayer player;
|
private final ConnectedPlayer player;
|
||||||
private CompletableFuture<WrappedPacket> packetFuture;
|
private final ChatState chatState = new ChatState();
|
||||||
|
private CompletableFuture<Void> head = CompletableFuture.completedFuture(null);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Instantiates a {@link ChatQueue} for a specific {@link ConnectedPlayer}.
|
* Instantiates a {@link ChatQueue} for a specific {@link ConnectedPlayer}.
|
||||||
@ -43,8 +44,19 @@ public class ChatQueue {
|
|||||||
*/
|
*/
|
||||||
public ChatQueue(ConnectedPlayer player) {
|
public ChatQueue(ConnectedPlayer player) {
|
||||||
this.player = player;
|
this.player = player;
|
||||||
this.packetFuture = CompletableFuture.completedFuture(new WrappedPacket(Instant.EPOCH, null));
|
}
|
||||||
this.internalLock = new Object();
|
|
||||||
|
private void queueTask(Task task) {
|
||||||
|
synchronized (internalLock) {
|
||||||
|
MinecraftConnection smc = player.ensureAndGetCurrentServer().ensureConnected();
|
||||||
|
head = head.thenCompose(v -> {
|
||||||
|
try {
|
||||||
|
return task.update(chatState, smc).exceptionally(ignored -> null);
|
||||||
|
} catch (Throwable ignored) {
|
||||||
|
return CompletableFuture.completedFuture(null);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -53,120 +65,54 @@ public class ChatQueue {
|
|||||||
* and messages. All entries are locked through an internal object lock.
|
* and messages. All entries are locked through an internal object lock.
|
||||||
*
|
*
|
||||||
* @param nextPacket the {@link CompletableFuture} which will provide the next-processed packet.
|
* @param nextPacket the {@link CompletableFuture} which will provide the next-processed packet.
|
||||||
* @param timestamp the {@link Instant} timestamp of this packet so we can allow piggybacking.
|
* @param timestamp the new {@link Instant} timestamp of this packet to update the internal chat state.
|
||||||
*/
|
*/
|
||||||
public void queuePacket(CompletableFuture<MinecraftPacket> nextPacket, Instant timestamp) {
|
public void queuePacket(CompletableFuture<MinecraftPacket> nextPacket, @Nullable Instant timestamp) {
|
||||||
synchronized (internalLock) { // wait for the lock to resolve - we don't want to drop packets
|
queueTask((chatState, smc) -> {
|
||||||
MinecraftConnection smc = player.ensureAndGetCurrentServer().ensureConnected();
|
chatState.update(timestamp);
|
||||||
|
return nextPacket.thenCompose(packet -> writePacket(packet, smc));
|
||||||
CompletableFuture<WrappedPacket> nextInLine = WrappedPacket.wrap(timestamp, nextPacket);
|
});
|
||||||
this.packetFuture = awaitChat(smc, this.packetFuture,
|
|
||||||
nextInLine); // we await chat, binding `this.packetFuture` -> `nextInLine`
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Hijacks the latest sent packet's timestamp to provide an in-order packet without polling the
|
* Hijacks the latest sent packet's chat state to provide an in-order packet without polling the
|
||||||
* physical, or prior packets sent through the stream.
|
* physical, or prior packets sent through the stream.
|
||||||
*
|
*
|
||||||
* @param packet the {@link MinecraftPacket} to send.
|
* @param packetFunction a function that maps the prior {@link ChatState} into a new packet.
|
||||||
* @param instantMapper the {@link InstantPacketMapper} which maps the prior timestamp and current
|
* @param <T> the type of packet to send.
|
||||||
* packet to a new packet.
|
|
||||||
* @param <K> the type of base to expect when mapping the packet.
|
|
||||||
* @param <V> the type of packet for instantMapper type-checking.
|
|
||||||
*/
|
*/
|
||||||
public <K, V extends MinecraftPacket> void hijack(K packet,
|
public <T extends MinecraftPacket> void queuePacket(Function<ChatState, T> packetFunction) {
|
||||||
InstantPacketMapper<K, V> instantMapper) {
|
queueTask((chatState, smc) -> {
|
||||||
synchronized (internalLock) {
|
T packet = packetFunction.apply(chatState);
|
||||||
CompletableFuture<K> trueFuture = CompletableFuture.completedFuture(packet);
|
return writePacket(packet, smc);
|
||||||
MinecraftConnection smc = player.ensureAndGetCurrentServer().ensureConnected();
|
});
|
||||||
|
|
||||||
this.packetFuture = hijackCurrentPacket(smc, this.packetFuture, trueFuture, instantMapper);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Function<WrappedPacket, WrappedPacket> writePacket(MinecraftConnection connection) {
|
private static <T extends MinecraftPacket> CompletableFuture<Void> writePacket(T packet, MinecraftConnection smc) {
|
||||||
return wrappedPacket -> {
|
return CompletableFuture.runAsync(() -> {
|
||||||
if (!connection.isClosed()) {
|
if (!smc.isClosed()) {
|
||||||
ChannelFuture future = wrappedPacket.write(connection);
|
ChannelFuture future = smc.write(packet);
|
||||||
if (future != null) {
|
if (future != null) {
|
||||||
future.awaitUninterruptibly();
|
future.awaitUninterruptibly();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}, smc.eventLoop());
|
||||||
return wrappedPacket;
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static <T extends MinecraftPacket> CompletableFuture<WrappedPacket> awaitChat(
|
private interface Task {
|
||||||
MinecraftConnection connection,
|
CompletableFuture<Void> update(ChatState chatState, MinecraftConnection smc);
|
||||||
CompletableFuture<WrappedPacket> binder,
|
|
||||||
CompletableFuture<WrappedPacket> future
|
|
||||||
) {
|
|
||||||
// the binder will run -> then the future will get the `write packet` caller
|
|
||||||
return binder.thenCompose(ignored -> future.thenApply(writePacket(connection)));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static <K, V extends MinecraftPacket> CompletableFuture<WrappedPacket> hijackCurrentPacket(
|
public static class ChatState {
|
||||||
MinecraftConnection connection,
|
public volatile Instant lastTimestamp = Instant.EPOCH;
|
||||||
CompletableFuture<WrappedPacket> binder,
|
|
||||||
CompletableFuture<K> future,
|
|
||||||
InstantPacketMapper<K, V> packetMapper
|
|
||||||
) {
|
|
||||||
CompletableFuture<WrappedPacket> awaitedFuture = new CompletableFuture<>();
|
|
||||||
// the binder will complete -> then the future will get the `write packet` caller
|
|
||||||
binder.whenComplete((previous, ignored) -> {
|
|
||||||
// map the new packet into a better "designed" packet with the hijacked packet's timestamp
|
|
||||||
WrappedPacket.wrap(previous.timestamp,
|
|
||||||
future.thenApply(item -> packetMapper.map(previous.timestamp, item)))
|
|
||||||
.thenApplyAsync(writePacket(connection), connection.eventLoop())
|
|
||||||
.whenComplete(
|
|
||||||
(packet, throwable) -> awaitedFuture.complete(throwable != null ? null : packet));
|
|
||||||
});
|
|
||||||
return awaitedFuture;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
private ChatState() {
|
||||||
* Provides an {@link Instant} based timestamp mapper from an existing object to create a packet.
|
|
||||||
*
|
|
||||||
* @param <K> The base object type to map.
|
|
||||||
* @param <V> The resulting packet type.
|
|
||||||
*/
|
|
||||||
public interface InstantPacketMapper<K, V extends MinecraftPacket> {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Maps a value into a packet with it and a timestamp.
|
|
||||||
*
|
|
||||||
* @param nextInstant the {@link Instant} timestamp to use for tracking.
|
|
||||||
* @param currentObject the current item to map to the packet.
|
|
||||||
* @return The resulting packet from the mapping.
|
|
||||||
*/
|
|
||||||
V map(Instant nextInstant, K currentObject);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static class WrappedPacket {
|
|
||||||
|
|
||||||
private final Instant timestamp;
|
|
||||||
private final MinecraftPacket packet;
|
|
||||||
|
|
||||||
private WrappedPacket(Instant timestamp, MinecraftPacket packet) {
|
|
||||||
this.timestamp = timestamp;
|
|
||||||
this.packet = packet;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Nullable
|
public void update(@Nullable Instant timestamp) {
|
||||||
public ChannelFuture write(MinecraftConnection connection) {
|
if (timestamp != null) {
|
||||||
if (packet != null) {
|
this.lastTimestamp = timestamp;
|
||||||
return connection.write(packet);
|
|
||||||
}
|
}
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static CompletableFuture<WrappedPacket> wrap(Instant timestamp,
|
|
||||||
CompletableFuture<MinecraftPacket> nextPacket) {
|
|
||||||
return nextPacket
|
|
||||||
.thenApply(pkt -> new WrappedPacket(timestamp, pkt))
|
|
||||||
.exceptionally(ignored -> new WrappedPacket(timestamp, null));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Laden…
In neuem Issue referenzieren
Einen Benutzer sperren