diff --git a/.gitignore b/.gitignore index 74b81462f..1cf6c5d69 100644 --- a/.gitignore +++ b/.gitignore @@ -89,3 +89,5 @@ plugins/ ### Natives stuff ### native/mbedtls native/zlib-ng +native/zlib-cf +native/libdeflate diff --git a/native/compile-linux.sh b/native/compile-linux.sh index 4b759df20..db7227d06 100755 --- a/native/compile-linux.sh +++ b/native/compile-linux.sh @@ -1,21 +1,22 @@ #!/bin/bash -if [ ! -d zlib-ng ]; then - echo "Cloning zlib-ng..." - git clone https://github.com/zlib-ng/zlib-ng.git +if [ ! -d libdeflate ]; then + echo "Cloning libdeflate..." + git clone https://github.com/ebiggers/libdeflate.git fi -echo "Compiling zlib-ng..." -cd zlib-ng -CFLAGS="-fPIC -O3" ./configure --zlib-compat --static -make clean && make +echo "Compiling libdeflate..." +cd libdeflate || exit +make cd .. # Modify as you need. MBEDTLS_ROOT=mbedtls -CFLAGS="-O3 -I$JAVA_HOME/include/ -I$JAVA_HOME/include/linux/ -fPIC -shared" -gcc $CFLAGS -Izlib-ng src/main/c/jni_util.c src/main/c/jni_zlib_deflate.c src/main/c/jni_zlib_inflate.c \ - src/main/c/jni_zlib_common.c zlib-ng/libz.a -o src/main/resources/linux_x64/velocity-compress.so -gcc $CFLAGS -I $MBEDTLS_ROOT/include -shared $MBEDTLS_ROOT/library/aes.c $MBEDTLS_ROOT/library/aesni.c \ +CFLAGS="-O3 -I$JAVA_HOME/include/ -I$JAVA_HOME/include/linux/ -fPIC -shared -Wl,-z,noexecstack" +ARCH=$(uname -m) +mkdir -p src/main/resources/linux_$ARCH +gcc $CFLAGS -Ilibdeflate src/main/c/jni_util.c src/main/c/jni_zlib_deflate.c src/main/c/jni_zlib_inflate.c \ + libdeflate/libdeflate.a -o src/main/resources/linux_$ARCH/velocity-compress.so +gcc $CFLAGS -I $MBEDTLS_ROOT/include -shared $MBEDTLS_ROOT/library/aes.c $MBEDTLS_ROOT/library/aesni.c \ $MBEDTLS_ROOT/library/platform.c $MBEDTLS_ROOT/library/platform_util.c src/main/c/jni_util.c src/main/c/jni_cipher.c \ - -o src/main/resources/linux_x64/velocity-cipher.so \ No newline at end of file + -o src/main/resources/linux_$ARCH/velocity-cipher.so \ No newline at end of file diff --git a/native/compile-osx.sh b/native/compile-osx.sh deleted file mode 100755 index 0c0f2c2c5..000000000 --- a/native/compile-osx.sh +++ /dev/null @@ -1,23 +0,0 @@ -#!/bin/bash - -if [ ! -d zlib-ng ]; then - echo "Cloning zlib-ng..." - git clone https://github.com/zlib-ng/zlib-ng.git -fi - -echo "Compiling zlib-ng..." -cd zlib-ng -CFLAGS="-fPIC -O3" ./configure --zlib-compat --static -make clean && make -cd .. - -# Modify as you need. -MBEDTLS_ROOT=mbedtls -export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.8.0_144.jdk/Contents/Home -CFLAGS="-O3 -I$JAVA_HOME/include/ -I$JAVA_HOME/include/darwin/ -fPIC -shared" - -clang $CFLAGS -Izlib-ng src/main/c/jni_util.c src/main/c/jni_zlib_deflate.c src/main/c/jni_zlib_inflate.c \ - src/main/c/jni_zlib_common.c zlib-ng/libz.a -o src/main/resources/macosx/velocity-compress.dylib -clang $CFLAGS -I $MBEDTLS_ROOT/include -shared $MBEDTLS_ROOT/library/aes.c $MBEDTLS_ROOT/library/aesni.c \ - $MBEDTLS_ROOT/library/platform.c $MBEDTLS_ROOT/library/platform_util.c src/main/c/jni_util.c src/main/c/jni_cipher.c \ - -o src/main/resources/macosx/velocity-cipher.dylib \ No newline at end of file diff --git a/native/src/main/c/jni_zlib_common.c b/native/src/main/c/jni_zlib_common.c deleted file mode 100644 index fa5a0cbab..000000000 --- a/native/src/main/c/jni_zlib_common.c +++ /dev/null @@ -1,29 +0,0 @@ -#include -#include -#include -#include -#include "jni_util.h" - -void JNICALL -check_zlib_free(JNIEnv *env, z_stream *stream, bool deflate) -{ - int ret = deflate ? deflateEnd(stream) : inflateEnd(stream); - const char *msg = stream->msg; - free((void*) stream); - - switch (ret) { - case Z_OK: - break; - case Z_STREAM_ERROR: - if (msg == NULL) { - msg = "stream state inconsistent"; - } - // fall-through - case Z_DATA_ERROR: - if (msg == NULL) { - msg = "data was discarded"; - } - throwException(env, "java/lang/IllegalArgumentException", msg); - break; - } -} \ No newline at end of file diff --git a/native/src/main/c/jni_zlib_common.h b/native/src/main/c/jni_zlib_common.h deleted file mode 100644 index ff4bc9a80..000000000 --- a/native/src/main/c/jni_zlib_common.h +++ /dev/null @@ -1,6 +0,0 @@ -#include -#include -#include - -void JNICALL -check_zlib_free(JNIEnv *env, z_stream *stream, bool deflate); \ No newline at end of file diff --git a/native/src/main/c/jni_zlib_deflate.c b/native/src/main/c/jni_zlib_deflate.c index 6e05f7cae..7a84a5b04 100644 --- a/native/src/main/c/jni_zlib_deflate.c +++ b/native/src/main/c/jni_zlib_deflate.c @@ -2,56 +2,21 @@ #include #include #include -#include +#include #include "jni_util.h" -#include "jni_zlib_common.h" - -static jfieldID finishedID; -static jfieldID consumedID; - -JNIEXPORT void JNICALL -Java_com_velocitypowered_natives_compression_NativeZlibDeflate_initIDs(JNIEnv *env, jclass cls) -{ - finishedID = (*env)->GetFieldID(env, cls, "finished", "Z"); - consumedID = (*env)->GetFieldID(env, cls, "consumed", "I"); -} JNIEXPORT jlong JNICALL Java_com_velocitypowered_natives_compression_NativeZlibDeflate_init(JNIEnv *env, jobject obj, jint level) { - z_stream* stream = calloc(1, sizeof(z_stream)); - - if (stream == 0) { + struct libdeflate_compressor *compressor = libdeflate_alloc_compressor(level); + if (compressor == NULL) { // Out of memory! - throwException(env, "java/lang/OutOfMemoryError", "zlib allocate stream"); - return 0; - } - - int ret = deflateInit(stream, level); - if (ret == Z_OK) { - return (jlong) stream; - } else { - const char *zlib_msg = stream->msg; - free(stream); - switch (ret) { - case Z_MEM_ERROR: - throwException(env, "java/lang/OutOfMemoryError", "zlib init"); - break; - case Z_STREAM_ERROR: { - // Thanks Ken and Ritchie! - char message[32]; - snprintf(message, 32, "invalid level %d", level); - throwException(env, "java/lang/IllegalArgumentException", message); - break; - } - default: - throwException(env, "java/util/zip/DataFormatException", zlib_msg); - break; - } + throwException(env, "java/lang/OutOfMemoryError", "libdeflate allocate compressor"); return 0; } + return (jlong) compressor; } JNIEXPORT void JNICALL @@ -59,11 +24,10 @@ Java_com_velocitypowered_natives_compression_NativeZlibDeflate_free(JNIEnv *env, jobject obj, jlong ctx) { - z_stream* stream = (z_stream*) ctx; - check_zlib_free(env, stream, true); + libdeflate_free_compressor((struct libdeflate_compressor *) ctx); } -JNIEXPORT int JNICALL +JNIEXPORT jboolean JNICALL Java_com_velocitypowered_natives_compression_NativeZlibDeflate_process(JNIEnv *env, jobject obj, jlong ctx, @@ -73,38 +37,8 @@ Java_com_velocitypowered_natives_compression_NativeZlibDeflate_process(JNIEnv *e jint destinationLength, jboolean finish) { - z_stream* stream = (z_stream*) ctx; - stream->next_in = (Bytef *) sourceAddress; - stream->next_out = (Bytef *) destinationAddress; - stream->avail_in = sourceLength; - stream->avail_out = destinationLength; - - int res = deflate(stream, finish ? Z_FINISH : Z_NO_FLUSH); - switch (res) { - case Z_STREAM_END: - // The stream has ended. - (*env)->SetBooleanField(env, obj, finishedID, JNI_TRUE); - // fall-through - case Z_OK: - // Not yet completed, but progress has been made. Tell Java how many bytes we've processed. - (*env)->SetIntField(env, obj, consumedID, sourceLength - stream->avail_in); - return destinationLength - stream->avail_out; - case Z_BUF_ERROR: - // This is not fatal. Just say we need more data. Usually this applies to the next_out buffer, - // which NativeVelocityCompressor will notice and will expand the buffer. - return 0; - default: - throwException(env, "java/util/zip/DataFormatException", stream->msg); - return 0; - } -} - -JNIEXPORT void JNICALL -Java_com_velocitypowered_natives_compression_NativeZlibDeflate_reset(JNIEnv *env, - jobject obj, - jlong ctx) -{ - z_stream* stream = (z_stream*) ctx; - int ret = deflateReset(stream); - assert(ret == Z_OK); + struct libdeflate_compressor *compressor = (struct libdeflate_compressor *) ctx; + size_t produced = libdeflate_zlib_compress(compressor, (void *) sourceAddress, sourceLength, + (void *) destinationAddress, destinationLength); + return (jlong) produced; } \ No newline at end of file diff --git a/native/src/main/c/jni_zlib_inflate.c b/native/src/main/c/jni_zlib_inflate.c index 64804e160..9a81f0db8 100644 --- a/native/src/main/c/jni_zlib_inflate.c +++ b/native/src/main/c/jni_zlib_inflate.c @@ -2,50 +2,21 @@ #include #include #include -#include +#include #include "jni_util.h" -#include "jni_zlib_common.h" - -static jfieldID finishedID; -static jfieldID consumedID; - -JNIEXPORT void JNICALL -Java_com_velocitypowered_natives_compression_NativeZlibInflate_initIDs(JNIEnv *env, jclass cls) -{ - finishedID = (*env)->GetFieldID(env, cls, "finished", "Z"); - consumedID = (*env)->GetFieldID(env, cls, "consumed", "I"); -} JNIEXPORT jlong JNICALL Java_com_velocitypowered_natives_compression_NativeZlibInflate_init(JNIEnv *env, jobject obj) { - z_stream* stream = calloc(1, sizeof(z_stream)); - - if (stream == 0) { + struct libdeflate_decompressor *decompress = libdeflate_alloc_decompressor(); + if (decompress == NULL) { // Out of memory! - throwException(env, "java/lang/OutOfMemoryError", "zlib allocate stream"); + throwException(env, "java/lang/OutOfMemoryError", "libdeflate allocate decompressor"); return 0; } - int ret = inflateInit(stream); - if (ret == Z_OK) { - return (jlong) stream; - } else { - const char *zlib_msg = stream->msg; - free(stream); - switch (ret) { - case Z_MEM_ERROR: - throwException(env, "java/lang/OutOfMemoryError", "zlib init"); - return 0; - case Z_STREAM_ERROR: - throwException(env, "java/lang/IllegalArgumentException", "stream clobbered?"); - return 0; - default: - throwException(env, "java/util/zip/DataFormatException", zlib_msg); - return 0; - } - } + return (jlong) decompress; } JNIEXPORT void JNICALL @@ -53,51 +24,34 @@ Java_com_velocitypowered_natives_compression_NativeZlibInflate_free(JNIEnv *env, jobject obj, jlong ctx) { - z_stream* stream = (z_stream*) ctx; - check_zlib_free(env, stream, false); + libdeflate_free_decompressor((struct libdeflate_decompressor *) ctx); } -JNIEXPORT int JNICALL +JNIEXPORT jboolean JNICALL Java_com_velocitypowered_natives_compression_NativeZlibInflate_process(JNIEnv *env, jobject obj, jlong ctx, jlong sourceAddress, jint sourceLength, jlong destinationAddress, - jint destinationLength) + jint destinationLength, + jlong maximumSize) { - z_stream* stream = (z_stream*) ctx; - stream->next_in = (Bytef *) sourceAddress; - stream->next_out = (Bytef *) destinationAddress; - stream->avail_in = sourceLength; - stream->avail_out = destinationLength; + struct libdeflate_decompressor *decompress = (struct libdeflate_decompressor *) ctx; + enum libdeflate_result result = libdeflate_zlib_decompress(decompress, (void *) sourceAddress, + sourceLength, (void *) destinationAddress, destinationLength, NULL); - int res = inflate(stream, Z_PARTIAL_FLUSH); - switch (res) { - case Z_STREAM_END: - // The stream has ended - (*env)->SetBooleanField(env, obj, finishedID, JNI_TRUE); - // fall-through - case Z_OK: - // Not yet completed, but progress has been made. Tell Java how many bytes we've processed. - (*env)->SetIntField(env, obj, consumedID, sourceLength - stream->avail_in); - return destinationLength - stream->avail_out; - case Z_BUF_ERROR: - // This is not fatal. Just say we need more data. Usually this applies to the next_out buffer, - // which NativeVelocityCompressor will notice and will expand the buffer. - return 0; - default: - throwException(env, "java/util/zip/DataFormatException", stream->msg); - return 0; + switch (result) { + case LIBDEFLATE_SUCCESS: + // We are happy + return JNI_TRUE; + case LIBDEFLATE_BAD_DATA: + throwException(env, "java/util/zip/DataFormatException", "inflate data is bad"); + return JNI_FALSE; + case LIBDEFLATE_SHORT_OUTPUT: + case LIBDEFLATE_INSUFFICIENT_SPACE: + // These cases are the same for us. We expect the full uncompressed size to be known. + throwException(env, "java/util/zip/DataFormatException", "uncompressed size is inaccurate"); + return JNI_FALSE; } -} - -JNIEXPORT void JNICALL -Java_com_velocitypowered_natives_compression_NativeZlibInflate_reset(JNIEnv *env, - jobject obj, - jlong ctx) -{ - z_stream* stream = (z_stream*) ctx; - int ret = inflateReset(stream); - assert(ret == Z_OK); } \ No newline at end of file diff --git a/native/src/main/java/com/velocitypowered/natives/compression/Java11VelocityCompressor.java b/native/src/main/java/com/velocitypowered/natives/compression/Java11VelocityCompressor.java index 999d12caa..bc6206242 100644 --- a/native/src/main/java/com/velocitypowered/natives/compression/Java11VelocityCompressor.java +++ b/native/src/main/java/com/velocitypowered/natives/compression/Java11VelocityCompressor.java @@ -54,7 +54,8 @@ public class Java11VelocityCompressor implements VelocityCompressor { } @Override - public void inflate(ByteBuf source, ByteBuf destination, int max) throws DataFormatException { + public void inflate(ByteBuf source, ByteBuf destination, int uncompressedSize) + throws DataFormatException { ensureNotDisposed(); // We (probably) can't nicely deal with >=1 buffer nicely, so let's scream loudly. @@ -67,7 +68,7 @@ public class Java11VelocityCompressor implements VelocityCompressor { while (!inflater.finished() && inflater.getBytesRead() < source.readableBytes()) { if (!destination.isWritable()) { - ensureMaxSize(destination, max); + ensureMaxSize(destination, uncompressedSize); destination.ensureWritable(ZLIB_BUFFER_SIZE); } diff --git a/native/src/main/java/com/velocitypowered/natives/compression/JavaVelocityCompressor.java b/native/src/main/java/com/velocitypowered/natives/compression/JavaVelocityCompressor.java index 6293dd770..f2e557d1c 100644 --- a/native/src/main/java/com/velocitypowered/natives/compression/JavaVelocityCompressor.java +++ b/native/src/main/java/com/velocitypowered/natives/compression/JavaVelocityCompressor.java @@ -25,20 +25,21 @@ public class JavaVelocityCompressor implements VelocityCompressor { } @Override - public void inflate(ByteBuf source, ByteBuf destination, int max) throws DataFormatException { + public void inflate(ByteBuf source, ByteBuf destination, int uncompressedSize) + throws DataFormatException { ensureNotDisposed(); final int available = source.readableBytes(); this.setInflaterInput(source); if (destination.hasArray()) { - this.inflateDestinationIsHeap(destination, available, max); + this.inflateDestinationIsHeap(destination, available, uncompressedSize); } else { if (buf.length == 0) { buf = new byte[ZLIB_BUFFER_SIZE]; } while (!inflater.finished() && inflater.getBytesRead() < available) { - ensureMaxSize(destination, max); + ensureMaxSize(destination, uncompressedSize); int read = inflater.inflate(buf); destination.writeBytes(buf, 0, read); } diff --git a/native/src/main/java/com/velocitypowered/natives/compression/LibdeflateVelocityCompressor.java b/native/src/main/java/com/velocitypowered/natives/compression/LibdeflateVelocityCompressor.java new file mode 100644 index 000000000..73e9ff7fc --- /dev/null +++ b/native/src/main/java/com/velocitypowered/natives/compression/LibdeflateVelocityCompressor.java @@ -0,0 +1,87 @@ +package com.velocitypowered.natives.compression; + +import com.google.common.base.Preconditions; +import com.velocitypowered.natives.util.BufferPreference; +import io.netty.buffer.ByteBuf; +import java.util.zip.DataFormatException; + +public class LibdeflateVelocityCompressor implements VelocityCompressor { + + public static final VelocityCompressorFactory FACTORY = LibdeflateVelocityCompressor::new; + + private final NativeZlibInflate inflate = new NativeZlibInflate(); + private final long inflateCtx; + private final NativeZlibDeflate deflate = new NativeZlibDeflate(); + private final long deflateCtx; + private boolean disposed = false; + + private LibdeflateVelocityCompressor(int level) { + int correctedLevel = level == -1 ? 6 : level; + if (correctedLevel > 12 || correctedLevel < 1) { + throw new IllegalArgumentException("Invalid compression level " + level); + } + + this.inflateCtx = inflate.init(); + this.deflateCtx = deflate.init(level == -1 ? 6 : level); + } + + @Override + public void inflate(ByteBuf source, ByteBuf destination, int uncompressedSize) + throws DataFormatException { + ensureNotDisposed(); + source.memoryAddress(); + destination.memoryAddress(); + + // libdeflate recommends we work with a known uncompressed size - so we work strictly within + // those parameters. If the uncompressed size doesn't match the compressed size, then we will + // throw an exception from native code. + destination.ensureWritable(uncompressedSize); + + long sourceAddress = source.memoryAddress() + source.readerIndex(); + long destinationAddress = destination.memoryAddress() + destination.writerIndex(); + + inflate.process(inflateCtx, sourceAddress, source.readableBytes(), destinationAddress, + uncompressedSize); + destination.writerIndex(destination.writerIndex() + uncompressedSize); + } + + @Override + public void deflate(ByteBuf source, ByteBuf destination) throws DataFormatException { + ensureNotDisposed(); + source.memoryAddress(); + destination.memoryAddress(); + + while (true) { + long sourceAddress = source.memoryAddress() + source.readerIndex(); + long destinationAddress = destination.memoryAddress() + destination.writerIndex(); + + int produced = deflate.process(deflateCtx, sourceAddress, source.readableBytes(), + destinationAddress, destination.writableBytes()); + if (produced > 0) { + destination.writerIndex(destination.writerIndex() + produced); + return; + } + + // Insufficient room - enlarge the buffer. + destination.capacity(destination.capacity() * 2); + } + } + + private void ensureNotDisposed() { + Preconditions.checkState(!disposed, "Object already disposed"); + } + + @Override + public void dispose() { + if (!disposed) { + inflate.free(inflateCtx); + deflate.free(deflateCtx); + } + disposed = true; + } + + @Override + public BufferPreference preferredBufferType() { + return BufferPreference.DIRECT_REQUIRED; + } +} diff --git a/native/src/main/java/com/velocitypowered/natives/compression/NativeVelocityCompressor.java b/native/src/main/java/com/velocitypowered/natives/compression/NativeVelocityCompressor.java deleted file mode 100644 index b932579ae..000000000 --- a/native/src/main/java/com/velocitypowered/natives/compression/NativeVelocityCompressor.java +++ /dev/null @@ -1,89 +0,0 @@ -package com.velocitypowered.natives.compression; - -import static com.velocitypowered.natives.compression.CompressorUtils.ZLIB_BUFFER_SIZE; -import static com.velocitypowered.natives.compression.CompressorUtils.ensureMaxSize; - -import com.google.common.base.Preconditions; -import com.velocitypowered.natives.util.BufferPreference; -import io.netty.buffer.ByteBuf; -import java.util.zip.DataFormatException; - -public class NativeVelocityCompressor implements VelocityCompressor { - - public static final VelocityCompressorFactory FACTORY = NativeVelocityCompressor::new; - - private final NativeZlibInflate inflate = new NativeZlibInflate(); - private final long inflateCtx; - private final NativeZlibDeflate deflate = new NativeZlibDeflate(); - private final long deflateCtx; - private boolean disposed = false; - - private NativeVelocityCompressor(int level) { - this.inflateCtx = inflate.init(); - this.deflateCtx = deflate.init(level); - } - - @Override - public void inflate(ByteBuf source, ByteBuf destination, int max) throws DataFormatException { - ensureNotDisposed(); - source.memoryAddress(); - destination.memoryAddress(); - - while (!inflate.finished && source.isReadable()) { - if (!destination.isWritable()) { - ensureMaxSize(destination, max); - destination.ensureWritable(ZLIB_BUFFER_SIZE); - } - int produced = inflate.process(inflateCtx, source.memoryAddress() + source.readerIndex(), - source.readableBytes(), destination.memoryAddress() + destination.writerIndex(), - destination.writableBytes()); - source.readerIndex(source.readerIndex() + inflate.consumed); - destination.writerIndex(destination.writerIndex() + produced); - } - - inflate.reset(inflateCtx); - inflate.consumed = 0; - inflate.finished = false; - } - - @Override - public void deflate(ByteBuf source, ByteBuf destination) throws DataFormatException { - ensureNotDisposed(); - source.memoryAddress(); - destination.memoryAddress(); - - while (!deflate.finished) { - if (!destination.isWritable()) { - destination.ensureWritable(ZLIB_BUFFER_SIZE); - } - int produced = deflate.process(deflateCtx, source.memoryAddress() + source.readerIndex(), - source.readableBytes(), - destination.memoryAddress() + destination.writerIndex(), destination.writableBytes(), - true); - source.readerIndex(source.readerIndex() + deflate.consumed); - destination.writerIndex(destination.writerIndex() + produced); - } - - deflate.reset(deflateCtx); - deflate.consumed = 0; - deflate.finished = false; - } - - private void ensureNotDisposed() { - Preconditions.checkState(!disposed, "Object already disposed"); - } - - @Override - public void dispose() { - if (!disposed) { - inflate.free(inflateCtx); - deflate.free(deflateCtx); - } - disposed = true; - } - - @Override - public BufferPreference preferredBufferType() { - return BufferPreference.DIRECT_REQUIRED; - } -} diff --git a/native/src/main/java/com/velocitypowered/natives/compression/NativeZlibDeflate.java b/native/src/main/java/com/velocitypowered/natives/compression/NativeZlibDeflate.java index 1114cab49..eb89412cb 100644 --- a/native/src/main/java/com/velocitypowered/natives/compression/NativeZlibDeflate.java +++ b/native/src/main/java/com/velocitypowered/natives/compression/NativeZlibDeflate.java @@ -5,21 +5,10 @@ package com.velocitypowered.natives.compression; */ class NativeZlibDeflate { - boolean finished; - int consumed; - native long init(int level); native long free(long ctx); native int process(long ctx, long sourceAddress, int sourceLength, long destinationAddress, - int destinationLength, boolean finish); - - native void reset(long ctx); - - static { - initIDs(); - } - - private static native void initIDs(); + int destinationLength); } diff --git a/native/src/main/java/com/velocitypowered/natives/compression/NativeZlibInflate.java b/native/src/main/java/com/velocitypowered/natives/compression/NativeZlibInflate.java index 7eabd9a05..fc6e9787f 100644 --- a/native/src/main/java/com/velocitypowered/natives/compression/NativeZlibInflate.java +++ b/native/src/main/java/com/velocitypowered/natives/compression/NativeZlibInflate.java @@ -1,25 +1,16 @@ package com.velocitypowered.natives.compression; +import java.util.zip.DataFormatException; + /** * Represents a native interface for zlib's inflate functions. */ class NativeZlibInflate { - boolean finished; - int consumed; - native long init(); native long free(long ctx); - native int process(long ctx, long sourceAddress, int sourceLength, long destinationAddress, - int destinationLength); - - native void reset(long ctx); - - static { - initIDs(); - } - - private static native void initIDs(); + native boolean process(long ctx, long sourceAddress, int sourceLength, long destinationAddress, + int destinationLength) throws DataFormatException; } diff --git a/native/src/main/java/com/velocitypowered/natives/compression/VelocityCompressor.java b/native/src/main/java/com/velocitypowered/natives/compression/VelocityCompressor.java index 839b64569..09cad102c 100644 --- a/native/src/main/java/com/velocitypowered/natives/compression/VelocityCompressor.java +++ b/native/src/main/java/com/velocitypowered/natives/compression/VelocityCompressor.java @@ -6,10 +6,12 @@ import io.netty.buffer.ByteBuf; import java.util.zip.DataFormatException; /** - * Provides an interface to inflate and deflate {@link ByteBuf}s using zlib. + * Provides an interface to inflate and deflate {@link ByteBuf}s using zlib or a compatible + * implementation. */ public interface VelocityCompressor extends Disposable, Native { - void inflate(ByteBuf source, ByteBuf destination, int max) throws DataFormatException; + void inflate(ByteBuf source, ByteBuf destination, int uncompressedSize) + throws DataFormatException; void deflate(ByteBuf source, ByteBuf destination) throws DataFormatException; } diff --git a/native/src/main/java/com/velocitypowered/natives/util/NativeConstraints.java b/native/src/main/java/com/velocitypowered/natives/util/NativeConstraints.java index 6e1bd1c3f..5a9dcb35d 100644 --- a/native/src/main/java/com/velocitypowered/natives/util/NativeConstraints.java +++ b/native/src/main/java/com/velocitypowered/natives/util/NativeConstraints.java @@ -7,6 +7,7 @@ import java.util.function.BooleanSupplier; public class NativeConstraints { private static final boolean NATIVES_ENABLED = !Boolean.getBoolean("velocity.natives-disabled"); private static final boolean IS_AMD64; + private static final boolean IS_AARCH64; private static final boolean CAN_GET_MEMORYADDRESS; static { @@ -21,20 +22,21 @@ public class NativeConstraints { // HotSpot on Intel macOS prefers x86_64, but OpenJ9 on macOS and HotSpot/OpenJ9 elsewhere // give amd64. IS_AMD64 = osArch.equals("amd64") || osArch.equals("x86_64"); + IS_AARCH64 = osArch.equals("aarch64"); } - static final BooleanSupplier MACOS = () -> { - return NATIVES_ENABLED - && CAN_GET_MEMORYADDRESS - && System.getProperty("os.name", "").equalsIgnoreCase("Mac OS X") + static final BooleanSupplier NATIVE_BASE = () -> NATIVES_ENABLED && CAN_GET_MEMORYADDRESS; + + static final BooleanSupplier LINUX_X86_64 = () -> { + return NATIVE_BASE.getAsBoolean() + && System.getProperty("os.name", "").equalsIgnoreCase("Linux") && IS_AMD64; }; - static final BooleanSupplier LINUX = () -> { - return NATIVES_ENABLED - && CAN_GET_MEMORYADDRESS + static final BooleanSupplier LINUX_AARCH64 = () -> { + return NATIVE_BASE.getAsBoolean() && System.getProperty("os.name", "").equalsIgnoreCase("Linux") - && IS_AMD64; + && IS_AARCH64; }; static final BooleanSupplier JAVA_11 = () -> { diff --git a/native/src/main/java/com/velocitypowered/natives/util/Natives.java b/native/src/main/java/com/velocitypowered/natives/util/Natives.java index e63c87c20..120ec1ee7 100644 --- a/native/src/main/java/com/velocitypowered/natives/util/Natives.java +++ b/native/src/main/java/com/velocitypowered/natives/util/Natives.java @@ -4,7 +4,7 @@ import com.google.common.collect.ImmutableList; import com.velocitypowered.natives.NativeSetupException; import com.velocitypowered.natives.compression.Java11VelocityCompressor; import com.velocitypowered.natives.compression.JavaVelocityCompressor; -import com.velocitypowered.natives.compression.NativeVelocityCompressor; +import com.velocitypowered.natives.compression.LibdeflateVelocityCompressor; import com.velocitypowered.natives.compression.VelocityCompressorFactory; import com.velocitypowered.natives.encryption.JavaVelocityCipher; import com.velocitypowered.natives.encryption.NativeVelocityCipher; @@ -62,12 +62,14 @@ public class Natives { public static final NativeCodeLoader compress = new NativeCodeLoader<>( ImmutableList.of( - new NativeCodeLoader.Variant<>(NativeConstraints.MACOS, - copyAndLoadNative("/macosx/velocity-compress.dylib"), "native (macOS)", - NativeVelocityCompressor.FACTORY), - new NativeCodeLoader.Variant<>(NativeConstraints.LINUX, - copyAndLoadNative("/linux_x64/velocity-compress.so"), "native (Linux amd64)", - NativeVelocityCompressor.FACTORY), + new NativeCodeLoader.Variant<>(NativeConstraints.LINUX_X86_64, + copyAndLoadNative("/linux_x86_64/velocity-compress.so"), + "libdeflate (Linux x86_64)", + LibdeflateVelocityCompressor.FACTORY), + new NativeCodeLoader.Variant<>(NativeConstraints.LINUX_AARCH64, + copyAndLoadNative("/linux_aarch64/velocity-compress.so"), + "libdeflate (Linux aarch64)", + LibdeflateVelocityCompressor.FACTORY), new NativeCodeLoader.Variant<>(NativeConstraints.JAVA_11, () -> { }, "Java 11", () -> Java11VelocityCompressor.FACTORY), new NativeCodeLoader.Variant<>(NativeCodeLoader.ALWAYS, () -> { @@ -77,12 +79,12 @@ public class Natives { public static final NativeCodeLoader cipher = new NativeCodeLoader<>( ImmutableList.of( - new NativeCodeLoader.Variant<>(NativeConstraints.MACOS, - copyAndLoadNative("/macosx/velocity-cipher.dylib"), "mbed TLS (macOS)", - NativeVelocityCipher.FACTORY), - new NativeCodeLoader.Variant<>(NativeConstraints.LINUX, - copyAndLoadNative("/linux_x64/velocity-cipher.so"), "mbed TLS (Linux amd64)", - NativeVelocityCipher.FACTORY), + new NativeCodeLoader.Variant<>(NativeConstraints.LINUX_X86_64, + copyAndLoadNative("/linux_x86_64/velocity-cipher.so"), + "mbed TLS (Linux x86_64)", NativeVelocityCipher.FACTORY), + new NativeCodeLoader.Variant<>(NativeConstraints.LINUX_AARCH64, + copyAndLoadNative("/linux_aarch64/velocity-cipher.so"), + "mbed TLS (Linux aarch64)", NativeVelocityCipher.FACTORY), new NativeCodeLoader.Variant<>(NativeCodeLoader.ALWAYS, () -> { }, "Java", JavaVelocityCipher.FACTORY) ) diff --git a/native/src/main/resources/linux_aarch64/velocity-compress.so b/native/src/main/resources/linux_aarch64/velocity-compress.so new file mode 100755 index 000000000..100a41c99 Binary files /dev/null and b/native/src/main/resources/linux_aarch64/velocity-compress.so differ diff --git a/native/src/main/resources/linux_x64/velocity-compress.so b/native/src/main/resources/linux_x64/velocity-compress.so deleted file mode 100755 index d4ec11d70..000000000 Binary files a/native/src/main/resources/linux_x64/velocity-compress.so and /dev/null differ diff --git a/native/src/main/resources/linux_x86_64/velocity-compress.so b/native/src/main/resources/linux_x86_64/velocity-compress.so new file mode 100755 index 000000000..d72a66bfd Binary files /dev/null and b/native/src/main/resources/linux_x86_64/velocity-compress.so differ diff --git a/native/src/main/resources/macosx/velocity-compress.dylib b/native/src/main/resources/macosx/velocity-compress.dylib deleted file mode 100755 index 3624162a6..000000000 Binary files a/native/src/main/resources/macosx/velocity-compress.dylib and /dev/null differ diff --git a/native/src/test/java/com/velocitypowered/natives/compression/VelocityCompressorTest.java b/native/src/test/java/com/velocitypowered/natives/compression/VelocityCompressorTest.java index 182423e98..2bc657b0f 100644 --- a/native/src/test/java/com/velocitypowered/natives/compression/VelocityCompressorTest.java +++ b/native/src/test/java/com/velocitypowered/natives/compression/VelocityCompressorTest.java @@ -33,13 +33,13 @@ class VelocityCompressorTest { } @Test - @EnabledOnOs({MAC, LINUX}) + @EnabledOnOs({LINUX}) void sanityCheckNative() { assertThrows(IllegalArgumentException.class, () -> Natives.compress.get().create(-42)); } @Test - @EnabledOnOs({MAC, LINUX}) + @EnabledOnOs({LINUX}) void nativeIntegrityCheck() throws DataFormatException { VelocityCompressor compressor = Natives.compress.get().create(Deflater.DEFAULT_COMPRESSION); if (compressor.preferredBufferType() != BufferPreference.DIRECT_REQUIRED) { @@ -86,10 +86,11 @@ class VelocityCompressorTest { ByteBuf decompressed = bufSupplier.get(); source.writeBytes(TEST_DATA); + int uncompressedData = source.readableBytes(); try { compressor.deflate(source, dest); - compressor.inflate(dest, decompressed, Integer.MAX_VALUE); + compressor.inflate(dest, decompressed, uncompressedData); source.readerIndex(0); assertTrue(ByteBufUtil.equals(source, decompressed)); } finally { diff --git a/proxy/build.gradle b/proxy/build.gradle index d6fdbb539..266883969 100644 --- a/proxy/build.gradle +++ b/proxy/build.gradle @@ -48,7 +48,7 @@ dependencies { compile "io.netty:netty-handler:${nettyVersion}" compile "io.netty:netty-transport-native-epoll:${nettyVersion}" compile "io.netty:netty-transport-native-epoll:${nettyVersion}:linux-x86_64" - compile "io.netty:netty-transport-native-kqueue:${nettyVersion}:osx-x86_64" + compile "io.netty:netty-transport-native-epoll:${nettyVersion}:linux-aarch64" compile "io.netty:netty-resolver-dns:${nettyVersion}" compile "org.apache.logging.log4j:log4j-api:${log4jVersion}" diff --git a/proxy/src/main/java/com/velocitypowered/proxy/VelocityServer.java b/proxy/src/main/java/com/velocitypowered/proxy/VelocityServer.java index 036956882..21b08e3aa 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/VelocityServer.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/VelocityServer.java @@ -120,12 +120,12 @@ public class VelocityServer implements ProxyServer { } public KeyPair getServerKeyPair() { - return ensureInitialized(serverKeyPair); + return serverKeyPair; } @Override public VelocityConfiguration getConfiguration() { - return ensureInitialized(this.configuration); + return this.configuration; } @Override @@ -229,7 +229,6 @@ public class VelocityServer implements ProxyServer { Metrics.VelocityMetrics.startMetrics(this, configuration.getMetrics()); } - @RequiresNonNull({"pluginManager", "eventManager"}) private void loadPlugins() { logger.info("Loading plugins..."); @@ -443,18 +442,11 @@ public class VelocityServer implements ProxyServer { } public AsyncHttpClient getAsyncHttpClient() { - return ensureInitialized(cm).getHttpClient(); + return cm.getHttpClient(); } public Ratelimiter getIpAttemptLimiter() { - return ensureInitialized(ipAttemptLimiter); - } - - private static T ensureInitialized(T o) { - if (o == null) { - throw new IllegalStateException("The proxy isn't fully initialized."); - } - return o; + return ipAttemptLimiter; } /** diff --git a/proxy/src/main/java/com/velocitypowered/proxy/config/VelocityConfiguration.java b/proxy/src/main/java/com/velocitypowered/proxy/config/VelocityConfiguration.java index 3522d1523..921556676 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/config/VelocityConfiguration.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/config/VelocityConfiguration.java @@ -465,7 +465,7 @@ public class VelocityConfiguration extends AnnotatedConfig implements ProxyConfi Advanced advanced = new Advanced(toml.getTable("advanced")); Query query = new Query(toml.getTable("query")); Metrics metrics = new Metrics(toml.getTable("metrics")); - byte[] forwardingSecret = toml.getString("forwarding-secret", "5up3r53cr3t") + byte[] forwardingSecret = toml.getString("forwarding-secret", generateRandomString(12)) .getBytes(StandardCharsets.UTF_8); String forwardingModeName = toml.getString("player-info-forwarding-mode", "MODERN") diff --git a/proxy/src/main/java/com/velocitypowered/proxy/connection/MinecraftConnection.java b/proxy/src/main/java/com/velocitypowered/proxy/connection/MinecraftConnection.java index d3214787c..2765676e0 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/connection/MinecraftConnection.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/connection/MinecraftConnection.java @@ -16,7 +16,6 @@ import com.velocitypowered.natives.encryption.VelocityCipher; import com.velocitypowered.natives.encryption.VelocityCipherFactory; import com.velocitypowered.natives.util.Natives; import com.velocitypowered.proxy.VelocityServer; -import com.velocitypowered.proxy.network.netty.DiscardHandler; import com.velocitypowered.proxy.protocol.MinecraftPacket; import com.velocitypowered.proxy.protocol.StateRegistry; import com.velocitypowered.proxy.protocol.netty.MinecraftCipherDecoder; @@ -125,6 +124,13 @@ public class MinecraftConnection extends ChannelInboundHandlerAdapter { } } + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + if (sessionHandler != null) { + sessionHandler.readCompleted(); + } + } + @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { if (ctx.channel().isActive()) { @@ -145,7 +151,6 @@ public class MinecraftConnection extends ChannelInboundHandlerAdapter { } } - installDiscardHandler(ctx); ctx.close(); } } @@ -161,18 +166,6 @@ public class MinecraftConnection extends ChannelInboundHandlerAdapter { Preconditions.checkState(this.channel.eventLoop().inEventLoop(), "Not in event loop"); } - private void installDiscardHandler(ChannelHandlerContext ctx) { - if (ctx.pipeline().get("discard") == null) { - ctx.pipeline().addBefore(MINECRAFT_DECODER, "discard", DiscardHandler.HANDLER); - } - } - - private void installDiscardHandler() { - if (channel.pipeline().get("discard") == null) { - channel.pipeline().addBefore(MINECRAFT_DECODER, "discard", DiscardHandler.HANDLER); - } - } - public EventLoop eventLoop() { return channel.eventLoop(); } @@ -212,9 +205,10 @@ public class MinecraftConnection extends ChannelInboundHandlerAdapter { */ public void closeWith(Object msg) { if (channel.isActive()) { - knownDisconnect = true; - installDiscardHandler(); - channel.writeAndFlush(msg).addListener(ChannelFutureListener.CLOSE); + channel.eventLoop().execute(() -> { + knownDisconnect = true; + channel.writeAndFlush(msg).addListener(ChannelFutureListener.CLOSE); + }); } } @@ -223,7 +217,6 @@ public class MinecraftConnection extends ChannelInboundHandlerAdapter { */ public void close() { if (channel.isActive()) { - installDiscardHandler(); channel.close(); } } diff --git a/proxy/src/main/java/com/velocitypowered/proxy/connection/MinecraftSessionHandler.java b/proxy/src/main/java/com/velocitypowered/proxy/connection/MinecraftSessionHandler.java index 2789e17f9..297e5f884 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/connection/MinecraftSessionHandler.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/connection/MinecraftSessionHandler.java @@ -70,6 +70,10 @@ public interface MinecraftSessionHandler { } + default void readCompleted() { + + } + default boolean handle(AvailableCommands commands) { return false; } diff --git a/proxy/src/main/java/com/velocitypowered/proxy/connection/backend/BackendPlaySessionHandler.java b/proxy/src/main/java/com/velocitypowered/proxy/connection/backend/BackendPlaySessionHandler.java index 7bde50a23..5c43eb020 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/connection/backend/BackendPlaySessionHandler.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/connection/backend/BackendPlaySessionHandler.java @@ -188,12 +188,17 @@ public class BackendPlaySessionHandler implements MinecraftSessionHandler { if (packet instanceof PluginMessage) { ((PluginMessage) packet).retain(); } - playerConnection.write(packet); + playerConnection.delayedWrite(packet); } @Override public void handleUnknown(ByteBuf buf) { - playerConnection.write(buf.retain()); + playerConnection.delayedWrite(buf.retain()); + } + + @Override + public void readCompleted() { + playerConnection.flush(); } @Override diff --git a/proxy/src/main/java/com/velocitypowered/proxy/connection/client/ClientPlaySessionHandler.java b/proxy/src/main/java/com/velocitypowered/proxy/connection/client/ClientPlaySessionHandler.java index d33ad1ed7..e2b83e1da 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/connection/client/ClientPlaySessionHandler.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/connection/client/ClientPlaySessionHandler.java @@ -170,59 +170,45 @@ public class ClientPlaySessionHandler implements MinecraftSessionHandler { public boolean handle(PluginMessage packet) { VelocityServerConnection serverConn = player.getConnectedServer(); MinecraftConnection backendConn = serverConn != null ? serverConn.getConnection() : null; - if (serverConn != null && backendConn != null) { - if (backendConn.getState() != StateRegistry.PLAY) { - logger.warn("A plugin message was received while the backend server was not " - + "ready. Channel: {}. Packet discarded.", packet.getChannel()); - } else if (PluginMessageUtil.isRegister(packet)) { - player.getKnownChannels().addAll(PluginMessageUtil.getChannels(packet)); - backendConn.write(packet.retain()); - } else if (PluginMessageUtil.isUnregister(packet)) { - player.getKnownChannels().removeAll(PluginMessageUtil.getChannels(packet)); - backendConn.write(packet.retain()); - } else if (PluginMessageUtil.isMcBrand(packet)) { - backendConn.write(PluginMessageUtil - .rewriteMinecraftBrand(packet, server.getVersion(), player.getProtocolVersion())); - } else { - if (serverConn.getPhase() == BackendConnectionPhases.IN_TRANSITION) { - // We must bypass the currently-connected server when forwarding Forge packets. - VelocityServerConnection inFlight = player.getConnectionInFlight(); - if (inFlight != null) { - player.getPhase().handle(player, packet, inFlight); - } - return true; - } - - if (!player.getPhase().handle(player, packet, serverConn)) { - if (!player.getPhase().consideredComplete() || !serverConn.getPhase() - .consideredComplete()) { - // The client is trying to send messages too early. This is primarily caused by mods, - // but further aggravated by Velocity. To work around these issues, we will queue any - // non-FML handshake messages to be sent once the FML handshake has completed or the - // JoinGame packet has been received by the proxy, whichever comes first. - // - // We also need to make sure to retain these packets so they can be flushed - // appropriately. - loginPluginMessages.add(packet.retain()); - } else { - ChannelIdentifier id = server.getChannelRegistrar().getFromId(packet.getChannel()); - if (id == null) { - backendConn.write(packet.retain()); - } else { - byte[] copy = ByteBufUtil.getBytes(packet.content()); - PluginMessageEvent event = new PluginMessageEvent(player, serverConn, id, - ByteBufUtil.getBytes(packet.content())); - server.getEventManager().fire(event).thenAcceptAsync(pme -> { - PluginMessage message = new PluginMessage(packet.getChannel(), - Unpooled.wrappedBuffer(copy)); - backendConn.write(message); - }, backendConn.eventLoop()); - } - } - } - } + if (serverConn == null || backendConn == null) { + return true; } + if (backendConn.getState() != StateRegistry.PLAY) { + logger.warn("A plugin message was received while the backend server was not " + + "ready. Channel: {}. Packet discarded.", packet.getChannel()); + return true; + } + + if (this.tryHandleVanillaPluginMessageChannel(packet, backendConn)) { + return true; + } + + if (serverConn.getPhase() == BackendConnectionPhases.IN_TRANSITION) { + // We must bypass the currently-connected server when forwarding Forge packets. + VelocityServerConnection inFlight = player.getConnectionInFlight(); + if (inFlight != null) { + if (player.getPhase().handle(player, packet, inFlight)) { + return true; + } + } + } else if (!this.tryHandleForgeMessage(packet, serverConn)) { + return true; + } + + ChannelIdentifier id = server.getChannelRegistrar().getFromId(packet.getChannel()); + if (id == null) { + backendConn.write(packet.retain()); + } else { + byte[] copy = ByteBufUtil.getBytes(packet.content()); + PluginMessageEvent event = new PluginMessageEvent(player, serverConn, id, + ByteBufUtil.getBytes(packet.content())); + server.getEventManager().fire(event).thenAcceptAsync(pme -> { + PluginMessage message = new PluginMessage(packet.getChannel(), + Unpooled.wrappedBuffer(copy)); + backendConn.write(message); + }, backendConn.eventLoop()); + } return true; } @@ -277,9 +263,15 @@ public class ClientPlaySessionHandler implements MinecraftSessionHandler { @Override public void writabilityChanged() { + boolean writable = player.getConnection().getChannel().isWritable(); + + if (!writable) { + // We might have packets queued for the server, so flush them now to free up memory. + player.getConnection().flush(); + } + VelocityServerConnection serverConn = player.getConnectedServer(); if (serverConn != null) { - boolean writable = player.getConnection().getChannel().isWritable(); MinecraftConnection smc = serverConn.getConnection(); if (smc != null) { smc.setAutoReading(writable); @@ -287,6 +279,44 @@ public class ClientPlaySessionHandler implements MinecraftSessionHandler { } } + private boolean tryHandleVanillaPluginMessageChannel(PluginMessage packet, + MinecraftConnection backendConn) { + if (PluginMessageUtil.isRegister(packet)) { + player.getKnownChannels().addAll(PluginMessageUtil.getChannels(packet)); + backendConn.write(packet.retain()); + return true; + } else if (PluginMessageUtil.isUnregister(packet)) { + player.getKnownChannels().removeAll(PluginMessageUtil.getChannels(packet)); + backendConn.write(packet.retain()); + return true; + } else if (PluginMessageUtil.isMcBrand(packet)) { + backendConn.write(PluginMessageUtil.rewriteMinecraftBrand(packet, server.getVersion(), + player.getProtocolVersion())); + return true; + } + return false; + } + + private boolean tryHandleForgeMessage(PluginMessage packet, VelocityServerConnection serverConn) { + if (player.getPhase().handle(player, packet, serverConn)) { + return true; + } + + if (!player.getPhase().consideredComplete() || !serverConn.getPhase().consideredComplete()) { + // The client is trying to send messages too early. This is primarily caused by mods, + // but further aggravated by Velocity. To work around these issues, we will queue any + // non-FML handshake messages to be sent once the FML handshake has completed or the + // JoinGame packet has been received by the proxy, whichever comes first. + // + // We also need to make sure to retain these packets so they can be flushed + // appropriately. + loginPluginMessages.add(packet.retain()); + return true; + } else { + return false; + } + } + /** * Handles the {@code JoinGame} packet. This function is responsible for handling the client-side * switching servers in Velocity. diff --git a/proxy/src/main/java/com/velocitypowered/proxy/network/ConnectionManager.java b/proxy/src/main/java/com/velocitypowered/proxy/network/ConnectionManager.java index 0f89109df..24fe82440 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/network/ConnectionManager.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/network/ConnectionManager.java @@ -6,7 +6,7 @@ import static org.asynchttpclient.Dsl.config; import com.google.common.base.Preconditions; import com.velocitypowered.natives.util.Natives; import com.velocitypowered.proxy.VelocityServer; -import com.velocitypowered.proxy.network.netty.DnsAddressResolverGroupNameResolverAdapter; +import com.velocitypowered.proxy.network.netty.SeparatePoolInetNameResolver; import com.velocitypowered.proxy.protocol.netty.GS4QueryHandler; import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; @@ -18,6 +18,7 @@ import io.netty.channel.WriteBufferWaterMark; import io.netty.channel.epoll.EpollChannelOption; import io.netty.resolver.dns.DnsAddressResolverGroup; import io.netty.resolver.dns.DnsNameResolverBuilder; +import io.netty.util.concurrent.GlobalEventExecutor; import java.net.InetSocketAddress; import java.util.HashMap; import java.util.Map; @@ -48,7 +49,7 @@ public final class ConnectionManager { @SuppressWarnings("WeakerAccess") public final BackendChannelInitializerHolder backendChannelInitializer; - private final DnsAddressResolverGroup resolverGroup; + private final SeparatePoolInetNameResolver resolver; private final AsyncHttpClient httpClient; /** @@ -65,21 +66,16 @@ public final class ConnectionManager { new ServerChannelInitializer(this.server)); this.backendChannelInitializer = new BackendChannelInitializerHolder( new BackendChannelInitializer(this.server)); - this.resolverGroup = new DnsAddressResolverGroup(new DnsNameResolverBuilder() - .channelType(this.transportType.datagramChannelClass) - .negativeTtl(15) - .ndots(1)); + this.resolver = new SeparatePoolInetNameResolver(GlobalEventExecutor.INSTANCE); this.httpClient = asyncHttpClient(config() .setEventLoopGroup(this.workerGroup) .setUserAgent(server.getVersion().getName() + "/" + server.getVersion().getVersion()) .addRequestFilter(new RequestFilter() { @Override - public FilterContext filter(FilterContext ctx) throws FilterException { + public FilterContext filter(FilterContext ctx) { return new FilterContextBuilder<>(ctx) .request(new RequestBuilder(ctx.getRequest()) - .setNameResolver( - new DnsAddressResolverGroupNameResolverAdapter(resolverGroup, workerGroup) - ) + .setNameResolver(resolver) .build()) .build(); } @@ -162,7 +158,7 @@ public final class ConnectionManager { .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, this.server.getConfiguration().getConnectTimeout()) .group(group == null ? this.workerGroup : group) - .resolver(this.resolverGroup); + .resolver(this.resolver.asGroup()); if (transportType == TransportType.EPOLL && server.getConfiguration().useTcpFastOpen()) { bootstrap.option(EpollChannelOption.TCP_FASTOPEN_CONNECT, true); } @@ -194,6 +190,8 @@ public final class ConnectionManager { Thread.currentThread().interrupt(); } } + + this.resolver.shutdown(); } public EventLoopGroup getBossGroup() { diff --git a/proxy/src/main/java/com/velocitypowered/proxy/network/TransportType.java b/proxy/src/main/java/com/velocitypowered/proxy/network/TransportType.java index 0edff72c0..b5829f375 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/network/TransportType.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/network/TransportType.java @@ -7,11 +7,6 @@ import io.netty.channel.epoll.EpollDatagramChannel; import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.epoll.EpollServerSocketChannel; import io.netty.channel.epoll.EpollSocketChannel; -import io.netty.channel.kqueue.KQueue; -import io.netty.channel.kqueue.KQueueDatagramChannel; -import io.netty.channel.kqueue.KQueueEventLoopGroup; -import io.netty.channel.kqueue.KQueueServerSocketChannel; -import io.netty.channel.kqueue.KQueueSocketChannel; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.DatagramChannel; import io.netty.channel.socket.ServerSocketChannel; @@ -27,11 +22,7 @@ enum TransportType { (name, type) -> new NioEventLoopGroup(0, createThreadFactory(name, type))), EPOLL("epoll", EpollServerSocketChannel.class, EpollSocketChannel.class, EpollDatagramChannel.class, - (name, type) -> new EpollEventLoopGroup(0, createThreadFactory(name, type))), - KQUEUE("Kqueue", KQueueServerSocketChannel.class, KQueueSocketChannel.class, - KQueueDatagramChannel.class, - (name, type) -> new KQueueEventLoopGroup(0, createThreadFactory(name, type))); - + (name, type) -> new EpollEventLoopGroup(0, createThreadFactory(name, type))); final String name; final Class serverSocketChannelClass; @@ -71,8 +62,6 @@ enum TransportType { if (Epoll.isAvailable()) { return EPOLL; - } else if (KQueue.isAvailable()) { - return KQUEUE; } else { return NIO; } diff --git a/proxy/src/main/java/com/velocitypowered/proxy/network/netty/DiscardHandler.java b/proxy/src/main/java/com/velocitypowered/proxy/network/netty/DiscardHandler.java deleted file mode 100644 index 80b34056e..000000000 --- a/proxy/src/main/java/com/velocitypowered/proxy/network/netty/DiscardHandler.java +++ /dev/null @@ -1,17 +0,0 @@ -package com.velocitypowered.proxy.network.netty; - -import io.netty.channel.ChannelHandler.Sharable; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.util.ReferenceCountUtil; - -@Sharable -public class DiscardHandler extends ChannelInboundHandlerAdapter { - - public static final DiscardHandler HANDLER = new DiscardHandler(); - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - ReferenceCountUtil.release(msg); - } -} diff --git a/proxy/src/main/java/com/velocitypowered/proxy/network/netty/DnsAddressResolverGroupNameResolverAdapter.java b/proxy/src/main/java/com/velocitypowered/proxy/network/netty/DnsAddressResolverGroupNameResolverAdapter.java deleted file mode 100644 index 169b60a49..000000000 --- a/proxy/src/main/java/com/velocitypowered/proxy/network/netty/DnsAddressResolverGroupNameResolverAdapter.java +++ /dev/null @@ -1,68 +0,0 @@ -package com.velocitypowered.proxy.network.netty; - -import io.netty.channel.EventLoopGroup; -import io.netty.resolver.InetNameResolver; -import io.netty.resolver.dns.DnsAddressResolverGroup; -import io.netty.util.concurrent.EventExecutor; -import io.netty.util.concurrent.FutureListener; -import io.netty.util.concurrent.ImmediateEventExecutor; -import io.netty.util.concurrent.Promise; -import io.netty.util.internal.ThreadExecutorMap; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.List; - -public class DnsAddressResolverGroupNameResolverAdapter extends InetNameResolver { - - private final DnsAddressResolverGroup resolverGroup; - private final EventLoopGroup group; - - /** - * Creates a DnsAddressResolverGroupNameResolverAdapter. - * @param resolverGroup the resolver group to use - * @param group the event loop group - */ - public DnsAddressResolverGroupNameResolverAdapter( - DnsAddressResolverGroup resolverGroup, EventLoopGroup group) { - super(ImmediateEventExecutor.INSTANCE); - this.resolverGroup = resolverGroup; - this.group = group; - } - - @Override - protected void doResolve(String inetHost, Promise promise) throws Exception { - EventExecutor executor = this.findExecutor(); - resolverGroup.getResolver(executor).resolve(InetSocketAddress.createUnresolved(inetHost, 17)) - .addListener((FutureListener) future -> { - if (future.isSuccess()) { - promise.trySuccess(future.getNow().getAddress()); - } else { - promise.tryFailure(future.cause()); - } - }); - } - - @Override - protected void doResolveAll(String inetHost, Promise> promise) - throws Exception { - EventExecutor executor = this.findExecutor(); - resolverGroup.getResolver(executor).resolveAll(InetSocketAddress.createUnresolved(inetHost, 17)) - .addListener((FutureListener>) future -> { - if (future.isSuccess()) { - List addresses = new ArrayList<>(future.getNow().size()); - for (InetSocketAddress address : future.getNow()) { - addresses.add(address.getAddress()); - } - promise.trySuccess(addresses); - } else { - promise.tryFailure(future.cause()); - } - }); - } - - private EventExecutor findExecutor() { - EventExecutor current = ThreadExecutorMap.currentExecutor(); - return current == null ? group.next() : current; - } -} diff --git a/proxy/src/main/java/com/velocitypowered/proxy/network/netty/SeparatePoolInetNameResolver.java b/proxy/src/main/java/com/velocitypowered/proxy/network/netty/SeparatePoolInetNameResolver.java new file mode 100644 index 000000000..8fef724af --- /dev/null +++ b/proxy/src/main/java/com/velocitypowered/proxy/network/netty/SeparatePoolInetNameResolver.java @@ -0,0 +1,79 @@ +package com.velocitypowered.proxy.network.netty; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.netty.resolver.AddressResolver; +import io.netty.resolver.AddressResolverGroup; +import io.netty.resolver.DefaultNameResolver; +import io.netty.resolver.InetNameResolver; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.Promise; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; + +public final class SeparatePoolInetNameResolver extends InetNameResolver { + + private final ExecutorService resolveExecutor; + private final InetNameResolver delegate; + private AddressResolverGroup resolverGroup; + + /** + * Creates a new instance of {@code SeparatePoolInetNameResolver}. + * + * @param executor the {@link EventExecutor} which is used to notify the listeners of the {@link + * Future} returned by {@link #resolve(String)} + */ + public SeparatePoolInetNameResolver(EventExecutor executor) { + super(executor); + this.resolveExecutor = Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder() + .setNameFormat("Velocity DNS Resolver") + .setDaemon(true) + .build()); + this.delegate = new DefaultNameResolver(executor); + } + + @Override + protected void doResolve(String inetHost, Promise promise) throws Exception { + try { + resolveExecutor.execute(() -> this.delegate.resolve(inetHost, promise)); + } catch (RejectedExecutionException e) { + promise.setFailure(e); + } + } + + @Override + protected void doResolveAll(String inetHost, Promise> promise) + throws Exception { + try { + resolveExecutor.execute(() -> this.delegate.resolveAll(inetHost, promise)); + } catch (RejectedExecutionException e) { + promise.setFailure(e); + } + } + + public void shutdown() { + this.resolveExecutor.shutdown(); + } + + /** + * Returns a view of this resolver as a AddressResolverGroup. + * + * @return a view of this resolver as a AddressResolverGroup + */ + public AddressResolverGroup asGroup() { + if (this.resolverGroup == null) { + this.resolverGroup = new AddressResolverGroup() { + @Override + protected AddressResolver newResolver(EventExecutor executor) { + return asAddressResolver(); + } + }; + } + return this.resolverGroup; + } +} diff --git a/proxy/src/main/java/com/velocitypowered/proxy/protocol/netty/MinecraftCipherDecoder.java b/proxy/src/main/java/com/velocitypowered/proxy/protocol/netty/MinecraftCipherDecoder.java index ad02191e7..3eca73b4b 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/protocol/netty/MinecraftCipherDecoder.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/protocol/netty/MinecraftCipherDecoder.java @@ -22,7 +22,6 @@ public class MinecraftCipherDecoder extends MessageToMessageDecoder { try { cipher.process(compatible); out.add(compatible); - in.skipBytes(in.readableBytes()); } catch (Exception e) { compatible.release(); // compatible will never be used if we throw an exception throw e; diff --git a/proxy/src/main/java/com/velocitypowered/proxy/protocol/netty/MinecraftCompressDecoder.java b/proxy/src/main/java/com/velocitypowered/proxy/protocol/netty/MinecraftCompressDecoder.java index ebf7f927e..5a0e0462d 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/protocol/netty/MinecraftCompressDecoder.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/protocol/netty/MinecraftCompressDecoder.java @@ -13,9 +13,13 @@ import java.util.List; public class MinecraftCompressDecoder extends MessageToMessageDecoder { - private static final int SOFT_MAXIMUM_UNCOMPRESSED_SIZE = 2 * 1024 * 1024; // 2MiB + private static final int VANILLA_MAXIMUM_UNCOMPRESSED_SIZE = 2 * 1024 * 1024; // 2MiB private static final int HARD_MAXIMUM_UNCOMPRESSED_SIZE = 16 * 1024 * 1024; // 16MiB + private static final int UNCOMPRESSED_CAP = + Boolean.getBoolean("velocity.increased-compression-cap") + ? HARD_MAXIMUM_UNCOMPRESSED_SIZE : VANILLA_MAXIMUM_UNCOMPRESSED_SIZE; + private final int threshold; private final VelocityCompressor compressor; @@ -28,21 +32,21 @@ public class MinecraftCompressDecoder extends MessageToMessageDecoder { protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { int claimedUncompressedSize = ProtocolUtils.readVarInt(in); if (claimedUncompressedSize == 0) { - // Strip the now-useless uncompressed size, this message is already uncompressed. + // This message is not compressed. out.add(in.retainedSlice()); - in.skipBytes(in.readableBytes()); return; } checkFrame(claimedUncompressedSize >= threshold, "Uncompressed size %s is less than" + " threshold %s", claimedUncompressedSize, threshold); - int allowedMax = Math.min(claimedUncompressedSize, HARD_MAXIMUM_UNCOMPRESSED_SIZE); - int initialCapacity = Math.min(claimedUncompressedSize, SOFT_MAXIMUM_UNCOMPRESSED_SIZE); + checkFrame(claimedUncompressedSize <= UNCOMPRESSED_CAP, + "Uncompressed size %s exceeds hard threshold of %s", claimedUncompressedSize, + UNCOMPRESSED_CAP); ByteBuf compatibleIn = ensureCompatible(ctx.alloc(), compressor, in); - ByteBuf uncompressed = preferredBuffer(ctx.alloc(), compressor, initialCapacity); + ByteBuf uncompressed = preferredBuffer(ctx.alloc(), compressor, claimedUncompressedSize); try { - compressor.inflate(compatibleIn, uncompressed, allowedMax); + compressor.inflate(compatibleIn, uncompressed, claimedUncompressedSize); out.add(uncompressed); } catch (Exception e) { uncompressed.release(); diff --git a/proxy/src/main/java/com/velocitypowered/proxy/protocol/netty/MinecraftCompressEncoder.java b/proxy/src/main/java/com/velocitypowered/proxy/protocol/netty/MinecraftCompressEncoder.java index 0c9105e92..c36bfd8a3 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/protocol/netty/MinecraftCompressEncoder.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/protocol/netty/MinecraftCompressEncoder.java @@ -38,8 +38,11 @@ public class MinecraftCompressEncoder extends MessageToByteEncoder { @Override protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, ByteBuf msg, boolean preferDirect) throws Exception { - int initialBufferSize = msg.readableBytes() <= threshold ? msg.readableBytes() + 1 : - msg.readableBytes() / 3; + // Follow the advice of https://github.com/ebiggers/libdeflate/blob/master/libdeflate.h#L103 + // here for compression. The maximum buffer size if the data compresses well (which is almost + // always the case) is one less the input buffer. + int offset = msg.readableBytes() < threshold ? 1 : -1; + int initialBufferSize = msg.readableBytes() + offset; return MoreByteBufUtils.preferredBuffer(ctx.alloc(), compressor, initialBufferSize); } diff --git a/proxy/src/main/java/com/velocitypowered/proxy/protocol/netty/MinecraftVarintFrameDecoder.java b/proxy/src/main/java/com/velocitypowered/proxy/protocol/netty/MinecraftVarintFrameDecoder.java index dd1fa5aa1..6bc3229cd 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/protocol/netty/MinecraftVarintFrameDecoder.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/protocol/netty/MinecraftVarintFrameDecoder.java @@ -1,43 +1,88 @@ package com.velocitypowered.proxy.protocol.netty; -import com.velocitypowered.proxy.protocol.ProtocolUtils; +import com.velocitypowered.proxy.util.except.QuietException; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; -import io.netty.handler.codec.CorruptedFrameException; +import io.netty.util.ByteProcessor; import java.util.List; public class MinecraftVarintFrameDecoder extends ByteToMessageDecoder { + private static final QuietException BAD_LENGTH_CACHED = new QuietException("Bad packet length"); + private static final QuietException VARINT_BIG_CACHED = new QuietException("VarInt too big"); + private final VarintByteDecoder reader = new VarintByteDecoder(); + @Override - protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { - if (!in.isReadable()) { + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) { + if (!ctx.channel().isActive()) { + in.skipBytes(in.readableBytes()); return; } - int origReaderIndex = in.readerIndex(); - for (int i = 0; i < 3; i++) { - if (!in.isReadable()) { - in.readerIndex(origReaderIndex); + while (in.isReadable()) { + reader.reset(); + + int varintEnd = in.forEachByte(reader); + if (varintEnd == -1) { + // We tried to go beyond the end of the buffer. This is probably a good sign that the + // buffer was too short to hold a proper varint. return; } - byte read = in.readByte(); - if (read >= 0) { - // Make sure reader index of length buffer is returned to the beginning - in.readerIndex(origReaderIndex); - int packetLength = ProtocolUtils.readVarInt(in); - - if (in.readableBytes() >= packetLength) { - out.add(in.readRetainedSlice(packetLength)); + if (reader.result == DecodeResult.SUCCESS) { + if (reader.readVarint < 0) { + throw BAD_LENGTH_CACHED; + } else if (reader.readVarint == 0) { + // skip over the empty packet and ignore it + in.readerIndex(varintEnd + 1); } else { - in.readerIndex(origReaderIndex); + int minimumRead = reader.bytesRead + reader.readVarint; + if (in.isReadable(minimumRead)) { + out.add(in.retainedSlice(varintEnd + 1, reader.readVarint)); + in.skipBytes(minimumRead); + } else { + return; + } } - - return; + } else if (reader.result == DecodeResult.TOO_BIG) { + throw VARINT_BIG_CACHED; + } else if (reader.result == DecodeResult.TOO_SHORT) { + // No-op: we couldn't get a useful result. + break; } } + } - throw new CorruptedFrameException("VarInt too big"); + private static class VarintByteDecoder implements ByteProcessor { + private int readVarint; + private int bytesRead; + private DecodeResult result = DecodeResult.TOO_SHORT; + + @Override + public boolean process(byte k) { + readVarint |= (k & 0x7F) << bytesRead++ * 7; + if (bytesRead > 3) { + result = DecodeResult.TOO_BIG; + return false; + } + if ((k & 0x80) != 128) { + result = DecodeResult.SUCCESS; + return false; + } + return true; + } + + void reset() { + readVarint = 0; + bytesRead = 0; + result = DecodeResult.TOO_SHORT; + } + } + + private enum DecodeResult { + SUCCESS, + TOO_SHORT, + TOO_BIG } }