diff --git a/api/src/main/java/com/velocitypowered/api/event/EventTask.java b/api/src/main/java/com/velocitypowered/api/event/EventTask.java index 37b3b22b9..312a703d4 100644 --- a/api/src/main/java/com/velocitypowered/api/event/EventTask.java +++ b/api/src/main/java/com/velocitypowered/api/event/EventTask.java @@ -75,7 +75,7 @@ public interface EventTask { } /** - * Creates an continuation based {@link EventTask} from the given {@link Consumer}. The task isn't + * Creates a continuation based {@link EventTask} from the given {@link Consumer}. The task isn't * guaranteed to be executed asynchronously ({@link #requiresAsync()} always returns * {@code false}). * @@ -99,8 +99,9 @@ public interface EventTask { } /** - * Creates an continuation based {@link EventTask} for the given {@link CompletableFuture}. The - * continuation will be notified once the given future is completed. + * Creates a continuation based {@link EventTask} for the given {@link CompletableFuture}. The + * continuation is resumed upon completion of the given {@code future}, whether it is completed + * successfully or not. * * @param future The task to wait for * @return The event task diff --git a/proxy/src/test/java/com/velocitypowered/proxy/event/EventTaskTest.java b/proxy/src/test/java/com/velocitypowered/proxy/event/EventTaskTest.java new file mode 100644 index 000000000..d6bfb8718 --- /dev/null +++ b/proxy/src/test/java/com/velocitypowered/proxy/event/EventTaskTest.java @@ -0,0 +1,118 @@ +package com.velocitypowered.proxy.event; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.velocitypowered.api.event.Continuation; +import com.velocitypowered.api.event.EventTask; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.function.Consumer; +import org.junit.jupiter.api.Test; + +public class EventTaskTest { + + @Test + public void testResumeWhenCompleteNormal() { + WitnessContinuation continuation = new WitnessContinuation(); + CompletableFuture completed = CompletableFuture.completedFuture(null); + EventTask.resumeWhenComplete(completed).execute(continuation); + assertTrue(continuation.completedSuccessfully(), "Completed future did not " + + "complete successfully"); + } + + @Test + public void testResumeWhenCompleteException() { + WitnessContinuation continuation = new WitnessContinuation(); + CompletableFuture failed = CompletableFuture.failedFuture(new Throwable()); + EventTask.resumeWhenComplete(failed).execute(continuation); + assertTrue(continuation.completedWithError(), "Failed future completed successfully"); + } + + @Test + public void testResumeWhenCompleteFromOtherThread() throws InterruptedException { + WitnessContinuation continuation = new WitnessContinuation(); + CountDownLatch latch = new CountDownLatch(1); + continuation.onComplete = (ignored) -> latch.countDown(); + CompletableFuture async = CompletableFuture.supplyAsync(() -> null); + EventTask.resumeWhenComplete(async).execute(continuation); + latch.await(); + + assertTrue(continuation.completedSuccessfully(), "Completed future did not " + + "complete successfully"); + } + + @Test + public void testResumeWhenFailFromOtherThread() throws InterruptedException { + WitnessContinuation continuation = new WitnessContinuation(); + CountDownLatch latch = new CountDownLatch(1); + continuation.onComplete = (ignored) -> latch.countDown(); + CompletableFuture async = CompletableFuture.supplyAsync(() -> { + throw new RuntimeException(); + }); + EventTask.resumeWhenComplete(async).execute(continuation); + latch.await(); + + assertTrue(continuation.completedWithError(), "Failed future completed successfully"); + } + + @Test + public void testResumeWhenFailFromOtherThreadComplexChain() throws InterruptedException { + WitnessContinuation continuation = new WitnessContinuation(); + CountDownLatch latch = new CountDownLatch(1); + continuation.onComplete = (ignored) -> latch.countDown(); + CompletableFuture async = CompletableFuture.supplyAsync(() -> null) + .thenAccept((v) -> { + throw new RuntimeException(); + }) + .thenCompose((v) -> CompletableFuture.completedFuture(null)); + EventTask.resumeWhenComplete(async).execute(continuation); + latch.await(); + + assertTrue(continuation.completedWithError(), "Failed future completed successfully"); + } + + /** + * An extremely simplified implementation of {@link Continuation} for verifying the completion + * of an operation. + */ + private static class WitnessContinuation implements Continuation { + + private static final AtomicIntegerFieldUpdater STATUS_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(WitnessContinuation.class, "status"); + + private static final int UNCOMPLETED = 0; + private static final int COMPLETED = 1; + private static final int COMPLETED_WITH_EXCEPTION = 2; + + private volatile int status = UNCOMPLETED; + private Consumer onComplete; + + @Override + public void resume() { + if (!STATUS_UPDATER.compareAndSet(this, UNCOMPLETED, COMPLETED)) { + throw new IllegalStateException("Continuation is already completed"); + } + + this.onComplete.accept(null); + } + + @Override + public void resumeWithException(Throwable exception) { + if (!STATUS_UPDATER.compareAndSet(this, UNCOMPLETED, COMPLETED_WITH_EXCEPTION)) { + throw new IllegalStateException("Continuation is already completed"); + } + + this.onComplete.accept(exception); + } + + public boolean completedSuccessfully() { + return STATUS_UPDATER.get(this) == COMPLETED; + } + + public boolean completedWithError() { + return STATUS_UPDATER.get(this) == COMPLETED_WITH_EXCEPTION; + } + } + +}