From 30f9955e2a9b88365d14dddabc0ce1d4ce856b14 Mon Sep 17 00:00:00 2001 From: Spottedleaf Date: Sun, 15 Sep 2019 21:02:13 -0700 Subject: [PATCH] Fix race conditions in flush allowing for previously scheduled tasks to execute later than the flush call (#2548) --- ...10-Asynchronous-chunk-IO-and-loading.patch | 127 ++++++++++-------- 1 file changed, 71 insertions(+), 56 deletions(-) diff --git a/Spigot-Server-Patches/0410-Asynchronous-chunk-IO-and-loading.patch b/Spigot-Server-Patches/0410-Asynchronous-chunk-IO-and-loading.patch index 8e5b3ac137..c10c288768 100644 --- a/Spigot-Server-Patches/0410-Asynchronous-chunk-IO-and-loading.patch +++ b/Spigot-Server-Patches/0410-Asynchronous-chunk-IO-and-loading.patch @@ -1,4 +1,4 @@ -From 78fb1395fde968993c5cc8d1f76ed4989fde3b7f Mon Sep 17 00:00:00 2001 +From b0905f4b72eaba949fcedd2c1fdbbb90d24aced4 Mon Sep 17 00:00:00 2001 From: Spottedleaf Date: Sat, 13 Jul 2019 09:23:10 -0700 Subject: [PATCH] Asynchronous chunk IO and loading @@ -1053,10 +1053,10 @@ index 000000000..4f10a8311 +} diff --git a/src/main/java/com/destroystokyo/paper/io/PrioritizedTaskQueue.java b/src/main/java/com/destroystokyo/paper/io/PrioritizedTaskQueue.java new file mode 100644 -index 000000000..c3ca3c4a1 +index 000000000..78bd238f4 --- /dev/null +++ b/src/main/java/com/destroystokyo/paper/io/PrioritizedTaskQueue.java -@@ -0,0 +1,258 @@ +@@ -0,0 +1,276 @@ +package com.destroystokyo.paper.io; + +import java.util.concurrent.ConcurrentLinkedQueue; @@ -1160,6 +1160,24 @@ index 000000000..c3ca3c4a1 + } + + /** ++ * Returns whether this queue may have tasks queued. ++ *

++ * This operation is not atomic, but is MT-Safe. ++ *

++ * @return {@code true} if tasks may be queued, {@code false} otherwise ++ */ ++ public boolean hasTasks() { ++ for (int i = 0; i < TOTAL_PRIORITIES; ++i) { ++ final ConcurrentLinkedQueue queue = this.queues[i]; ++ ++ if (queue.peek() != null) { ++ return true; ++ } ++ } ++ return false; ++ } ++ ++ /** + * Prevent further additions to this queue. Attempts to add after this call has completed (potentially during) will + * result in {@link IllegalStateException} being thrown. + *

@@ -1317,10 +1335,10 @@ index 000000000..c3ca3c4a1 +} diff --git a/src/main/java/com/destroystokyo/paper/io/QueueExecutorThread.java b/src/main/java/com/destroystokyo/paper/io/QueueExecutorThread.java new file mode 100644 -index 000000000..f127ef236 +index 000000000..ee906b594 --- /dev/null +++ b/src/main/java/com/destroystokyo/paper/io/QueueExecutorThread.java -@@ -0,0 +1,244 @@ +@@ -0,0 +1,241 @@ +package com.destroystokyo.paper.io; + +import net.minecraft.server.MinecraftServer; @@ -1342,11 +1360,7 @@ index 000000000..f127ef236 + protected final AtomicBoolean parked = new AtomicBoolean(); + + protected volatile ConcurrentLinkedQueue flushQueue = new ConcurrentLinkedQueue<>(); -+ -+ // this is required to synchronize LockSupport#park() -+ // LockSupport explicitly states that it will only follow ordering with respect to volatile access -+ // see flush() for more details -+ protected volatile long flushCounter; ++ protected volatile long flushCycles; + + public QueueExecutorThread(final PrioritizedTaskQueue queue) { + this(queue, (int)(1.e6)); // 1.0ms @@ -1392,20 +1406,14 @@ index 000000000..f127ef236 + } + + this.parked.set(true); ++ + // We need to parse here to avoid a race condition where a thread queues a task before we set parked to true + // (i.e it will not notify us) -+ -+ // it also resolves race condition where we've overriden a concurrent thread's flush call which set parked to false -+ // the important ordering: (volatile guarantees we cannot re-order the below events) -+ // us: parked -> true, parse tasks -> writeCounter + 1 -> drain flush queue -+ // them: read write counter -> add to flush queue -> write parked to false -> park loop -+ -+ // if we overwrite their set parked to false call then they're in the park loop or about to be, and we're about to -+ // drain the flush queue + if (this.pollTasks(true)) { + this.parked.set(false); + continue; + } ++ + if (this.handleClose()) { + return; + } @@ -1452,19 +1460,22 @@ index 000000000..f127ef236 + } + + protected void handleFlushThreads(final boolean shutdown) { -+ final ConcurrentLinkedQueue flushQueue = this.flushQueue; // Note: this can be a plain read ++ Thread parking; ++ ConcurrentLinkedQueue flushQueue = this.flushQueue; ++ do { ++ ++flushCycles; // may be plain read opaque write ++ while ((parking = flushQueue.poll()) != null) { ++ LockSupport.unpark(parking); ++ } ++ } while (this.pollTasks(false)); ++ + if (shutdown) { -+ this.flushQueue = null; // Note: this can be a release write -+ } ++ this.flushQueue = null; + -+ Thread current; -+ -+ while ((current = flushQueue.poll()) != null) { -+ this.pollTasks(false); -+ // increment flush counter so threads will wake up after being unparked() -+ //noinspection NonAtomicOperationOnVolatileField -+ ++this.flushCounter; // may be plain read plain write if we order before poll() (also would need to re-order pollTasks) -+ LockSupport.unpark(current); ++ // defend against a race condition where a flush thread double-checks right before we set to null ++ while ((parking = flushQueue.poll()) != null) { ++ LockSupport.unpark(parking); ++ } + } + } + @@ -1485,7 +1496,6 @@ index 000000000..f127ef236 + this.notifyTasks(); + } + -+ + /** + * Waits until this thread's queue is empty. + * @@ -1501,42 +1511,47 @@ index 000000000..f127ef236 + + // order is important + -+ long flushCounter = this.flushCounter; ++ int successes = 0; ++ long lastCycle = -1L; + -+ ConcurrentLinkedQueue flushQueue = this.flushQueue; ++ do { ++ final ConcurrentLinkedQueue flushQueue = this.flushQueue; ++ if (flushQueue == null) { ++ return; ++ } + -+ // it's important to read the flush queue after the flush counter to ensure that if we proceed from here -+ // we have a flush counter that would be different from the final flush counter if the queue executor shuts down -+ // the double read of the flush queue is not enough to account for this since -+ if (flushQueue == null) { -+ return; // queue executor has received shutdown and emptied queue -+ } ++ flushQueue.add(currentThread); + -+ flushQueue.add(currentThread); ++ // double check flush queue ++ if (this.flushQueue == null) { ++ return; ++ } + -+ // re-check null flush queue, we need to guarantee the executor is not shutting down before parking ++ final long currentCycle = this.flushCycles; // may be opaque read + -+ if (this.flushQueue == null) { -+ // cannot guarantee state of flush queue now, the executor is done though -+ return; -+ } ++ if (currentCycle == lastCycle) { ++ Thread.yield(); ++ continue; ++ } + -+ // force a response from the IO thread, we're not sure of its state currently -+ this.parked.set(false); -+ LockSupport.unpark(this); ++ // force response ++ this.parked.set(false); ++ LockSupport.unpark(this); + -+ // Note: see the run() function for handling of a race condition where the queue executor overwrites our parked write ++ LockSupport.park("flushing queue executor thread"); + -+ boolean interrupted = false; // preserve interrupted status ++ // returns whether there are tasks queued, does not return whether there are tasks executing ++ // this is why we cycle twice twice through flush (we know a pollTask call is made after a flush cycle) ++ // we really only need to guarantee that the tasks this thread has queued has gone through, and can leave ++ // tasks queued concurrently that are unsychronized with this thread as undefined behavior ++ if (this.queue.hasTasks()) { ++ successes = 0; ++ } else { ++ ++successes; ++ } + -+ while (this.flushCounter == flushCounter) { -+ interrupted |= Thread.interrupted(); -+ LockSupport.park(); -+ } ++ } while (successes != 2); + -+ if (interrupted) { -+ Thread.currentThread().interrupt(); -+ } + } + + /**