3
0
Mirror von https://github.com/PaperMC/Velocity.git synchronisiert 2024-11-16 21:10:30 +01:00

Merge remote-tracking branch 'origin/dev/2.0.0' into dev/2.0.0

Dieser Commit ist enthalten in:
Andrew Steinborn 2021-02-21 18:10:10 -05:00
Commit 28f12e2d7f
2 geänderte Dateien mit 147 neuen und 42 gelöschten Zeilen

Datei anzeigen

@ -16,6 +16,7 @@ import com.velocitypowered.api.event.Subscribe;
import com.velocitypowered.api.plugin.PluginContainer;
import com.velocitypowered.api.plugin.PluginManager;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
@ -32,7 +33,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Predicate;
@ -415,6 +415,112 @@ public class VelocityEventManager implements EventManager {
}
}
private static final int TASK_STATE_DEFAULT = 0;
private static final int TASK_STATE_EXECUTING = 1;
private static final int TASK_STATE_CONTINUE_IMMEDIATELY = 2;
private static final VarHandle CONTINUATION_TASK_RESUMED;
private static final VarHandle CONTINUATION_TASK_STATE;
static {
try {
CONTINUATION_TASK_RESUMED = MethodHandles.lookup()
.findVarHandle(ContinuationTask.class, "resumed", boolean.class);
CONTINUATION_TASK_STATE = MethodHandles.lookup()
.findVarHandle(ContinuationTask.class, "state", int.class);
} catch (final ReflectiveOperationException e) {
throw new IllegalStateException();
}
}
final class ContinuationTask<E> implements Continuation, Runnable {
private final EventTask.WithContinuation task;
private final int index;
private final HandlerRegistration[] registrations;
private final @Nullable CompletableFuture<E> future;
private final boolean currentlyAsync;
private final E event;
private volatile int state = TASK_STATE_DEFAULT;
private volatile boolean resumed = false;
private ContinuationTask(
final EventTask.WithContinuation task,
final HandlerRegistration[] registrations,
final @Nullable CompletableFuture<E> future,
final E event,
final int index,
final boolean currentlyAsync) {
this.task = task;
this.registrations = registrations;
this.future = future;
this.event = event;
this.index = index;
this.currentlyAsync = currentlyAsync;
}
@Override
public void run() {
if (execute()) {
fire(future, event, index + 1, currentlyAsync, registrations);
}
}
/**
* Executes the task and returns whether the next one should be executed
* immediately after this one without scheduling.
*/
boolean execute() {
state = TASK_STATE_EXECUTING;
try {
task.run(this);
} catch (final Throwable t) {
// validateOnlyOnce false here so don't get an exception if the
// continuation was resumed before
resume(t, false);
}
return !CONTINUATION_TASK_STATE.compareAndSet(
this, TASK_STATE_EXECUTING, TASK_STATE_DEFAULT);
}
@Override
public void resume() {
resume(null, true);
}
void resume(final @Nullable Throwable exception, final boolean validateOnlyOnce) {
final boolean changed = CONTINUATION_TASK_RESUMED.compareAndSet(this, false, true);
// Only allow the continuation to be resumed once
if (!changed && validateOnlyOnce) {
throw new IllegalStateException("The continuation can only be resumed once.");
}
final HandlerRegistration registration = registrations[index];
if (exception != null) {
logHandlerException(registration, exception);
}
if (!changed) {
return;
}
if (index + 1 == registrations.length) {
// Optimization: don't schedule a task just to complete the future
if (future != null) {
future.complete(event);
}
return;
}
if (!CONTINUATION_TASK_STATE.compareAndSet(
this, TASK_STATE_EXECUTING, TASK_STATE_CONTINUE_IMMEDIATELY)) {
asyncExecutor.execute(() -> fire(future, event, index + 1, true, registrations));
}
}
@Override
public void resumeWithException(final Throwable exception) {
resume(requireNonNull(exception, "exception"), true);
}
}
private <E> void fire(final @Nullable CompletableFuture<E> future, final E event,
final int offset, final boolean currentlyAsync, final HandlerRegistration[] registrations) {
for (int i = offset; i < registrations.length; i++) {
@ -427,49 +533,14 @@ public class VelocityEventManager implements EventManager {
if (eventTask instanceof EventTask.WithContinuation) {
final EventTask.WithContinuation withContinuation =
(EventTask.WithContinuation) eventTask;
final int index = i;
final Continuation continuation = new Continuation() {
private final AtomicBoolean resumed = new AtomicBoolean();
@Override
public void resume() {
resume(null);
}
void resume(final @Nullable Throwable exception) {
// Only allow the continuation to be resumed once
if (!resumed.compareAndSet(false, true)) {
throw new IllegalStateException("The continuation can only be resumed once.");
}
if (exception != null) {
logHandlerException(registration, exception);
}
if (index + 1 == registrations.length) {
// Optimization: don't schedule a task just to complete the future
if (future != null) {
future.complete(event);
}
return;
}
asyncExecutor.execute(() -> fire(future, event, index + 1, true, registrations));
}
@Override
public void resumeWithException(final Throwable exception) {
resume(requireNonNull(exception, "exception"));
}
};
final Runnable task = () -> {
try {
withContinuation.run(continuation);
} catch (final Throwable t) {
continuation.resumeWithException(t);
}
};
final ContinuationTask<E> continuationTask = new ContinuationTask<>(withContinuation,
registrations, future, event, i, currentlyAsync);
if (currentlyAsync || !eventTask.requiresAsync()) {
task.run();
if (continuationTask.execute()) {
continue;
}
} else {
asyncExecutor.execute(task);
asyncExecutor.execute(continuationTask);
}
// fire will continue in another thread once the async task is
// executed and the continuation is resumed

Datei anzeigen

@ -223,4 +223,38 @@ public class EventTest {
threadC = Thread.currentThread();
}
}
@Test
void testResumeContinuationImmediately() throws Exception {
final ResumeContinuationImmediatelyListener listener = new ResumeContinuationImmediatelyListener();
handleMethodListener(listener);
assertSyncThread(listener.threadA);
assertSyncThread(listener.threadB);
assertSyncThread(listener.threadC);
assertEquals(2, listener.result);
}
static final class ResumeContinuationImmediatelyListener {
Thread threadA;
Thread threadB;
Thread threadC;
int result;
@Subscribe(order = PostOrder.EARLY)
EventTask continuation(TestEvent event) {
threadA = Thread.currentThread();
return EventTask.withContinuation(continuation -> {
threadB = Thread.currentThread();
result++;
continuation.resume();
});
}
@Subscribe(order = PostOrder.LATE)
void afterContinuation(TestEvent event) {
threadC = Thread.currentThread();
result++;
}
}
}