summaryrefslogtreecommitdiff
path: root/src/forkjoin/scala/concurrent/forkjoin/ForkJoinWorkerThread.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/forkjoin/scala/concurrent/forkjoin/ForkJoinWorkerThread.java')
-rw-r--r--src/forkjoin/scala/concurrent/forkjoin/ForkJoinWorkerThread.java1240
1 files changed, 522 insertions, 718 deletions
diff --git a/src/forkjoin/scala/concurrent/forkjoin/ForkJoinWorkerThread.java b/src/forkjoin/scala/concurrent/forkjoin/ForkJoinWorkerThread.java
index 79879b19c7..b4d889750c 100644
--- a/src/forkjoin/scala/concurrent/forkjoin/ForkJoinWorkerThread.java
+++ b/src/forkjoin/scala/concurrent/forkjoin/ForkJoinWorkerThread.java
@@ -1,287 +1,224 @@
/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
- * http://creativecommons.org/publicdomain/zero/1.0/
+ * http://creativecommons.org/licenses/publicdomain
*/
package scala.concurrent.forkjoin;
-
-import java.util.Collection;
-import java.util.concurrent.RejectedExecutionException;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+import java.util.concurrent.locks.*;
+import sun.misc.Unsafe;
+import java.lang.reflect.*;
/**
- * A thread managed by a {@link ForkJoinPool}, which executes
- * {@link ForkJoinTask}s.
- * This class is subclassable solely for the sake of adding
- * functionality -- there are no overridable methods dealing with
- * scheduling or execution. However, you can override initialization
- * and termination methods surrounding the main task processing loop.
- * If you do create such a subclass, you will also need to supply a
- * custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to use it
- * in a {@code ForkJoinPool}.
+ * A thread managed by a {@link ForkJoinPool}. This class is
+ * subclassable solely for the sake of adding functionality -- there
+ * are no overridable methods dealing with scheduling or
+ * execution. However, you can override initialization and termination
+ * methods surrounding the main task processing loop. If you do
+ * create such a subclass, you will also need to supply a custom
+ * ForkJoinWorkerThreadFactory to use it in a ForkJoinPool.
*
- * @since 1.7
- * @author Doug Lea
*/
public class ForkJoinWorkerThread extends Thread {
/*
- * Overview:
- *
- * ForkJoinWorkerThreads are managed by ForkJoinPools and perform
- * ForkJoinTasks. This class includes bookkeeping in support of
- * worker activation, suspension, and lifecycle control described
- * in more detail in the internal documentation of class
- * ForkJoinPool. And as described further below, this class also
- * includes special-cased support for some ForkJoinTask
- * methods. But the main mechanics involve work-stealing:
+ * Algorithm overview:
*
- * Work-stealing queues are special forms of Deques that support
- * only three of the four possible end-operations -- push, pop,
- * and deq (aka steal), under the further constraints that push
- * and pop are called only from the owning thread, while deq may
- * be called from other threads. (If you are unfamiliar with
- * them, you probably want to read Herlihy and Shavit's book "The
- * Art of Multiprocessor programming", chapter 16 describing these
- * in more detail before proceeding.) The main work-stealing
- * queue design is roughly similar to those in the papers "Dynamic
- * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
- * (http://research.sun.com/scalable/pubs/index.html) and
- * "Idempotent work stealing" by Michael, Saraswat, and Vechev,
- * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
- * The main differences ultimately stem from gc requirements that
- * we null out taken slots as soon as we can, to maintain as small
- * a footprint as possible even in programs generating huge
- * numbers of tasks. To accomplish this, we shift the CAS
- * arbitrating pop vs deq (steal) from being on the indices
- * ("queueBase" and "queueTop") to the slots themselves (mainly
- * via method "casSlotNull()"). So, both a successful pop and deq
- * mainly entail a CAS of a slot from non-null to null. Because
- * we rely on CASes of references, we do not need tag bits on
- * queueBase or queueTop. They are simple ints as used in any
- * circular array-based queue (see for example ArrayDeque).
+ * 1. Work-Stealing: Work-stealing queues are special forms of
+ * Deques that support only three of the four possible
+ * end-operations -- push, pop, and deq (aka steal), and only do
+ * so under the constraints that push and pop are called only from
+ * the owning thread, while deq may be called from other threads.
+ * (If you are unfamiliar with them, you probably want to read
+ * Herlihy and Shavit's book "The Art of Multiprocessor
+ * programming", chapter 16 describing these in more detail before
+ * proceeding.) The main work-stealing queue design is roughly
+ * similar to "Dynamic Circular Work-Stealing Deque" by David
+ * Chase and Yossi Lev, SPAA 2005
+ * (http://research.sun.com/scalable/pubs/index.html). The main
+ * difference ultimately stems from gc requirements that we null
+ * out taken slots as soon as we can, to maintain as small a
+ * footprint as possible even in programs generating huge numbers
+ * of tasks. To accomplish this, we shift the CAS arbitrating pop
+ * vs deq (steal) from being on the indices ("base" and "sp") to
+ * the slots themselves (mainly via method "casSlotNull()"). So,
+ * both a successful pop and deq mainly entail CAS'ing a nonnull
+ * slot to null. Because we rely on CASes of references, we do
+ * not need tag bits on base or sp. They are simple ints as used
+ * in any circular array-based queue (see for example ArrayDeque).
* Updates to the indices must still be ordered in a way that
- * guarantees that queueTop == queueBase means the queue is empty,
- * but otherwise may err on the side of possibly making the queue
+ * guarantees that (sp - base) > 0 means the queue is empty, but
+ * otherwise may err on the side of possibly making the queue
* appear nonempty when a push, pop, or deq have not fully
* committed. Note that this means that the deq operation,
* considered individually, is not wait-free. One thief cannot
* successfully continue until another in-progress one (or, if
* previously empty, a push) completes. However, in the
- * aggregate, we ensure at least probabilistic non-blockingness.
- * If an attempted steal fails, a thief always chooses a different
+ * aggregate, we ensure at least probablistic non-blockingness. If
+ * an attempted steal fails, a thief always chooses a different
* random victim target to try next. So, in order for one thief to
* progress, it suffices for any in-progress deq or new push on
- * any empty queue to complete.
- *
- * This approach also enables support for "async mode" where local
- * task processing is in FIFO, not LIFO order; simply by using a
- * version of deq rather than pop when locallyFifo is true (as set
- * by the ForkJoinPool). This allows use in message-passing
- * frameworks in which tasks are never joined. However neither
- * mode considers affinities, loads, cache localities, etc, so
- * rarely provide the best possible performance on a given
- * machine, but portably provide good throughput by averaging over
- * these factors. (Further, even if we did try to use such
- * information, we do not usually have a basis for exploiting
- * it. For example, some sets of tasks profit from cache
- * affinities, but others are harmed by cache pollution effects.)
- *
- * When a worker would otherwise be blocked waiting to join a
- * task, it first tries a form of linear helping: Each worker
- * records (in field currentSteal) the most recent task it stole
- * from some other worker. Plus, it records (in field currentJoin)
- * the task it is currently actively joining. Method joinTask uses
- * these markers to try to find a worker to help (i.e., steal back
- * a task from and execute it) that could hasten completion of the
- * actively joined task. In essence, the joiner executes a task
- * that would be on its own local deque had the to-be-joined task
- * not been stolen. This may be seen as a conservative variant of
- * the approach in Wagner & Calder "Leapfrogging: a portable
- * technique for implementing efficient futures" SIGPLAN Notices,
- * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs
- * in that: (1) We only maintain dependency links across workers
- * upon steals, rather than use per-task bookkeeping. This may
- * require a linear scan of workers array to locate stealers, but
- * usually doesn't because stealers leave hints (that may become
- * stale/wrong) of where to locate them. This isolates cost to
- * when it is needed, rather than adding to per-task overhead.
- * (2) It is "shallow", ignoring nesting and potentially cyclic
- * mutual steals. (3) It is intentionally racy: field currentJoin
- * is updated only while actively joining, which means that we
- * miss links in the chain during long-lived tasks, GC stalls etc
- * (which is OK since blocking in such cases is usually a good
- * idea). (4) We bound the number of attempts to find work (see
- * MAX_HELP) and fall back to suspending the worker and if
- * necessary replacing it with another.
+ * any empty queue to complete. One reason this works well here is
+ * that apparently-nonempty often means soon-to-be-stealable,
+ * which gives threads a chance to activate if necessary before
+ * stealing (see below).
*
- * Efficient implementation of these algorithms currently relies
- * on an uncomfortable amount of "Unsafe" mechanics. To maintain
- * correct orderings, reads and writes of variable queueBase
- * require volatile ordering. Variable queueTop need not be
- * volatile because non-local reads always follow those of
- * queueBase. Similarly, because they are protected by volatile
- * queueBase reads, reads of the queue array and its slots by
- * other threads do not need volatile load semantics, but writes
- * (in push) require store order and CASes (in pop and deq)
- * require (volatile) CAS semantics. (Michael, Saraswat, and
- * Vechev's algorithm has similar properties, but without support
- * for nulling slots.) Since these combinations aren't supported
- * using ordinary volatiles, the only way to accomplish these
- * efficiently is to use direct Unsafe calls. (Using external
+ * Efficient implementation of this approach currently relies on
+ * an uncomfortable amount of "Unsafe" mechanics. To maintain
+ * correct orderings, reads and writes of variable base require
+ * volatile ordering. Variable sp does not require volatile write
+ * but needs cheaper store-ordering on writes. Because they are
+ * protected by volatile base reads, reads of the queue array and
+ * its slots do not need volatile load semantics, but writes (in
+ * push) require store order and CASes (in pop and deq) require
+ * (volatile) CAS semantics. Since these combinations aren't
+ * supported using ordinary volatiles, the only way to accomplish
+ * these effciently is to use direct Unsafe calls. (Using external
* AtomicIntegers and AtomicReferenceArrays for the indices and
* array is significantly slower because of memory locality and
- * indirection effects.)
- *
- * Further, performance on most platforms is very sensitive to
- * placement and sizing of the (resizable) queue array. Even
- * though these queues don't usually become all that big, the
- * initial size must be large enough to counteract cache
+ * indirection effects.) Further, performance on most platforms is
+ * very sensitive to placement and sizing of the (resizable) queue
+ * array. Even though these queues don't usually become all that
+ * big, the initial size must be large enough to counteract cache
* contention effects across multiple queues (especially in the
* presence of GC cardmarking). Also, to improve thread-locality,
- * queues are initialized after starting.
- */
-
- /**
- * Mask for pool indices encoded as shorts
+ * queues are currently initialized immediately after the thread
+ * gets the initial signal to start processing tasks. However,
+ * all queue-related methods except pushTask are written in a way
+ * that allows them to instead be lazily allocated and/or disposed
+ * of when empty. All together, these low-level implementation
+ * choices produce as much as a factor of 4 performance
+ * improvement compared to naive implementations, and enable the
+ * processing of billions of tasks per second, sometimes at the
+ * expense of ugliness.
+ *
+ * 2. Run control: The primary run control is based on a global
+ * counter (activeCount) held by the pool. It uses an algorithm
+ * similar to that in Herlihy and Shavit section 17.6 to cause
+ * threads to eventually block when all threads declare they are
+ * inactive. (See variable "scans".) For this to work, threads
+ * must be declared active when executing tasks, and before
+ * stealing a task. They must be inactive before blocking on the
+ * Pool Barrier (awaiting a new submission or other Pool
+ * event). In between, there is some free play which we take
+ * advantage of to avoid contention and rapid flickering of the
+ * global activeCount: If inactive, we activate only if a victim
+ * queue appears to be nonempty (see above). Similarly, a thread
+ * tries to inactivate only after a full scan of other threads.
+ * The net effect is that contention on activeCount is rarely a
+ * measurable performance issue. (There are also a few other cases
+ * where we scan for work rather than retry/block upon
+ * contention.)
+ *
+ * 3. Selection control. We maintain policy of always choosing to
+ * run local tasks rather than stealing, and always trying to
+ * steal tasks before trying to run a new submission. All steals
+ * are currently performed in randomly-chosen deq-order. It may be
+ * worthwhile to bias these with locality / anti-locality
+ * information, but doing this well probably requires more
+ * lower-level information from JVMs than currently provided.
*/
- private static final int SMASK = 0xffff;
/**
* Capacity of work-stealing queue array upon initialization.
- * Must be a power of two. Initial size must be at least 4, but is
+ * Must be a power of two. Initial size must be at least 2, but is
* padded to minimize cache effects.
*/
private static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
/**
- * Maximum size for queue array. Must be a power of two
- * less than or equal to 1 << (31 - width of array entry) to
- * ensure lack of index wraparound, but is capped at a lower
- * value to help users trap runaway computations.
+ * Maximum work-stealing queue array size. Must be less than or
+ * equal to 1 << 28 to ensure lack of index wraparound. (This
+ * is less than usual bounds, because we need leftshift by 3
+ * to be in int range).
*/
- private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 24; // 16M
+ private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 28;
/**
- * The work-stealing queue array. Size must be a power of two.
- * Initialized when started (as oposed to when constructed), to
- * improve memory locality.
+ * The pool this thread works in. Accessed directly by ForkJoinTask
*/
- ForkJoinTask<?>[] queue;
+ final ForkJoinPool pool;
/**
- * The pool this thread works in. Accessed directly by ForkJoinTask.
+ * The work-stealing queue array. Size must be a power of two.
+ * Initialized when thread starts, to improve memory locality.
*/
- final ForkJoinPool pool;
+ private ForkJoinTask<?>[] queue;
/**
* Index (mod queue.length) of next queue slot to push to or pop
- * from. It is written only by owner thread, and accessed by other
- * threads only after reading (volatile) queueBase. Both queueTop
- * and queueBase are allowed to wrap around on overflow, but
- * (queueTop - queueBase) still estimates size.
+ * from. It is written only by owner thread, via ordered store.
+ * Both sp and base are allowed to wrap around on overflow, but
+ * (sp - base) still estimates size.
*/
- int queueTop;
+ private volatile int sp;
/**
* Index (mod queue.length) of least valid queue slot, which is
* always the next position to steal from if nonempty.
*/
- volatile int queueBase;
-
- /**
- * The index of most recent stealer, used as a hint to avoid
- * traversal in method helpJoinTask. This is only a hint because a
- * worker might have had multiple steals and this only holds one
- * of them (usually the most current). Declared non-volatile,
- * relying on other prevailing sync to keep reasonably current.
- */
- int stealHint;
-
- /**
- * Index of this worker in pool array. Set once by pool before
- * running, and accessed directly by pool to locate this worker in
- * its workers array.
- */
- final int poolIndex;
+ private volatile int base;
/**
- * Encoded record for pool task waits. Usages are always
- * surrounded by volatile reads/writes
+ * Activity status. When true, this worker is considered active.
+ * Must be false upon construction. It must be true when executing
+ * tasks, and BEFORE stealing a task. It must be false before
+ * calling pool.sync
*/
- int nextWait;
+ private boolean active;
/**
- * Complement of poolIndex, offset by count of entries of task
- * waits. Accessed by ForkJoinPool to manage event waiters.
+ * Run state of this worker. Supports simple versions of the usual
+ * shutdown/shutdownNow control.
*/
- volatile int eventCount;
+ private volatile int runState;
/**
* Seed for random number generator for choosing steal victims.
- * Uses Marsaglia xorshift. Must be initialized as nonzero.
+ * Uses Marsaglia xorshift. Must be nonzero upon initialization.
*/
- int seed;
+ private int seed;
/**
- * Number of steals. Directly accessed (and reset) by pool when
- * idle.
+ * Number of steals, transferred to pool when idle
*/
- int stealCount;
+ private int stealCount;
/**
- * True if this worker should or did terminate
- */
- volatile boolean terminate;
-
- /**
- * Set to true before LockSupport.park; false on return
- */
- volatile boolean parked;
-
- /**
- * True if use local fifo, not default lifo, for local polling.
- * Shadows value from ForkJoinPool.
+ * Index of this worker in pool array. Set once by pool before
+ * running, and accessed directly by pool during cleanup etc
*/
- final boolean locallyFifo;
+ int poolIndex;
/**
- * The task most recently stolen from another worker (or
- * submission queue). All uses are surrounded by enough volatile
- * reads/writes to maintain as non-volatile.
+ * The last barrier event waited for. Accessed in pool callback
+ * methods, but only by current thread.
*/
- ForkJoinTask<?> currentSteal;
+ long lastEventCount;
/**
- * The task currently being joined, set only when actively trying
- * to help other stealers in helpJoinTask. All uses are surrounded
- * by enough volatile reads/writes to maintain as non-volatile.
+ * True if use local fifo, not default lifo, for local polling
*/
- ForkJoinTask<?> currentJoin;
+ private boolean locallyFifo;
/**
* Creates a ForkJoinWorkerThread operating in the given pool.
- *
* @param pool the pool this thread works in
* @throws NullPointerException if pool is null
*/
protected ForkJoinWorkerThread(ForkJoinPool pool) {
- super(pool.nextWorkerName());
+ if (pool == null) throw new NullPointerException();
this.pool = pool;
- int k = pool.registerWorker(this);
- poolIndex = k;
- eventCount = ~k & SMASK; // clear wait count
- locallyFifo = pool.locallyFifo;
- Thread.UncaughtExceptionHandler ueh = pool.ueh;
- if (ueh != null)
- setUncaughtExceptionHandler(ueh);
- setDaemon(true);
+ // Note: poolIndex is set by pool during construction
+ // Remaining initialization is deferred to onStart
}
- // Public methods
+ // Public access methods
/**
- * Returns the pool hosting this thread.
- *
+ * Returns the pool hosting this thread
* @return the pool
*/
public ForkJoinPool getPool() {
@@ -294,676 +231,543 @@ public class ForkJoinWorkerThread extends Thread {
* threads (minus one) that have ever been created in the pool.
* This method may be useful for applications that track status or
* collect results per-worker rather than per-task.
- *
- * @return the index number
+ * @return the index number.
*/
public int getPoolIndex() {
return poolIndex;
}
- // Randomization
+ /**
+ * Establishes local first-in-first-out scheduling mode for forked
+ * tasks that are never joined.
+ * @param async if true, use locally FIFO scheduling
+ */
+ void setAsyncMode(boolean async) {
+ locallyFifo = async;
+ }
+
+ // Runstate management
+
+ // Runstate values. Order matters
+ private static final int RUNNING = 0;
+ private static final int SHUTDOWN = 1;
+ private static final int TERMINATING = 2;
+ private static final int TERMINATED = 3;
+
+ final boolean isShutdown() { return runState >= SHUTDOWN; }
+ final boolean isTerminating() { return runState >= TERMINATING; }
+ final boolean isTerminated() { return runState == TERMINATED; }
+ final boolean shutdown() { return transitionRunStateTo(SHUTDOWN); }
+ final boolean shutdownNow() { return transitionRunStateTo(TERMINATING); }
+
+ /**
+ * Transition to at least the given state. Return true if not
+ * already at least given state.
+ */
+ private boolean transitionRunStateTo(int state) {
+ for (;;) {
+ int s = runState;
+ if (s >= state)
+ return false;
+ if (_unsafe.compareAndSwapInt(this, runStateOffset, s, state))
+ return true;
+ }
+ }
+
+ /**
+ * Try to set status to active; fail on contention
+ */
+ private boolean tryActivate() {
+ if (!active) {
+ if (!pool.tryIncrementActiveCount())
+ return false;
+ active = true;
+ }
+ return true;
+ }
/**
- * Computes next value for random victim probes and backoffs.
- * Scans don't require a very high quality generator, but also not
- * a crummy one. Marsaglia xor-shift is cheap and works well
- * enough. Note: This is manually inlined in FJP.scan() to avoid
- * writes inside busy loops.
+ * Try to set status to active; fail on contention
*/
- private int nextSeed() {
- int r = seed;
- r ^= r << 13;
- r ^= r >>> 17;
- r ^= r << 5;
- return seed = r;
+ private boolean tryInactivate() {
+ if (active) {
+ if (!pool.tryDecrementActiveCount())
+ return false;
+ active = false;
+ }
+ return true;
}
- // Run State management
+ /**
+ * Computes next value for random victim probe. Scans don't
+ * require a very high quality generator, but also not a crummy
+ * one. Marsaglia xor-shift is cheap and works well.
+ */
+ private static int xorShift(int r) {
+ r ^= r << 1;
+ r ^= r >>> 3;
+ r ^= r << 10;
+ return r;
+ }
+
+ // Lifecycle methods
+
+ /**
+ * This method is required to be public, but should never be
+ * called explicitly. It performs the main run loop to execute
+ * ForkJoinTasks.
+ */
+ public void run() {
+ Throwable exception = null;
+ try {
+ onStart();
+ pool.sync(this); // await first pool event
+ mainLoop();
+ } catch (Throwable ex) {
+ exception = ex;
+ } finally {
+ onTermination(exception);
+ }
+ }
+
+ /**
+ * Execute tasks until shut down.
+ */
+ private void mainLoop() {
+ while (!isShutdown()) {
+ ForkJoinTask<?> t = pollTask();
+ if (t != null || (t = pollSubmission()) != null)
+ t.quietlyExec();
+ else if (tryInactivate())
+ pool.sync(this);
+ }
+ }
/**
* Initializes internal state after construction but before
* processing any tasks. If you override this method, you must
- * invoke {@code super.onStart()} at the beginning of the method.
+ * invoke super.onStart() at the beginning of the method.
* Initialization requires care: Most fields must have legal
* default values, to ensure that attempted accesses from other
* threads work correctly even before this thread starts
* processing tasks.
*/
protected void onStart() {
+ // Allocate while starting to improve chances of thread-local
+ // isolation
queue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
- int r = pool.workerSeedGenerator.nextInt();
- seed = (r == 0) ? 1 : r; // must be nonzero
+ // Initial value of seed need not be especially random but
+ // should differ across workers and must be nonzero
+ int p = poolIndex + 1;
+ seed = p + (p << 8) + (p << 16) + (p << 24); // spread bits
}
/**
- * Performs cleanup associated with termination of this worker
+ * Perform cleanup associated with termination of this worker
* thread. If you override this method, you must invoke
- * {@code super.onTermination} at the end of the overridden method.
+ * super.onTermination at the end of the overridden method.
*
* @param exception the exception causing this thread to abort due
- * to an unrecoverable error, or {@code null} if completed normally
+ * to an unrecoverable error, or null if completed normally.
*/
protected void onTermination(Throwable exception) {
+ // Execute remaining local tasks unless aborting or terminating
+ while (exception == null && !pool.isTerminating() && base != sp) {
+ try {
+ ForkJoinTask<?> t = popTask();
+ if (t != null)
+ t.quietlyExec();
+ } catch(Throwable ex) {
+ exception = ex;
+ }
+ }
+ // Cancel other tasks, transition status, notify pool, and
+ // propagate exception to uncaught exception handler
try {
- terminate = true;
+ do;while (!tryInactivate()); // ensure inactive
cancelTasks();
- pool.deregisterWorker(this, exception);
+ runState = TERMINATED;
+ pool.workerTerminated(this);
} catch (Throwable ex) { // Shouldn't ever happen
if (exception == null) // but if so, at least rethrown
exception = ex;
} finally {
if (exception != null)
- UNSAFE.throwException(exception);
+ ForkJoinTask.rethrowException(exception);
}
}
+ // Intrinsics-based support for queue operations.
+
/**
- * This method is required to be public, but should never be
- * called explicitly. It performs the main run loop to execute
- * {@link ForkJoinTask}s.
+ * Add in store-order the given task at given slot of q to
+ * null. Caller must ensure q is nonnull and index is in range.
*/
- public void run() {
- Throwable exception = null;
- try {
- onStart();
- pool.work(this);
- } catch (Throwable ex) {
- exception = ex;
- } finally {
- onTermination(exception);
- }
+ private static void setSlot(ForkJoinTask<?>[] q, int i,
+ ForkJoinTask<?> t){
+ _unsafe.putOrderedObject(q, (i << qShift) + qBase, t);
}
- /*
- * Intrinsics-based atomic writes for queue slots. These are
- * basically the same as methods in AtomicReferenceArray, but
- * specialized for (1) ForkJoinTask elements (2) requirement that
- * nullness and bounds checks have already been performed by
- * callers and (3) effective offsets are known not to overflow
- * from int to long (because of MAXIMUM_QUEUE_CAPACITY). We don't
- * need corresponding version for reads: plain array reads are OK
- * because they are protected by other volatile reads and are
- * confirmed by CASes.
- *
- * Most uses don't actually call these methods, but instead
- * contain inlined forms that enable more predictable
- * optimization. We don't define the version of write used in
- * pushTask at all, but instead inline there a store-fenced array
- * slot write.
- *
- * Also in most methods, as a performance (not correctness) issue,
- * we'd like to encourage compilers not to arbitrarily postpone
- * setting queueTop after writing slot. Currently there is no
- * intrinsic for arranging this, but using Unsafe putOrderedInt
- * may be a preferable strategy on some compilers even though its
- * main effect is a pre-, not post- fence. To simplify possible
- * changes, the option is left in comments next to the associated
- * assignments.
- */
-
/**
- * CASes slot i of array q from t to null. Caller must ensure q is
- * non-null and index is in range.
+ * CAS given slot of q to null. Caller must ensure q is nonnull
+ * and index is in range.
*/
- private static final boolean casSlotNull(ForkJoinTask<?>[] q, int i,
- ForkJoinTask<?> t) {
- return UNSAFE.compareAndSwapObject(q, (i << ASHIFT) + ABASE, t, null);
+ private static boolean casSlotNull(ForkJoinTask<?>[] q, int i,
+ ForkJoinTask<?> t) {
+ return _unsafe.compareAndSwapObject(q, (i << qShift) + qBase, t, null);
}
/**
- * Performs a volatile write of the given task at given slot of
- * array q. Caller must ensure q is non-null and index is in
- * range. This method is used only during resets and backouts.
+ * Sets sp in store-order.
*/
- private static final void writeSlot(ForkJoinTask<?>[] q, int i,
- ForkJoinTask<?> t) {
- UNSAFE.putObjectVolatile(q, (i << ASHIFT) + ABASE, t);
+ private void storeSp(int s) {
+ _unsafe.putOrderedInt(this, spOffset, s);
}
- // queue methods
+ // Main queue methods
/**
- * Pushes a task. Call only from this thread.
- *
- * @param t the task. Caller must ensure non-null.
+ * Pushes a task. Called only by current thread.
+ * @param t the task. Caller must ensure nonnull
*/
final void pushTask(ForkJoinTask<?> t) {
- ForkJoinTask<?>[] q; int s, m;
- if ((q = queue) != null) { // ignore if queue removed
- long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE;
- UNSAFE.putOrderedObject(q, u, t);
- queueTop = s + 1; // or use putOrderedInt
- if ((s -= queueBase) <= 2)
- pool.signalWork();
- else if (s == m)
- growQueue();
- }
- }
-
- /**
- * Creates or doubles queue array. Transfers elements by
- * emulating steals (deqs) from old array and placing, oldest
- * first, into new array.
- */
- private void growQueue() {
- ForkJoinTask<?>[] oldQ = queue;
- int size = oldQ != null ? oldQ.length << 1 : INITIAL_QUEUE_CAPACITY;
- if (size > MAXIMUM_QUEUE_CAPACITY)
- throw new RejectedExecutionException("Queue capacity exceeded");
- if (size < INITIAL_QUEUE_CAPACITY)
- size = INITIAL_QUEUE_CAPACITY;
- ForkJoinTask<?>[] q = queue = new ForkJoinTask<?>[size];
- int mask = size - 1;
- int top = queueTop;
- int oldMask;
- if (oldQ != null && (oldMask = oldQ.length - 1) >= 0) {
- for (int b = queueBase; b != top; ++b) {
- long u = ((b & oldMask) << ASHIFT) + ABASE;
- Object x = UNSAFE.getObjectVolatile(oldQ, u);
- if (x != null && UNSAFE.compareAndSwapObject(oldQ, u, x, null))
- UNSAFE.putObjectVolatile
- (q, ((b & mask) << ASHIFT) + ABASE, x);
- }
- }
+ ForkJoinTask<?>[] q = queue;
+ int mask = q.length - 1;
+ int s = sp;
+ setSlot(q, s & mask, t);
+ storeSp(++s);
+ if ((s -= base) == 1)
+ pool.signalWork();
+ else if (s >= mask)
+ growQueue();
}
/**
* Tries to take a task from the base of the queue, failing if
- * empty or contended. Note: Specializations of this code appear
- * in locallyDeqTask and elsewhere.
- *
- * @return a task, or null if none or contended
+ * either empty or contended.
+ * @return a task, or null if none or contended.
*/
final ForkJoinTask<?> deqTask() {
- ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
- if (queueTop != (b = queueBase) &&
+ ForkJoinTask<?> t;
+ ForkJoinTask<?>[] q;
+ int i;
+ int b;
+ if (sp != (b = base) &&
(q = queue) != null && // must read q after b
- (i = (q.length - 1) & b) >= 0 &&
- (t = q[i]) != null && queueBase == b &&
- UNSAFE.compareAndSwapObject(q, (i << ASHIFT) + ABASE, t, null)) {
- queueBase = b + 1;
+ (t = q[i = (q.length - 1) & b]) != null &&
+ casSlotNull(q, i, t)) {
+ base = b + 1;
return t;
}
return null;
}
/**
- * Tries to take a task from the base of own queue. Called only
- * by this thread.
- *
- * @return a task, or null if none
- */
- final ForkJoinTask<?> locallyDeqTask() {
- ForkJoinTask<?> t; int m, b, i;
- ForkJoinTask<?>[] q = queue;
- if (q != null && (m = q.length - 1) >= 0) {
- while (queueTop != (b = queueBase)) {
- if ((t = q[i = m & b]) != null &&
- queueBase == b &&
- UNSAFE.compareAndSwapObject(q, (i << ASHIFT) + ABASE,
- t, null)) {
- queueBase = b + 1;
- return t;
- }
- }
- }
- return null;
- }
-
- /**
- * Returns a popped task, or null if empty.
- * Called only by this thread.
+ * Returns a popped task, or null if empty. Ensures active status
+ * if nonnull. Called only by current thread.
*/
- private ForkJoinTask<?> popTask() {
- int m;
- ForkJoinTask<?>[] q = queue;
- if (q != null && (m = q.length - 1) >= 0) {
- for (int s; (s = queueTop) != queueBase;) {
- int i = m & --s;
- long u = (i << ASHIFT) + ABASE; // raw offset
+ final ForkJoinTask<?> popTask() {
+ int s = sp;
+ while (s != base) {
+ if (tryActivate()) {
+ ForkJoinTask<?>[] q = queue;
+ int mask = q.length - 1;
+ int i = (s - 1) & mask;
ForkJoinTask<?> t = q[i];
- if (t == null) // lost to stealer
+ if (t == null || !casSlotNull(q, i, t))
break;
- if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
- queueTop = s; // or putOrderedInt
- return t;
- }
+ storeSp(s - 1);
+ return t;
}
}
return null;
}
/**
- * Specialized version of popTask to pop only if topmost element
- * is the given task. Called only by this thread.
- *
- * @param t the task. Caller must ensure non-null.
+ * Specialized version of popTask to pop only if
+ * topmost element is the given task. Called only
+ * by current thread while active.
+ * @param t the task. Caller must ensure nonnull
*/
final boolean unpushTask(ForkJoinTask<?> t) {
- ForkJoinTask<?>[] q;
- int s;
- if ((q = queue) != null && (s = queueTop) != queueBase &&
- UNSAFE.compareAndSwapObject
- (q, (((q.length - 1) & --s) << ASHIFT) + ABASE, t, null)) {
- queueTop = s; // or putOrderedInt
+ ForkJoinTask<?>[] q = queue;
+ int mask = q.length - 1;
+ int s = sp - 1;
+ if (casSlotNull(q, s & mask, t)) {
+ storeSp(s);
return true;
}
return false;
}
/**
- * Returns next task, or null if empty or contended.
+ * Returns next task.
*/
final ForkJoinTask<?> peekTask() {
- int m;
ForkJoinTask<?>[] q = queue;
- if (q == null || (m = q.length - 1) < 0)
+ if (q == null)
return null;
- int i = locallyFifo ? queueBase : (queueTop - 1);
- return q[i & m];
- }
-
- // Support methods for ForkJoinPool
-
- /**
- * Runs the given task, plus any local tasks until queue is empty
- */
- final void execTask(ForkJoinTask<?> t) {
- currentSteal = t;
- for (;;) {
- if (t != null)
- t.doExec();
- if (queueTop == queueBase)
- break;
- t = locallyFifo ? locallyDeqTask() : popTask();
- }
- ++stealCount;
- currentSteal = null;
+ int mask = q.length - 1;
+ int i = locallyFifo? base : (sp - 1);
+ return q[i & mask];
}
/**
- * Removes and cancels all tasks in queue. Can be called from any
- * thread.
+ * Doubles queue array size. Transfers elements by emulating
+ * steals (deqs) from old array and placing, oldest first, into
+ * new array.
*/
- final void cancelTasks() {
- ForkJoinTask<?> cj = currentJoin; // try to cancel ongoing tasks
- if (cj != null && cj.status >= 0)
- cj.cancelIgnoringExceptions();
- ForkJoinTask<?> cs = currentSteal;
- if (cs != null && cs.status >= 0)
- cs.cancelIgnoringExceptions();
- while (queueBase != queueTop) {
- ForkJoinTask<?> t = deqTask();
- if (t != null)
- t.cancelIgnoringExceptions();
- }
+ private void growQueue() {
+ ForkJoinTask<?>[] oldQ = queue;
+ int oldSize = oldQ.length;
+ int newSize = oldSize << 1;
+ if (newSize > MAXIMUM_QUEUE_CAPACITY)
+ throw new RejectedExecutionException("Queue capacity exceeded");
+ ForkJoinTask<?>[] newQ = queue = new ForkJoinTask<?>[newSize];
+
+ int b = base;
+ int bf = b + oldSize;
+ int oldMask = oldSize - 1;
+ int newMask = newSize - 1;
+ do {
+ int oldIndex = b & oldMask;
+ ForkJoinTask<?> t = oldQ[oldIndex];
+ if (t != null && !casSlotNull(oldQ, oldIndex, t))
+ t = null;
+ setSlot(newQ, b & newMask, t);
+ } while (++b != bf);
+ pool.signalWork();
}
/**
- * Drains tasks to given collection c.
+ * Tries to steal a task from another worker. Starts at a random
+ * index of workers array, and probes workers until finding one
+ * with non-empty queue or finding that all are empty. It
+ * randomly selects the first n probes. If these are empty, it
+ * resorts to a full circular traversal, which is necessary to
+ * accurately set active status by caller. Also restarts if pool
+ * events occurred since last scan, which forces refresh of
+ * workers array, in case barrier was associated with resize.
*
- * @return the number of tasks drained
- */
- final int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
- int n = 0;
- while (queueBase != queueTop) {
- ForkJoinTask<?> t = deqTask();
- if (t != null) {
- c.add(t);
- ++n;
+ * This method must be both fast and quiet -- usually avoiding
+ * memory accesses that could disrupt cache sharing etc other than
+ * those needed to check for and take tasks. This accounts for,
+ * among other things, updating random seed in place without
+ * storing it until exit.
+ *
+ * @return a task, or null if none found
+ */
+ private ForkJoinTask<?> scan() {
+ ForkJoinTask<?> t = null;
+ int r = seed; // extract once to keep scan quiet
+ ForkJoinWorkerThread[] ws; // refreshed on outer loop
+ int mask; // must be power 2 minus 1 and > 0
+ outer:do {
+ if ((ws = pool.workers) != null && (mask = ws.length - 1) > 0) {
+ int idx = r;
+ int probes = ~mask; // use random index while negative
+ for (;;) {
+ r = xorShift(r); // update random seed
+ ForkJoinWorkerThread v = ws[mask & idx];
+ if (v == null || v.sp == v.base) {
+ if (probes <= mask)
+ idx = (probes++ < 0)? r : (idx + 1);
+ else
+ break;
+ }
+ else if (!tryActivate() || (t = v.deqTask()) == null)
+ continue outer; // restart on contention
+ else
+ break outer;
+ }
}
- }
- return n;
+ } while (pool.hasNewSyncEvent(this)); // retry on pool events
+ seed = r;
+ return t;
}
- // Support methods for ForkJoinTask
-
/**
- * Returns an estimate of the number of tasks in the queue.
+ * gets and removes a local or stolen a task
+ * @return a task, if available
*/
- final int getQueueSize() {
- return queueTop - queueBase;
+ final ForkJoinTask<?> pollTask() {
+ ForkJoinTask<?> t = locallyFifo? deqTask() : popTask();
+ if (t == null && (t = scan()) != null)
+ ++stealCount;
+ return t;
}
/**
- * Gets and removes a local task.
- *
+ * gets a local task
* @return a task, if available
*/
final ForkJoinTask<?> pollLocalTask() {
- return locallyFifo ? locallyDeqTask() : popTask();
+ return locallyFifo? deqTask() : popTask();
}
/**
- * Gets and removes a local or stolen task.
- *
- * @return a task, if available
+ * Returns a pool submission, if one exists, activating first.
+ * @return a submission, if available
*/
- final ForkJoinTask<?> pollTask() {
- ForkJoinWorkerThread[] ws;
- ForkJoinTask<?> t = pollLocalTask();
- if (t != null || (ws = pool.workers) == null)
- return t;
- int n = ws.length; // cheap version of FJP.scan
- int steps = n << 1;
- int r = nextSeed();
- int i = 0;
- while (i < steps) {
- ForkJoinWorkerThread w = ws[(i++ + r) & (n - 1)];
- if (w != null && w.queueBase != w.queueTop && w.queue != null) {
- if ((t = w.deqTask()) != null)
- return t;
- i = 0;
- }
+ private ForkJoinTask<?> pollSubmission() {
+ ForkJoinPool p = pool;
+ while (p.hasQueuedSubmissions()) {
+ ForkJoinTask<?> t;
+ if (tryActivate() && (t = p.pollSubmission()) != null)
+ return t;
}
return null;
}
+ // Methods accessed only by Pool
+
/**
- * The maximum stolen->joining link depth allowed in helpJoinTask,
- * as well as the maximum number of retries (allowing on average
- * one staleness retry per level) per attempt to instead try
- * compensation. Depths for legitimate chains are unbounded, but
- * we use a fixed constant to avoid (otherwise unchecked) cycles
- * and bound staleness of traversal parameters at the expense of
- * sometimes blocking when we could be helping.
+ * Removes and cancels all tasks in queue. Can be called from any
+ * thread.
*/
- private static final int MAX_HELP = 16;
+ final void cancelTasks() {
+ ForkJoinTask<?> t;
+ while (base != sp && (t = deqTask()) != null)
+ t.cancelIgnoringExceptions();
+ }
/**
- * Possibly runs some tasks and/or blocks, until joinMe is done.
- *
- * @param joinMe the task to join
- * @return completion status on exit
- */
- final int joinTask(ForkJoinTask<?> joinMe) {
- ForkJoinTask<?> prevJoin = currentJoin;
- currentJoin = joinMe;
- for (int s, retries = MAX_HELP;;) {
- if ((s = joinMe.status) < 0) {
- currentJoin = prevJoin;
- return s;
- }
- if (retries > 0) {
- if (queueTop != queueBase) {
- if (!localHelpJoinTask(joinMe))
- retries = 0; // cannot help
- }
- else if (retries == MAX_HELP >>> 1) {
- --retries; // check uncommon case
- if (tryDeqAndExec(joinMe) >= 0)
- Thread.yield(); // for politeness
- }
- else
- retries = helpJoinTask(joinMe) ? MAX_HELP : retries - 1;
- }
- else {
- retries = MAX_HELP; // restart if not done
- pool.tryAwaitJoin(joinMe);
- }
+ * Drains tasks to given collection c
+ * @return the number of tasks drained
+ */
+ final int drainTasksTo(Collection<ForkJoinTask<?>> c) {
+ int n = 0;
+ ForkJoinTask<?> t;
+ while (base != sp && (t = deqTask()) != null) {
+ c.add(t);
+ ++n;
}
+ return n;
}
/**
- * If present, pops and executes the given task, or any other
- * cancelled task
- *
- * @return false if any other non-cancelled task exists in local queue
- */
- private boolean localHelpJoinTask(ForkJoinTask<?> joinMe) {
- int s, i; ForkJoinTask<?>[] q; ForkJoinTask<?> t;
- if ((s = queueTop) != queueBase && (q = queue) != null &&
- (i = (q.length - 1) & --s) >= 0 &&
- (t = q[i]) != null) {
- if (t != joinMe && t.status >= 0)
- return false;
- if (UNSAFE.compareAndSwapObject
- (q, (i << ASHIFT) + ABASE, t, null)) {
- queueTop = s; // or putOrderedInt
- t.doExec();
- }
- }
- return true;
+ * Get and clear steal count for accumulation by pool. Called
+ * only when known to be idle (in pool.sync and termination).
+ */
+ final int getAndClearStealCount() {
+ int sc = stealCount;
+ stealCount = 0;
+ return sc;
}
/**
- * Tries to locate and execute tasks for a stealer of the given
- * task, or in turn one of its stealers, Traces
- * currentSteal->currentJoin links looking for a thread working on
- * a descendant of the given task and with a non-empty queue to
- * steal back and execute tasks from. The implementation is very
- * branchy to cope with potential inconsistencies or loops
- * encountering chains that are stale, unknown, or of length
- * greater than MAX_HELP links. All of these cases are dealt with
- * by just retrying by caller.
- *
- * @param joinMe the task to join
- * @param canSteal true if local queue is empty
- * @return true if ran a task
- */
- private boolean helpJoinTask(ForkJoinTask<?> joinMe) {
- boolean helped = false;
- int m = pool.scanGuard & SMASK;
- ForkJoinWorkerThread[] ws = pool.workers;
- if (ws != null && ws.length > m && joinMe.status >= 0) {
- int levels = MAX_HELP; // remaining chain length
- ForkJoinTask<?> task = joinMe; // base of chain
- outer:for (ForkJoinWorkerThread thread = this;;) {
- // Try to find v, the stealer of task, by first using hint
- ForkJoinWorkerThread v = ws[thread.stealHint & m];
- if (v == null || v.currentSteal != task) {
- for (int j = 0; ;) { // search array
- if ((v = ws[j]) != null && v.currentSteal == task) {
- thread.stealHint = j;
- break; // save hint for next time
- }
- if (++j > m)
- break outer; // can't find stealer
- }
- }
- // Try to help v, using specialized form of deqTask
- for (;;) {
- ForkJoinTask<?>[] q; int b, i;
- if (joinMe.status < 0)
- break outer;
- if ((b = v.queueBase) == v.queueTop ||
- (q = v.queue) == null ||
- (i = (q.length-1) & b) < 0)
- break; // empty
- long u = (i << ASHIFT) + ABASE;
- ForkJoinTask<?> t = q[i];
- if (task.status < 0)
- break outer; // stale
- if (t != null && v.queueBase == b &&
- UNSAFE.compareAndSwapObject(q, u, t, null)) {
- v.queueBase = b + 1;
- v.stealHint = poolIndex;
- ForkJoinTask<?> ps = currentSteal;
- currentSteal = t;
- t.doExec();
- currentSteal = ps;
- helped = true;
- }
- }
- // Try to descend to find v's stealer
- ForkJoinTask<?> next = v.currentJoin;
- if (--levels > 0 && task.status >= 0 &&
- next != null && next != task) {
- task = next;
- thread = v;
+ * Returns true if at least one worker in the given array appears
+ * to have at least one queued task.
+ * @param ws array of workers
+ */
+ static boolean hasQueuedTasks(ForkJoinWorkerThread[] ws) {
+ if (ws != null) {
+ int len = ws.length;
+ for (int j = 0; j < 2; ++j) { // need two passes for clean sweep
+ for (int i = 0; i < len; ++i) {
+ ForkJoinWorkerThread w = ws[i];
+ if (w != null && w.sp != w.base)
+ return true;
}
- else
- break; // max levels, stale, dead-end, or cyclic
}
}
- return helped;
+ return false;
}
+ // Support methods for ForkJoinTask
+
/**
- * Performs an uncommon case for joinTask: If task t is at base of
- * some workers queue, steals and executes it.
- *
- * @param t the task
- * @return t's status
- */
- private int tryDeqAndExec(ForkJoinTask<?> t) {
- int m = pool.scanGuard & SMASK;
- ForkJoinWorkerThread[] ws = pool.workers;
- if (ws != null && ws.length > m && t.status >= 0) {
- for (int j = 0; j <= m; ++j) {
- ForkJoinTask<?>[] q; int b, i;
- ForkJoinWorkerThread v = ws[j];
- if (v != null &&
- (b = v.queueBase) != v.queueTop &&
- (q = v.queue) != null &&
- (i = (q.length - 1) & b) >= 0 &&
- q[i] == t) {
- long u = (i << ASHIFT) + ABASE;
- if (v.queueBase == b &&
- UNSAFE.compareAndSwapObject(q, u, t, null)) {
- v.queueBase = b + 1;
- v.stealHint = poolIndex;
- ForkJoinTask<?> ps = currentSteal;
- currentSteal = t;
- t.doExec();
- currentSteal = ps;
- }
- break;
- }
- }
- }
- return t.status;
+ * Returns an estimate of the number of tasks in the queue.
+ */
+ final int getQueueSize() {
+ int n = sp - base;
+ return n < 0? 0 : n; // suppress momentarily negative values
}
/**
- * Implements ForkJoinTask.getSurplusQueuedTaskCount(). Returns
- * an estimate of the number of tasks, offset by a function of
- * number of idle workers.
- *
- * This method provides a cheap heuristic guide for task
- * partitioning when programmers, frameworks, tools, or languages
- * have little or no idea about task granularity. In essence by
- * offering this method, we ask users only about tradeoffs in
- * overhead vs expected throughput and its variance, rather than
- * how finely to partition tasks.
- *
- * In a steady state strict (tree-structured) computation, each
- * thread makes available for stealing enough tasks for other
- * threads to remain active. Inductively, if all threads play by
- * the same rules, each thread should make available only a
- * constant number of tasks.
- *
- * The minimum useful constant is just 1. But using a value of 1
- * would require immediate replenishment upon each steal to
- * maintain enough tasks, which is infeasible. Further,
- * partitionings/granularities of offered tasks should minimize
- * steal rates, which in general means that threads nearer the top
- * of computation tree should generate more than those nearer the
- * bottom. In perfect steady state, each thread is at
- * approximately the same level of computation tree. However,
- * producing extra tasks amortizes the uncertainty of progress and
- * diffusion assumptions.
- *
- * So, users will want to use values larger, but not much larger
- * than 1 to both smooth over transient shortages and hedge
- * against uneven progress; as traded off against the cost of
- * extra task overhead. We leave the user to pick a threshold
- * value to compare with the results of this call to guide
- * decisions, but recommend values such as 3.
- *
- * When all threads are active, it is on average OK to estimate
- * surplus strictly locally. In steady-state, if one thread is
- * maintaining say 2 surplus tasks, then so are others. So we can
- * just use estimated queue length (although note that (queueTop -
- * queueBase) can be an overestimate because of stealers lagging
- * increments of queueBase). However, this strategy alone leads
- * to serious mis-estimates in some non-steady-state conditions
- * (ramp-up, ramp-down, other stalls). We can detect many of these
- * by further considering the number of "idle" threads, that are
- * known to have zero queued tasks, so compensate by a factor of
- * (#idle/#active) threads.
+ * Returns an estimate of the number of tasks, offset by a
+ * function of number of idle workers.
*/
final int getEstimatedSurplusTaskCount() {
- return queueTop - queueBase - pool.idlePerActive();
+ // The halving approximates weighting idle vs non-idle workers
+ return (sp - base) - (pool.getIdleThreadCount() >>> 1);
}
/**
- * Runs tasks until {@code pool.isQuiescent()}. We piggyback on
- * pool's active count ctl maintenance, but rather than blocking
- * when tasks cannot be found, we rescan until all others cannot
- * find tasks either. The bracketing by pool quiescerCounts
- * updates suppresses pool auto-shutdown mechanics that could
- * otherwise prematurely terminate the pool because all threads
- * appear to be inactive.
+ * Scan, returning early if joinMe done
+ */
+ final ForkJoinTask<?> scanWhileJoining(ForkJoinTask<?> joinMe) {
+ ForkJoinTask<?> t = pollTask();
+ if (t != null && joinMe.status < 0 && sp == base) {
+ pushTask(t); // unsteal if done and this task would be stealable
+ t = null;
+ }
+ return t;
+ }
+
+ /**
+ * Runs tasks until pool isQuiescent
*/
final void helpQuiescePool() {
- boolean active = true;
- ForkJoinTask<?> ps = currentSteal; // to restore below
- ForkJoinPool p = pool;
- p.addQuiescerCount(1);
for (;;) {
- ForkJoinWorkerThread[] ws = p.workers;
- ForkJoinWorkerThread v = null;
- int n;
- if (queueTop != queueBase)
- v = this;
- else if (ws != null && (n = ws.length) > 1) {
- ForkJoinWorkerThread w;
- int r = nextSeed(); // cheap version of FJP.scan
- int steps = n << 1;
- for (int i = 0; i < steps; ++i) {
- if ((w = ws[(i + r) & (n - 1)]) != null &&
- w.queueBase != w.queueTop) {
- v = w;
- break;
- }
- }
- }
- if (v != null) {
- ForkJoinTask<?> t;
- if (!active) {
- active = true;
- p.addActiveCount(1);
- }
- if ((t = (v != this) ? v.deqTask() :
- locallyFifo ? locallyDeqTask() : popTask()) != null) {
- currentSteal = t;
- t.doExec();
- currentSteal = ps;
- }
- }
- else {
- if (active) {
- active = false;
- p.addActiveCount(-1);
- }
- if (p.isQuiescent()) {
- p.addActiveCount(1);
- p.addQuiescerCount(-1);
- break;
- }
+ ForkJoinTask<?> t = pollTask();
+ if (t != null)
+ t.quietlyExec();
+ else if (tryInactivate() && pool.isQuiescent())
+ break;
+ }
+ do;while (!tryActivate()); // re-activate on exit
+ }
+
+ // Temporary Unsafe mechanics for preliminary release
+ private static Unsafe getUnsafe() throws Throwable {
+ try {
+ return Unsafe.getUnsafe();
+ } catch (SecurityException se) {
+ try {
+ return java.security.AccessController.doPrivileged
+ (new java.security.PrivilegedExceptionAction<Unsafe>() {
+ public Unsafe run() throws Exception {
+ return getUnsafePrivileged();
+ }});
+ } catch (java.security.PrivilegedActionException e) {
+ throw e.getCause();
}
}
}
- // Unsafe mechanics
- private static final sun.misc.Unsafe UNSAFE;
- private static final long ABASE;
- private static final int ASHIFT;
+ private static Unsafe getUnsafePrivileged()
+ throws NoSuchFieldException, IllegalAccessException {
+ Field f = Unsafe.class.getDeclaredField("theUnsafe");
+ f.setAccessible(true);
+ return (Unsafe) f.get(null);
+ }
+ private static long fieldOffset(String fieldName)
+ throws NoSuchFieldException {
+ return _unsafe.objectFieldOffset
+ (ForkJoinWorkerThread.class.getDeclaredField(fieldName));
+ }
+
+ static final Unsafe _unsafe;
+ static final long baseOffset;
+ static final long spOffset;
+ static final long runStateOffset;
+ static final long qBase;
+ static final int qShift;
static {
- int s;
try {
- UNSAFE = sun.misc.Unsafe.getUnsafe();
- Class a = ForkJoinTask[].class;
- ABASE = UNSAFE.arrayBaseOffset(a);
- s = UNSAFE.arrayIndexScale(a);
- } catch (Exception e) {
- throw new Error(e);
+ _unsafe = getUnsafe();
+ baseOffset = fieldOffset("base");
+ spOffset = fieldOffset("sp");
+ runStateOffset = fieldOffset("runState");
+ qBase = _unsafe.arrayBaseOffset(ForkJoinTask[].class);
+ int s = _unsafe.arrayIndexScale(ForkJoinTask[].class);
+ if ((s & (s-1)) != 0)
+ throw new Error("data type scale not a power of two");
+ qShift = 31 - Integer.numberOfLeadingZeros(s);
+ } catch (Throwable e) {
+ throw new RuntimeException("Could not initialize intrinsics", e);
}
- if ((s & (s-1)) != 0)
- throw new Error("data type scale not a power of two");
- ASHIFT = 31 - Integer.numberOfLeadingZeros(s);
}
-
}