diff options
author | Heather Miller <heather.miller@epfl.ch> | 2012-04-13 18:47:04 +0200 |
---|---|---|
committer | Heather Miller <heather.miller@epfl.ch> | 2012-04-13 18:47:04 +0200 |
commit | 412885a92e4621af33bbb2c265627f5854e30ba1 (patch) | |
tree | d4e9c35905b5fafb1fdf241eb43ddc935060ad45 | |
parent | 225d205f83ceb7fc6f0af005f0085bf7ab493b38 (diff) | |
download | scala-412885a92e4621af33bbb2c265627f5854e30ba1.tar.gz scala-412885a92e4621af33bbb2c265627f5854e30ba1.tar.bz2 scala-412885a92e4621af33bbb2c265627f5854e30ba1.zip |
Adds most recent iteration of the ForkJoinPool, and updates it to now run on JDK 1.6
-rw-r--r-- | src/forkjoin/scala/concurrent/forkjoin/ForkJoinPool.java | 414 | ||||
-rw-r--r-- | src/forkjoin/scala/concurrent/forkjoin/ForkJoinTask.java | 195 |
2 files changed, 321 insertions, 288 deletions
diff --git a/src/forkjoin/scala/concurrent/forkjoin/ForkJoinPool.java b/src/forkjoin/scala/concurrent/forkjoin/ForkJoinPool.java index e9389e9acb..4b5b3382f5 100644 --- a/src/forkjoin/scala/concurrent/forkjoin/ForkJoinPool.java +++ b/src/forkjoin/scala/concurrent/forkjoin/ForkJoinPool.java @@ -1,5 +1,4 @@ /* - * Written by Doug Lea with assistance from members of JCP JSR-166 * Expert Group and released to the public domain, as explained at * http://creativecommons.org/publicdomain/zero/1.0/ @@ -12,22 +11,18 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Random; -//import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.AbstractExecutorService; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; -//import java.util.concurrent.RunnableFuture; +import java.util.concurrent.RunnableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.concurrent.locks.Condition; -interface RunnableFuture<T> extends Runnable { - //TR placeholder for java.util.concurrent.RunnableFuture -} - /** * An {@link ExecutorService} for running {@link ForkJoinTask}s. * A {@code ForkJoinPool} provides the entry point for submissions @@ -127,7 +122,7 @@ interface RunnableFuture<T> extends Runnable { * @since 1.7 * @author Doug Lea */ -public class ForkJoinPool /*extends AbstractExecutorService*/ { +public class ForkJoinPool extends AbstractExecutorService { /* * Implementation Overview @@ -634,7 +629,7 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ { final ForkJoinPool pool; // the containing pool (may be null) final ForkJoinWorkerThread owner; // owning thread or null if shared volatile Thread parker; // == owner during call to park; else null - ForkJoinTask<?> currentJoin; // task being joined in awaitJoin + volatile ForkJoinTask<?> currentJoin; // task being joined in awaitJoin ForkJoinTask<?> currentSteal; // current non-local task being executed // Heuristic padding to ameliorate unfortunate memory placements Object p00, p01, p02, p03, p04, p05, p06, p07; @@ -726,12 +721,11 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ { * version of this method because it is never needed.) */ final ForkJoinTask<?> pop() { - ForkJoinTask<?> t; int m; - ForkJoinTask<?>[] a = array; - if (a != null && (m = a.length - 1) >= 0) { + ForkJoinTask<?>[] a; ForkJoinTask<?> t; int m; + if ((a = array) != null && (m = a.length - 1) >= 0) { for (int s; (s = top - 1) - base >= 0;) { - int j = ((m & s) << ASHIFT) + ABASE; - if ((t = (ForkJoinTask<?>)U.getObjectVolatile(a, j)) == null) + long j = ((m & s) << ASHIFT) + ABASE; + if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null) break; if (U.compareAndSwapObject(a, j, t, null)) { top = s; @@ -835,54 +829,6 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ { } /** - * If present, removes from queue and executes the given task, or - * any other cancelled task. Returns (true) immediately on any CAS - * or consistency check failure so caller can retry. - * - * @return false if no progress can be made - */ - final boolean tryRemoveAndExec(ForkJoinTask<?> task) { - boolean removed = false, empty = true, progress = true; - ForkJoinTask<?>[] a; int m, s, b, n; - if ((a = array) != null && (m = a.length - 1) >= 0 && - (n = (s = top) - (b = base)) > 0) { - for (ForkJoinTask<?> t;;) { // traverse from s to b - int j = ((--s & m) << ASHIFT) + ABASE; - t = (ForkJoinTask<?>)U.getObjectVolatile(a, j); - if (t == null) // inconsistent length - break; - else if (t == task) { - if (s + 1 == top) { // pop - if (!U.compareAndSwapObject(a, j, task, null)) - break; - top = s; - removed = true; - } - else if (base == b) // replace with proxy - removed = U.compareAndSwapObject(a, j, task, - new EmptyTask()); - break; - } - else if (t.status >= 0) - empty = false; - else if (s + 1 == top) { // pop and throw away - if (U.compareAndSwapObject(a, j, t, null)) - top = s; - break; - } - if (--n == 0) { - if (!empty && base == b) - progress = false; - break; - } - } - } - if (removed) - task.doExec(); - return progress; - } - - /** * Initializes or doubles the capacity of array. Call either * by owner or with lock held -- it is OK for base, but not * top, to move while resizings are in progress. @@ -944,69 +890,98 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ { // Execution methods /** - * Removes and runs tasks until empty, using local mode - * ordering. Normally called only after checking for apparent - * non-emptiness. + * Pops and runs tasks until empty. */ - final void runLocalTasks() { - // hoist checks from repeated pop/poll - ForkJoinTask<?>[] a; int m; - if ((a = array) != null && (m = a.length - 1) >= 0) { - if (mode == 0) { - for (int s; (s = top - 1) - base >= 0;) { - int j = ((m & s) << ASHIFT) + ABASE; - ForkJoinTask<?> t = - (ForkJoinTask<?>)U.getObjectVolatile(a, j); - if (t != null) { - if (U.compareAndSwapObject(a, j, t, null)) { - top = s; - t.doExec(); - } - } - else - break; - } + private void popAndExecAll() { + // A bit faster than repeated pop calls + ForkJoinTask<?>[] a; int m, s; long j; ForkJoinTask<?> t; + while ((a = array) != null && (m = a.length - 1) >= 0 && + (s = top - 1) - base >= 0 && + (t = ((ForkJoinTask<?>) + U.getObject(a, j = ((m & s) << ASHIFT) + ABASE))) + != null) { + if (U.compareAndSwapObject(a, j, t, null)) { + top = s; + t.doExec(); } - else { - for (int b; (b = base) - top < 0;) { - int j = ((m & b) << ASHIFT) + ABASE; - ForkJoinTask<?> t = - (ForkJoinTask<?>)U.getObjectVolatile(a, j); - if (t != null) { - if (base == b && - U.compareAndSwapObject(a, j, t, null)) { - base = b + 1; - t.doExec(); - } - } else if (base == b) { - if (b + 1 == top) + } + } + + /** + * Polls and runs tasks until empty. + */ + private void pollAndExecAll() { + for (ForkJoinTask<?> t; (t = poll()) != null;) + t.doExec(); + } + + /** + * If present, removes from queue and executes the given task, or + * any other cancelled task. Returns (true) immediately on any CAS + * or consistency check failure so caller can retry. + * + * @return 0 if no progress can be made, else positive + * (this unusual convention simplifies use with tryHelpStealer.) + */ + final int tryRemoveAndExec(ForkJoinTask<?> task) { + int stat = 1; + boolean removed = false, empty = true; + ForkJoinTask<?>[] a; int m, s, b, n; + if ((a = array) != null && (m = a.length - 1) >= 0 && + (n = (s = top) - (b = base)) > 0) { + for (ForkJoinTask<?> t;;) { // traverse from s to b + int j = ((--s & m) << ASHIFT) + ABASE; + t = (ForkJoinTask<?>)U.getObjectVolatile(a, j); + if (t == null) // inconsistent length + break; + else if (t == task) { + if (s + 1 == top) { // pop + if (!U.compareAndSwapObject(a, j, task, null)) break; - Thread.yield(); // wait for lagging update + top = s; + removed = true; } + else if (base == b) // replace with proxy + removed = U.compareAndSwapObject(a, j, task, + new EmptyTask()); + break; + } + else if (t.status >= 0) + empty = false; + else if (s + 1 == top) { // pop and throw away + if (U.compareAndSwapObject(a, j, t, null)) + top = s; + break; + } + if (--n == 0) { + if (!empty && base == b) + stat = 0; + break; } } } + if (removed) + task.doExec(); + return stat; } /** * Executes a top-level task and any local tasks remaining * after execution. - * - * @return true unless terminating */ - final boolean runTask(ForkJoinTask<?> t) { - boolean alive = true; + final void runTask(ForkJoinTask<?> t) { if (t != null) { currentSteal = t; t.doExec(); - if (top != base) // conservative guard - runLocalTasks(); + if (top != base) { // process remaining local tasks + if (mode == 0) + popAndExecAll(); + else + pollAndExecAll(); + } ++nsteals; currentSteal = null; } - else if (runState < 0) // terminating - alive = false; - return alive; } /** @@ -1072,7 +1047,6 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ { ASHIFT = 31 - Integer.numberOfLeadingZeros(s); } } - /** * Per-thread records for threads that submit to pools. Currently * holds only pseudo-random seed / index that is used to choose @@ -1165,7 +1139,7 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ { * traversal parameters at the expense of sometimes blocking when * we could be helping. */ - private static final int MAX_HELP = 32; + private static final int MAX_HELP = 64; /** * Secondary time-based bound (in nanosecs) for helping attempts @@ -1175,7 +1149,7 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ { * value should roughly approximate the time required to create * and/or activate a worker thread. */ - private static final long COMPENSATION_DELAY = 100L * 1000L; // 0.1 millisec + private static final long COMPENSATION_DELAY = 1L << 18; // ~0.25 millisec /** * Increment for seed generators. See class ThreadLocal for @@ -1326,22 +1300,28 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ { * * @param w the worker's queue */ + final void registerWorker(WorkQueue w) { Mutex lock = this.lock; lock.lock(); try { WorkQueue[] ws = workQueues; if (w != null && ws != null) { // skip on shutdown/failure - int rs, n; - while ((n = ws.length) < // ensure can hold total - (parallelism + (short)(ctl >>> TC_SHIFT) << 1)) - workQueues = ws = Arrays.copyOf(ws, n << 1); - int m = n - 1; + int rs, n = ws.length, m = n - 1; int s = nextSeed += SEED_INCREMENT; // rarely-colliding sequence w.seed = (s == 0) ? 1 : s; // ensure non-zero seed int r = (s << 1) | 1; // use odd-numbered indices - while (ws[r &= m] != null) // step by approx half size - r += ((n >>> 1) & SQMASK) + 2; + if (ws[r &= m] != null) { // collision + int probes = 0; // step by approx half size + int step = (n <= 4) ? 2 : ((n >>> 1) & SQMASK) + 2; + while (ws[r = (r + step) & m] != null) { + if (++probes >= n) { + workQueues = ws = Arrays.copyOf(ws, n <<= 1); + m = n - 1; + probes = 0; + } + } + } w.eventCount = w.poolIndex = r; // establish before recording ws[r] = w; // also update seq runState = ((rs = runState) & SHUTDOWN) | ((rs + 2) & ~SHUTDOWN); @@ -1488,7 +1468,6 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ { } } - // Scanning for tasks /** @@ -1496,7 +1475,7 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ { */ final void runWorker(WorkQueue w) { w.growArray(false); // initialize queue array in this thread - do {} while (w.runTask(scan(w))); + do { w.runTask(scan(w)); } while (w.runState >= 0); } /** @@ -1559,8 +1538,7 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ { q.base = b + 1; // specialization of pollAt return t; } - else if ((t != null || b + 1 != q.top) && - (ec < 0 || j <= m)) { + else if (ec < 0 || j <= m) { rs = 0; // mark scan as imcomplete break; // caller can retry after release } @@ -1568,6 +1546,7 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ { if (--j < 0) break; } + long c = ctl; int e = (int)c, a = (int)(c >> AC_SHIFT), nr, ns; if (e < 0) // decode ctl on empty scan w.runState = -1; // pool is terminating @@ -1635,7 +1614,7 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ { */ private void idleAwaitWork(WorkQueue w, long currentCtl, long prevCtl) { if (w.eventCount < 0 && !tryTerminate(false, false) && - (int)prevCtl != 0 && ctl == currentCtl) { + (int)prevCtl != 0 && !hasQueuedSubmissions() && ctl == currentCtl) { Thread wt = Thread.currentThread(); Thread.yield(); // yield before block while (ctl == currentCtl) { @@ -1670,70 +1649,79 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ { * leaves hints in workers to speed up subsequent calls. The * implementation is very branchy to cope with potential * inconsistencies or loops encountering chains that are stale, - * unknown, or so long that they are likely cyclic. All of these - * cases are dealt with by just retrying by caller. + * unknown, or so long that they are likely cyclic. * * @param joiner the joining worker * @param task the task to join - * @return true if found or ran a task (and so is immediately retryable) - */ - private boolean tryHelpStealer(WorkQueue joiner, ForkJoinTask<?> task) { - WorkQueue[] ws; - int m, depth = MAX_HELP; // remaining chain depth - boolean progress = false; - if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && - task.status >= 0) { - ForkJoinTask<?> subtask = task; // current target - outer: for (WorkQueue j = joiner;;) { - WorkQueue stealer = null; // find stealer of subtask - WorkQueue v = ws[j.stealHint & m]; // try hint - if (v != null && v.currentSteal == subtask) - stealer = v; - else { // scan - for (int i = 1; i <= m; i += 2) { - if ((v = ws[i]) != null && v.currentSteal == subtask && - v != joiner) { - stealer = v; - j.stealHint = i; // save hint - break; - } + * @return 0 if no progress can be made, negative if task + * known complete, else positive + */ + private int tryHelpStealer(WorkQueue joiner, ForkJoinTask<?> task) { + int stat = 0, steps = 0; // bound to avoid cycles + if (joiner != null && task != null) { // hoist null checks + restart: for (;;) { + ForkJoinTask<?> subtask = task; // current target + for (WorkQueue j = joiner, v;;) { // v is stealer of subtask + WorkQueue[] ws; int m, s, h; + if ((s = task.status) < 0) { + stat = s; + break restart; } - if (stealer == null) - break; - } - - for (WorkQueue q = stealer;;) { // try to help stealer - ForkJoinTask[] a; ForkJoinTask<?> t; int b; - if (task.status < 0) - break outer; - if ((b = q.base) - q.top < 0 && (a = q.array) != null) { - progress = true; - int i = (((a.length - 1) & b) << ASHIFT) + ABASE; - t = (ForkJoinTask<?>)U.getObjectVolatile(a, i); - if (subtask.status < 0) // must recheck before taking - break outer; - if (t != null && - q.base == b && - U.compareAndSwapObject(a, i, t, null)) { - q.base = b + 1; - joiner.runSubtask(t); + if ((ws = workQueues) == null || (m = ws.length - 1) <= 0) + break restart; // shutting down + if ((v = ws[h = (j.stealHint | 1) & m]) == null || + v.currentSteal != subtask) { + for (int origin = h;;) { // find stealer + if (((h = (h + 2) & m) & 15) == 1 && + (subtask.status < 0 || j.currentJoin != subtask)) + continue restart; // occasional staleness check + if ((v = ws[h]) != null && + v.currentSteal == subtask) { + j.stealHint = h; // save hint + break; + } + if (h == origin) + break restart; // cannot find stealer } - else if (q.base == b) - break outer; // possibly stalled } - else { // descend - ForkJoinTask<?> next = stealer.currentJoin; - if (--depth <= 0 || subtask.status < 0 || - next == null || next == subtask) - break outer; // stale, dead-end, or cyclic - subtask = next; - j = stealer; - break; + for (;;) { // help stealer or descend to its stealer + ForkJoinTask[] a; int b; + if (subtask.status < 0) // surround probes with + continue restart; // consistency checks + if ((b = v.base) - v.top < 0 && (a = v.array) != null) { + int i = (((a.length - 1) & b) << ASHIFT) + ABASE; + ForkJoinTask<?> t = + (ForkJoinTask<?>)U.getObjectVolatile(a, i); + if (subtask.status < 0 || j.currentJoin != subtask || + v.currentSteal != subtask) + continue restart; // stale + stat = 1; // apparent progress + if (t != null && v.base == b && + U.compareAndSwapObject(a, i, t, null)) { + v.base = b + 1; // help stealer + joiner.runSubtask(t); + } + else if (v.base == b && ++steps == MAX_HELP) + break restart; // v apparently stalled + } + else { // empty -- try to descend + ForkJoinTask<?> next = v.currentJoin; + if (subtask.status < 0 || j.currentJoin != subtask || + v.currentSteal != subtask) + continue restart; // stale + else if (next == null || ++steps == MAX_HELP) + break restart; // dead-end or maybe cyclic + else { + subtask = next; + j = v; + break; + } + } } } } } - return progress; + return stat; } /** @@ -1833,44 +1821,50 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ { * @return task status on exit */ final int awaitJoin(WorkQueue joiner, ForkJoinTask<?> task) { - ForkJoinTask<?> prevJoin = joiner.currentJoin; - joiner.currentJoin = task; - long startTime = 0L; - for (int k = 0, s; ; ++k) { - if ((joiner.isEmpty() ? // try to help - !tryHelpStealer(joiner, task) : - !joiner.tryRemoveAndExec(task))) { - if (k == 0) { - startTime = System.nanoTime(); - tryPollForAndExec(joiner, task); // check uncommon case - } - else if ((k & (MAX_HELP - 1)) == 0 && - System.nanoTime() - startTime >= COMPENSATION_DELAY && - tryCompensate(task, null)) { - if (task.trySetSignal() && task.status >= 0) { - synchronized (task) { - if (task.status >= 0) { - try { // see ForkJoinTask - task.wait(); // for explanation - } catch (InterruptedException ie) { + int s; + if ((s = task.status) >= 0) { + ForkJoinTask<?> prevJoin = joiner.currentJoin; + joiner.currentJoin = task; + long startTime = 0L; + for (int k = 0;;) { + if ((s = (joiner.isEmpty() ? // try to help + tryHelpStealer(joiner, task) : + joiner.tryRemoveAndExec(task))) == 0 && + (s = task.status) >= 0) { + if (k == 0) { + startTime = System.nanoTime(); + tryPollForAndExec(joiner, task); // check uncommon case + } + else if ((k & (MAX_HELP - 1)) == 0 && + System.nanoTime() - startTime >= + COMPENSATION_DELAY && + tryCompensate(task, null)) { + if (task.trySetSignal()) { + synchronized (task) { + if (task.status >= 0) { + try { // see ForkJoinTask + task.wait(); // for explanation + } catch (InterruptedException ie) { + } } + else + task.notifyAll(); } - else - task.notifyAll(); } + long c; // re-activate + do {} while (!U.compareAndSwapLong + (this, CTL, c = ctl, c + AC_UNIT)); } - long c; // re-activate - do {} while (!U.compareAndSwapLong - (this, CTL, c = ctl, c + AC_UNIT)); } + if (s < 0 || (s = task.status) < 0) { + joiner.currentJoin = prevJoin; + break; + } + else if ((k++ & (MAX_HELP - 1)) == MAX_HELP >>> 1) + Thread.yield(); // for politeness } - if ((s = task.status) < 0) { - joiner.currentJoin = prevJoin; - return s; - } - else if ((k & (MAX_HELP - 1)) == MAX_HELP >>> 1) - Thread.yield(); // for politeness } + return s; } /** @@ -1887,7 +1881,7 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ { while ((s = task.status) >= 0 && (joiner.isEmpty() ? tryHelpStealer(joiner, task) : - joiner.tryRemoveAndExec(task))) + joiner.tryRemoveAndExec(task)) != 0) ; return s; } @@ -1919,6 +1913,7 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ { } } + /** * Runs tasks until {@code isQuiescent()}. We piggyback on * active count ctl maintenance, but rather than blocking @@ -1927,8 +1922,9 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ { */ final void helpQuiescePool(WorkQueue w) { for (boolean active = true;;) { - if (w.base - w.top < 0) - w.runLocalTasks(); // exhaust local queue + ForkJoinTask<?> localTask; // exhaust local queue + while ((localTask = w.nextLocalTask()) != null) + localTask.doExec(); WorkQueue q = findNonEmptyStealQueue(w); if (q != null) { ForkJoinTask<?> t; int b; diff --git a/src/forkjoin/scala/concurrent/forkjoin/ForkJoinTask.java b/src/forkjoin/scala/concurrent/forkjoin/ForkJoinTask.java index 344f6887a6..2ba146c4da 100644 --- a/src/forkjoin/scala/concurrent/forkjoin/ForkJoinTask.java +++ b/src/forkjoin/scala/concurrent/forkjoin/ForkJoinTask.java @@ -16,7 +16,7 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; -//import java.util.concurrent.RunnableFuture; +import java.util.concurrent.RunnableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.ReentrantLock; @@ -115,18 +115,19 @@ import java.lang.reflect.Constructor; * <p>The ForkJoinTask class is not usually directly subclassed. * Instead, you subclass one of the abstract classes that support a * particular style of fork/join processing, typically {@link - * RecursiveAction} for computations that do not return results, or - * {@link RecursiveTask} for those that do. Normally, a concrete - * ForkJoinTask subclass declares fields comprising its parameters, - * established in a constructor, and then defines a {@code compute} - * method that somehow uses the control methods supplied by this base - * class. While these methods have {@code public} access (to allow - * instances of different task subclasses to call each other's - * methods), some of them may only be called from within other - * ForkJoinTasks (as may be determined using method {@link - * #inForkJoinPool}). Attempts to invoke them in other contexts - * result in exceptions or errors, possibly including - * {@code ClassCastException}. + * RecursiveAction} for most computations that do not return results, + * {@link RecursiveTask} for those that do, and {@link + * CountedCompleter} for those in which completed actions trigger + * other actions. Normally, a concrete ForkJoinTask subclass declares + * fields comprising its parameters, established in a constructor, and + * then defines a {@code compute} method that somehow uses the control + * methods supplied by this base class. While these methods have + * {@code public} access (to allow instances of different task + * subclasses to call each other's methods), some of them may only be + * called from within other ForkJoinTasks (as may be determined using + * method {@link #inForkJoinPool}). Attempts to invoke them in other + * contexts result in exceptions or errors, possibly including {@code + * ClassCastException}. * * <p>Method {@link #join} and its variants are appropriate for use * only when completion dependencies are acyclic; that is, the @@ -137,17 +138,17 @@ import java.lang.reflect.Constructor; * {@link Phaser}, {@link #helpQuiesce}, and {@link #complete}) that * may be of use in constructing custom subclasses for problems that * are not statically structured as DAGs. To support such usages a - * ForkJoinTask may be atomically <em>marked</em> using {@link - * #markForkJoinTask} and checked for marking using {@link - * #isMarkedForkJoinTask}. The ForkJoinTask implementation does not - * use these {@code protected} methods or marks for any purpose, but + * ForkJoinTask may be atomically <em>tagged</em> with a {@code + * short} value using {@link #setForkJoinTaskTag} or {@link + * #compareAndSetForkJoinTaskTag} and checked using {@link + * #getForkJoinTaskTag}. The ForkJoinTask implementation does not + * use these {@code protected} methods or tags for any purpose, but * they may be of use in the construction of specialized subclasses. * For example, parallel graph traversals can use the supplied methods * to avoid revisiting nodes/tasks that have already been processed. - * Also, completion based designs can use them to record that one - * subtask has completed. (Method names for marking are bulky in part - * to encourage definition of methods that reflect their usage - * patterns.) + * Also, completion based designs can use them to record that subtasks + * have completed. (Method names for tagging are bulky in part to + * encourage definition of methods that reflect their usage patterns.) * * <p>Most base support methods are {@code final}, to prevent * overriding of implementations that are intrinsically tied to the @@ -213,6 +214,10 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { * thin-lock techniques, so use some odd coding idioms that tend * to avoid them, mainly by arranging that every synchronized * block performs a wait, notifyAll or both. + * + * These control bits occupy only (some of) the upper half (16 + * bits) of status field. The lower bits are used for user-defined + * tags. */ /** The run status of this task */ @@ -221,13 +226,12 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { static final int NORMAL = 0xf0000000; // must be negative static final int CANCELLED = 0xc0000000; // must be < NORMAL static final int EXCEPTIONAL = 0x80000000; // must be < CANCELLED - static final int SIGNAL = 0x00000001; - static final int MARKED = 0x00000002; + static final int SIGNAL = 0x00010000; // must be >= 1 << 16 + static final int SMASK = 0x0000ffff; // short bits for tags /** * Marks completion and wakes up threads waiting to join this - * task. A specialization for NORMAL completion is in method - * doExec. + * task. * * @param completion one of NORMAL, CANCELLED, EXCEPTIONAL * @return completion status on exit @@ -237,7 +241,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { if ((s = status) < 0) return s; if (U.compareAndSwapInt(this, STATUS, s, s | completion)) { - if ((s & SIGNAL) != 0) + if ((s >>> 16) != 0) synchronized (this) { notifyAll(); } return completion; } @@ -259,26 +263,22 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { } catch (Throwable rex) { return setExceptionalCompletion(rex); } - while ((s = status) >= 0 && completed) { - if (U.compareAndSwapInt(this, STATUS, s, s | NORMAL)) { - if ((s & SIGNAL) != 0) - synchronized (this) { notifyAll(); } - return NORMAL; - } - } + if (completed) + s = setCompletion(NORMAL); } return s; } /** - * Tries to set SIGNAL status. Used by ForkJoinPool. Other - * variants are directly incorporated into externalAwaitDone etc. + * Tries to set SIGNAL status unless already completed. Used by + * ForkJoinPool. Other variants are directly incorporated into + * externalAwaitDone etc. * * @return true if successful */ final boolean trySetSignal() { - int s; - return U.compareAndSwapInt(this, STATUS, s = status, s | SIGNAL); + int s = status; + return s >= 0 && U.compareAndSwapInt(this, STATUS, s, s | SIGNAL); } /** @@ -328,7 +328,6 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { return s; } - /** * Implementation for join, get, quietlyJoin. Directly handles * only cases of already-completed, external wait, and @@ -417,25 +416,39 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { * @return status on exit */ private int setExceptionalCompletion(Throwable ex) { - int h = System.identityHashCode(this); - final ReentrantLock lock = exceptionTableLock; - lock.lock(); - try { - expungeStaleExceptions(); - ExceptionNode[] t = exceptionTable; - int i = h & (t.length - 1); - for (ExceptionNode e = t[i]; ; e = e.next) { - if (e == null) { - t[i] = new ExceptionNode(this, ex, t[i]); - break; + int s; + if ((s = status) >= 0) { + int h = System.identityHashCode(this); + final ReentrantLock lock = exceptionTableLock; + lock.lock(); + try { + expungeStaleExceptions(); + ExceptionNode[] t = exceptionTable; + int i = h & (t.length - 1); + for (ExceptionNode e = t[i]; ; e = e.next) { + if (e == null) { + t[i] = new ExceptionNode(this, ex, t[i]); + break; + } + if (e.get() == this) // already present + break; } - if (e.get() == this) // already present - break; + } finally { + lock.unlock(); } - } finally { - lock.unlock(); + s = setCompletion(EXCEPTIONAL); } - return setCompletion(EXCEPTIONAL); + ForkJoinTask<?> p = internalGetCompleter(); // propagate + if (p != null && p.status >= 0) + p.setExceptionalCompletion(ex); + return s; + } + + /** + * Exception propagation support for tasks with completers. + */ + ForkJoinTask<?> internalGetCompleter() { + return null; } /** @@ -517,7 +530,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { Throwable ex; if (e == null || (ex = e.ex) == null) return null; - if (e.thrower != Thread.currentThread().getId()) { + if (false && e.thrower != Thread.currentThread().getId()) { Class<? extends Throwable> ec = ex.getClass(); try { Constructor<?> noArgCtor = null; @@ -907,6 +920,18 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { } /** + * Completes this task normally without setting a value. The most + * recent value established by {@link #setRawResult} (or {@code + * null} by default) will be returned as the result of subsequent + * invocations of {@code join} and related operations. + * + * @since 1.8 + */ + public final void quietlyComplete() { + setCompletion(NORMAL); + } + + /** * Waits if necessary for the computation to complete, and then * retrieves its result. * @@ -1225,15 +1250,18 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { protected abstract void setRawResult(V value); /** - * Immediately performs the base action of this task. This method - * is designed to support extensions, and should not in general be - * called otherwise. The return value controls whether this task - * is considered to be done normally. It may return false in + * Immediately performs the base action of this task and returns + * true if, upon return from this method, this task is guaranteed + * to have completed normally. This method may return false + * otherwise, to indicate that this task is not necessarily + * complete (or is not known to be complete), for example in * asynchronous actions that require explicit invocations of - * {@link #complete} to become joinable. It may also throw an - * (unchecked) exception to indicate abnormal exit. + * completion methods. This method may also throw an (unchecked) + * exception to indicate abnormal exit. This method is designed to + * support extensions, and should not in general be called + * otherwise. * - * @return {@code true} if completed normally + * @return {@code true} if this task is known to have completed normally */ protected abstract boolean exec(); @@ -1302,44 +1330,53 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { return wt.pool.nextTaskFor(wt.workQueue); } - // Mark-bit operations + // tag operations /** - * Returns true if this task is marked. + * Returns the tag for this task. * - * @return true if this task is marked + * @return the tag for this task * @since 1.8 */ - public final boolean isMarkedForkJoinTask() { - return (status & MARKED) != 0; + public final short getForkJoinTaskTag() { + return (short)status; } /** - * Atomically sets the mark on this task. + * Atomically sets the tag value for this task. * - * @return true if this task was previously unmarked + * @param tag the tag value + * @return the previous value of the tag * @since 1.8 */ - public final boolean markForkJoinTask() { + public final short setForkJoinTaskTag(short tag) { for (int s;;) { - if (((s = status) & MARKED) != 0) - return false; - if (U.compareAndSwapInt(this, STATUS, s, s | MARKED)) - return true; + if (U.compareAndSwapInt(this, STATUS, s = status, + (s & ~SMASK) | (tag & SMASK))) + return (short)s; } } /** - * Atomically clears the mark on this task. + * Atomically conditionally sets the tag value for this task. + * Among other applications, tags can be used as visit markers + * in tasks operating on graphs, as in methods that check: {@code + * if (task.compareAndSetForkJoinTaskTag((short)0, (short)1))} + * before processing, otherwise exiting because the node has + * already been visited. * - * @return true if this task was previously marked + * @param e the expected tag value + * @param tag the new tag value + * @return true if successful; i.e., the current value was + * equal to e and is now tag. * @since 1.8 */ - public final boolean unmarkForkJoinTask() { + public final boolean compareAndSetForkJoinTaskTag(short e, short tag) { for (int s;;) { - if (((s = status) & MARKED) == 0) + if ((short)(s = status) != e) return false; - if (U.compareAndSwapInt(this, STATUS, s, s & ~MARKED)) + if (U.compareAndSwapInt(this, STATUS, s, + (s & ~SMASK) | (tag & SMASK))) return true; } } |