summaryrefslogtreecommitdiff
path: root/src/forkjoin
diff options
context:
space:
mode:
authorHeather Miller <heather.miller@epfl.ch>2012-04-13 18:47:04 +0200
committerHeather Miller <heather.miller@epfl.ch>2012-04-13 18:47:04 +0200
commit412885a92e4621af33bbb2c265627f5854e30ba1 (patch)
treed4e9c35905b5fafb1fdf241eb43ddc935060ad45 /src/forkjoin
parent225d205f83ceb7fc6f0af005f0085bf7ab493b38 (diff)
downloadscala-412885a92e4621af33bbb2c265627f5854e30ba1.tar.gz
scala-412885a92e4621af33bbb2c265627f5854e30ba1.tar.bz2
scala-412885a92e4621af33bbb2c265627f5854e30ba1.zip
Adds most recent iteration of the ForkJoinPool, and updates it to now run on JDK 1.6
Diffstat (limited to 'src/forkjoin')
-rw-r--r--src/forkjoin/scala/concurrent/forkjoin/ForkJoinPool.java414
-rw-r--r--src/forkjoin/scala/concurrent/forkjoin/ForkJoinTask.java195
2 files changed, 321 insertions, 288 deletions
diff --git a/src/forkjoin/scala/concurrent/forkjoin/ForkJoinPool.java b/src/forkjoin/scala/concurrent/forkjoin/ForkJoinPool.java
index e9389e9acb..4b5b3382f5 100644
--- a/src/forkjoin/scala/concurrent/forkjoin/ForkJoinPool.java
+++ b/src/forkjoin/scala/concurrent/forkjoin/ForkJoinPool.java
@@ -1,5 +1,4 @@
/*
-
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
@@ -12,22 +11,18 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Random;
-//import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
-//import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
-interface RunnableFuture<T> extends Runnable {
- //TR placeholder for java.util.concurrent.RunnableFuture
-}
-
/**
* An {@link ExecutorService} for running {@link ForkJoinTask}s.
* A {@code ForkJoinPool} provides the entry point for submissions
@@ -127,7 +122,7 @@ interface RunnableFuture<T> extends Runnable {
* @since 1.7
* @author Doug Lea
*/
-public class ForkJoinPool /*extends AbstractExecutorService*/ {
+public class ForkJoinPool extends AbstractExecutorService {
/*
* Implementation Overview
@@ -634,7 +629,7 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ {
final ForkJoinPool pool; // the containing pool (may be null)
final ForkJoinWorkerThread owner; // owning thread or null if shared
volatile Thread parker; // == owner during call to park; else null
- ForkJoinTask<?> currentJoin; // task being joined in awaitJoin
+ volatile ForkJoinTask<?> currentJoin; // task being joined in awaitJoin
ForkJoinTask<?> currentSteal; // current non-local task being executed
// Heuristic padding to ameliorate unfortunate memory placements
Object p00, p01, p02, p03, p04, p05, p06, p07;
@@ -726,12 +721,11 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ {
* version of this method because it is never needed.)
*/
final ForkJoinTask<?> pop() {
- ForkJoinTask<?> t; int m;
- ForkJoinTask<?>[] a = array;
- if (a != null && (m = a.length - 1) >= 0) {
+ ForkJoinTask<?>[] a; ForkJoinTask<?> t; int m;
+ if ((a = array) != null && (m = a.length - 1) >= 0) {
for (int s; (s = top - 1) - base >= 0;) {
- int j = ((m & s) << ASHIFT) + ABASE;
- if ((t = (ForkJoinTask<?>)U.getObjectVolatile(a, j)) == null)
+ long j = ((m & s) << ASHIFT) + ABASE;
+ if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)
break;
if (U.compareAndSwapObject(a, j, t, null)) {
top = s;
@@ -835,54 +829,6 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ {
}
/**
- * If present, removes from queue and executes the given task, or
- * any other cancelled task. Returns (true) immediately on any CAS
- * or consistency check failure so caller can retry.
- *
- * @return false if no progress can be made
- */
- final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
- boolean removed = false, empty = true, progress = true;
- ForkJoinTask<?>[] a; int m, s, b, n;
- if ((a = array) != null && (m = a.length - 1) >= 0 &&
- (n = (s = top) - (b = base)) > 0) {
- for (ForkJoinTask<?> t;;) { // traverse from s to b
- int j = ((--s & m) << ASHIFT) + ABASE;
- t = (ForkJoinTask<?>)U.getObjectVolatile(a, j);
- if (t == null) // inconsistent length
- break;
- else if (t == task) {
- if (s + 1 == top) { // pop
- if (!U.compareAndSwapObject(a, j, task, null))
- break;
- top = s;
- removed = true;
- }
- else if (base == b) // replace with proxy
- removed = U.compareAndSwapObject(a, j, task,
- new EmptyTask());
- break;
- }
- else if (t.status >= 0)
- empty = false;
- else if (s + 1 == top) { // pop and throw away
- if (U.compareAndSwapObject(a, j, t, null))
- top = s;
- break;
- }
- if (--n == 0) {
- if (!empty && base == b)
- progress = false;
- break;
- }
- }
- }
- if (removed)
- task.doExec();
- return progress;
- }
-
- /**
* Initializes or doubles the capacity of array. Call either
* by owner or with lock held -- it is OK for base, but not
* top, to move while resizings are in progress.
@@ -944,69 +890,98 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ {
// Execution methods
/**
- * Removes and runs tasks until empty, using local mode
- * ordering. Normally called only after checking for apparent
- * non-emptiness.
+ * Pops and runs tasks until empty.
*/
- final void runLocalTasks() {
- // hoist checks from repeated pop/poll
- ForkJoinTask<?>[] a; int m;
- if ((a = array) != null && (m = a.length - 1) >= 0) {
- if (mode == 0) {
- for (int s; (s = top - 1) - base >= 0;) {
- int j = ((m & s) << ASHIFT) + ABASE;
- ForkJoinTask<?> t =
- (ForkJoinTask<?>)U.getObjectVolatile(a, j);
- if (t != null) {
- if (U.compareAndSwapObject(a, j, t, null)) {
- top = s;
- t.doExec();
- }
- }
- else
- break;
- }
+ private void popAndExecAll() {
+ // A bit faster than repeated pop calls
+ ForkJoinTask<?>[] a; int m, s; long j; ForkJoinTask<?> t;
+ while ((a = array) != null && (m = a.length - 1) >= 0 &&
+ (s = top - 1) - base >= 0 &&
+ (t = ((ForkJoinTask<?>)
+ U.getObject(a, j = ((m & s) << ASHIFT) + ABASE)))
+ != null) {
+ if (U.compareAndSwapObject(a, j, t, null)) {
+ top = s;
+ t.doExec();
}
- else {
- for (int b; (b = base) - top < 0;) {
- int j = ((m & b) << ASHIFT) + ABASE;
- ForkJoinTask<?> t =
- (ForkJoinTask<?>)U.getObjectVolatile(a, j);
- if (t != null) {
- if (base == b &&
- U.compareAndSwapObject(a, j, t, null)) {
- base = b + 1;
- t.doExec();
- }
- } else if (base == b) {
- if (b + 1 == top)
+ }
+ }
+
+ /**
+ * Polls and runs tasks until empty.
+ */
+ private void pollAndExecAll() {
+ for (ForkJoinTask<?> t; (t = poll()) != null;)
+ t.doExec();
+ }
+
+ /**
+ * If present, removes from queue and executes the given task, or
+ * any other cancelled task. Returns (true) immediately on any CAS
+ * or consistency check failure so caller can retry.
+ *
+ * @return 0 if no progress can be made, else positive
+ * (this unusual convention simplifies use with tryHelpStealer.)
+ */
+ final int tryRemoveAndExec(ForkJoinTask<?> task) {
+ int stat = 1;
+ boolean removed = false, empty = true;
+ ForkJoinTask<?>[] a; int m, s, b, n;
+ if ((a = array) != null && (m = a.length - 1) >= 0 &&
+ (n = (s = top) - (b = base)) > 0) {
+ for (ForkJoinTask<?> t;;) { // traverse from s to b
+ int j = ((--s & m) << ASHIFT) + ABASE;
+ t = (ForkJoinTask<?>)U.getObjectVolatile(a, j);
+ if (t == null) // inconsistent length
+ break;
+ else if (t == task) {
+ if (s + 1 == top) { // pop
+ if (!U.compareAndSwapObject(a, j, task, null))
break;
- Thread.yield(); // wait for lagging update
+ top = s;
+ removed = true;
}
+ else if (base == b) // replace with proxy
+ removed = U.compareAndSwapObject(a, j, task,
+ new EmptyTask());
+ break;
+ }
+ else if (t.status >= 0)
+ empty = false;
+ else if (s + 1 == top) { // pop and throw away
+ if (U.compareAndSwapObject(a, j, t, null))
+ top = s;
+ break;
+ }
+ if (--n == 0) {
+ if (!empty && base == b)
+ stat = 0;
+ break;
}
}
}
+ if (removed)
+ task.doExec();
+ return stat;
}
/**
* Executes a top-level task and any local tasks remaining
* after execution.
- *
- * @return true unless terminating
*/
- final boolean runTask(ForkJoinTask<?> t) {
- boolean alive = true;
+ final void runTask(ForkJoinTask<?> t) {
if (t != null) {
currentSteal = t;
t.doExec();
- if (top != base) // conservative guard
- runLocalTasks();
+ if (top != base) { // process remaining local tasks
+ if (mode == 0)
+ popAndExecAll();
+ else
+ pollAndExecAll();
+ }
++nsteals;
currentSteal = null;
}
- else if (runState < 0) // terminating
- alive = false;
- return alive;
}
/**
@@ -1072,7 +1047,6 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ {
ASHIFT = 31 - Integer.numberOfLeadingZeros(s);
}
}
-
/**
* Per-thread records for threads that submit to pools. Currently
* holds only pseudo-random seed / index that is used to choose
@@ -1165,7 +1139,7 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ {
* traversal parameters at the expense of sometimes blocking when
* we could be helping.
*/
- private static final int MAX_HELP = 32;
+ private static final int MAX_HELP = 64;
/**
* Secondary time-based bound (in nanosecs) for helping attempts
@@ -1175,7 +1149,7 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ {
* value should roughly approximate the time required to create
* and/or activate a worker thread.
*/
- private static final long COMPENSATION_DELAY = 100L * 1000L; // 0.1 millisec
+ private static final long COMPENSATION_DELAY = 1L << 18; // ~0.25 millisec
/**
* Increment for seed generators. See class ThreadLocal for
@@ -1326,22 +1300,28 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ {
*
* @param w the worker's queue
*/
+
final void registerWorker(WorkQueue w) {
Mutex lock = this.lock;
lock.lock();
try {
WorkQueue[] ws = workQueues;
if (w != null && ws != null) { // skip on shutdown/failure
- int rs, n;
- while ((n = ws.length) < // ensure can hold total
- (parallelism + (short)(ctl >>> TC_SHIFT) << 1))
- workQueues = ws = Arrays.copyOf(ws, n << 1);
- int m = n - 1;
+ int rs, n = ws.length, m = n - 1;
int s = nextSeed += SEED_INCREMENT; // rarely-colliding sequence
w.seed = (s == 0) ? 1 : s; // ensure non-zero seed
int r = (s << 1) | 1; // use odd-numbered indices
- while (ws[r &= m] != null) // step by approx half size
- r += ((n >>> 1) & SQMASK) + 2;
+ if (ws[r &= m] != null) { // collision
+ int probes = 0; // step by approx half size
+ int step = (n <= 4) ? 2 : ((n >>> 1) & SQMASK) + 2;
+ while (ws[r = (r + step) & m] != null) {
+ if (++probes >= n) {
+ workQueues = ws = Arrays.copyOf(ws, n <<= 1);
+ m = n - 1;
+ probes = 0;
+ }
+ }
+ }
w.eventCount = w.poolIndex = r; // establish before recording
ws[r] = w; // also update seq
runState = ((rs = runState) & SHUTDOWN) | ((rs + 2) & ~SHUTDOWN);
@@ -1488,7 +1468,6 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ {
}
}
-
// Scanning for tasks
/**
@@ -1496,7 +1475,7 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ {
*/
final void runWorker(WorkQueue w) {
w.growArray(false); // initialize queue array in this thread
- do {} while (w.runTask(scan(w)));
+ do { w.runTask(scan(w)); } while (w.runState >= 0);
}
/**
@@ -1559,8 +1538,7 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ {
q.base = b + 1; // specialization of pollAt
return t;
}
- else if ((t != null || b + 1 != q.top) &&
- (ec < 0 || j <= m)) {
+ else if (ec < 0 || j <= m) {
rs = 0; // mark scan as imcomplete
break; // caller can retry after release
}
@@ -1568,6 +1546,7 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ {
if (--j < 0)
break;
}
+
long c = ctl; int e = (int)c, a = (int)(c >> AC_SHIFT), nr, ns;
if (e < 0) // decode ctl on empty scan
w.runState = -1; // pool is terminating
@@ -1635,7 +1614,7 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ {
*/
private void idleAwaitWork(WorkQueue w, long currentCtl, long prevCtl) {
if (w.eventCount < 0 && !tryTerminate(false, false) &&
- (int)prevCtl != 0 && ctl == currentCtl) {
+ (int)prevCtl != 0 && !hasQueuedSubmissions() && ctl == currentCtl) {
Thread wt = Thread.currentThread();
Thread.yield(); // yield before block
while (ctl == currentCtl) {
@@ -1670,70 +1649,79 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ {
* leaves hints in workers to speed up subsequent calls. The
* implementation is very branchy to cope with potential
* inconsistencies or loops encountering chains that are stale,
- * unknown, or so long that they are likely cyclic. All of these
- * cases are dealt with by just retrying by caller.
+ * unknown, or so long that they are likely cyclic.
*
* @param joiner the joining worker
* @param task the task to join
- * @return true if found or ran a task (and so is immediately retryable)
- */
- private boolean tryHelpStealer(WorkQueue joiner, ForkJoinTask<?> task) {
- WorkQueue[] ws;
- int m, depth = MAX_HELP; // remaining chain depth
- boolean progress = false;
- if ((ws = workQueues) != null && (m = ws.length - 1) > 0 &&
- task.status >= 0) {
- ForkJoinTask<?> subtask = task; // current target
- outer: for (WorkQueue j = joiner;;) {
- WorkQueue stealer = null; // find stealer of subtask
- WorkQueue v = ws[j.stealHint & m]; // try hint
- if (v != null && v.currentSteal == subtask)
- stealer = v;
- else { // scan
- for (int i = 1; i <= m; i += 2) {
- if ((v = ws[i]) != null && v.currentSteal == subtask &&
- v != joiner) {
- stealer = v;
- j.stealHint = i; // save hint
- break;
- }
+ * @return 0 if no progress can be made, negative if task
+ * known complete, else positive
+ */
+ private int tryHelpStealer(WorkQueue joiner, ForkJoinTask<?> task) {
+ int stat = 0, steps = 0; // bound to avoid cycles
+ if (joiner != null && task != null) { // hoist null checks
+ restart: for (;;) {
+ ForkJoinTask<?> subtask = task; // current target
+ for (WorkQueue j = joiner, v;;) { // v is stealer of subtask
+ WorkQueue[] ws; int m, s, h;
+ if ((s = task.status) < 0) {
+ stat = s;
+ break restart;
}
- if (stealer == null)
- break;
- }
-
- for (WorkQueue q = stealer;;) { // try to help stealer
- ForkJoinTask[] a; ForkJoinTask<?> t; int b;
- if (task.status < 0)
- break outer;
- if ((b = q.base) - q.top < 0 && (a = q.array) != null) {
- progress = true;
- int i = (((a.length - 1) & b) << ASHIFT) + ABASE;
- t = (ForkJoinTask<?>)U.getObjectVolatile(a, i);
- if (subtask.status < 0) // must recheck before taking
- break outer;
- if (t != null &&
- q.base == b &&
- U.compareAndSwapObject(a, i, t, null)) {
- q.base = b + 1;
- joiner.runSubtask(t);
+ if ((ws = workQueues) == null || (m = ws.length - 1) <= 0)
+ break restart; // shutting down
+ if ((v = ws[h = (j.stealHint | 1) & m]) == null ||
+ v.currentSteal != subtask) {
+ for (int origin = h;;) { // find stealer
+ if (((h = (h + 2) & m) & 15) == 1 &&
+ (subtask.status < 0 || j.currentJoin != subtask))
+ continue restart; // occasional staleness check
+ if ((v = ws[h]) != null &&
+ v.currentSteal == subtask) {
+ j.stealHint = h; // save hint
+ break;
+ }
+ if (h == origin)
+ break restart; // cannot find stealer
}
- else if (q.base == b)
- break outer; // possibly stalled
}
- else { // descend
- ForkJoinTask<?> next = stealer.currentJoin;
- if (--depth <= 0 || subtask.status < 0 ||
- next == null || next == subtask)
- break outer; // stale, dead-end, or cyclic
- subtask = next;
- j = stealer;
- break;
+ for (;;) { // help stealer or descend to its stealer
+ ForkJoinTask[] a; int b;
+ if (subtask.status < 0) // surround probes with
+ continue restart; // consistency checks
+ if ((b = v.base) - v.top < 0 && (a = v.array) != null) {
+ int i = (((a.length - 1) & b) << ASHIFT) + ABASE;
+ ForkJoinTask<?> t =
+ (ForkJoinTask<?>)U.getObjectVolatile(a, i);
+ if (subtask.status < 0 || j.currentJoin != subtask ||
+ v.currentSteal != subtask)
+ continue restart; // stale
+ stat = 1; // apparent progress
+ if (t != null && v.base == b &&
+ U.compareAndSwapObject(a, i, t, null)) {
+ v.base = b + 1; // help stealer
+ joiner.runSubtask(t);
+ }
+ else if (v.base == b && ++steps == MAX_HELP)
+ break restart; // v apparently stalled
+ }
+ else { // empty -- try to descend
+ ForkJoinTask<?> next = v.currentJoin;
+ if (subtask.status < 0 || j.currentJoin != subtask ||
+ v.currentSteal != subtask)
+ continue restart; // stale
+ else if (next == null || ++steps == MAX_HELP)
+ break restart; // dead-end or maybe cyclic
+ else {
+ subtask = next;
+ j = v;
+ break;
+ }
+ }
}
}
}
}
- return progress;
+ return stat;
}
/**
@@ -1833,44 +1821,50 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ {
* @return task status on exit
*/
final int awaitJoin(WorkQueue joiner, ForkJoinTask<?> task) {
- ForkJoinTask<?> prevJoin = joiner.currentJoin;
- joiner.currentJoin = task;
- long startTime = 0L;
- for (int k = 0, s; ; ++k) {
- if ((joiner.isEmpty() ? // try to help
- !tryHelpStealer(joiner, task) :
- !joiner.tryRemoveAndExec(task))) {
- if (k == 0) {
- startTime = System.nanoTime();
- tryPollForAndExec(joiner, task); // check uncommon case
- }
- else if ((k & (MAX_HELP - 1)) == 0 &&
- System.nanoTime() - startTime >= COMPENSATION_DELAY &&
- tryCompensate(task, null)) {
- if (task.trySetSignal() && task.status >= 0) {
- synchronized (task) {
- if (task.status >= 0) {
- try { // see ForkJoinTask
- task.wait(); // for explanation
- } catch (InterruptedException ie) {
+ int s;
+ if ((s = task.status) >= 0) {
+ ForkJoinTask<?> prevJoin = joiner.currentJoin;
+ joiner.currentJoin = task;
+ long startTime = 0L;
+ for (int k = 0;;) {
+ if ((s = (joiner.isEmpty() ? // try to help
+ tryHelpStealer(joiner, task) :
+ joiner.tryRemoveAndExec(task))) == 0 &&
+ (s = task.status) >= 0) {
+ if (k == 0) {
+ startTime = System.nanoTime();
+ tryPollForAndExec(joiner, task); // check uncommon case
+ }
+ else if ((k & (MAX_HELP - 1)) == 0 &&
+ System.nanoTime() - startTime >=
+ COMPENSATION_DELAY &&
+ tryCompensate(task, null)) {
+ if (task.trySetSignal()) {
+ synchronized (task) {
+ if (task.status >= 0) {
+ try { // see ForkJoinTask
+ task.wait(); // for explanation
+ } catch (InterruptedException ie) {
+ }
}
+ else
+ task.notifyAll();
}
- else
- task.notifyAll();
}
+ long c; // re-activate
+ do {} while (!U.compareAndSwapLong
+ (this, CTL, c = ctl, c + AC_UNIT));
}
- long c; // re-activate
- do {} while (!U.compareAndSwapLong
- (this, CTL, c = ctl, c + AC_UNIT));
}
+ if (s < 0 || (s = task.status) < 0) {
+ joiner.currentJoin = prevJoin;
+ break;
+ }
+ else if ((k++ & (MAX_HELP - 1)) == MAX_HELP >>> 1)
+ Thread.yield(); // for politeness
}
- if ((s = task.status) < 0) {
- joiner.currentJoin = prevJoin;
- return s;
- }
- else if ((k & (MAX_HELP - 1)) == MAX_HELP >>> 1)
- Thread.yield(); // for politeness
}
+ return s;
}
/**
@@ -1887,7 +1881,7 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ {
while ((s = task.status) >= 0 &&
(joiner.isEmpty() ?
tryHelpStealer(joiner, task) :
- joiner.tryRemoveAndExec(task)))
+ joiner.tryRemoveAndExec(task)) != 0)
;
return s;
}
@@ -1919,6 +1913,7 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ {
}
}
+
/**
* Runs tasks until {@code isQuiescent()}. We piggyback on
* active count ctl maintenance, but rather than blocking
@@ -1927,8 +1922,9 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ {
*/
final void helpQuiescePool(WorkQueue w) {
for (boolean active = true;;) {
- if (w.base - w.top < 0)
- w.runLocalTasks(); // exhaust local queue
+ ForkJoinTask<?> localTask; // exhaust local queue
+ while ((localTask = w.nextLocalTask()) != null)
+ localTask.doExec();
WorkQueue q = findNonEmptyStealQueue(w);
if (q != null) {
ForkJoinTask<?> t; int b;
diff --git a/src/forkjoin/scala/concurrent/forkjoin/ForkJoinTask.java b/src/forkjoin/scala/concurrent/forkjoin/ForkJoinTask.java
index 344f6887a6..2ba146c4da 100644
--- a/src/forkjoin/scala/concurrent/forkjoin/ForkJoinTask.java
+++ b/src/forkjoin/scala/concurrent/forkjoin/ForkJoinTask.java
@@ -16,7 +16,7 @@ import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
-//import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReentrantLock;
@@ -115,18 +115,19 @@ import java.lang.reflect.Constructor;
* <p>The ForkJoinTask class is not usually directly subclassed.
* Instead, you subclass one of the abstract classes that support a
* particular style of fork/join processing, typically {@link
- * RecursiveAction} for computations that do not return results, or
- * {@link RecursiveTask} for those that do. Normally, a concrete
- * ForkJoinTask subclass declares fields comprising its parameters,
- * established in a constructor, and then defines a {@code compute}
- * method that somehow uses the control methods supplied by this base
- * class. While these methods have {@code public} access (to allow
- * instances of different task subclasses to call each other's
- * methods), some of them may only be called from within other
- * ForkJoinTasks (as may be determined using method {@link
- * #inForkJoinPool}). Attempts to invoke them in other contexts
- * result in exceptions or errors, possibly including
- * {@code ClassCastException}.
+ * RecursiveAction} for most computations that do not return results,
+ * {@link RecursiveTask} for those that do, and {@link
+ * CountedCompleter} for those in which completed actions trigger
+ * other actions. Normally, a concrete ForkJoinTask subclass declares
+ * fields comprising its parameters, established in a constructor, and
+ * then defines a {@code compute} method that somehow uses the control
+ * methods supplied by this base class. While these methods have
+ * {@code public} access (to allow instances of different task
+ * subclasses to call each other's methods), some of them may only be
+ * called from within other ForkJoinTasks (as may be determined using
+ * method {@link #inForkJoinPool}). Attempts to invoke them in other
+ * contexts result in exceptions or errors, possibly including {@code
+ * ClassCastException}.
*
* <p>Method {@link #join} and its variants are appropriate for use
* only when completion dependencies are acyclic; that is, the
@@ -137,17 +138,17 @@ import java.lang.reflect.Constructor;
* {@link Phaser}, {@link #helpQuiesce}, and {@link #complete}) that
* may be of use in constructing custom subclasses for problems that
* are not statically structured as DAGs. To support such usages a
- * ForkJoinTask may be atomically <em>marked</em> using {@link
- * #markForkJoinTask} and checked for marking using {@link
- * #isMarkedForkJoinTask}. The ForkJoinTask implementation does not
- * use these {@code protected} methods or marks for any purpose, but
+ * ForkJoinTask may be atomically <em>tagged</em> with a {@code
+ * short} value using {@link #setForkJoinTaskTag} or {@link
+ * #compareAndSetForkJoinTaskTag} and checked using {@link
+ * #getForkJoinTaskTag}. The ForkJoinTask implementation does not
+ * use these {@code protected} methods or tags for any purpose, but
* they may be of use in the construction of specialized subclasses.
* For example, parallel graph traversals can use the supplied methods
* to avoid revisiting nodes/tasks that have already been processed.
- * Also, completion based designs can use them to record that one
- * subtask has completed. (Method names for marking are bulky in part
- * to encourage definition of methods that reflect their usage
- * patterns.)
+ * Also, completion based designs can use them to record that subtasks
+ * have completed. (Method names for tagging are bulky in part to
+ * encourage definition of methods that reflect their usage patterns.)
*
* <p>Most base support methods are {@code final}, to prevent
* overriding of implementations that are intrinsically tied to the
@@ -213,6 +214,10 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* thin-lock techniques, so use some odd coding idioms that tend
* to avoid them, mainly by arranging that every synchronized
* block performs a wait, notifyAll or both.
+ *
+ * These control bits occupy only (some of) the upper half (16
+ * bits) of status field. The lower bits are used for user-defined
+ * tags.
*/
/** The run status of this task */
@@ -221,13 +226,12 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
static final int NORMAL = 0xf0000000; // must be negative
static final int CANCELLED = 0xc0000000; // must be < NORMAL
static final int EXCEPTIONAL = 0x80000000; // must be < CANCELLED
- static final int SIGNAL = 0x00000001;
- static final int MARKED = 0x00000002;
+ static final int SIGNAL = 0x00010000; // must be >= 1 << 16
+ static final int SMASK = 0x0000ffff; // short bits for tags
/**
* Marks completion and wakes up threads waiting to join this
- * task. A specialization for NORMAL completion is in method
- * doExec.
+ * task.
*
* @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
* @return completion status on exit
@@ -237,7 +241,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
if ((s = status) < 0)
return s;
if (U.compareAndSwapInt(this, STATUS, s, s | completion)) {
- if ((s & SIGNAL) != 0)
+ if ((s >>> 16) != 0)
synchronized (this) { notifyAll(); }
return completion;
}
@@ -259,26 +263,22 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
} catch (Throwable rex) {
return setExceptionalCompletion(rex);
}
- while ((s = status) >= 0 && completed) {
- if (U.compareAndSwapInt(this, STATUS, s, s | NORMAL)) {
- if ((s & SIGNAL) != 0)
- synchronized (this) { notifyAll(); }
- return NORMAL;
- }
- }
+ if (completed)
+ s = setCompletion(NORMAL);
}
return s;
}
/**
- * Tries to set SIGNAL status. Used by ForkJoinPool. Other
- * variants are directly incorporated into externalAwaitDone etc.
+ * Tries to set SIGNAL status unless already completed. Used by
+ * ForkJoinPool. Other variants are directly incorporated into
+ * externalAwaitDone etc.
*
* @return true if successful
*/
final boolean trySetSignal() {
- int s;
- return U.compareAndSwapInt(this, STATUS, s = status, s | SIGNAL);
+ int s = status;
+ return s >= 0 && U.compareAndSwapInt(this, STATUS, s, s | SIGNAL);
}
/**
@@ -328,7 +328,6 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
return s;
}
-
/**
* Implementation for join, get, quietlyJoin. Directly handles
* only cases of already-completed, external wait, and
@@ -417,25 +416,39 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* @return status on exit
*/
private int setExceptionalCompletion(Throwable ex) {
- int h = System.identityHashCode(this);
- final ReentrantLock lock = exceptionTableLock;
- lock.lock();
- try {
- expungeStaleExceptions();
- ExceptionNode[] t = exceptionTable;
- int i = h & (t.length - 1);
- for (ExceptionNode e = t[i]; ; e = e.next) {
- if (e == null) {
- t[i] = new ExceptionNode(this, ex, t[i]);
- break;
+ int s;
+ if ((s = status) >= 0) {
+ int h = System.identityHashCode(this);
+ final ReentrantLock lock = exceptionTableLock;
+ lock.lock();
+ try {
+ expungeStaleExceptions();
+ ExceptionNode[] t = exceptionTable;
+ int i = h & (t.length - 1);
+ for (ExceptionNode e = t[i]; ; e = e.next) {
+ if (e == null) {
+ t[i] = new ExceptionNode(this, ex, t[i]);
+ break;
+ }
+ if (e.get() == this) // already present
+ break;
}
- if (e.get() == this) // already present
- break;
+ } finally {
+ lock.unlock();
}
- } finally {
- lock.unlock();
+ s = setCompletion(EXCEPTIONAL);
}
- return setCompletion(EXCEPTIONAL);
+ ForkJoinTask<?> p = internalGetCompleter(); // propagate
+ if (p != null && p.status >= 0)
+ p.setExceptionalCompletion(ex);
+ return s;
+ }
+
+ /**
+ * Exception propagation support for tasks with completers.
+ */
+ ForkJoinTask<?> internalGetCompleter() {
+ return null;
}
/**
@@ -517,7 +530,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
Throwable ex;
if (e == null || (ex = e.ex) == null)
return null;
- if (e.thrower != Thread.currentThread().getId()) {
+ if (false && e.thrower != Thread.currentThread().getId()) {
Class<? extends Throwable> ec = ex.getClass();
try {
Constructor<?> noArgCtor = null;
@@ -907,6 +920,18 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
}
/**
+ * Completes this task normally without setting a value. The most
+ * recent value established by {@link #setRawResult} (or {@code
+ * null} by default) will be returned as the result of subsequent
+ * invocations of {@code join} and related operations.
+ *
+ * @since 1.8
+ */
+ public final void quietlyComplete() {
+ setCompletion(NORMAL);
+ }
+
+ /**
* Waits if necessary for the computation to complete, and then
* retrieves its result.
*
@@ -1225,15 +1250,18 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
protected abstract void setRawResult(V value);
/**
- * Immediately performs the base action of this task. This method
- * is designed to support extensions, and should not in general be
- * called otherwise. The return value controls whether this task
- * is considered to be done normally. It may return false in
+ * Immediately performs the base action of this task and returns
+ * true if, upon return from this method, this task is guaranteed
+ * to have completed normally. This method may return false
+ * otherwise, to indicate that this task is not necessarily
+ * complete (or is not known to be complete), for example in
* asynchronous actions that require explicit invocations of
- * {@link #complete} to become joinable. It may also throw an
- * (unchecked) exception to indicate abnormal exit.
+ * completion methods. This method may also throw an (unchecked)
+ * exception to indicate abnormal exit. This method is designed to
+ * support extensions, and should not in general be called
+ * otherwise.
*
- * @return {@code true} if completed normally
+ * @return {@code true} if this task is known to have completed normally
*/
protected abstract boolean exec();
@@ -1302,44 +1330,53 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
return wt.pool.nextTaskFor(wt.workQueue);
}
- // Mark-bit operations
+ // tag operations
/**
- * Returns true if this task is marked.
+ * Returns the tag for this task.
*
- * @return true if this task is marked
+ * @return the tag for this task
* @since 1.8
*/
- public final boolean isMarkedForkJoinTask() {
- return (status & MARKED) != 0;
+ public final short getForkJoinTaskTag() {
+ return (short)status;
}
/**
- * Atomically sets the mark on this task.
+ * Atomically sets the tag value for this task.
*
- * @return true if this task was previously unmarked
+ * @param tag the tag value
+ * @return the previous value of the tag
* @since 1.8
*/
- public final boolean markForkJoinTask() {
+ public final short setForkJoinTaskTag(short tag) {
for (int s;;) {
- if (((s = status) & MARKED) != 0)
- return false;
- if (U.compareAndSwapInt(this, STATUS, s, s | MARKED))
- return true;
+ if (U.compareAndSwapInt(this, STATUS, s = status,
+ (s & ~SMASK) | (tag & SMASK)))
+ return (short)s;
}
}
/**
- * Atomically clears the mark on this task.
+ * Atomically conditionally sets the tag value for this task.
+ * Among other applications, tags can be used as visit markers
+ * in tasks operating on graphs, as in methods that check: {@code
+ * if (task.compareAndSetForkJoinTaskTag((short)0, (short)1))}
+ * before processing, otherwise exiting because the node has
+ * already been visited.
*
- * @return true if this task was previously marked
+ * @param e the expected tag value
+ * @param tag the new tag value
+ * @return true if successful; i.e., the current value was
+ * equal to e and is now tag.
* @since 1.8
*/
- public final boolean unmarkForkJoinTask() {
+ public final boolean compareAndSetForkJoinTaskTag(short e, short tag) {
for (int s;;) {
- if (((s = status) & MARKED) == 0)
+ if ((short)(s = status) != e)
return false;
- if (U.compareAndSwapInt(this, STATUS, s, s & ~MARKED))
+ if (U.compareAndSwapInt(this, STATUS, s,
+ (s & ~SMASK) | (tag & SMASK)))
return true;
}
}