13
0
geforkt von Mirrors/Velocity

Fix chat race condition (#1042)

Dieser Commit ist enthalten in:
EpicPlayerA10 2024-01-11 12:44:16 +01:00 committet von GitHub
Ursprung 02c4d61fc6
Commit 08c03aaea2
Es konnte kein GPG-Schlüssel zu dieser Signatur gefunden werden
GPG-Schlüssel-ID: 4AEE18F83AFDEB23
2 geänderte Dateien mit 27 neuen und 14 gelöschten Zeilen

Datei anzeigen

@ -51,6 +51,7 @@ import com.velocitypowered.proxy.protocol.netty.PlayPacketQueueHandler;
import com.velocitypowered.proxy.util.except.QuietDecoderException; import com.velocitypowered.proxy.util.except.QuietDecoderException;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
@ -224,12 +225,16 @@ public class MinecraftConnection extends ChannelInboundHandlerAdapter {
* Writes and immediately flushes a message to the connection. * Writes and immediately flushes a message to the connection.
* *
* @param msg the message to write * @param msg the message to write
*
* @return A {@link ChannelFuture} that will complete when packet is successfully sent
*/ */
public void write(Object msg) { @Nullable
public ChannelFuture write(Object msg) {
if (channel.isActive()) { if (channel.isActive()) {
channel.writeAndFlush(msg, channel.voidPromise()); return channel.writeAndFlush(msg, channel.newPromise());
} else { } else {
ReferenceCountUtil.release(msg); ReferenceCountUtil.release(msg);
return null;
} }
} }

Datei anzeigen

@ -20,9 +20,11 @@ package com.velocitypowered.proxy.protocol.packet.chat;
import com.velocitypowered.proxy.connection.MinecraftConnection; import com.velocitypowered.proxy.connection.MinecraftConnection;
import com.velocitypowered.proxy.connection.client.ConnectedPlayer; import com.velocitypowered.proxy.connection.client.ConnectedPlayer;
import com.velocitypowered.proxy.protocol.MinecraftPacket; import com.velocitypowered.proxy.protocol.MinecraftPacket;
import io.netty.channel.ChannelFuture;
import org.checkerframework.checker.nullness.qual.Nullable;
import java.time.Instant; import java.time.Instant;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer; import java.util.function.Function;
/** /**
* A precisely ordered queue which allows for outside entries into the ordered queue through * A precisely ordered queue which allows for outside entries into the ordered queue through
@ -58,9 +60,8 @@ public class ChatQueue {
MinecraftConnection smc = player.ensureAndGetCurrentServer().ensureConnected(); MinecraftConnection smc = player.ensureAndGetCurrentServer().ensureConnected();
CompletableFuture<WrappedPacket> nextInLine = WrappedPacket.wrap(timestamp, nextPacket); CompletableFuture<WrappedPacket> nextInLine = WrappedPacket.wrap(timestamp, nextPacket);
awaitChat(smc, this.packetFuture, this.packetFuture = awaitChat(smc, this.packetFuture,
nextInLine); // we await chat, binding `this.packetFuture` -> `nextInLine` nextInLine); // we await chat, binding `this.packetFuture` -> `nextInLine`
this.packetFuture = nextInLine;
} }
} }
@ -84,21 +85,26 @@ public class ChatQueue {
} }
} }
private static BiConsumer<WrappedPacket, Throwable> writePacket(MinecraftConnection connection) { private static Function<WrappedPacket, WrappedPacket> writePacket(MinecraftConnection connection) {
return (wrappedPacket, throwable) -> { return wrappedPacket -> {
if (wrappedPacket != null && !connection.isClosed()) { if (!connection.isClosed()) {
wrappedPacket.write(connection); ChannelFuture future = wrappedPacket.write(connection);
if (future != null) {
future.awaitUninterruptibly();
}
} }
return wrappedPacket;
}; };
} }
private static <T extends MinecraftPacket> void awaitChat( private static <T extends MinecraftPacket> CompletableFuture<WrappedPacket> awaitChat(
MinecraftConnection connection, MinecraftConnection connection,
CompletableFuture<WrappedPacket> binder, CompletableFuture<WrappedPacket> binder,
CompletableFuture<WrappedPacket> future CompletableFuture<WrappedPacket> future
) { ) {
// the binder will run -> then the future will get the `write packet` caller // the binder will run -> then the future will get the `write packet` caller
binder.whenComplete((ignored1, ignored2) -> future.whenComplete(writePacket(connection))); return binder.thenCompose(ignored -> future.thenApply(writePacket(connection)));
} }
private static <K, V extends MinecraftPacket> CompletableFuture<WrappedPacket> hijackCurrentPacket( private static <K, V extends MinecraftPacket> CompletableFuture<WrappedPacket> hijackCurrentPacket(
@ -113,7 +119,7 @@ public class ChatQueue {
// map the new packet into a better "designed" packet with the hijacked packet's timestamp // map the new packet into a better "designed" packet with the hijacked packet's timestamp
WrappedPacket.wrap(previous.timestamp, WrappedPacket.wrap(previous.timestamp,
future.thenApply(item -> packetMapper.map(previous.timestamp, item))) future.thenApply(item -> packetMapper.map(previous.timestamp, item)))
.whenCompleteAsync(writePacket(connection), connection.eventLoop()) .thenApplyAsync(writePacket(connection), connection.eventLoop())
.whenComplete( .whenComplete(
(packet, throwable) -> awaitedFuture.complete(throwable != null ? null : packet)); (packet, throwable) -> awaitedFuture.complete(throwable != null ? null : packet));
}); });
@ -148,10 +154,12 @@ public class ChatQueue {
this.packet = packet; this.packet = packet;
} }
public void write(MinecraftConnection connection) { @Nullable
public ChannelFuture write(MinecraftConnection connection) {
if (packet != null) { if (packet != null) {
connection.write(packet); return connection.write(packet);
} }
return null;
} }
private static CompletableFuture<WrappedPacket> wrap(Instant timestamp, private static CompletableFuture<WrappedPacket> wrap(Instant timestamp,