Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

8319447: Improve performance of delayed task handling #23702

Open
wants to merge 49 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
cb1aedf
In-progress snapshot
DougLea Jan 6, 2025
5c4ca21
Better conform to STPE
DougLea Jan 7, 2025
bb0e6a6
Refactorings
DougLea Jan 9, 2025
f683d7f
Use pendingRemval queue
DougLea Jan 9, 2025
3801ba0
Reduce unparks
DougLea Jan 10, 2025
d630b40
Better pending queues
DougLea Jan 16, 2025
0dd4ab7
Merge branch 'openjdk:master' into JDK-8319447
DougLea Jan 16, 2025
7e04af5
Conform to default STPE policies
DougLea Jan 19, 2025
7288e32
Comment out racy test
DougLea Jan 19, 2025
b14e31c
Merge branch 'openjdk:master' into JDK-8319447
DougLea Jan 19, 2025
cb202b6
Reduce memory contention
DougLea Jan 21, 2025
1f0a5cf
Use nanoTimeOrigin
DougLea Jan 21, 2025
97a2920
Reduce nanoTime usage; extend tck tests
DougLea Jan 23, 2025
f9aa135
Simplify scheduler state tracking
DougLea Jan 23, 2025
798fe64
Refactor delay scheduler pool submissions
DougLea Jan 24, 2025
d083e91
improve removal cost balance
DougLea Jan 25, 2025
1a7f77c
Ensure negative nanotime offset
DougLea Jan 26, 2025
f832335
Merge branch 'openjdk:master' into JDK-8319447
DougLea Feb 2, 2025
3da4fd7
Solidify design; add documentation
DougLea Feb 2, 2025
49b1699
Separate out DelayScheduler.java
DougLea Feb 3, 2025
229da14
Rework FJP-DS connections
DougLea Feb 4, 2025
9fad8d4
Deal with commonPool parallelism zero; use in other juc classes; remo…
DougLea Feb 7, 2025
2e60dc9
Refactor schedule methods
DougLea Feb 8, 2025
a0db427
Isolate screening
DougLea Feb 8, 2025
d0f4af1
Support STPE policy methods
DougLea Feb 9, 2025
a6290ab
Reduce memory accesses
DougLea Feb 9, 2025
c839299
Simplify policy methods; improve layout
DougLea Feb 10, 2025
f1394c4
Rename DelayedTask to ScheduledForkJoinTask; misc other improvements
DougLea Feb 12, 2025
0e13955
Better accommodate CompletableFuture; use 4-ary heap; add javadocs; o…
DougLea Feb 15, 2025
14a7a6f
Reduce garbage retention; use trailing padding; add tests
DougLea Feb 16, 2025
bd58f41
Merge branch 'openjdk:master' into JDK-8319447
DougLea Feb 16, 2025
93aac79
Merge remote-tracking branch 'refs/remotes/origin/JDK-8319447' into J…
DougLea Feb 16, 2025
753d0e0
Add optional SubmitWithTimeout action
DougLea Feb 17, 2025
53516e9
Misc minor improvements and renamings for clarity
DougLea Feb 19, 2025
16815cc
Address feedback
DougLea Feb 21, 2025
84eaab0
Address review comments; ensure new internal methods can't clash with…
DougLea Feb 22, 2025
c9bc41a
Standardize parameter checking
DougLea Feb 23, 2025
b40513f
Address review comments; reactivation tweak
DougLea Feb 28, 2025
0c5d22a
Reduce volatile reads
DougLea Mar 1, 2025
5c0355b
Associate probes with carriers if Virtual (no doc updates yet)
DougLea Mar 8, 2025
f670910
Merge branch 'openjdk:master' into JDK-8319447
DougLea Mar 8, 2025
6fe1a3b
Disambiguate caller-runs vs Interruptible
DougLea Mar 9, 2025
9cc670b
Use SharedSecrets for ThreadLocalRandomProbe; other tweaks
DougLea Mar 11, 2025
172a235
Reword javadoc
DougLea Mar 13, 2025
9b51b7a
Use TC_MASK in accord with https://bugs.openjdk.org/browse/JDK-833001…
DougLea Mar 14, 2025
24422e4
Match indent of naster changes
DougLea Mar 22, 2025
b552c22
Merge branch 'openjdk:master' into JDK-8319447
DougLea Mar 22, 2025
9cf0a75
Address review comments
DougLea Mar 25, 2025
4aabe6b
Merge branch 'openjdk:master' into JDK-8319447
DougLea Mar 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Rework FJP-DS connections
DougLea committed Feb 4, 2025
commit 229da14815fa011c9903e5b5fc184ed51d01dcd4
Original file line number Diff line number Diff line change
@@ -54,8 +54,10 @@ final class DelayScheduler extends Thread {
*
* To reduce memory contention, the heap is maintained solely via
* local variables in method loop() (forcing noticeable code
* sprawl), recording only the heap array to allow method
* canShutDown to conservatively check emptiness.
* sprawl), recording only the current heap size when blocked to
* allow method canShutDown to conservatively check emptiness, and
* to support an approximate reporting of current size for
* monitoring.
*
* The pending queue uses a design similar to ForkJoinTask.Aux
* queues: Incoming requests prepend (Treiber-stack-style) to the
@@ -104,7 +106,7 @@ final class DelayScheduler extends Thread {

private static final int INITIAL_HEAP_CAPACITY = 1 << 6;
private final ForkJoinPool pool; // read only once
private DelayedTask<?>[] heap; // written only when (re)allocated
int restingSize; // written only before parking
private volatile int active; // 0: inactive, -1: stopped, +1: running
@jdk.internal.vm.annotation.Contended()
private volatile DelayedTask<?> pending;
@@ -164,24 +166,28 @@ final void pend(DelayedTask<?> task) {
* Returns true if (momentarily) inactive and heap is empty
*/
final boolean canShutDown() {
DelayedTask<?>[] h;
return (active <= 0 &&
((h = heap) == null || h.length <= 0 || h[0] == null) &&
active <= 0);
return (active <= 0 && restingSize <= 0);
}

/**
* Setup and run scheduling loop
* Returns an approximate number of elements in heap
*/
final int approximateSize() {
return (active < 0) ? 0 : restingSize;
}

/**
* Sets up and runs scheduling loop
*/
public final void run() {
ForkJoinPool p;
ThreadLocalRandom.localInit();
if ((p = pool) != null) {
try {
loop(p);
} finally {
restingSize = 0;
active = -1;
ForkJoinPool.canTerminate(p);
ForkJoinPool.poolCanTerminate(p);
}
}
}
@@ -195,9 +201,9 @@ public final void run() {
* else park until next trigger time, or indefinitely if none
*/
private void loop(ForkJoinPool p) {
DelayedTask<?>[] h = new DelayedTask<?>[INITIAL_HEAP_CAPACITY];
heap = h;
p.onDelaySchedulerStart();
active = 1;
DelayedTask<?>[] h = new DelayedTask<?>[INITIAL_HEAP_CAPACITY];
boolean purgedPeriodic = false;
for (int n = 0;;) { // n is heap size
DelayedTask<?> t;
@@ -236,7 +242,7 @@ else if (t.status >= 0) {
}
t.heapIndex = k;
h[k] = t;
if (n >= cap && (nh = growHeap(h)) != null)
if (n >= cap && (nh = growHeap(h, cap)) != null)
cap = (h = nh).length;
}
}
@@ -261,13 +267,14 @@ else if (t.status >= 0) {
break;
}
f.heapIndex = -1;
if (stat >= 0 && p != null)
if (stat >= 0)
p.executeReadyDelayedTask(f);
}
} while ((n = replace(h, 0, n)) > 0);
}

if (pending == null) {
restingSize = n;
Thread.interrupted(); // clear before park
if (active == 0)
U.park(false, parkTime);
@@ -281,20 +288,16 @@ else if (t.status >= 0) {
* Tries to reallocate the heap array, returning existing
* array on failure.
*/
private DelayedTask<?>[] growHeap(DelayedTask<?>[] h) {
int cap, newCap;
if (h != null && (cap = h.length) < (newCap = cap << 1)) {
DelayedTask<?>[] a = null;
private DelayedTask<?>[] growHeap(DelayedTask<?>[] h, int cap) {
int newCap = cap << 1;
DelayedTask<?>[] nh = h;
if (h != null && h.length == cap && cap < newCap) {
try {
a = Arrays.copyOf(h, newCap);
nh = Arrays.copyOf(h, newCap);
} catch (Error | RuntimeException ex) {
}
if (a != null && a.length > cap) {
heap = h = a;
U.storeFence();
}
}
return h;
return nh;
}

/**
@@ -361,7 +364,7 @@ private int tryStop(ForkJoinPool p, DelayedTask<?>[] h,
}
}
}
if (n > 0 || !ForkJoinPool.canTerminate(p))
if (n > 0 || !ForkJoinPool.poolCanTerminate(p))
return n;
}
for (int i = 0; i < n; ++i) {
@@ -378,25 +381,6 @@ private int tryStop(ForkJoinPool p, DelayedTask<?>[] h,
return -1;
}

/**
* Returns an approximate count by finding the highest used
* heap array slot. This is very racy and not fast, but
* useful enough for monitoring purposes.
*/
final int approximateSize() {
DelayedTask<?>[] h;
int size = 0;
if (active >= 0 && (h = heap) != null) {
for (int i = h.length - 1; i >= 0; --i) {
if (h[i] != null) {
size = i + 1;
break;
}
}
}
return size;
}

/**
* Task class for DelayScheduler operations
*/
77 changes: 52 additions & 25 deletions src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
Original file line number Diff line number Diff line change
@@ -902,19 +902,21 @@ public class ForkJoinPool extends AbstractExecutorService
*
* This class supports ScheduledExecutorService methods by
* creating and starting a DelayScheduler on first use of these
* methods. The scheduler operates independently in its own
* thread, relaying tasks to the pool to execute as they become
* ready (see method executeReadyDelayedTask). The only other
* interactions with the delayScheduler are to control shutdown
* and maintain shutdown-related policies in methods quiescent()
* and tryTerminate(). In particular, to conform to policies,
* shutdown-related processing must deal with cases in which tasks
* are submitted before shutdown, but not ready until afterwards,
* in which case they must bypass some screening to be allowed to
* run. Conversely, the DelayScheduler interacts with the pool
* only to check runState status (via mehods poolIsStopping and
* poolIsShutdown) and complete termination (via canTerminate)
* that invoke corresponding private pool implementations.
* methods (via startDelayScheduler, with callback
* onDelaySchedulerStart). The scheduler operates independently in
* its own thread, relaying tasks to the pool to execute as they
* become ready (see method executeReadyDelayedTask). The only
* other interactions with the delayScheduler are to control
* shutdown and maintain shutdown-related policies in methods
* quiescent() and tryTerminate(). In particular, to conform to
* policies, shutdown-related processing must deal with cases in
* which tasks are submitted before shutdown, but not ready until
* afterwards, in which case they must bypass some screening to be
* allowed to run. Conversely, the DelayScheduler interacts with
* the pool only to check runState status (via mehods
* poolIsStopping and poolIsShutdown) and complete termination
* (via poolCanTerminate) that invoke corresponding private pool
* implementations.
*
* Memory placement
* ================
@@ -1720,17 +1722,6 @@ private long spinLockRunState() { // spin/sleep
}
}

// status methods used by other classes in this package
static boolean poolIsStopping(ForkJoinPool p) {
return p != null && (p.runState & STOP) != 0L;
}
static boolean poolIsShutdown(ForkJoinPool p) {
return p != null && (p.runState & SHUTDOWN) != 0L;
}
static boolean canTerminate(ForkJoinPool p) { // true if terminated
return p != null && (p.tryTerminate(false, false) & STOP) != 0L;
}

// Creating, registering, and deregistering workers

/**
@@ -3415,6 +3406,17 @@ public <T> T invokeAny(Collection<? extends Callable<T>> tasks,

// Support for delayed tasks

// status methods used by Delayscheduler and other classes in this package
static boolean poolIsStopping(ForkJoinPool p) {
return p != null && (p.runState & STOP) != 0L;
}
static boolean poolIsShutdown(ForkJoinPool p) {
return p != null && (p.runState & SHUTDOWN) != 0L;
}
static boolean poolCanTerminate(ForkJoinPool p) { // true if terminated
return p != null && (p.tryTerminate(false, false) & STOP) != 0L;
}

/**
* Creates and starts DelayScheduler unless pool is in shutdown mode
*/
@@ -3443,6 +3445,15 @@ private DelayScheduler startDelayScheduler() {
return ds;
}

/**
* Callback upon starting DelayScheduler
*/
final void onDelaySchedulerStart() {
WorkQueue q; // set up default submission queue
if ((q = submissionQueue(0, false)) != null)
q.unlockPhase();
}

/**
* Arranges execution of a ready task from DelayScheduler
*/
@@ -3753,6 +3764,19 @@ public int getQueuedSubmissionCount() {
return count;
}

/**
* Returns an estimate of the number of delayed (including
* periodic) tasks scheduled in this pool that are not yet ready
* to submit for execution. The returned value is innacurate when
* delayed tasks are being processed.
*
* @return an estimate of the number of delayed tasks
*/
public int getDelayedTaskCount() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DougLea It would seem more consistent to have this return a long like getQueuedTaskCount (even if that amount isn't currently possible to represent in the heap. Thoughts, @AlanBateman?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arguably better, but not sure it is worth regenerating diffs for? (An int was used for the same reason as in FJT.getQueueSize -- they need to be valid array bounds. Which might someday allow long, but if so many things would change.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally I think it's worth it. @AlanBateman, what do you think?

DelayScheduler ds;
return ((ds = delayScheduler) == null ? 0 : ds.approximateSize());
}

/**
* Returns {@code true} if there are any tasks submitted to this
* pool that have not yet begun executing.
@@ -3816,6 +3840,7 @@ protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
*/
public String toString() {
// Use a single pass through queues to collect counts
DelayScheduler ds;
long e = runState;
long st = stealCount;
long qt = 0L, ss = 0L; int rc = 0;
@@ -3835,7 +3860,8 @@ public String toString() {
}
}
}

String delayed = ((ds = delayScheduler) == null ? "" :
", delayed = " + ds.approximateSize());
int pc = parallelism;
long c = ctl;
int tc = (short)(c >>> TC_SHIFT);
@@ -3855,6 +3881,7 @@ public String toString() {
", steals = " + st +
", tasks = " + qt +
", submissions = " + ss +
delayed +
"]";
}