Mirror von
https://github.com/PaperMC/Velocity.git
synchronisiert 2024-11-16 21:10:30 +01:00
Merge pull request #404 from Cybermaxke/dev/event-improvements
Some more improvements to event tasks.
Dieser Commit ist enthalten in:
Commit
8543971a94
@ -16,6 +16,7 @@ import com.velocitypowered.api.event.Subscribe;
|
||||
import com.velocitypowered.api.plugin.PluginContainer;
|
||||
import com.velocitypowered.api.plugin.PluginManager;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.lang.invoke.VarHandle;
|
||||
import java.lang.reflect.Method;
|
||||
import java.lang.reflect.Modifier;
|
||||
import java.util.ArrayList;
|
||||
@ -32,7 +33,6 @@ import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.function.Predicate;
|
||||
@ -415,6 +415,112 @@ public class VelocityEventManager implements EventManager {
|
||||
}
|
||||
}
|
||||
|
||||
private static final int TASK_STATE_DEFAULT = 0;
|
||||
private static final int TASK_STATE_EXECUTING = 1;
|
||||
private static final int TASK_STATE_CONTINUE_IMMEDIATELY = 2;
|
||||
|
||||
private static final VarHandle CONTINUATION_TASK_RESUMED;
|
||||
private static final VarHandle CONTINUATION_TASK_STATE;
|
||||
|
||||
static {
|
||||
try {
|
||||
CONTINUATION_TASK_RESUMED = MethodHandles.lookup()
|
||||
.findVarHandle(ContinuationTask.class, "resumed", boolean.class);
|
||||
CONTINUATION_TASK_STATE = MethodHandles.lookup()
|
||||
.findVarHandle(ContinuationTask.class, "state", int.class);
|
||||
} catch (final ReflectiveOperationException e) {
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
}
|
||||
|
||||
final class ContinuationTask<E> implements Continuation, Runnable {
|
||||
|
||||
private final EventTask.WithContinuation task;
|
||||
private final int index;
|
||||
private final HandlerRegistration[] registrations;
|
||||
private final @Nullable CompletableFuture<E> future;
|
||||
private final boolean currentlyAsync;
|
||||
private final E event;
|
||||
|
||||
private volatile int state = TASK_STATE_DEFAULT;
|
||||
private volatile boolean resumed = false;
|
||||
|
||||
private ContinuationTask(
|
||||
final EventTask.WithContinuation task,
|
||||
final HandlerRegistration[] registrations,
|
||||
final @Nullable CompletableFuture<E> future,
|
||||
final E event,
|
||||
final int index,
|
||||
final boolean currentlyAsync) {
|
||||
this.task = task;
|
||||
this.registrations = registrations;
|
||||
this.future = future;
|
||||
this.event = event;
|
||||
this.index = index;
|
||||
this.currentlyAsync = currentlyAsync;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
if (execute()) {
|
||||
fire(future, event, index + 1, currentlyAsync, registrations);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes the task and returns whether the next one should be executed
|
||||
* immediately after this one without scheduling.
|
||||
*/
|
||||
boolean execute() {
|
||||
state = TASK_STATE_EXECUTING;
|
||||
try {
|
||||
task.run(this);
|
||||
} catch (final Throwable t) {
|
||||
// validateOnlyOnce false here so don't get an exception if the
|
||||
// continuation was resumed before
|
||||
resume(t, false);
|
||||
}
|
||||
return !CONTINUATION_TASK_STATE.compareAndSet(
|
||||
this, TASK_STATE_EXECUTING, TASK_STATE_DEFAULT);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resume() {
|
||||
resume(null, true);
|
||||
}
|
||||
|
||||
void resume(final @Nullable Throwable exception, final boolean validateOnlyOnce) {
|
||||
final boolean changed = CONTINUATION_TASK_RESUMED.compareAndSet(this, false, true);
|
||||
// Only allow the continuation to be resumed once
|
||||
if (!changed && validateOnlyOnce) {
|
||||
throw new IllegalStateException("The continuation can only be resumed once.");
|
||||
}
|
||||
final HandlerRegistration registration = registrations[index];
|
||||
if (exception != null) {
|
||||
logHandlerException(registration, exception);
|
||||
}
|
||||
if (!changed) {
|
||||
return;
|
||||
}
|
||||
if (index + 1 == registrations.length) {
|
||||
// Optimization: don't schedule a task just to complete the future
|
||||
if (future != null) {
|
||||
future.complete(event);
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (!CONTINUATION_TASK_STATE.compareAndSet(
|
||||
this, TASK_STATE_EXECUTING, TASK_STATE_CONTINUE_IMMEDIATELY)) {
|
||||
asyncExecutor.execute(() -> fire(future, event, index + 1, true, registrations));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resumeWithException(final Throwable exception) {
|
||||
resume(requireNonNull(exception, "exception"), true);
|
||||
}
|
||||
}
|
||||
|
||||
private <E> void fire(final @Nullable CompletableFuture<E> future, final E event,
|
||||
final int offset, final boolean currentlyAsync, final HandlerRegistration[] registrations) {
|
||||
for (int i = offset; i < registrations.length; i++) {
|
||||
@ -427,49 +533,14 @@ public class VelocityEventManager implements EventManager {
|
||||
if (eventTask instanceof EventTask.WithContinuation) {
|
||||
final EventTask.WithContinuation withContinuation =
|
||||
(EventTask.WithContinuation) eventTask;
|
||||
final int index = i;
|
||||
final Continuation continuation = new Continuation() {
|
||||
private final AtomicBoolean resumed = new AtomicBoolean();
|
||||
|
||||
@Override
|
||||
public void resume() {
|
||||
resume(null);
|
||||
}
|
||||
|
||||
void resume(final @Nullable Throwable exception) {
|
||||
// Only allow the continuation to be resumed once
|
||||
if (!resumed.compareAndSet(false, true)) {
|
||||
throw new IllegalStateException("The continuation can only be resumed once.");
|
||||
}
|
||||
if (exception != null) {
|
||||
logHandlerException(registration, exception);
|
||||
}
|
||||
if (index + 1 == registrations.length) {
|
||||
// Optimization: don't schedule a task just to complete the future
|
||||
if (future != null) {
|
||||
future.complete(event);
|
||||
}
|
||||
return;
|
||||
}
|
||||
asyncExecutor.execute(() -> fire(future, event, index + 1, true, registrations));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resumeWithException(final Throwable exception) {
|
||||
resume(requireNonNull(exception, "exception"));
|
||||
}
|
||||
};
|
||||
final Runnable task = () -> {
|
||||
try {
|
||||
withContinuation.run(continuation);
|
||||
} catch (final Throwable t) {
|
||||
continuation.resumeWithException(t);
|
||||
}
|
||||
};
|
||||
final ContinuationTask<E> continuationTask = new ContinuationTask<>(withContinuation,
|
||||
registrations, future, event, i, currentlyAsync);
|
||||
if (currentlyAsync || !eventTask.requiresAsync()) {
|
||||
task.run();
|
||||
if (continuationTask.execute()) {
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
asyncExecutor.execute(task);
|
||||
asyncExecutor.execute(continuationTask);
|
||||
}
|
||||
// fire will continue in another thread once the async task is
|
||||
// executed and the continuation is resumed
|
||||
|
@ -223,4 +223,38 @@ public class EventTest {
|
||||
threadC = Thread.currentThread();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void testResumeContinuationImmediately() throws Exception {
|
||||
final ResumeContinuationImmediatelyListener listener = new ResumeContinuationImmediatelyListener();
|
||||
handleMethodListener(listener);
|
||||
assertSyncThread(listener.threadA);
|
||||
assertSyncThread(listener.threadB);
|
||||
assertSyncThread(listener.threadC);
|
||||
assertEquals(2, listener.result);
|
||||
}
|
||||
|
||||
static final class ResumeContinuationImmediatelyListener {
|
||||
|
||||
Thread threadA;
|
||||
Thread threadB;
|
||||
Thread threadC;
|
||||
int result;
|
||||
|
||||
@Subscribe(order = PostOrder.EARLY)
|
||||
EventTask continuation(TestEvent event) {
|
||||
threadA = Thread.currentThread();
|
||||
return EventTask.withContinuation(continuation -> {
|
||||
threadB = Thread.currentThread();
|
||||
result++;
|
||||
continuation.resume();
|
||||
});
|
||||
}
|
||||
|
||||
@Subscribe(order = PostOrder.LATE)
|
||||
void afterContinuation(TestEvent event) {
|
||||
threadC = Thread.currentThread();
|
||||
result++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Laden…
In neuem Issue referenzieren
Einen Benutzer sperren