diff --git a/build.gradle b/build.gradle index b999bc923..3341ce3ad 100644 --- a/build.gradle +++ b/build.gradle @@ -24,7 +24,7 @@ allprojects { junitVersion = '5.3.0-M1' slf4jVersion = '1.7.25' log4jVersion = '2.11.2' - nettyVersion = '4.1.35.Final' + nettyVersion = '4.1.37.Final' guavaVersion = '25.1-jre' checkerFrameworkVersion = '2.7.0' configurateVersion = '3.6' diff --git a/native/src/main/java/com/velocitypowered/natives/encryption/JavaVelocityCipher.java b/native/src/main/java/com/velocitypowered/natives/encryption/JavaVelocityCipher.java index c7a14815e..c8c15686a 100644 --- a/native/src/main/java/com/velocitypowered/natives/encryption/JavaVelocityCipher.java +++ b/native/src/main/java/com/velocitypowered/natives/encryption/JavaVelocityCipher.java @@ -4,6 +4,7 @@ import com.google.common.base.Preconditions; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; +import io.netty.util.concurrent.FastThreadLocal; import java.security.GeneralSecurityException; import javax.crypto.Cipher; import javax.crypto.SecretKey; @@ -24,8 +25,12 @@ public class JavaVelocityCipher implements VelocityCipher { } }; private static final int INITIAL_BUFFER_SIZE = 1024 * 8; - private static final ThreadLocal inBufLocal = ThreadLocal.withInitial( - () -> new byte[INITIAL_BUFFER_SIZE]); + private static final FastThreadLocal inBufLocal = new FastThreadLocal() { + @Override + protected byte[] initialValue() { + return new byte[INITIAL_BUFFER_SIZE]; + } + }; private final Cipher cipher; private boolean disposed = false; diff --git a/proxy/src/main/java/com/velocitypowered/proxy/VelocityServer.java b/proxy/src/main/java/com/velocitypowered/proxy/VelocityServer.java index 6e2765202..095fd5e86 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/VelocityServer.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/VelocityServer.java @@ -59,9 +59,14 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.IntFunction; import java.util.stream.Collectors; import net.kyori.text.Component; import net.kyori.text.TextComponent; @@ -379,14 +384,35 @@ public class VelocityServer implements ProxyServer { Runnable shutdownProcess = () -> { logger.info("Shutting down the proxy..."); - for (ConnectedPlayer player : ImmutableList.copyOf(connectionsByUuid.values())) { + // Shutdown the connection manager, this should be + // done first to refuse new connections + cm.shutdown(); + + ImmutableList players = ImmutableList.copyOf(connectionsByUuid.values()); + for (ConnectedPlayer player : players) { player.disconnect(TextComponent.of("Proxy shutting down.")); } - this.cm.shutdown(); - try { - if (!eventManager.shutdown() || !scheduler.shutdown()) { + boolean timedOut = false; + + try { + // Wait for the connections finish tearing down, this + // makes sure that all the disconnect events are being fired + + CompletableFuture playersTeardownFuture = CompletableFuture.allOf(players.stream() + .map(ConnectedPlayer::getTeardownFuture) + .toArray((IntFunction[]>) CompletableFuture[]::new)); + + playersTeardownFuture.get(10, TimeUnit.SECONDS); + } catch (TimeoutException | ExecutionException e) { + timedOut = true; + } + + timedOut = !eventManager.shutdown() || timedOut; + timedOut = !scheduler.shutdown() || timedOut; + + if (timedOut) { logger.error("Your plugins took over 10 seconds to shut down."); } } catch (InterruptedException e) { diff --git a/proxy/src/main/java/com/velocitypowered/proxy/command/VelocityCommandManager.java b/proxy/src/main/java/com/velocitypowered/proxy/command/VelocityCommandManager.java index 5e2718755..0195645ea 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/command/VelocityCommandManager.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/command/VelocityCommandManager.java @@ -16,7 +16,7 @@ import java.util.Set; public class VelocityCommandManager implements CommandManager { - private final Map commands = new HashMap<>(); + private final Map commands = new HashMap<>(); @Override @Deprecated @@ -30,12 +30,14 @@ public class VelocityCommandManager implements CommandManager { Preconditions.checkNotNull(alias, "alias"); Preconditions.checkNotNull(otherAliases, "otherAliases"); Preconditions.checkNotNull(command, "executor"); - this.commands.put(alias.toLowerCase(Locale.ENGLISH), command); + + RawCommand rawCmd = RegularCommandWrapper.wrap(command); + this.commands.put(alias.toLowerCase(Locale.ENGLISH), rawCmd); for (int i = 0, length = otherAliases.length; i < length; i++) { final String alias1 = otherAliases[i]; Preconditions.checkNotNull(alias1, "alias at index %s", i + 1); - this.commands.put(alias1.toLowerCase(Locale.ENGLISH), command); + this.commands.put(alias1.toLowerCase(Locale.ENGLISH), rawCmd); } } @@ -50,34 +52,23 @@ public class VelocityCommandManager implements CommandManager { Preconditions.checkNotNull(source, "invoker"); Preconditions.checkNotNull(cmdLine, "cmdLine"); - String[] split = cmdLine.split(" ", -1); - if (split.length == 0) { - return false; + String alias = cmdLine; + String args = ""; + int firstSpace = cmdLine.indexOf(' '); + if (firstSpace != -1) { + alias = cmdLine.substring(0, firstSpace); + args = cmdLine.substring(firstSpace).trim(); } - - String alias = split[0]; - Command command = commands.get(alias.toLowerCase(Locale.ENGLISH)); + RawCommand command = commands.get(alias.toLowerCase(Locale.ENGLISH)); if (command == null) { return false; } - @SuppressWarnings("nullness") - String[] actualArgs = Arrays.copyOfRange(split, 1, split.length); try { - if (command instanceof RawCommand) { - RawCommand rc = (RawCommand) command; - int firstSpace = cmdLine.indexOf(' '); - String line = firstSpace == -1 ? "" : cmdLine.substring(firstSpace + 1); - if (!rc.hasPermission(source, line)) { - return false; - } - rc.execute(source, line); - } else { - if (!command.hasPermission(source, actualArgs)) { - return false; - } - command.execute(source, actualArgs); + if (!command.hasPermission(source, args)) { + return false; } + command.execute(source, args); return true; } catch (Exception e) { throw new RuntimeException("Unable to invoke command " + cmdLine + " for " + source, e); @@ -102,18 +93,12 @@ public class VelocityCommandManager implements CommandManager { Preconditions.checkNotNull(source, "source"); Preconditions.checkNotNull(cmdLine, "cmdLine"); - String[] split = cmdLine.split(" ", -1); - if (split.length == 0) { - // No command available. - return ImmutableList.of(); - } - - String alias = split[0]; - if (split.length == 1) { + int firstSpace = cmdLine.indexOf(' '); + if (firstSpace == -1) { // Offer to fill in commands. ImmutableList.Builder availableCommands = ImmutableList.builder(); - for (Map.Entry entry : commands.entrySet()) { - if (entry.getKey().regionMatches(true, 0, alias, 0, alias.length()) + for (Map.Entry entry : commands.entrySet()) { + if (entry.getKey().regionMatches(true, 0, cmdLine, 0, cmdLine.length()) && entry.getValue().hasPermission(source, new String[0])) { availableCommands.add("/" + entry.getKey()); } @@ -121,32 +106,22 @@ public class VelocityCommandManager implements CommandManager { return availableCommands.build(); } - Command command = commands.get(alias.toLowerCase(Locale.ENGLISH)); + String alias = cmdLine.substring(0, firstSpace); + String args = cmdLine.substring(firstSpace).trim(); + RawCommand command = commands.get(alias.toLowerCase(Locale.ENGLISH)); if (command == null) { // No such command, so we can't offer any tab complete suggestions. return ImmutableList.of(); } - @SuppressWarnings("nullness") - String[] actualArgs = Arrays.copyOfRange(split, 1, split.length); try { - if (command instanceof RawCommand) { - RawCommand rc = (RawCommand) command; - int firstSpace = cmdLine.indexOf(' '); - String line = firstSpace == -1 ? "" : cmdLine.substring(firstSpace + 1); - if (!rc.hasPermission(source, line)) { - return ImmutableList.of(); - } - return ImmutableList.copyOf(rc.suggest(source, line)); - } else { - if (!command.hasPermission(source, actualArgs)) { - return ImmutableList.of(); - } - return ImmutableList.copyOf(command.suggest(source, actualArgs)); + if (!command.hasPermission(source, args)) { + return ImmutableList.of(); } + return ImmutableList.copyOf(command.suggest(source, args)); } catch (Exception e) { throw new RuntimeException( - "Unable to invoke suggestions for command " + alias + " for " + source, e); + "Unable to invoke suggestions for command " + cmdLine + " for " + source, e); } } @@ -160,33 +135,61 @@ public class VelocityCommandManager implements CommandManager { Preconditions.checkNotNull(source, "source"); Preconditions.checkNotNull(cmdLine, "cmdLine"); - String[] split = cmdLine.split(" ", -1); - if (split.length == 0) { - // No command available. - return false; + String alias = cmdLine; + String args = ""; + int firstSpace = cmdLine.indexOf(' '); + if (firstSpace != -1) { + alias = cmdLine.substring(0, firstSpace); + args = cmdLine.substring(firstSpace).trim(); } - - String alias = split[0]; - Command command = commands.get(alias.toLowerCase(Locale.ENGLISH)); + RawCommand command = commands.get(alias.toLowerCase(Locale.ENGLISH)); if (command == null) { - // No such command. return false; } - @SuppressWarnings("nullness") - String[] actualArgs = Arrays.copyOfRange(split, 1, split.length); try { - if (command instanceof RawCommand) { - RawCommand rc = (RawCommand) command; - int firstSpace = cmdLine.indexOf(' '); - String line = firstSpace == -1 ? "" : cmdLine.substring(firstSpace + 1); - return rc.hasPermission(source, line); - } else { - return command.hasPermission(source, actualArgs); - } + return command.hasPermission(source, args); } catch (Exception e) { throw new RuntimeException( "Unable to invoke suggestions for command " + alias + " for " + source, e); } } + + private static class RegularCommandWrapper implements RawCommand { + + private final Command delegate; + + private RegularCommandWrapper(Command delegate) { + this.delegate = delegate; + } + + private static String[] split(String line) { + if (line.isEmpty()) { + return new String[0]; + } + return line.split(" ", -1); + } + + @Override + public void execute(CommandSource source, String commandLine) { + delegate.execute(source, split(commandLine)); + } + + @Override + public List suggest(CommandSource source, String currentLine) { + return delegate.suggest(source, split(currentLine)); + } + + @Override + public boolean hasPermission(CommandSource source, String commandLine) { + return delegate.hasPermission(source, split(commandLine)); + } + + static RawCommand wrap(Command command) { + if (command instanceof RawCommand) { + return (RawCommand) command; + } + return new RegularCommandWrapper(command); + } + } } diff --git a/proxy/src/main/java/com/velocitypowered/proxy/connection/MinecraftConnection.java b/proxy/src/main/java/com/velocitypowered/proxy/connection/MinecraftConnection.java index beefa6b05..f4c56ae74 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/connection/MinecraftConnection.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/connection/MinecraftConnection.java @@ -131,8 +131,8 @@ public class MinecraftConnection extends ChannelInboundHandlerAdapter { try { sessionHandler.exception(cause); } catch (Exception ex) { - logger.error("{}: exception handling exception", (association != null ? association : - channel.remoteAddress()), cause); + logger.error("{}: exception handling exception in {}", + (association != null ? association : channel.remoteAddress()), sessionHandler, cause); } } @@ -140,7 +140,7 @@ public class MinecraftConnection extends ChannelInboundHandlerAdapter { if (cause instanceof ReadTimeoutException) { logger.error("{}: read timed out", association); } else { - logger.error("{}: exception encountered", association, cause); + logger.error("{}: exception encountered in {}", association, sessionHandler, cause); } } diff --git a/proxy/src/main/java/com/velocitypowered/proxy/connection/backend/BackendPlaySessionHandler.java b/proxy/src/main/java/com/velocitypowered/proxy/connection/backend/BackendPlaySessionHandler.java index 2f5d0f106..fcfa53fb5 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/connection/backend/BackendPlaySessionHandler.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/connection/backend/BackendPlaySessionHandler.java @@ -22,6 +22,8 @@ import com.velocitypowered.proxy.protocol.packet.PluginMessage; import com.velocitypowered.proxy.protocol.packet.TabCompleteResponse; import com.velocitypowered.proxy.protocol.util.PluginMessageUtil; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.Unpooled; public class BackendPlaySessionHandler implements MinecraftSessionHandler { @@ -116,12 +118,15 @@ public class BackendPlaySessionHandler implements MinecraftSessionHandler { return false; } + byte[] copy = ByteBufUtil.getBytes(packet.content()); PluginMessageEvent event = new PluginMessageEvent(serverConn, serverConn.getPlayer(), id, - packet.getData()); + copy); server.getEventManager().fire(event) .thenAcceptAsync(pme -> { if (pme.getResult().isAllowed() && !playerConnection.isClosed()) { - playerConnection.write(packet); + PluginMessage copied = new PluginMessage(packet.getChannel(), + Unpooled.wrappedBuffer(copy)); + playerConnection.write(copied); } }, playerConnection.eventLoop()); return true; @@ -160,6 +165,9 @@ public class BackendPlaySessionHandler implements MinecraftSessionHandler { @Override public void handleGeneric(MinecraftPacket packet) { + if (packet instanceof PluginMessage) { + ((PluginMessage) packet).retain(); + } playerConnection.write(packet); } diff --git a/proxy/src/main/java/com/velocitypowered/proxy/connection/backend/TransitionSessionHandler.java b/proxy/src/main/java/com/velocitypowered/proxy/connection/backend/TransitionSessionHandler.java index 6f32e7ad1..b8d7ed75f 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/connection/backend/TransitionSessionHandler.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/connection/backend/TransitionSessionHandler.java @@ -167,7 +167,7 @@ public class TransitionSessionHandler implements MinecraftSessionHandler { return true; } - serverConn.getPlayer().getMinecraftConnection().write(packet); + serverConn.getPlayer().getMinecraftConnection().write(packet.retain()); return true; } diff --git a/proxy/src/main/java/com/velocitypowered/proxy/connection/backend/VelocityServerConnection.java b/proxy/src/main/java/com/velocitypowered/proxy/connection/backend/VelocityServerConnection.java index a8b97c028..bb4142904 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/connection/backend/VelocityServerConnection.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/connection/backend/VelocityServerConnection.java @@ -33,6 +33,7 @@ import com.velocitypowered.proxy.protocol.packet.Handshake; import com.velocitypowered.proxy.protocol.packet.PluginMessage; import com.velocitypowered.proxy.protocol.packet.ServerLogin; import com.velocitypowered.proxy.server.VelocityRegisteredServer; +import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelInitializer; @@ -212,9 +213,7 @@ public class VelocityServerConnection implements MinecraftConnectionAssociation, MinecraftConnection mc = ensureConnected(); - PluginMessage message = new PluginMessage(); - message.setChannel(identifier.getId()); - message.setData(data); + PluginMessage message = new PluginMessage(identifier.getId(), Unpooled.wrappedBuffer(data)); mc.write(message); return true; } diff --git a/proxy/src/main/java/com/velocitypowered/proxy/connection/client/ClientPlaySessionHandler.java b/proxy/src/main/java/com/velocitypowered/proxy/connection/client/ClientPlaySessionHandler.java index 3d2e3a9f7..03530ef9b 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/connection/client/ClientPlaySessionHandler.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/connection/client/ClientPlaySessionHandler.java @@ -30,6 +30,9 @@ import com.velocitypowered.proxy.protocol.packet.TabCompleteResponse.Offer; import com.velocitypowered.proxy.protocol.packet.TitlePacket; import com.velocitypowered.proxy.protocol.util.PluginMessageUtil; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.Unpooled; +import io.netty.util.ReferenceCountUtil; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; @@ -79,6 +82,13 @@ public class ClientPlaySessionHandler implements MinecraftSessionHandler { } } + @Override + public void deactivated() { + for (PluginMessage message : loginPluginMessages) { + ReferenceCountUtil.release(message); + } + } + @Override public boolean handle(KeepAlive packet) { VelocityServerConnection serverConnection = player.getConnectedServer(); @@ -212,10 +222,10 @@ public class ClientPlaySessionHandler implements MinecraftSessionHandler { + "ready. Channel: {}. Packet discarded.", packet.getChannel()); } else if (PluginMessageUtil.isRegister(packet)) { player.getKnownChannels().addAll(PluginMessageUtil.getChannels(packet)); - backendConn.write(packet); + backendConn.write(packet.retain()); } else if (PluginMessageUtil.isUnregister(packet)) { player.getKnownChannels().removeAll(PluginMessageUtil.getChannels(packet)); - backendConn.write(packet); + backendConn.write(packet.retain()); } else if (PluginMessageUtil.isMcBrand(packet)) { backendConn.write(PluginMessageUtil .rewriteMinecraftBrand(packet, server.getVersion(), player.getProtocolVersion())); @@ -236,16 +246,23 @@ public class ClientPlaySessionHandler implements MinecraftSessionHandler { // but further aggravated by Velocity. To work around these issues, we will queue any // non-FML handshake messages to be sent once the FML handshake has completed or the // JoinGame packet has been received by the proxy, whichever comes first. - loginPluginMessages.add(packet); + // + // We also need to make sure to retain these packets so they can be flushed + // appropriately. + loginPluginMessages.add(packet.retain()); } else { ChannelIdentifier id = server.getChannelRegistrar().getFromId(packet.getChannel()); if (id == null) { - backendConn.write(packet); + backendConn.write(packet.retain()); } else { + byte[] copy = ByteBufUtil.getBytes(packet.content()); PluginMessageEvent event = new PluginMessageEvent(player, serverConn, id, - packet.getData()); - server.getEventManager().fire(event).thenAcceptAsync(pme -> backendConn.write(packet), - backendConn.eventLoop()); + ByteBufUtil.getBytes(packet.content())); + server.getEventManager().fire(event).thenAcceptAsync(pme -> { + PluginMessage message = new PluginMessage(packet.getChannel(), + Unpooled.wrappedBuffer(copy)); + backendConn.write(message); + }, backendConn.eventLoop()); } } } @@ -272,6 +289,9 @@ public class ClientPlaySessionHandler implements MinecraftSessionHandler { MinecraftConnection smc = serverConnection.getConnection(); if (smc != null && serverConnection.getPhase().consideredComplete()) { + if (packet instanceof PluginMessage) { + ((PluginMessage) packet).retain(); + } smc.write(packet); } } diff --git a/proxy/src/main/java/com/velocitypowered/proxy/connection/client/ConnectedPlayer.java b/proxy/src/main/java/com/velocitypowered/proxy/connection/client/ConnectedPlayer.java index 878e2fc40..9ad15bf6c 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/connection/client/ConnectedPlayer.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/connection/client/ConnectedPlayer.java @@ -50,6 +50,7 @@ import com.velocitypowered.proxy.tablist.VelocityTabListLegacy; import com.velocitypowered.proxy.util.VelocityMessages; import com.velocitypowered.proxy.util.collect.CappedSet; import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.Unpooled; import java.net.InetSocketAddress; import java.util.Collection; import java.util.Collections; @@ -58,6 +59,7 @@ import java.util.Optional; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadLocalRandom; import net.kyori.text.Component; import net.kyori.text.TextComponent; @@ -97,6 +99,7 @@ public class ConnectedPlayer implements MinecraftConnectionAssociation, Player { private final VelocityServer server; private ClientConnectionPhase connectionPhase; private final Collection knownChannels; + private final CompletableFuture teardownFuture = new CompletableFuture<>(); private @MonotonicNonNull List serversToTry = null; @@ -409,18 +412,14 @@ public class ConnectedPlayer implements MinecraftConnectionAssociation, Player { } if (connectedServer == null) { - // The player isn't yet connected to a server. Note that we need to do this in a future run - // of the event loop due to an issue with the Netty kqueue transport. - minecraftConnection.eventLoop().execute(() -> { - Optional nextServer = getNextServerToTry(rs); - if (nextServer.isPresent()) { - // There can't be any connection in flight now. - resetInFlightConnection(); - createConnectionRequest(nextServer.get()).fireAndForget(); - } else { - disconnect(friendlyReason); - } - }); + Optional nextServer = getNextServerToTry(rs); + if (nextServer.isPresent()) { + // There can't be any connection in flight now. + resetInFlightConnection(); + createConnectionRequest(nextServer.get()).fireAndForget(); + } else { + disconnect(friendlyReason); + } } else { boolean kickedFromCurrent = connectedServer.getServer().equals(rs); ServerKickResult result; @@ -562,7 +561,12 @@ public class ConnectedPlayer implements MinecraftConnectionAssociation, Player { connectedServer.disconnect(); } server.unregisterConnection(this); - server.getEventManager().fireAndForget(new DisconnectEvent(this)); + server.getEventManager().fire(new DisconnectEvent(this)) + .thenRun(() -> this.teardownFuture.complete(null)); + } + + public CompletableFuture getTeardownFuture() { + return teardownFuture; } @Override @@ -579,9 +583,7 @@ public class ConnectedPlayer implements MinecraftConnectionAssociation, Player { public boolean sendPluginMessage(ChannelIdentifier identifier, byte[] data) { Preconditions.checkNotNull(identifier, "identifier"); Preconditions.checkNotNull(data, "data"); - PluginMessage message = new PluginMessage(); - message.setChannel(identifier.getId()); - message.setData(data); + PluginMessage message = new PluginMessage(identifier.getId(), Unpooled.wrappedBuffer(data)); minecraftConnection.write(message); return true; } diff --git a/proxy/src/main/java/com/velocitypowered/proxy/connection/client/InitialConnectSessionHandler.java b/proxy/src/main/java/com/velocitypowered/proxy/connection/client/InitialConnectSessionHandler.java index 59c2e9255..1d1485ef8 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/connection/client/InitialConnectSessionHandler.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/connection/client/InitialConnectSessionHandler.java @@ -26,7 +26,7 @@ public class InitialConnectSessionHandler implements MinecraftSessionHandler { } else if (PluginMessageUtil.isUnregister(packet)) { player.getKnownChannels().removeAll(PluginMessageUtil.getChannels(packet)); } - serverConn.ensureConnected().write(packet); + serverConn.ensureConnected().write(packet.retain()); } return true; } diff --git a/proxy/src/main/java/com/velocitypowered/proxy/connection/forge/legacy/LegacyForgeHandshakeBackendPhase.java b/proxy/src/main/java/com/velocitypowered/proxy/connection/forge/legacy/LegacyForgeHandshakeBackendPhase.java index 060525d5e..08884b0ea 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/connection/forge/legacy/LegacyForgeHandshakeBackendPhase.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/connection/forge/legacy/LegacyForgeHandshakeBackendPhase.java @@ -114,7 +114,7 @@ public enum LegacyForgeHandshakeBackendPhase implements BackendConnectionPhase { serverConnection.setConnectionPhase(newPhase); // Write the packet to the player, we don't need it now. - player.getMinecraftConnection().write(message); + player.getMinecraftConnection().write(message.retain()); return true; } diff --git a/proxy/src/main/java/com/velocitypowered/proxy/connection/forge/legacy/LegacyForgeHandshakeClientPhase.java b/proxy/src/main/java/com/velocitypowered/proxy/connection/forge/legacy/LegacyForgeHandshakeClientPhase.java index fd4577181..f75dc04b7 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/connection/forge/legacy/LegacyForgeHandshakeClientPhase.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/connection/forge/legacy/LegacyForgeHandshakeClientPhase.java @@ -212,7 +212,7 @@ public enum LegacyForgeHandshakeClientPhase implements ClientConnectionPhase { PluginMessage message, MinecraftConnection backendConn) { // Send the packet on to the server. - backendConn.write(message); + backendConn.write(message.retain()); // We handled the packet. No need to continue processing. return true; diff --git a/proxy/src/main/java/com/velocitypowered/proxy/connection/forge/legacy/LegacyForgeUtil.java b/proxy/src/main/java/com/velocitypowered/proxy/connection/forge/legacy/LegacyForgeUtil.java index ca01f86d7..8d2c9d410 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/connection/forge/legacy/LegacyForgeUtil.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/connection/forge/legacy/LegacyForgeUtil.java @@ -27,9 +27,8 @@ class LegacyForgeUtil { */ static byte getHandshakePacketDiscriminator(PluginMessage message) { Preconditions.checkArgument(message.getChannel().equals(FORGE_LEGACY_HANDSHAKE_CHANNEL)); - byte[] data = message.getData(); - Preconditions.checkArgument(data.length >= 1); - return data[0]; + Preconditions.checkArgument(message.content().isReadable()); + return message.content().getByte(0); } /** @@ -44,7 +43,7 @@ class LegacyForgeUtil { .checkArgument(message.getChannel().equals(FORGE_LEGACY_HANDSHAKE_CHANNEL), "message is not a FML HS plugin message"); - ByteBuf byteBuf = Unpooled.wrappedBuffer(message.getData()); + ByteBuf byteBuf = message.content().retainedSlice(); try { byte discriminator = byteBuf.readByte(); @@ -75,7 +74,7 @@ class LegacyForgeUtil { static PluginMessage resetPacket() { PluginMessage msg = new PluginMessage(); msg.setChannel(FORGE_LEGACY_HANDSHAKE_CHANNEL); - msg.setData(FORGE_LEGACY_HANDSHAKE_RESET_DATA.clone()); + msg.replace(Unpooled.wrappedBuffer(FORGE_LEGACY_HANDSHAKE_RESET_DATA.clone())); return msg; } } diff --git a/proxy/src/main/java/com/velocitypowered/proxy/network/TransportType.java b/proxy/src/main/java/com/velocitypowered/proxy/network/TransportType.java index beb42d92f..e382a4856 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/network/TransportType.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/network/TransportType.java @@ -1,6 +1,6 @@ package com.velocitypowered.proxy.network; -import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.velocitypowered.proxy.util.concurrent.VelocityNettyThreadFactory; import io.netty.channel.EventLoopGroup; import io.netty.channel.epoll.Epoll; import io.netty.channel.epoll.EpollDatagramChannel; @@ -60,10 +60,7 @@ enum TransportType { } private static ThreadFactory createThreadFactory(final String name, final Type type) { - return new ThreadFactoryBuilder() - .setNameFormat("Netty " + name + ' ' + type.toString() + " #%d") - .setDaemon(true) - .build(); + return new VelocityNettyThreadFactory("Netty " + name + ' ' + type.toString() + " #%d"); } public static TransportType bestType() { diff --git a/proxy/src/main/java/com/velocitypowered/proxy/protocol/ProtocolUtils.java b/proxy/src/main/java/com/velocitypowered/proxy/protocol/ProtocolUtils.java index ab6dbc75d..77b628e4f 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/protocol/ProtocolUtils.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/protocol/ProtocolUtils.java @@ -225,6 +225,24 @@ public enum ProtocolUtils { return ret; } + /** + * Reads a retained {@link ByteBuf} slice of the specified {@code buf} with the 1.7 style length. + * + * @param buf the buffer to read from + * @return the retained slice + */ + public static ByteBuf readRetainedByteBufSlice17(ByteBuf buf) { + // Read in a 2 or 3 byte number that represents the length of the packet. (3 byte "shorts" for + // Forge only) + // No vanilla packet should give a 3 byte packet + int len = readExtendedForgeShort(buf); + + Preconditions.checkArgument(len <= (FORGE_MAX_ARRAY_LENGTH), + "Cannot receive array longer than %s (got %s bytes)", FORGE_MAX_ARRAY_LENGTH, len); + + return buf.readRetainedSlice(len); + } + /** * Writes an byte array for legacy version 1.7 to the specified {@code buf} * @@ -250,6 +268,31 @@ public enum ProtocolUtils { buf.writeBytes(b); } + /** + * Writes an {@link ByteBuf} for legacy version 1.7 to the specified {@code buf} + * + * @param b array + * @param buf buf + * @param allowExtended forge + */ + public static void writeByteBuf17(ByteBuf b, ByteBuf buf, boolean allowExtended) { + if (allowExtended) { + Preconditions + .checkArgument(b.readableBytes() <= (FORGE_MAX_ARRAY_LENGTH), + "Cannot send array longer than %s (got %s bytes)", FORGE_MAX_ARRAY_LENGTH, + b.readableBytes()); + } else { + Preconditions.checkArgument(b.readableBytes() <= Short.MAX_VALUE, + "Cannot send array longer than Short.MAX_VALUE (got %s bytes)", b.readableBytes()); + } + // Write a 2 or 3 byte number that represents the length of the packet. (3 byte "shorts" for + // Forge only) + // No vanilla packet should give a 3 byte packet, this method will still retain vanilla + // behaviour. + writeExtendedForgeShort(buf, b.readableBytes()); + buf.writeBytes(b); + } + /** * Reads a Minecraft-style extended short from the specified {@code buf}. * @@ -284,6 +327,31 @@ public enum ProtocolUtils { } } + /** + * Reads a non length-prefixed string from the {@code buf}. We need this for the legacy 1.7 + * version, being inconsistent when sending the brand. + * + * @param buf the buffer to read from + * @return the decoded string + */ + public static String readStringWithoutLength(ByteBuf buf) { + int length = buf.readableBytes(); + int cap = DEFAULT_MAX_STRING_SIZE; + checkArgument(length >= 0, "Got a negative-length string (%s)", length); + // `cap` is interpreted as a UTF-8 character length. To cover the full Unicode plane, we must + // consider the length of a UTF-8 character, which can be up to a 4 bytes. We do an initial + // sanity check and then check again to make sure our optimistic guess was good. + checkArgument(length <= cap * 4, "Bad string size (got %s, maximum is %s)", length, cap); + checkState(buf.isReadable(length), + "Trying to read a string that is too long (wanted %s, only have %s)", length, + buf.readableBytes()); + String str = buf.toString(buf.readerIndex(), length, StandardCharsets.UTF_8); + buf.skipBytes(length); + checkState(str.length() <= cap, "Got a too-long string (got %s, max %s)", + str.length(), cap); + return str; + } + public enum Direction { SERVERBOUND, CLIENTBOUND; diff --git a/proxy/src/main/java/com/velocitypowered/proxy/protocol/packet/LoginPluginMessage.java b/proxy/src/main/java/com/velocitypowered/proxy/protocol/packet/LoginPluginMessage.java index 26fb8cb70..a3dae879d 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/protocol/packet/LoginPluginMessage.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/protocol/packet/LoginPluginMessage.java @@ -4,24 +4,24 @@ import com.velocitypowered.api.network.ProtocolVersion; import com.velocitypowered.proxy.connection.MinecraftSessionHandler; import com.velocitypowered.proxy.protocol.MinecraftPacket; import com.velocitypowered.proxy.protocol.ProtocolUtils; +import com.velocitypowered.proxy.protocol.util.DeferredByteBufHolder; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import org.checkerframework.checker.nullness.qual.Nullable; -public class LoginPluginMessage implements MinecraftPacket { +public class LoginPluginMessage extends DeferredByteBufHolder implements MinecraftPacket { private int id; private @Nullable String channel; - private ByteBuf data = Unpooled.EMPTY_BUFFER; public LoginPluginMessage() { - + super(null); } public LoginPluginMessage(int id, @Nullable String channel, ByteBuf data) { + super(data); this.id = id; this.channel = channel; - this.data = data; } public int getId() { @@ -35,16 +35,12 @@ public class LoginPluginMessage implements MinecraftPacket { return channel; } - public ByteBuf getData() { - return data; - } - @Override public String toString() { return "LoginPluginMessage{" + "id=" + id + ", channel='" + channel + '\'' - + ", data=" + data + + ", data=" + super.toString() + '}'; } @@ -53,9 +49,9 @@ public class LoginPluginMessage implements MinecraftPacket { this.id = ProtocolUtils.readVarInt(buf); this.channel = ProtocolUtils.readString(buf); if (buf.isReadable()) { - this.data = buf.readSlice(buf.readableBytes()); + this.replace(buf.readSlice(buf.readableBytes())); } else { - this.data = Unpooled.EMPTY_BUFFER; + this.replace(Unpooled.EMPTY_BUFFER); } } @@ -66,7 +62,7 @@ public class LoginPluginMessage implements MinecraftPacket { throw new IllegalStateException("Channel is not specified!"); } ProtocolUtils.writeString(buf, channel); - buf.writeBytes(data); + buf.writeBytes(content()); } @Override diff --git a/proxy/src/main/java/com/velocitypowered/proxy/protocol/packet/PluginMessage.java b/proxy/src/main/java/com/velocitypowered/proxy/protocol/packet/PluginMessage.java index 487c1e2bc..5b9b3f136 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/protocol/packet/PluginMessage.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/protocol/packet/PluginMessage.java @@ -1,19 +1,29 @@ package com.velocitypowered.proxy.protocol.packet; -import static com.velocitypowered.proxy.connection.VelocityConstants.EMPTY_BYTE_ARRAY; import static com.velocitypowered.proxy.protocol.util.PluginMessageUtil.transformLegacyToModernChannel; import com.velocitypowered.api.network.ProtocolVersion; import com.velocitypowered.proxy.connection.MinecraftSessionHandler; import com.velocitypowered.proxy.protocol.MinecraftPacket; import com.velocitypowered.proxy.protocol.ProtocolUtils; +import com.velocitypowered.proxy.protocol.util.DeferredByteBufHolder; import io.netty.buffer.ByteBuf; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.checkerframework.checker.nullness.qual.Nullable; -public class PluginMessage implements MinecraftPacket { +public class PluginMessage extends DeferredByteBufHolder implements MinecraftPacket { private @Nullable String channel; - private byte[] data = EMPTY_BYTE_ARRAY; + + public PluginMessage() { + super(null); + } + + public PluginMessage(String channel, + @MonotonicNonNull ByteBuf backing) { + super(backing); + this.channel = channel; + } public String getChannel() { if (channel == null) { @@ -26,19 +36,11 @@ public class PluginMessage implements MinecraftPacket { this.channel = channel; } - public byte[] getData() { - return data; - } - - public void setData(byte[] data) { - this.data = data; - } - @Override public String toString() { return "PluginMessage{" + "channel='" + channel + '\'' - + ", data=" + + ", data=" + super.toString() + '}'; } @@ -49,11 +51,11 @@ public class PluginMessage implements MinecraftPacket { this.channel = transformLegacyToModernChannel(this.channel); } if (version.compareTo(ProtocolVersion.MINECRAFT_1_8) >= 0) { - this.data = new byte[buf.readableBytes()]; - buf.readBytes(data); + this.replace(buf.readRetainedSlice(buf.readableBytes())); } else { - data = ProtocolUtils.readByteArray17(buf); + this.replace(ProtocolUtils.readRetainedByteBufSlice17(buf)); } + } @Override @@ -67,14 +69,55 @@ public class PluginMessage implements MinecraftPacket { ProtocolUtils.writeString(buf, this.channel); } if (version.compareTo(ProtocolVersion.MINECRAFT_1_8) >= 0) { - buf.writeBytes(data); + buf.writeBytes(content()); } else { - ProtocolUtils.writeByteArray17(data, buf, true); // True for Forge support + ProtocolUtils.writeByteBuf17(content(), buf, true); // True for Forge support } + } @Override public boolean handle(MinecraftSessionHandler handler) { return handler.handle(this); } + + @Override + public PluginMessage copy() { + return (PluginMessage) super.copy(); + } + + @Override + public PluginMessage duplicate() { + return (PluginMessage) super.duplicate(); + } + + @Override + public PluginMessage retainedDuplicate() { + return (PluginMessage) super.retainedDuplicate(); + } + + @Override + public PluginMessage replace(ByteBuf content) { + return (PluginMessage) super.replace(content); + } + + @Override + public PluginMessage retain() { + return (PluginMessage) super.retain(); + } + + @Override + public PluginMessage retain(int increment) { + return (PluginMessage) super.retain(increment); + } + + @Override + public PluginMessage touch() { + return (PluginMessage) super.touch(); + } + + @Override + public PluginMessage touch(Object hint) { + return (PluginMessage) super.touch(hint); + } } diff --git a/proxy/src/main/java/com/velocitypowered/proxy/protocol/util/DeferredByteBufHolder.java b/proxy/src/main/java/com/velocitypowered/proxy/protocol/util/DeferredByteBufHolder.java new file mode 100644 index 000000000..6a35a4e5e --- /dev/null +++ b/proxy/src/main/java/com/velocitypowered/proxy/protocol/util/DeferredByteBufHolder.java @@ -0,0 +1,136 @@ +package com.velocitypowered.proxy.protocol.util; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufHolder; +import io.netty.util.IllegalReferenceCountException; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; + +/** + * A special-purpose implementation of {@code ByteBufHolder} that can defer accepting its buffer. + * This is required because Velocity packets are, for better or worse, mutable. + */ +public class DeferredByteBufHolder implements ByteBufHolder { + + @MonotonicNonNull + private ByteBuf backing; + + public DeferredByteBufHolder( + @MonotonicNonNull ByteBuf backing) { + this.backing = backing; + } + + @Override + public ByteBuf content() { + if (backing == null) { + throw new IllegalStateException("Trying to obtain contents of holder with a null buffer"); + } + if (backing.refCnt() <= 0) { + throw new IllegalReferenceCountException(backing.refCnt()); + } + return backing; + } + + @Override + public ByteBufHolder copy() { + if (backing == null) { + throw new IllegalStateException("Trying to obtain contents of holder with a null buffer"); + } + return new DeferredByteBufHolder(backing.copy()); + } + + @Override + public ByteBufHolder duplicate() { + if (backing == null) { + throw new IllegalStateException("Trying to obtain contents of holder with a null buffer"); + } + return new DeferredByteBufHolder(backing.duplicate()); + } + + @Override + public ByteBufHolder retainedDuplicate() { + if (backing == null) { + throw new IllegalStateException("Trying to obtain contents of holder with a null buffer"); + } + return new DeferredByteBufHolder(backing.retainedDuplicate()); + } + + @Override + public ByteBufHolder replace(ByteBuf content) { + if (content == null) { + throw new NullPointerException("content"); + } + this.backing = content; + return this; + } + + @Override + public int refCnt() { + if (backing == null) { + throw new IllegalStateException("Trying to obtain contents of holder with a null buffer"); + } + return backing.refCnt(); + } + + @Override + public ByteBufHolder retain() { + if (backing == null) { + throw new IllegalStateException("Trying to obtain contents of holder with a null buffer"); + } + backing.retain(); + return this; + } + + @Override + public ByteBufHolder retain(int increment) { + if (backing == null) { + throw new IllegalStateException("Trying to obtain contents of holder with a null buffer"); + } + backing.retain(increment); + return this; + } + + @Override + public ByteBufHolder touch() { + if (backing == null) { + throw new IllegalStateException("Trying to obtain contents of holder with a null buffer"); + } + backing.touch(); + return this; + } + + @Override + public ByteBufHolder touch(Object hint) { + if (backing == null) { + throw new IllegalStateException("Trying to obtain contents of holder with a null buffer"); + } + backing.touch(hint); + return this; + } + + @Override + public boolean release() { + if (backing == null) { + throw new IllegalStateException("Trying to obtain contents of holder with a null buffer"); + } + return backing.release(); + } + + @Override + public boolean release(int decrement) { + if (backing == null) { + throw new IllegalStateException("Trying to obtain contents of holder with a null buffer"); + } + return backing.release(decrement); + } + + @Override + public String toString() { + String str = "DeferredByteBufHolder["; + if (backing == null) { + str += "null"; + } else { + str += backing.toString(); + } + return str + "]"; + } +} diff --git a/proxy/src/main/java/com/velocitypowered/proxy/protocol/util/PluginMessageUtil.java b/proxy/src/main/java/com/velocitypowered/proxy/protocol/util/PluginMessageUtil.java index d58ee95a7..8dea2e01d 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/protocol/util/PluginMessageUtil.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/protocol/util/PluginMessageUtil.java @@ -93,12 +93,12 @@ public class PluginMessageUtil { checkNotNull(message, "message"); checkArgument(isRegister(message) || isUnregister(message), "Unknown channel type %s", message.getChannel()); - if (message.getData().length == 0) { + if (!message.content().isReadable()) { // If we try to split this, we will get an one-element array with the empty string, which // has caused issues with 1.13+ compatibility. Just return an empty list. return ImmutableList.of(); } - String channels = new String(message.getData(), StandardCharsets.UTF_8); + String channels = message.content().toString(StandardCharsets.UTF_8); return ImmutableList.copyOf(channels.split("\0")); } @@ -114,10 +114,9 @@ public class PluginMessageUtil { Preconditions.checkArgument(channels.size() > 0, "no channels specified"); String channelName = protocolVersion.compareTo(ProtocolVersion.MINECRAFT_1_13) >= 0 ? REGISTER_CHANNEL : REGISTER_CHANNEL_LEGACY; - PluginMessage message = new PluginMessage(); - message.setChannel(channelName); - message.setData(String.join("\0", channels).getBytes(StandardCharsets.UTF_8)); - return message; + ByteBuf contents = Unpooled.buffer(); + contents.writeCharSequence(String.join("\0", channels), StandardCharsets.UTF_8); + return new PluginMessage(channelName, contents); } /** @@ -134,27 +133,17 @@ public class PluginMessageUtil { String toAppend = " (" + version.getName() + ")"; - PluginMessage newMsg = new PluginMessage(); - newMsg.setChannel(message.getChannel()); + ByteBuf rewrittenBuf = Unpooled.buffer(); - byte[] rewrittenData; if (protocolVersion.compareTo(ProtocolVersion.MINECRAFT_1_8) >= 0) { - ByteBuf rewrittenBuf = Unpooled.buffer(); - try { - String currentBrand = ProtocolUtils.readString(Unpooled.wrappedBuffer(message.getData())); - ProtocolUtils.writeString(rewrittenBuf, currentBrand + toAppend); - rewrittenData = new byte[rewrittenBuf.readableBytes()]; - rewrittenBuf.readBytes(rewrittenData); - } finally { - rewrittenBuf.release(); - } + String currentBrand = ProtocolUtils.readString(message.content().slice()); + ProtocolUtils.writeString(rewrittenBuf, currentBrand + toAppend); } else { - String currentBrand = new String(message.getData(), StandardCharsets.UTF_8); - rewrittenData = (currentBrand + toAppend).getBytes(); + String currentBrand = ProtocolUtils.readStringWithoutLength(message.content().slice()); + rewrittenBuf.writeBytes((currentBrand + toAppend).getBytes()); } - newMsg.setData(rewrittenData); - return newMsg; + return new PluginMessage(message.getChannel(), rewrittenBuf); } private static final Pattern INVALID_IDENTIFIER_REGEX = Pattern.compile("[^a-z0-9\\-_]*"); @@ -192,5 +181,4 @@ public class PluginMessageUtil { return "legacy:" + INVALID_IDENTIFIER_REGEX.matcher(lower).replaceAll(""); } } - } diff --git a/proxy/src/main/java/com/velocitypowered/proxy/util/concurrent/VelocityNettyThreadFactory.java b/proxy/src/main/java/com/velocitypowered/proxy/util/concurrent/VelocityNettyThreadFactory.java new file mode 100644 index 000000000..87fb83524 --- /dev/null +++ b/proxy/src/main/java/com/velocitypowered/proxy/util/concurrent/VelocityNettyThreadFactory.java @@ -0,0 +1,23 @@ +package com.velocitypowered.proxy.util.concurrent; + +import static com.google.common.base.Preconditions.checkNotNull; + +import io.netty.util.concurrent.FastThreadLocalThread; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +public class VelocityNettyThreadFactory implements ThreadFactory { + + private final AtomicInteger threadNumber = new AtomicInteger(); + private final String nameFormat; + + public VelocityNettyThreadFactory(String nameFormat) { + this.nameFormat = checkNotNull(nameFormat, "nameFormat"); + } + + @Override + public Thread newThread(Runnable r) { + String name = String.format(nameFormat, threadNumber.incrementAndGet()); + return new FastThreadLocalThread(r, name); + } +}