From 5aa7e892bb693ba80cd372e46bdc6e9719d1bb82 Mon Sep 17 00:00:00 2001 From: Philipp Haller Date: Wed, 30 Sep 2009 16:07:48 +0000 Subject: Copied 1.5 backport of ForkJoinPool to jvm15-li... Copied 1.5 backport of ForkJoinPool to jvm15-library. --- .../scala/concurrent/forkjoin/ForkJoinPool.java | 1870 ++++++++++++++++++++ .../scala/concurrent/forkjoin/ForkJoinTask.java | 1052 +++++++++++ .../concurrent/forkjoin/ForkJoinWorkerThread.java | 775 ++++++++ .../concurrent/forkjoin/LinkedTransferQueue.java | 840 +++++++++ .../scala/concurrent/forkjoin/RecursiveAction.java | 151 ++ .../scala/concurrent/forkjoin/RecursiveTask.java | 71 + .../concurrent/forkjoin/ThreadLocalRandom.java | 186 ++ .../scala/concurrent/forkjoin/TransferQueue.java | 118 ++ .../scala/concurrent/forkjoin/package-info.java | 29 + 9 files changed, 5092 insertions(+) create mode 100644 src/jvm15-library/scala/concurrent/forkjoin/ForkJoinPool.java create mode 100644 src/jvm15-library/scala/concurrent/forkjoin/ForkJoinTask.java create mode 100644 src/jvm15-library/scala/concurrent/forkjoin/ForkJoinWorkerThread.java create mode 100644 src/jvm15-library/scala/concurrent/forkjoin/LinkedTransferQueue.java create mode 100644 src/jvm15-library/scala/concurrent/forkjoin/RecursiveAction.java create mode 100644 src/jvm15-library/scala/concurrent/forkjoin/RecursiveTask.java create mode 100644 src/jvm15-library/scala/concurrent/forkjoin/ThreadLocalRandom.java create mode 100644 src/jvm15-library/scala/concurrent/forkjoin/TransferQueue.java create mode 100644 src/jvm15-library/scala/concurrent/forkjoin/package-info.java (limited to 'src') diff --git a/src/jvm15-library/scala/concurrent/forkjoin/ForkJoinPool.java b/src/jvm15-library/scala/concurrent/forkjoin/ForkJoinPool.java new file mode 100644 index 0000000000..ba30f3a161 --- /dev/null +++ b/src/jvm15-library/scala/concurrent/forkjoin/ForkJoinPool.java @@ -0,0 +1,1870 @@ +/* + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + */ + +package scala.concurrent.forkjoin; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.locks.*; +import java.util.concurrent.atomic.*; +import sun.misc.Unsafe; +import java.lang.reflect.*; + +interface RunnableFuture extends Runnable { + //TR placeholder for java.util.concurrent.RunnableFuture +} + +/** + * An {@link ExecutorService} for running {@link ForkJoinTask}s. A + * ForkJoinPool provides the entry point for submissions from + * non-ForkJoinTasks, as well as management and monitoring operations. + * Normally a single ForkJoinPool is used for a large number of + * submitted tasks. Otherwise, use would not usually outweigh the + * construction and bookkeeping overhead of creating a large set of + * threads. + * + *

ForkJoinPools differ from other kinds of Executors mainly in + * that they provide work-stealing: all threads in the pool + * attempt to find and execute subtasks created by other active tasks + * (eventually blocking if none exist). This makes them efficient when + * most tasks spawn other subtasks (as do most ForkJoinTasks), as well + * as the mixed execution of some plain Runnable- or Callable- based + * activities along with ForkJoinTasks. When setting + * setAsyncMode, a ForkJoinPools may also be appropriate for + * use with fine-grained tasks that are never joined. Otherwise, other + * ExecutorService implementations are typically more appropriate + * choices. + * + *

A ForkJoinPool may be constructed with a given parallelism level + * (target pool size), which it attempts to maintain by dynamically + * adding, suspending, or resuming threads, even if some tasks are + * waiting to join others. However, no such adjustments are performed + * in the face of blocked IO or other unmanaged synchronization. The + * nested ManagedBlocker interface enables extension of + * the kinds of synchronization accommodated. The target parallelism + * level may also be changed dynamically (setParallelism) + * and thread construction can be limited using methods + * setMaximumPoolSize and/or + * setMaintainsParallelism. + * + *

In addition to execution and lifecycle control methods, this + * class provides status check methods (for example + * getStealCount) that are intended to aid in developing, + * tuning, and monitoring fork/join applications. Also, method + * toString returns indications of pool state in a + * convenient form for informal monitoring. + * + *

Implementation notes: This implementation restricts the + * maximum number of running threads to 32767. Attempts to create + * pools with greater than the maximum result in + * IllegalArgumentExceptions. + */ +public class ForkJoinPool /*extends AbstractExecutorService*/ { + + /* + * See the extended comments interspersed below for design, + * rationale, and walkthroughs. + */ + + /** Mask for packing and unpacking shorts */ + private static final int shortMask = 0xffff; + + /** Max pool size -- must be a power of two minus 1 */ + private static final int MAX_THREADS = 0x7FFF; + + /** + * Factory for creating new ForkJoinWorkerThreads. A + * ForkJoinWorkerThreadFactory must be defined and used for + * ForkJoinWorkerThread subclasses that extend base functionality + * or initialize threads with different contexts. + */ + public static interface ForkJoinWorkerThreadFactory { + /** + * Returns a new worker thread operating in the given pool. + * + * @param pool the pool this thread works in + * @throws NullPointerException if pool is null; + */ + public ForkJoinWorkerThread newThread(ForkJoinPool pool); + } + + /** + * Default ForkJoinWorkerThreadFactory implementation, creates a + * new ForkJoinWorkerThread. + */ + static class DefaultForkJoinWorkerThreadFactory + implements ForkJoinWorkerThreadFactory { + public ForkJoinWorkerThread newThread(ForkJoinPool pool) { + try { + return new ForkJoinWorkerThread(pool); + } catch (OutOfMemoryError oom) { + return null; + } + } + } + + /** + * Creates a new ForkJoinWorkerThread. This factory is used unless + * overridden in ForkJoinPool constructors. + */ + public static final ForkJoinWorkerThreadFactory + defaultForkJoinWorkerThreadFactory = + new DefaultForkJoinWorkerThreadFactory(); + + /** + * Permission required for callers of methods that may start or + * kill threads. + */ + private static final RuntimePermission modifyThreadPermission = + new RuntimePermission("modifyThread"); + + /** + * If there is a security manager, makes sure caller has + * permission to modify threads. + */ + private static void checkPermission() { + SecurityManager security = System.getSecurityManager(); + if (security != null) + security.checkPermission(modifyThreadPermission); + } + + /** + * Generator for assigning sequence numbers as pool names. + */ + private static final AtomicInteger poolNumberGenerator = + new AtomicInteger(); + + /** + * Array holding all worker threads in the pool. Initialized upon + * first use. Array size must be a power of two. Updates and + * replacements are protected by workerLock, but it is always kept + * in a consistent enough state to be randomly accessed without + * locking by workers performing work-stealing. + */ + public volatile ForkJoinWorkerThread[] workers; + + /** + * Lock protecting access to workers. + */ + private final ReentrantLock workerLock; + + /** + * Condition for awaitTermination. + */ + private final Condition termination; + + /** + * The uncaught exception handler used when any worker + * abrupty terminates + */ + private Thread.UncaughtExceptionHandler ueh; + + /** + * Creation factory for worker threads. + */ + private final ForkJoinWorkerThreadFactory factory; + + /** + * Head of stack of threads that were created to maintain + * parallelism when other threads blocked, but have since + * suspended when the parallelism level rose. + */ + private volatile WaitQueueNode spareStack; + + /** + * Sum of per-thread steal counts, updated only when threads are + * idle or terminating. + */ + private final AtomicLong stealCount; + + /** + * Queue for external submissions. + */ + private final LinkedTransferQueue> submissionQueue; + + /** + * Head of Treiber stack for barrier sync. See below for explanation + */ + private volatile WaitQueueNode syncStack; + + /** + * The count for event barrier + */ + private volatile long eventCount; + + /** + * Pool number, just for assigning useful names to worker threads + */ + private final int poolNumber; + + /** + * The maximum allowed pool size + */ + private volatile int maxPoolSize; + + /** + * The desired parallelism level, updated only under workerLock. + */ + private volatile int parallelism; + + /** + * True if use local fifo, not default lifo, for local polling + */ + private volatile boolean locallyFifo; + + /** + * Holds number of total (i.e., created and not yet terminated) + * and running (i.e., not blocked on joins or other managed sync) + * threads, packed into one int to ensure consistent snapshot when + * making decisions about creating and suspending spare + * threads. Updated only by CAS. Note: CASes in + * updateRunningCount and preJoin running active count is in low + * word, so need to be modified if this changes + */ + private volatile int workerCounts; + + private static int totalCountOf(int s) { return s >>> 16; } + private static int runningCountOf(int s) { return s & shortMask; } + private static int workerCountsFor(int t, int r) { return (t << 16) + r; } + + /** + * Add delta (which may be negative) to running count. This must + * be called before (with negative arg) and after (with positive) + * any managed synchronization (i.e., mainly, joins) + * @param delta the number to add + */ + final void updateRunningCount(int delta) { + int s; + do;while (!casWorkerCounts(s = workerCounts, s + delta)); + } + + /** + * Add delta (which may be negative) to both total and running + * count. This must be called upon creation and termination of + * worker threads. + * @param delta the number to add + */ + private void updateWorkerCount(int delta) { + int d = delta + (delta << 16); // add to both lo and hi parts + int s; + do;while (!casWorkerCounts(s = workerCounts, s + d)); + } + + /** + * Lifecycle control. High word contains runState, low word + * contains the number of workers that are (probably) executing + * tasks. This value is atomically incremented before a worker + * gets a task to run, and decremented when worker has no tasks + * and cannot find any. These two fields are bundled together to + * support correct termination triggering. Note: activeCount + * CAS'es cheat by assuming active count is in low word, so need + * to be modified if this changes + */ + private volatile int runControl; + + // RunState values. Order among values matters + private static final int RUNNING = 0; + private static final int SHUTDOWN = 1; + private static final int TERMINATING = 2; + private static final int TERMINATED = 3; + + private static int runStateOf(int c) { return c >>> 16; } + private static int activeCountOf(int c) { return c & shortMask; } + private static int runControlFor(int r, int a) { return (r << 16) + a; } + + /** + * Try incrementing active count; fail on contention. Called by + * workers before/during executing tasks. + * @return true on success; + */ + final boolean tryIncrementActiveCount() { + int c = runControl; + return casRunControl(c, c+1); + } + + /** + * Try decrementing active count; fail on contention. + * Possibly trigger termination on success + * Called by workers when they can't find tasks. + * @return true on success + */ + final boolean tryDecrementActiveCount() { + int c = runControl; + int nextc = c - 1; + if (!casRunControl(c, nextc)) + return false; + if (canTerminateOnShutdown(nextc)) + terminateOnShutdown(); + return true; + } + + /** + * Return true if argument represents zero active count and + * nonzero runstate, which is the triggering condition for + * terminating on shutdown. + */ + private static boolean canTerminateOnShutdown(int c) { + return ((c & -c) >>> 16) != 0; // i.e. least bit is nonzero runState bit + } + + /** + * Transition run state to at least the given state. Return true + * if not already at least given state. + */ + private boolean transitionRunStateTo(int state) { + for (;;) { + int c = runControl; + if (runStateOf(c) >= state) + return false; + if (casRunControl(c, runControlFor(state, activeCountOf(c)))) + return true; + } + } + + /** + * Controls whether to add spares to maintain parallelism + */ + private volatile boolean maintainsParallelism; + + // Constructors + + /** + * Creates a ForkJoinPool with a pool size equal to the number of + * processors available on the system and using the default + * ForkJoinWorkerThreadFactory, + * @throws SecurityException if a security manager exists and + * the caller is not permitted to modify threads + * because it does not hold {@link + * java.lang.RuntimePermission}("modifyThread"), + */ + public ForkJoinPool() { + this(Runtime.getRuntime().availableProcessors(), + defaultForkJoinWorkerThreadFactory); + } + + /** + * Creates a ForkJoinPool with the indicated parellelism level + * threads, and using the default ForkJoinWorkerThreadFactory, + * @param parallelism the number of worker threads + * @throws IllegalArgumentException if parallelism less than or + * equal to zero + * @throws SecurityException if a security manager exists and + * the caller is not permitted to modify threads + * because it does not hold {@link + * java.lang.RuntimePermission}("modifyThread"), + */ + public ForkJoinPool(int parallelism) { + this(parallelism, defaultForkJoinWorkerThreadFactory); + } + + /** + * Creates a ForkJoinPool with parallelism equal to the number of + * processors available on the system and using the given + * ForkJoinWorkerThreadFactory, + * @param factory the factory for creating new threads + * @throws NullPointerException if factory is null + * @throws SecurityException if a security manager exists and + * the caller is not permitted to modify threads + * because it does not hold {@link + * java.lang.RuntimePermission}("modifyThread"), + */ + public ForkJoinPool(ForkJoinWorkerThreadFactory factory) { + this(Runtime.getRuntime().availableProcessors(), factory); + } + + /** + * Creates a ForkJoinPool with the given parallelism and factory. + * + * @param parallelism the targeted number of worker threads + * @param factory the factory for creating new threads + * @throws IllegalArgumentException if parallelism less than or + * equal to zero, or greater than implementation limit. + * @throws NullPointerException if factory is null + * @throws SecurityException if a security manager exists and + * the caller is not permitted to modify threads + * because it does not hold {@link + * java.lang.RuntimePermission}("modifyThread"), + */ + public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory) { + if (parallelism <= 0 || parallelism > MAX_THREADS) + throw new IllegalArgumentException(); + if (factory == null) + throw new NullPointerException(); + checkPermission(); + this.factory = factory; + this.parallelism = parallelism; + this.maxPoolSize = MAX_THREADS; + this.maintainsParallelism = true; + this.poolNumber = poolNumberGenerator.incrementAndGet(); + this.workerLock = new ReentrantLock(); + this.termination = workerLock.newCondition(); + this.stealCount = new AtomicLong(); + this.submissionQueue = new LinkedTransferQueue>(); + // worker array and workers are lazily constructed + } + + /** + * Create new worker using factory. + * @param index the index to assign worker + * @return new worker, or null of factory failed + */ + private ForkJoinWorkerThread createWorker(int index) { + Thread.UncaughtExceptionHandler h = ueh; + ForkJoinWorkerThread w = factory.newThread(this); + if (w != null) { + w.poolIndex = index; + w.setDaemon(true); + w.setAsyncMode(locallyFifo); + w.setName("ForkJoinPool-" + poolNumber + "-worker-" + index); + if (h != null) + w.setUncaughtExceptionHandler(h); + } + return w; + } + + /** + * Return a good size for worker array given pool size. + * Currently requires size to be a power of two. + */ + private static int arraySizeFor(int ps) { + return ps <= 1? 1 : (1 << (32 - Integer.numberOfLeadingZeros(ps-1))); + } + + public static ForkJoinWorkerThread[] copyOfWorkers(ForkJoinWorkerThread[] original, int newLength) { + ForkJoinWorkerThread[] copy = new ForkJoinWorkerThread[newLength]; + System.arraycopy(original, 0, copy, 0, Math.min(newLength, original.length)); + return copy; + } + + /** + * Create or resize array if necessary to hold newLength. + * Call only under exlusion or lock + * @return the array + */ + private ForkJoinWorkerThread[] ensureWorkerArrayCapacity(int newLength) { + ForkJoinWorkerThread[] ws = workers; + if (ws == null) + return workers = new ForkJoinWorkerThread[arraySizeFor(newLength)]; + else if (newLength > ws.length) + return workers = copyOfWorkers(ws, arraySizeFor(newLength)); + else + return ws; + } + + /** + * Try to shrink workers into smaller array after one or more terminate + */ + private void tryShrinkWorkerArray() { + ForkJoinWorkerThread[] ws = workers; + if (ws != null) { + int len = ws.length; + int last = len - 1; + while (last >= 0 && ws[last] == null) + --last; + int newLength = arraySizeFor(last+1); + if (newLength < len) + workers = copyOfWorkers(ws, newLength); + } + } + + /** + * Initialize workers if necessary + */ + final void ensureWorkerInitialization() { + ForkJoinWorkerThread[] ws = workers; + if (ws == null) { + final ReentrantLock lock = this.workerLock; + lock.lock(); + try { + ws = workers; + if (ws == null) { + int ps = parallelism; + ws = ensureWorkerArrayCapacity(ps); + for (int i = 0; i < ps; ++i) { + ForkJoinWorkerThread w = createWorker(i); + if (w != null) { + ws[i] = w; + w.start(); + updateWorkerCount(1); + } + } + } + } finally { + lock.unlock(); + } + } + } + + /** + * Worker creation and startup for threads added via setParallelism. + */ + private void createAndStartAddedWorkers() { + resumeAllSpares(); // Allow spares to convert to nonspare + int ps = parallelism; + ForkJoinWorkerThread[] ws = ensureWorkerArrayCapacity(ps); + int len = ws.length; + // Sweep through slots, to keep lowest indices most populated + int k = 0; + while (k < len) { + if (ws[k] != null) { + ++k; + continue; + } + int s = workerCounts; + int tc = totalCountOf(s); + int rc = runningCountOf(s); + if (rc >= ps || tc >= ps) + break; + if (casWorkerCounts (s, workerCountsFor(tc+1, rc+1))) { + ForkJoinWorkerThread w = createWorker(k); + if (w != null) { + ws[k++] = w; + w.start(); + } + else { + updateWorkerCount(-1); // back out on failed creation + break; + } + } + } + } + + // Execution methods + + /** + * Common code for execute, invoke and submit + */ + private void doSubmit(ForkJoinTask task) { + if (isShutdown()) + throw new RejectedExecutionException(); + if (workers == null) + ensureWorkerInitialization(); + submissionQueue.offer(task); + signalIdleWorkers(); + } + + /** + * Performs the given task; returning its result upon completion + * @param task the task + * @return the task's result + * @throws NullPointerException if task is null + * @throws RejectedExecutionException if pool is shut down + */ + public T invoke(ForkJoinTask task) { + doSubmit(task); + return task.join(); + } + + /** + * Arranges for (asynchronous) execution of the given task. + * @param task the task + * @throws NullPointerException if task is null + * @throws RejectedExecutionException if pool is shut down + */ + public void execute(ForkJoinTask task) { + doSubmit(task); + } + + // AbstractExecutorService methods + + public void execute(Runnable task) { + doSubmit(new AdaptedRunnable(task, null)); + } + + public ForkJoinTask submit(Callable task) { + ForkJoinTask job = new AdaptedCallable(task); + doSubmit(job); + return job; + } + + public ForkJoinTask submit(Runnable task, T result) { + ForkJoinTask job = new AdaptedRunnable(task, result); + doSubmit(job); + return job; + } + + public ForkJoinTask submit(Runnable task) { + ForkJoinTask job = new AdaptedRunnable(task, null); + doSubmit(job); + return job; + } + + /** + * Adaptor for Runnables. This implements RunnableFuture + * to be compliant with AbstractExecutorService constraints + */ + static final class AdaptedRunnable extends ForkJoinTask + implements RunnableFuture { + final Runnable runnable; + final T resultOnCompletion; + T result; + AdaptedRunnable(Runnable runnable, T result) { + if (runnable == null) throw new NullPointerException(); + this.runnable = runnable; + this.resultOnCompletion = result; + } + public T getRawResult() { return result; } + public void setRawResult(T v) { result = v; } + public boolean exec() { + runnable.run(); + result = resultOnCompletion; + return true; + } + public void run() { invoke(); } + } + + /** + * Adaptor for Callables + */ + static final class AdaptedCallable extends ForkJoinTask + implements RunnableFuture { + final Callable callable; + T result; + AdaptedCallable(Callable callable) { + if (callable == null) throw new NullPointerException(); + this.callable = callable; + } + public T getRawResult() { return result; } + public void setRawResult(T v) { result = v; } + public boolean exec() { + try { + result = callable.call(); + return true; + } catch (Error err) { + throw err; + } catch (RuntimeException rex) { + throw rex; + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + public void run() { invoke(); } + } + + public List> invokeAll(Collection> tasks) { + ArrayList> ts = + new ArrayList>(tasks.size()); + for (Callable c : tasks) + ts.add(new AdaptedCallable(c)); + invoke(new InvokeAll(ts)); + return (List>)(List)ts; + } + + static final class InvokeAll extends RecursiveAction { + final ArrayList> tasks; + InvokeAll(ArrayList> tasks) { this.tasks = tasks; } + public void compute() { + try { invokeAll(tasks); } catch(Exception ignore) {} + } + } + + // Configuration and status settings and queries + + /** + * Returns the factory used for constructing new workers + * + * @return the factory used for constructing new workers + */ + public ForkJoinWorkerThreadFactory getFactory() { + return factory; + } + + /** + * Returns the handler for internal worker threads that terminate + * due to unrecoverable errors encountered while executing tasks. + * @return the handler, or null if none + */ + public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() { + Thread.UncaughtExceptionHandler h; + final ReentrantLock lock = this.workerLock; + lock.lock(); + try { + h = ueh; + } finally { + lock.unlock(); + } + return h; + } + + /** + * Sets the handler for internal worker threads that terminate due + * to unrecoverable errors encountered while executing tasks. + * Unless set, the current default or ThreadGroup handler is used + * as handler. + * + * @param h the new handler + * @return the old handler, or null if none + * @throws SecurityException if a security manager exists and + * the caller is not permitted to modify threads + * because it does not hold {@link + * java.lang.RuntimePermission}("modifyThread"), + */ + public Thread.UncaughtExceptionHandler + setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler h) { + checkPermission(); + Thread.UncaughtExceptionHandler old = null; + final ReentrantLock lock = this.workerLock; + lock.lock(); + try { + old = ueh; + ueh = h; + ForkJoinWorkerThread[] ws = workers; + if (ws != null) { + for (int i = 0; i < ws.length; ++i) { + ForkJoinWorkerThread w = ws[i]; + if (w != null) + w.setUncaughtExceptionHandler(h); + } + } + } finally { + lock.unlock(); + } + return old; + } + + + /** + * Sets the target paralleism level of this pool. + * @param parallelism the target parallelism + * @throws IllegalArgumentException if parallelism less than or + * equal to zero or greater than maximum size bounds. + * @throws SecurityException if a security manager exists and + * the caller is not permitted to modify threads + * because it does not hold {@link + * java.lang.RuntimePermission}("modifyThread"), + */ + public void setParallelism(int parallelism) { + checkPermission(); + if (parallelism <= 0 || parallelism > maxPoolSize) + throw new IllegalArgumentException(); + final ReentrantLock lock = this.workerLock; + lock.lock(); + try { + if (!isTerminating()) { + int p = this.parallelism; + this.parallelism = parallelism; + if (parallelism > p) + createAndStartAddedWorkers(); + else + trimSpares(); + } + } finally { + lock.unlock(); + } + signalIdleWorkers(); + } + + /** + * Returns the targeted number of worker threads in this pool. + * + * @return the targeted number of worker threads in this pool + */ + public int getParallelism() { + return parallelism; + } + + /** + * Returns the number of worker threads that have started but not + * yet terminated. This result returned by this method may differ + * from getParallelism when threads are created to + * maintain parallelism when others are cooperatively blocked. + * + * @return the number of worker threads + */ + public int getPoolSize() { + return totalCountOf(workerCounts); + } + + /** + * Returns the maximum number of threads allowed to exist in the + * pool, even if there are insufficient unblocked running threads. + * @return the maximum + */ + public int getMaximumPoolSize() { + return maxPoolSize; + } + + /** + * Sets the maximum number of threads allowed to exist in the + * pool, even if there are insufficient unblocked running threads. + * Setting this value has no effect on current pool size. It + * controls construction of new threads. + * @throws IllegalArgumentException if negative or greater then + * internal implementation limit. + */ + public void setMaximumPoolSize(int newMax) { + if (newMax < 0 || newMax > MAX_THREADS) + throw new IllegalArgumentException(); + maxPoolSize = newMax; + } + + + /** + * Returns true if this pool dynamically maintains its target + * parallelism level. If false, new threads are added only to + * avoid possible starvation. + * This setting is by default true; + * @return true if maintains parallelism + */ + public boolean getMaintainsParallelism() { + return maintainsParallelism; + } + + /** + * Sets whether this pool dynamically maintains its target + * parallelism level. If false, new threads are added only to + * avoid possible starvation. + * @param enable true to maintains parallelism + */ + public void setMaintainsParallelism(boolean enable) { + maintainsParallelism = enable; + } + + /** + * Establishes local first-in-first-out scheduling mode for forked + * tasks that are never joined. This mode may be more appropriate + * than default locally stack-based mode in applications in which + * worker threads only process asynchronous tasks. This method is + * designed to be invoked only when pool is quiescent, and + * typically only before any tasks are submitted. The effects of + * invocations at ather times may be unpredictable. + * + * @param async if true, use locally FIFO scheduling + * @return the previous mode. + */ + public boolean setAsyncMode(boolean async) { + boolean oldMode = locallyFifo; + locallyFifo = async; + ForkJoinWorkerThread[] ws = workers; + if (ws != null) { + for (int i = 0; i < ws.length; ++i) { + ForkJoinWorkerThread t = ws[i]; + if (t != null) + t.setAsyncMode(async); + } + } + return oldMode; + } + + /** + * Returns true if this pool uses local first-in-first-out + * scheduling mode for forked tasks that are never joined. + * + * @return true if this pool uses async mode. + */ + public boolean getAsyncMode() { + return locallyFifo; + } + + /** + * Returns an estimate of the number of worker threads that are + * not blocked waiting to join tasks or for other managed + * synchronization. + * + * @return the number of worker threads + */ + public int getRunningThreadCount() { + return runningCountOf(workerCounts); + } + + /** + * Returns an estimate of the number of threads that are currently + * stealing or executing tasks. This method may overestimate the + * number of active threads. + * @return the number of active threads. + */ + public int getActiveThreadCount() { + return activeCountOf(runControl); + } + + /** + * Returns an estimate of the number of threads that are currently + * idle waiting for tasks. This method may underestimate the + * number of idle threads. + * @return the number of idle threads. + */ + final int getIdleThreadCount() { + int c = runningCountOf(workerCounts) - activeCountOf(runControl); + return (c <= 0)? 0 : c; + } + + /** + * Returns true if all worker threads are currently idle. An idle + * worker is one that cannot obtain a task to execute because none + * are available to steal from other threads, and there are no + * pending submissions to the pool. This method is conservative: + * It might not return true immediately upon idleness of all + * threads, but will eventually become true if threads remain + * inactive. + * @return true if all threads are currently idle + */ + public boolean isQuiescent() { + return activeCountOf(runControl) == 0; + } + + /** + * Returns an estimate of the total number of tasks stolen from + * one thread's work queue by another. The reported value + * underestimates the actual total number of steals when the pool + * is not quiescent. This value may be useful for monitoring and + * tuning fork/join programs: In general, steal counts should be + * high enough to keep threads busy, but low enough to avoid + * overhead and contention across threads. + * @return the number of steals. + */ + public long getStealCount() { + return stealCount.get(); + } + + /** + * Accumulate steal count from a worker. Call only + * when worker known to be idle. + */ + private void updateStealCount(ForkJoinWorkerThread w) { + int sc = w.getAndClearStealCount(); + if (sc != 0) + stealCount.addAndGet(sc); + } + + /** + * Returns an estimate of the total number of tasks currently held + * in queues by worker threads (but not including tasks submitted + * to the pool that have not begun executing). This value is only + * an approximation, obtained by iterating across all threads in + * the pool. This method may be useful for tuning task + * granularities. + * @return the number of queued tasks. + */ + public long getQueuedTaskCount() { + long count = 0; + ForkJoinWorkerThread[] ws = workers; + if (ws != null) { + for (int i = 0; i < ws.length; ++i) { + ForkJoinWorkerThread t = ws[i]; + if (t != null) + count += t.getQueueSize(); + } + } + return count; + } + + /** + * Returns an estimate of the number tasks submitted to this pool + * that have not yet begun executing. This method takes time + * proportional to the number of submissions. + * @return the number of queued submissions. + */ + public int getQueuedSubmissionCount() { + return submissionQueue.size(); + } + + /** + * Returns true if there are any tasks submitted to this pool + * that have not yet begun executing. + * @return true if there are any queued submissions. + */ + public boolean hasQueuedSubmissions() { + return !submissionQueue.isEmpty(); + } + + /** + * Removes and returns the next unexecuted submission if one is + * available. This method may be useful in extensions to this + * class that re-assign work in systems with multiple pools. + * @return the next submission, or null if none + */ + protected ForkJoinTask pollSubmission() { + return submissionQueue.poll(); + } + + /** + * Removes all available unexecuted submitted and forked tasks + * from scheduling queues and adds them to the given collection, + * without altering their execution status. These may include + * artifically generated or wrapped tasks. This method id designed + * to be invoked only when the pool is known to be + * quiescent. Invocations at other times may not remove all + * tasks. A failure encountered while attempting to add elements + * to collection c may result in elements being in + * neither, either or both collections when the associated + * exception is thrown. The behavior of this operation is + * undefined if the specified collection is modified while the + * operation is in progress. + * @param c the collection to transfer elements into + * @return the number of elements transferred + */ + protected int drainTasksTo(Collection> c) { + int n = submissionQueue.drainTo(c); + ForkJoinWorkerThread[] ws = workers; + if (ws != null) { + for (int i = 0; i < ws.length; ++i) { + ForkJoinWorkerThread w = ws[i]; + if (w != null) + n += w.drainTasksTo(c); + } + } + return n; + } + + /** + * Returns a string identifying this pool, as well as its state, + * including indications of run state, parallelism level, and + * worker and task counts. + * + * @return a string identifying this pool, as well as its state + */ + public String toString() { + int ps = parallelism; + int wc = workerCounts; + int rc = runControl; + long st = getStealCount(); + long qt = getQueuedTaskCount(); + long qs = getQueuedSubmissionCount(); + return super.toString() + + "[" + runStateToString(runStateOf(rc)) + + ", parallelism = " + ps + + ", size = " + totalCountOf(wc) + + ", active = " + activeCountOf(rc) + + ", running = " + runningCountOf(wc) + + ", steals = " + st + + ", tasks = " + qt + + ", submissions = " + qs + + "]"; + } + + private static String runStateToString(int rs) { + switch(rs) { + case RUNNING: return "Running"; + case SHUTDOWN: return "Shutting down"; + case TERMINATING: return "Terminating"; + case TERMINATED: return "Terminated"; + default: throw new Error("Unknown run state"); + } + } + + // lifecycle control + + /** + * Initiates an orderly shutdown in which previously submitted + * tasks are executed, but no new tasks will be accepted. + * Invocation has no additional effect if already shut down. + * Tasks that are in the process of being submitted concurrently + * during the course of this method may or may not be rejected. + * @throws SecurityException if a security manager exists and + * the caller is not permitted to modify threads + * because it does not hold {@link + * java.lang.RuntimePermission}("modifyThread"), + */ + public void shutdown() { + checkPermission(); + transitionRunStateTo(SHUTDOWN); + if (canTerminateOnShutdown(runControl)) + terminateOnShutdown(); + } + + /** + * Attempts to stop all actively executing tasks, and cancels all + * waiting tasks. Tasks that are in the process of being + * submitted or executed concurrently during the course of this + * method may or may not be rejected. Unlike some other executors, + * this method cancels rather than collects non-executed tasks + * upon termination, so always returns an empty list. However, you + * can use method drainTasksTo before invoking this + * method to transfer unexecuted tasks to another collection. + * @return an empty list + * @throws SecurityException if a security manager exists and + * the caller is not permitted to modify threads + * because it does not hold {@link + * java.lang.RuntimePermission}("modifyThread"), + */ + public List shutdownNow() { + checkPermission(); + terminate(); + return Collections.emptyList(); + } + + /** + * Returns true if all tasks have completed following shut down. + * + * @return true if all tasks have completed following shut down + */ + public boolean isTerminated() { + return runStateOf(runControl) == TERMINATED; + } + + /** + * Returns true if the process of termination has + * commenced but possibly not yet completed. + * + * @return true if terminating + */ + public boolean isTerminating() { + return runStateOf(runControl) >= TERMINATING; + } + + /** + * Returns true if this pool has been shut down. + * + * @return true if this pool has been shut down + */ + public boolean isShutdown() { + return runStateOf(runControl) >= SHUTDOWN; + } + + /** + * Blocks until all tasks have completed execution after a shutdown + * request, or the timeout occurs, or the current thread is + * interrupted, whichever happens first. + * + * @param timeout the maximum time to wait + * @param unit the time unit of the timeout argument + * @return true if this executor terminated and + * false if the timeout elapsed before termination + * @throws InterruptedException if interrupted while waiting + */ + public boolean awaitTermination(long timeout, TimeUnit unit) + throws InterruptedException { + long nanos = unit.toNanos(timeout); + final ReentrantLock lock = this.workerLock; + lock.lock(); + try { + for (;;) { + if (isTerminated()) + return true; + if (nanos <= 0) + return false; + nanos = termination.awaitNanos(nanos); + } + } finally { + lock.unlock(); + } + } + + // Shutdown and termination support + + /** + * Callback from terminating worker. Null out the corresponding + * workers slot, and if terminating, try to terminate, else try to + * shrink workers array. + * @param w the worker + */ + final void workerTerminated(ForkJoinWorkerThread w) { + updateStealCount(w); + updateWorkerCount(-1); + final ReentrantLock lock = this.workerLock; + lock.lock(); + try { + ForkJoinWorkerThread[] ws = workers; + if (ws != null) { + int idx = w.poolIndex; + if (idx >= 0 && idx < ws.length && ws[idx] == w) + ws[idx] = null; + if (totalCountOf(workerCounts) == 0) { + terminate(); // no-op if already terminating + transitionRunStateTo(TERMINATED); + termination.signalAll(); + } + else if (!isTerminating()) { + tryShrinkWorkerArray(); + tryResumeSpare(true); // allow replacement + } + } + } finally { + lock.unlock(); + } + signalIdleWorkers(); + } + + /** + * Initiate termination. + */ + private void terminate() { + if (transitionRunStateTo(TERMINATING)) { + stopAllWorkers(); + resumeAllSpares(); + signalIdleWorkers(); + cancelQueuedSubmissions(); + cancelQueuedWorkerTasks(); + interruptUnterminatedWorkers(); + signalIdleWorkers(); // resignal after interrupt + } + } + + /** + * Possibly terminate when on shutdown state + */ + private void terminateOnShutdown() { + if (!hasQueuedSubmissions() && canTerminateOnShutdown(runControl)) + terminate(); + } + + /** + * Clear out and cancel submissions + */ + private void cancelQueuedSubmissions() { + ForkJoinTask task; + while ((task = pollSubmission()) != null) + task.cancel(false); + } + + /** + * Clean out worker queues. + */ + private void cancelQueuedWorkerTasks() { + final ReentrantLock lock = this.workerLock; + lock.lock(); + try { + ForkJoinWorkerThread[] ws = workers; + if (ws != null) { + for (int i = 0; i < ws.length; ++i) { + ForkJoinWorkerThread t = ws[i]; + if (t != null) + t.cancelTasks(); + } + } + } finally { + lock.unlock(); + } + } + + /** + * Set each worker's status to terminating. Requires lock to avoid + * conflicts with add/remove + */ + private void stopAllWorkers() { + final ReentrantLock lock = this.workerLock; + lock.lock(); + try { + ForkJoinWorkerThread[] ws = workers; + if (ws != null) { + for (int i = 0; i < ws.length; ++i) { + ForkJoinWorkerThread t = ws[i]; + if (t != null) + t.shutdownNow(); + } + } + } finally { + lock.unlock(); + } + } + + /** + * Interrupt all unterminated workers. This is not required for + * sake of internal control, but may help unstick user code during + * shutdown. + */ + private void interruptUnterminatedWorkers() { + final ReentrantLock lock = this.workerLock; + lock.lock(); + try { + ForkJoinWorkerThread[] ws = workers; + if (ws != null) { + for (int i = 0; i < ws.length; ++i) { + ForkJoinWorkerThread t = ws[i]; + if (t != null && !t.isTerminated()) { + try { + t.interrupt(); + } catch (SecurityException ignore) { + } + } + } + } + } finally { + lock.unlock(); + } + } + + + /* + * Nodes for event barrier to manage idle threads. Queue nodes + * are basic Treiber stack nodes, also used for spare stack. + * + * The event barrier has an event count and a wait queue (actually + * a Treiber stack). Workers are enabled to look for work when + * the eventCount is incremented. If they fail to find work, they + * may wait for next count. Upon release, threads help others wake + * up. + * + * Synchronization events occur only in enough contexts to + * maintain overall liveness: + * + * - Submission of a new task to the pool + * - Resizes or other changes to the workers array + * - pool termination + * - A worker pushing a task on an empty queue + * + * The case of pushing a task occurs often enough, and is heavy + * enough compared to simple stack pushes, to require special + * handling: Method signalWork returns without advancing count if + * the queue appears to be empty. This would ordinarily result in + * races causing some queued waiters not to be woken up. To avoid + * this, the first worker enqueued in method sync (see + * syncIsReleasable) rescans for tasks after being enqueued, and + * helps signal if any are found. This works well because the + * worker has nothing better to do, and so might as well help + * alleviate the overhead and contention on the threads actually + * doing work. Also, since event counts increments on task + * availability exist to maintain liveness (rather than to force + * refreshes etc), it is OK for callers to exit early if + * contending with another signaller. + */ + static final class WaitQueueNode { + WaitQueueNode next; // only written before enqueued + volatile ForkJoinWorkerThread thread; // nulled to cancel wait + final long count; // unused for spare stack + + WaitQueueNode(long c, ForkJoinWorkerThread w) { + count = c; + thread = w; + } + + /** + * Wake up waiter, returning false if known to already + */ + boolean signal() { + ForkJoinWorkerThread t = thread; + if (t == null) + return false; + thread = null; + LockSupport.unpark(t); + return true; + } + + /** + * Await release on sync + */ + void awaitSyncRelease(ForkJoinPool p) { + while (thread != null && !p.syncIsReleasable(this)) + LockSupport.park();//TR park(this); + } + + /** + * Await resumption as spare + */ + void awaitSpareRelease() { + while (thread != null) { + if (!Thread.interrupted()) + LockSupport.park();//TR park(this); + } + } + } + + /** + * Ensures that no thread is waiting for count to advance from the + * current value of eventCount read on entry to this method, by + * releasing waiting threads if necessary. + * @return the count + */ + final long ensureSync() { + long c = eventCount; + WaitQueueNode q; + while ((q = syncStack) != null && q.count < c) { + if (casBarrierStack(q, null)) { + do { + q.signal(); + } while ((q = q.next) != null); + break; + } + } + return c; + } + + /** + * Increments event count and releases waiting threads. + */ + private void signalIdleWorkers() { + long c; + do;while (!casEventCount(c = eventCount, c+1)); + ensureSync(); + } + + /** + * Signal threads waiting to poll a task. Because method sync + * rechecks availability, it is OK to only proceed if queue + * appears to be non-empty, and OK to skip under contention to + * increment count (since some other thread succeeded). + */ + final void signalWork() { + long c; + WaitQueueNode q; + if (syncStack != null && + casEventCount(c = eventCount, c+1) && + (((q = syncStack) != null && q.count <= c) && + (!casBarrierStack(q, q.next) || !q.signal()))) + ensureSync(); + } + + /** + * Waits until event count advances from last value held by + * caller, or if excess threads, caller is resumed as spare, or + * caller or pool is terminating. Updates caller's event on exit. + * @param w the calling worker thread + */ + final void sync(ForkJoinWorkerThread w) { + updateStealCount(w); // Transfer w's count while it is idle + + while (!w.isShutdown() && !isTerminating() && !suspendIfSpare(w)) { + long prev = w.lastEventCount; + WaitQueueNode node = null; + WaitQueueNode h; + while (eventCount == prev && + ((h = syncStack) == null || h.count == prev)) { + if (node == null) + node = new WaitQueueNode(prev, w); + if (casBarrierStack(node.next = h, node)) { + node.awaitSyncRelease(this); + break; + } + } + long ec = ensureSync(); + if (ec != prev) { + w.lastEventCount = ec; + break; + } + } + } + + /** + * Returns true if worker waiting on sync can proceed: + * - on signal (thread == null) + * - on event count advance (winning race to notify vs signaller) + * - on Interrupt + * - if the first queued node, we find work available + * If node was not signalled and event count not advanced on exit, + * then we also help advance event count. + * @return true if node can be released + */ + final boolean syncIsReleasable(WaitQueueNode node) { + long prev = node.count; + if (!Thread.interrupted() && node.thread != null && + (node.next != null || + !ForkJoinWorkerThread.hasQueuedTasks(workers)) && + eventCount == prev) + return false; + if (node.thread != null) { + node.thread = null; + long ec = eventCount; + if (prev <= ec) // help signal + casEventCount(ec, ec+1); + } + return true; + } + + /** + * Returns true if a new sync event occurred since last call to + * sync or this method, if so, updating caller's count. + */ + final boolean hasNewSyncEvent(ForkJoinWorkerThread w) { + long lc = w.lastEventCount; + long ec = ensureSync(); + if (ec == lc) + return false; + w.lastEventCount = ec; + return true; + } + + // Parallelism maintenance + + /** + * Decrement running count; if too low, add spare. + * + * Conceptually, all we need to do here is add or resume a + * spare thread when one is about to block (and remove or + * suspend it later when unblocked -- see suspendIfSpare). + * However, implementing this idea requires coping with + * several problems: We have imperfect information about the + * states of threads. Some count updates can and usually do + * lag run state changes, despite arrangements to keep them + * accurate (for example, when possible, updating counts + * before signalling or resuming), especially when running on + * dynamic JVMs that don't optimize the infrequent paths that + * update counts. Generating too many threads can make these + * problems become worse, because excess threads are more + * likely to be context-switched with others, slowing them all + * down, especially if there is no work available, so all are + * busy scanning or idling. Also, excess spare threads can + * only be suspended or removed when they are idle, not + * immediately when they aren't needed. So adding threads will + * raise parallelism level for longer than necessary. Also, + * FJ applications often enounter highly transient peaks when + * many threads are blocked joining, but for less time than it + * takes to create or resume spares. + * + * @param joinMe if non-null, return early if done + * @param maintainParallelism if true, try to stay within + * target counts, else create only to avoid starvation + * @return true if joinMe known to be done + */ + final boolean preJoin(ForkJoinTask joinMe, boolean maintainParallelism) { + maintainParallelism &= maintainsParallelism; // overrride + boolean dec = false; // true when running count decremented + while (spareStack == null || !tryResumeSpare(dec)) { + int counts = workerCounts; + if (dec || (dec = casWorkerCounts(counts, --counts))) { // CAS cheat + if (!needSpare(counts, maintainParallelism)) + break; + if (joinMe.status < 0) + return true; + if (tryAddSpare(counts)) + break; + } + } + return false; + } + + /** + * Same idea as preJoin + */ + final boolean preBlock(ManagedBlocker blocker, boolean maintainParallelism){ + maintainParallelism &= maintainsParallelism; + boolean dec = false; + while (spareStack == null || !tryResumeSpare(dec)) { + int counts = workerCounts; + if (dec || (dec = casWorkerCounts(counts, --counts))) { + if (!needSpare(counts, maintainParallelism)) + break; + if (blocker.isReleasable()) + return true; + if (tryAddSpare(counts)) + break; + } + } + return false; + } + + /** + * Returns true if a spare thread appears to be needed. If + * maintaining parallelism, returns true when the deficit in + * running threads is more than the surplus of total threads, and + * there is apparently some work to do. This self-limiting rule + * means that the more threads that have already been added, the + * less parallelism we will tolerate before adding another. + * @param counts current worker counts + * @param maintainParallelism try to maintain parallelism + */ + private boolean needSpare(int counts, boolean maintainParallelism) { + int ps = parallelism; + int rc = runningCountOf(counts); + int tc = totalCountOf(counts); + int runningDeficit = ps - rc; + int totalSurplus = tc - ps; + return (tc < maxPoolSize && + (rc == 0 || totalSurplus < 0 || + (maintainParallelism && + runningDeficit > totalSurplus && + ForkJoinWorkerThread.hasQueuedTasks(workers)))); + } + + /** + * Add a spare worker if lock available and no more than the + * expected numbers of threads exist + * @return true if successful + */ + private boolean tryAddSpare(int expectedCounts) { + final ReentrantLock lock = this.workerLock; + int expectedRunning = runningCountOf(expectedCounts); + int expectedTotal = totalCountOf(expectedCounts); + boolean success = false; + boolean locked = false; + // confirm counts while locking; CAS after obtaining lock + try { + for (;;) { + int s = workerCounts; + int tc = totalCountOf(s); + int rc = runningCountOf(s); + if (rc > expectedRunning || tc > expectedTotal) + break; + if (!locked && !(locked = lock.tryLock())) + break; + if (casWorkerCounts(s, workerCountsFor(tc+1, rc+1))) { + createAndStartSpare(tc); + success = true; + break; + } + } + } finally { + if (locked) + lock.unlock(); + } + return success; + } + + /** + * Add the kth spare worker. On entry, pool coounts are already + * adjusted to reflect addition. + */ + private void createAndStartSpare(int k) { + ForkJoinWorkerThread w = null; + ForkJoinWorkerThread[] ws = ensureWorkerArrayCapacity(k + 1); + int len = ws.length; + // Probably, we can place at slot k. If not, find empty slot + if (k < len && ws[k] != null) { + for (k = 0; k < len && ws[k] != null; ++k) + ; + } + if (k < len && !isTerminating() && (w = createWorker(k)) != null) { + ws[k] = w; + w.start(); + } + else + updateWorkerCount(-1); // adjust on failure + signalIdleWorkers(); + } + + /** + * Suspend calling thread w if there are excess threads. Called + * only from sync. Spares are enqueued in a Treiber stack + * using the same WaitQueueNodes as barriers. They are resumed + * mainly in preJoin, but are also woken on pool events that + * require all threads to check run state. + * @param w the caller + */ + private boolean suspendIfSpare(ForkJoinWorkerThread w) { + WaitQueueNode node = null; + int s; + while (parallelism < runningCountOf(s = workerCounts)) { + if (node == null) + node = new WaitQueueNode(0, w); + if (casWorkerCounts(s, s-1)) { // representation-dependent + // push onto stack + do;while (!casSpareStack(node.next = spareStack, node)); + // block until released by resumeSpare + node.awaitSpareRelease(); + return true; + } + } + return false; + } + + /** + * Try to pop and resume a spare thread. + * @param updateCount if true, increment running count on success + * @return true if successful + */ + private boolean tryResumeSpare(boolean updateCount) { + WaitQueueNode q; + while ((q = spareStack) != null) { + if (casSpareStack(q, q.next)) { + if (updateCount) + updateRunningCount(1); + q.signal(); + return true; + } + } + return false; + } + + /** + * Pop and resume all spare threads. Same idea as ensureSync. + * @return true if any spares released + */ + private boolean resumeAllSpares() { + WaitQueueNode q; + while ( (q = spareStack) != null) { + if (casSpareStack(q, null)) { + do { + updateRunningCount(1); + q.signal(); + } while ((q = q.next) != null); + return true; + } + } + return false; + } + + /** + * Pop and shutdown excessive spare threads. Call only while + * holding lock. This is not guaranteed to eliminate all excess + * threads, only those suspended as spares, which are the ones + * unlikely to be needed in the future. + */ + private void trimSpares() { + int surplus = totalCountOf(workerCounts) - parallelism; + WaitQueueNode q; + while (surplus > 0 && (q = spareStack) != null) { + if (casSpareStack(q, null)) { + do { + updateRunningCount(1); + ForkJoinWorkerThread w = q.thread; + if (w != null && surplus > 0 && + runningCountOf(workerCounts) > 0 && w.shutdown()) + --surplus; + q.signal(); + } while ((q = q.next) != null); + } + } + } + + /** + * Interface for extending managed parallelism for tasks running + * in ForkJoinPools. A ManagedBlocker provides two methods. + * Method isReleasable must return true if blocking is not + * necessary. Method block blocks the current thread + * if necessary (perhaps internally invoking isReleasable before + * actually blocking.). + *

For example, here is a ManagedBlocker based on a + * ReentrantLock: + *

+     *   class ManagedLocker implements ManagedBlocker {
+     *     final ReentrantLock lock;
+     *     boolean hasLock = false;
+     *     ManagedLocker(ReentrantLock lock) { this.lock = lock; }
+     *     public boolean block() {
+     *        if (!hasLock)
+     *           lock.lock();
+     *        return true;
+     *     }
+     *     public boolean isReleasable() {
+     *        return hasLock || (hasLock = lock.tryLock());
+     *     }
+     *   }
+     * 
+ */ + public static interface ManagedBlocker { + /** + * Possibly blocks the current thread, for example waiting for + * a lock or condition. + * @return true if no additional blocking is necessary (i.e., + * if isReleasable would return true). + * @throws InterruptedException if interrupted while waiting + * (the method is not required to do so, but is allowe to). + */ + boolean block() throws InterruptedException; + + /** + * Returns true if blocking is unnecessary. + */ + boolean isReleasable(); + } + + /** + * Blocks in accord with the given blocker. If the current thread + * is a ForkJoinWorkerThread, this method possibly arranges for a + * spare thread to be activated if necessary to ensure parallelism + * while the current thread is blocked. If + * maintainParallelism is true and the pool supports + * it ({@link #getMaintainsParallelism}), this method attempts to + * maintain the pool's nominal parallelism. Otherwise if activates + * a thread only if necessary to avoid complete starvation. This + * option may be preferable when blockages use timeouts, or are + * almost always brief. + * + *

If the caller is not a ForkJoinTask, this method is behaviorally + * equivalent to + *

+     *   while (!blocker.isReleasable())
+     *      if (blocker.block())
+     *         return;
+     * 
+ * If the caller is a ForkJoinTask, then the pool may first + * be expanded to ensure parallelism, and later adjusted. + * + * @param blocker the blocker + * @param maintainParallelism if true and supported by this pool, + * attempt to maintain the pool's nominal parallelism; otherwise + * activate a thread only if necessary to avoid complete + * starvation. + * @throws InterruptedException if blocker.block did so. + */ + public static void managedBlock(ManagedBlocker blocker, + boolean maintainParallelism) + throws InterruptedException { + Thread t = Thread.currentThread(); + ForkJoinPool pool = (t instanceof ForkJoinWorkerThread? + ((ForkJoinWorkerThread)t).pool : null); + if (!blocker.isReleasable()) { + try { + if (pool == null || + !pool.preBlock(blocker, maintainParallelism)) + awaitBlocker(blocker); + } finally { + if (pool != null) + pool.updateRunningCount(1); + } + } + } + + private static void awaitBlocker(ManagedBlocker blocker) + throws InterruptedException { + do;while (!blocker.isReleasable() && !blocker.block()); + } + + // AbstractExecutorService overrides + + protected RunnableFuture newTaskFor(Runnable runnable, T value) { + return new AdaptedRunnable(runnable, value); + } + + protected RunnableFuture newTaskFor(Callable callable) { + return new AdaptedCallable(callable); + } + + + // Temporary Unsafe mechanics for preliminary release + private static Unsafe getUnsafe() throws Throwable { + try { + return Unsafe.getUnsafe(); + } catch (SecurityException se) { + try { + return java.security.AccessController.doPrivileged + (new java.security.PrivilegedExceptionAction() { + public Unsafe run() throws Exception { + return getUnsafePrivileged(); + }}); + } catch (java.security.PrivilegedActionException e) { + throw e.getCause(); + } + } + } + + private static Unsafe getUnsafePrivileged() + throws NoSuchFieldException, IllegalAccessException { + Field f = Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + return (Unsafe) f.get(null); + } + + private static long fieldOffset(String fieldName) + throws NoSuchFieldException { + return _unsafe.objectFieldOffset + (ForkJoinPool.class.getDeclaredField(fieldName)); + } + + static final Unsafe _unsafe; + static final long eventCountOffset; + static final long workerCountsOffset; + static final long runControlOffset; + static final long syncStackOffset; + static final long spareStackOffset; + + static { + try { + _unsafe = getUnsafe(); + eventCountOffset = fieldOffset("eventCount"); + workerCountsOffset = fieldOffset("workerCounts"); + runControlOffset = fieldOffset("runControl"); + syncStackOffset = fieldOffset("syncStack"); + spareStackOffset = fieldOffset("spareStack"); + } catch (Throwable e) { + throw new RuntimeException("Could not initialize intrinsics", e); + } + } + + private boolean casEventCount(long cmp, long val) { + return _unsafe.compareAndSwapLong(this, eventCountOffset, cmp, val); + } + private boolean casWorkerCounts(int cmp, int val) { + return _unsafe.compareAndSwapInt(this, workerCountsOffset, cmp, val); + } + private boolean casRunControl(int cmp, int val) { + return _unsafe.compareAndSwapInt(this, runControlOffset, cmp, val); + } + private boolean casSpareStack(WaitQueueNode cmp, WaitQueueNode val) { + return _unsafe.compareAndSwapObject(this, spareStackOffset, cmp, val); + } + private boolean casBarrierStack(WaitQueueNode cmp, WaitQueueNode val) { + return _unsafe.compareAndSwapObject(this, syncStackOffset, cmp, val); + } +} diff --git a/src/jvm15-library/scala/concurrent/forkjoin/ForkJoinTask.java b/src/jvm15-library/scala/concurrent/forkjoin/ForkJoinTask.java new file mode 100644 index 0000000000..e6c0fa7bb4 --- /dev/null +++ b/src/jvm15-library/scala/concurrent/forkjoin/ForkJoinTask.java @@ -0,0 +1,1052 @@ +/* + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + */ + +package scala.concurrent.forkjoin; +import java.io.Serializable; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; +import sun.misc.Unsafe; +import java.lang.reflect.*; + +/** + * Abstract base class for tasks that run within a {@link + * ForkJoinPool}. A ForkJoinTask is a thread-like entity that is much + * lighter weight than a normal thread. Huge numbers of tasks and + * subtasks may be hosted by a small number of actual threads in a + * ForkJoinPool, at the price of some usage limitations. + * + *

A "main" ForkJoinTask begins execution when submitted to a + * {@link ForkJoinPool}. Once started, it will usually in turn start + * other subtasks. As indicated by the name of this class, many + * programs using ForkJoinTasks employ only methods fork + * and join, or derivatives such as + * invokeAll. However, this class also provides a number + * of other methods that can come into play in advanced usages, as + * well as extension mechanics that allow support of new forms of + * fork/join processing. + * + *

A ForkJoinTask is a lightweight form of {@link Future}. The + * efficiency of ForkJoinTasks stems from a set of restrictions (that + * are only partially statically enforceable) reflecting their + * intended use as computational tasks calculating pure functions or + * operating on purely isolated objects. The primary coordination + * mechanisms are {@link #fork}, that arranges asynchronous execution, + * and {@link #join}, that doesn't proceed until the task's result has + * been computed. Computations should avoid synchronized + * methods or blocks, and should minimize other blocking + * synchronization apart from joining other tasks or using + * synchronizers such as Phasers that are advertised to cooperate with + * fork/join scheduling. Tasks should also not perform blocking IO, + * and should ideally access variables that are completely independent + * of those accessed by other running tasks. Minor breaches of these + * restrictions, for example using shared output streams, may be + * tolerable in practice, but frequent use may result in poor + * performance, and the potential to indefinitely stall if the number + * of threads not waiting for IO or other external synchronization + * becomes exhausted. This usage restriction is in part enforced by + * not permitting checked exceptions such as IOExceptions + * to be thrown. However, computations may still encounter unchecked + * exceptions, that are rethrown to callers attempting join + * them. These exceptions may additionally include + * RejectedExecutionExceptions stemming from internal resource + * exhaustion such as failure to allocate internal task queues. + * + *

The primary method for awaiting completion and extracting + * results of a task is {@link #join}, but there are several variants: + * The {@link Future#get} methods support interruptible and/or timed + * waits for completion and report results using Future + * conventions. Method {@link #helpJoin} enables callers to actively + * execute other tasks while awaiting joins, which is sometimes more + * efficient but only applies when all subtasks are known to be + * strictly tree-structured. Method {@link #invoke} is semantically + * equivalent to fork(); join() but always attempts to + * begin execution in the current thread. The "quiet" forms + * of these methods do not extract results or report exceptions. These + * may be useful when a set of tasks are being executed, and you need + * to delay processing of results or exceptions until all complete. + * Method invokeAll (available in multiple versions) + * performs the most common form of parallel invocation: forking a set + * of tasks and joining them all. + * + *

The ForkJoinTask class is not usually directly subclassed. + * Instead, you subclass one of the abstract classes that support a + * particular style of fork/join processing. Normally, a concrete + * ForkJoinTask subclass declares fields comprising its parameters, + * established in a constructor, and then defines a compute + * method that somehow uses the control methods supplied by this base + * class. While these methods have public access (to allow + * instances of different task subclasses to call each others + * methods), some of them may only be called from within other + * ForkJoinTasks. Attempts to invoke them in other contexts result in + * exceptions or errors possibly including ClassCastException. + * + *

Most base support methods are final because their + * implementations are intrinsically tied to the underlying + * lightweight task scheduling framework, and so cannot be overridden. + * Developers creating new basic styles of fork/join processing should + * minimally implement protected methods + * exec, setRawResult, and + * getRawResult, while also introducing an abstract + * computational method that can be implemented in its subclasses, + * possibly relying on other protected methods provided + * by this class. + * + *

ForkJoinTasks should perform relatively small amounts of + * computations, othewise splitting into smaller tasks. As a very + * rough rule of thumb, a task should perform more than 100 and less + * than 10000 basic computational steps. If tasks are too big, then + * parellelism cannot improve throughput. If too small, then memory + * and internal task maintenance overhead may overwhelm processing. + * + *

ForkJoinTasks are Serializable, which enables them + * to be used in extensions such as remote execution frameworks. It is + * in general sensible to serialize tasks only before or after, but + * not during execution. Serialization is not relied on during + * execution itself. + */ +public abstract class ForkJoinTask implements Future, Serializable { + + /** + * Run control status bits packed into a single int to minimize + * footprint and to ensure atomicity (via CAS). Status is + * initially zero, and takes on nonnegative values until + * completed, upon which status holds COMPLETED. CANCELLED, or + * EXCEPTIONAL, which use the top 3 bits. Tasks undergoing + * blocking waits by other threads have SIGNAL_MASK bits set -- + * bit 15 for external (nonFJ) waits, and the rest a count of + * waiting FJ threads. (This representation relies on + * ForkJoinPool max thread limits). Completion of a stolen task + * with SIGNAL_MASK bits set awakens waiter via notifyAll. Even + * though suboptimal for some purposes, we use basic builtin + * wait/notify to take advantage of "monitor inflation" in JVMs + * that we would otherwise need to emulate to avoid adding further + * per-task bookkeeping overhead. Note that bits 16-28 are + * currently unused. Also value 0x80000000 is available as spare + * completion value. + */ + volatile int status; // accessed directy by pool and workers + + static final int COMPLETION_MASK = 0xe0000000; + static final int NORMAL = 0xe0000000; // == mask + static final int CANCELLED = 0xc0000000; + static final int EXCEPTIONAL = 0xa0000000; + static final int SIGNAL_MASK = 0x0000ffff; + static final int INTERNAL_SIGNAL_MASK = 0x00007fff; + static final int EXTERNAL_SIGNAL = 0x00008000; // top bit of low word + + /** + * Table of exceptions thrown by tasks, to enable reporting by + * callers. Because exceptions are rare, we don't directly keep + * them with task objects, but instead us a weak ref table. Note + * that cancellation exceptions don't appear in the table, but are + * instead recorded as status values. + * Todo: Use ConcurrentReferenceHashMap + */ + static final Map, Throwable> exceptionMap = + Collections.synchronizedMap + (new WeakHashMap, Throwable>()); + + // within-package utilities + + /** + * Get current worker thread, or null if not a worker thread + */ + static ForkJoinWorkerThread getWorker() { + Thread t = Thread.currentThread(); + return ((t instanceof ForkJoinWorkerThread)? + (ForkJoinWorkerThread)t : null); + } + + final boolean casStatus(int cmp, int val) { + return _unsafe.compareAndSwapInt(this, statusOffset, cmp, val); + } + + /** + * Workaround for not being able to rethrow unchecked exceptions. + */ + static void rethrowException(Throwable ex) { + if (ex != null) + _unsafe.throwException(ex); + } + + // Setting completion status + + /** + * Mark completion and wake up threads waiting to join this task. + * @param completion one of NORMAL, CANCELLED, EXCEPTIONAL + */ + final void setCompletion(int completion) { + ForkJoinPool pool = getPool(); + if (pool != null) { + int s; // Clear signal bits while setting completion status + do;while ((s = status) >= 0 && !casStatus(s, completion)); + + if ((s & SIGNAL_MASK) != 0) { + if ((s &= INTERNAL_SIGNAL_MASK) != 0) + pool.updateRunningCount(s); + synchronized(this) { notifyAll(); } + } + } + else + externallySetCompletion(completion); + } + + /** + * Version of setCompletion for non-FJ threads. Leaves signal + * bits for unblocked threads to adjust, and always notifies. + */ + private void externallySetCompletion(int completion) { + int s; + do;while ((s = status) >= 0 && + !casStatus(s, (s & SIGNAL_MASK) | completion)); + synchronized(this) { notifyAll(); } + } + + /** + * Sets status to indicate normal completion + */ + final void setNormalCompletion() { + // Try typical fast case -- single CAS, no signal, not already done. + // Manually expand casStatus to improve chances of inlining it + if (!_unsafe.compareAndSwapInt(this, statusOffset, 0, NORMAL)) + setCompletion(NORMAL); + } + + // internal waiting and notification + + /** + * Performs the actual monitor wait for awaitDone + */ + private void doAwaitDone() { + // Minimize lock bias and in/de-flation effects by maximizing + // chances of waiting inside sync + try { + while (status >= 0) + synchronized(this) { if (status >= 0) wait(); } + } catch (InterruptedException ie) { + onInterruptedWait(); + } + } + + /** + * Performs the actual monitor wait for awaitDone + */ + private void doAwaitDone(long startTime, long nanos) { + synchronized(this) { + try { + while (status >= 0) { + long nt = nanos - System.nanoTime() - startTime; + if (nt <= 0) + break; + wait(nt / 1000000, (int)(nt % 1000000)); + } + } catch (InterruptedException ie) { + onInterruptedWait(); + } + } + } + + // Awaiting completion + + /** + * Sets status to indicate there is joiner, then waits for join, + * surrounded with pool notifications. + * @return status upon exit + */ + private int awaitDone(ForkJoinWorkerThread w, boolean maintainParallelism) { + ForkJoinPool pool = w == null? null : w.pool; + int s; + while ((s = status) >= 0) { + if (casStatus(s, pool == null? s|EXTERNAL_SIGNAL : s+1)) { + if (pool == null || !pool.preJoin(this, maintainParallelism)) + doAwaitDone(); + if (((s = status) & INTERNAL_SIGNAL_MASK) != 0) + adjustPoolCountsOnUnblock(pool); + break; + } + } + return s; + } + + /** + * Timed version of awaitDone + * @return status upon exit + */ + private int awaitDone(ForkJoinWorkerThread w, long nanos) { + ForkJoinPool pool = w == null? null : w.pool; + int s; + while ((s = status) >= 0) { + if (casStatus(s, pool == null? s|EXTERNAL_SIGNAL : s+1)) { + long startTime = System.nanoTime(); + if (pool == null || !pool.preJoin(this, false)) + doAwaitDone(startTime, nanos); + if ((s = status) >= 0) { + adjustPoolCountsOnCancelledWait(pool); + s = status; + } + if (s < 0 && (s & INTERNAL_SIGNAL_MASK) != 0) + adjustPoolCountsOnUnblock(pool); + break; + } + } + return s; + } + + /** + * Notify pool that thread is unblocked. Called by signalled + * threads when woken by non-FJ threads (which is atypical). + */ + private void adjustPoolCountsOnUnblock(ForkJoinPool pool) { + int s; + do;while ((s = status) < 0 && !casStatus(s, s & COMPLETION_MASK)); + if (pool != null && (s &= INTERNAL_SIGNAL_MASK) != 0) + pool.updateRunningCount(s); + } + + /** + * Notify pool to adjust counts on cancelled or timed out wait + */ + private void adjustPoolCountsOnCancelledWait(ForkJoinPool pool) { + if (pool != null) { + int s; + while ((s = status) >= 0 && (s & INTERNAL_SIGNAL_MASK) != 0) { + if (casStatus(s, s - 1)) { + pool.updateRunningCount(1); + break; + } + } + } + } + + /** + * Handle interruptions during waits. + */ + private void onInterruptedWait() { + ForkJoinWorkerThread w = getWorker(); + if (w == null) + Thread.currentThread().interrupt(); // re-interrupt + else if (w.isTerminating()) + cancelIgnoringExceptions(); + // else if FJworker, ignore interrupt + } + + // Recording and reporting exceptions + + private void setDoneExceptionally(Throwable rex) { + exceptionMap.put(this, rex); + setCompletion(EXCEPTIONAL); + } + + /** + * Throws the exception associated with status s; + * @throws the exception + */ + private void reportException(int s) { + if ((s &= COMPLETION_MASK) < NORMAL) { + if (s == CANCELLED) + throw new CancellationException(); + else + rethrowException(exceptionMap.get(this)); + } + } + + /** + * Returns result or throws exception using j.u.c.Future conventions + * Only call when isDone known to be true. + */ + private V reportFutureResult() + throws ExecutionException, InterruptedException { + int s = status & COMPLETION_MASK; + if (s < NORMAL) { + Throwable ex; + if (s == CANCELLED) + throw new CancellationException(); + if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null) + throw new ExecutionException(ex); + if (Thread.interrupted()) + throw new InterruptedException(); + } + return getRawResult(); + } + + /** + * Returns result or throws exception using j.u.c.Future conventions + * with timeouts + */ + private V reportTimedFutureResult() + throws InterruptedException, ExecutionException, TimeoutException { + Throwable ex; + int s = status & COMPLETION_MASK; + if (s == NORMAL) + return getRawResult(); + if (s == CANCELLED) + throw new CancellationException(); + if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null) + throw new ExecutionException(ex); + if (Thread.interrupted()) + throw new InterruptedException(); + throw new TimeoutException(); + } + + // internal execution methods + + /** + * Calls exec, recording completion, and rethrowing exception if + * encountered. Caller should normally check status before calling + * @return true if completed normally + */ + private boolean tryExec() { + try { // try block must contain only call to exec + if (!exec()) + return false; + } catch (Throwable rex) { + setDoneExceptionally(rex); + rethrowException(rex); + return false; // not reached + } + setNormalCompletion(); + return true; + } + + /** + * Main execution method used by worker threads. Invokes + * base computation unless already complete + */ + final void quietlyExec() { + if (status >= 0) { + try { + if (!exec()) + return; + } catch(Throwable rex) { + setDoneExceptionally(rex); + return; + } + setNormalCompletion(); + } + } + + /** + * Calls exec, recording but not rethrowing exception + * Caller should normally check status before calling + * @return true if completed normally + */ + private boolean tryQuietlyInvoke() { + try { + if (!exec()) + return false; + } catch (Throwable rex) { + setDoneExceptionally(rex); + return false; + } + setNormalCompletion(); + return true; + } + + /** + * Cancel, ignoring any exceptions it throws + */ + final void cancelIgnoringExceptions() { + try { + cancel(false); + } catch(Throwable ignore) { + } + } + + /** + * Main implementation of helpJoin + */ + private int busyJoin(ForkJoinWorkerThread w) { + int s; + ForkJoinTask t; + while ((s = status) >= 0 && (t = w.scanWhileJoining(this)) != null) + t.quietlyExec(); + return (s >= 0)? awaitDone(w, false) : s; // block if no work + } + + // public methods + + /** + * Arranges to asynchronously execute this task. While it is not + * necessarily enforced, it is a usage error to fork a task more + * than once unless it has completed and been reinitialized. This + * method may be invoked only from within ForkJoinTask + * computations. Attempts to invoke in other contexts result in + * exceptions or errors possibly including ClassCastException. + */ + public final void fork() { + ((ForkJoinWorkerThread)(Thread.currentThread())).pushTask(this); + } + + /** + * Returns the result of the computation when it is ready. + * This method differs from get in that abnormal + * completion results in RuntimeExceptions or Errors, not + * ExecutionExceptions. + * + * @return the computed result + */ + public final V join() { + ForkJoinWorkerThread w = getWorker(); + if (w == null || status < 0 || !w.unpushTask(this) || !tryExec()) + reportException(awaitDone(w, true)); + return getRawResult(); + } + + /** + * Commences performing this task, awaits its completion if + * necessary, and return its result. + * @throws Throwable (a RuntimeException, Error, or unchecked + * exception) if the underlying computation did so. + * @return the computed result + */ + public final V invoke() { + if (status >= 0 && tryExec()) + return getRawResult(); + else + return join(); + } + + /** + * Forks both tasks, returning when isDone holds for + * both of them or an exception is encountered. This method may be + * invoked only from within ForkJoinTask computations. Attempts to + * invoke in other contexts result in exceptions or errors + * possibly including ClassCastException. + * @param t1 one task + * @param t2 the other task + * @throws NullPointerException if t1 or t2 are null + * @throws RuntimeException or Error if either task did so. + */ + public static void invokeAll(ForkJoinTaskt1, ForkJoinTask t2) { + t2.fork(); + t1.invoke(); + t2.join(); + } + + /** + * Forks the given tasks, returning when isDone holds + * for all of them. If any task encounters an exception, others + * may be cancelled. This method may be invoked only from within + * ForkJoinTask computations. Attempts to invoke in other contexts + * result in exceptions or errors possibly including ClassCastException. + * @param tasks the array of tasks + * @throws NullPointerException if tasks or any element are null. + * @throws RuntimeException or Error if any task did so. + */ + public static void invokeAll(ForkJoinTask... tasks) { + Throwable ex = null; + int last = tasks.length - 1; + for (int i = last; i >= 0; --i) { + ForkJoinTask t = tasks[i]; + if (t == null) { + if (ex == null) + ex = new NullPointerException(); + } + else if (i != 0) + t.fork(); + else { + t.quietlyInvoke(); + if (ex == null) + ex = t.getException(); + } + } + for (int i = 1; i <= last; ++i) { + ForkJoinTask t = tasks[i]; + if (t != null) { + if (ex != null) + t.cancel(false); + else { + t.quietlyJoin(); + if (ex == null) + ex = t.getException(); + } + } + } + if (ex != null) + rethrowException(ex); + } + + /** + * Forks all tasks in the collection, returning when + * isDone holds for all of them. If any task + * encounters an exception, others may be cancelled. This method + * may be invoked only from within ForkJoinTask + * computations. Attempts to invoke in other contexts resul!t in + * exceptions or errors possibly including ClassCastException. + * @param tasks the collection of tasks + * @throws NullPointerException if tasks or any element are null. + * @throws RuntimeException or Error if any task did so. + */ + public static void invokeAll(Collection> tasks) { + if (!(tasks instanceof List)) { + invokeAll(tasks.toArray(new ForkJoinTask[tasks.size()])); + return; + } + List> ts = + (List>)tasks; + Throwable ex = null; + int last = ts.size() - 1; + for (int i = last; i >= 0; --i) { + ForkJoinTask t = ts.get(i); + if (t == null) { + if (ex == null) + ex = new NullPointerException(); + } + else if (i != 0) + t.fork(); + else { + t.quietlyInvoke(); + if (ex == null) + ex = t.getException(); + } + } + for (int i = 1; i <= last; ++i) { + ForkJoinTask t = ts.get(i); + if (t != null) { + if (ex != null) + t.cancel(false); + else { + t.quietlyJoin(); + if (ex == null) + ex = t.getException(); + } + } + } + if (ex != null) + rethrowException(ex); + } + + /** + * Returns true if the computation performed by this task has + * completed (or has been cancelled). + * @return true if this computation has completed + */ + public final boolean isDone() { + return status < 0; + } + + /** + * Returns true if this task was cancelled. + * @return true if this task was cancelled + */ + public final boolean isCancelled() { + return (status & COMPLETION_MASK) == CANCELLED; + } + + /** + * Asserts that the results of this task's computation will not be + * used. If a cancellation occurs before atempting to execute this + * task, then execution will be suppressed, isCancelled + * will report true, and join will result in a + * CancellationException being thrown. Otherwise, when + * cancellation races with completion, there are no guarantees + * about whether isCancelled will report true, whether + * join will return normally or via an exception, or + * whether these behaviors will remain consistent upon repeated + * invocation. + * + *

This method may be overridden in subclasses, but if so, must + * still ensure that these minimal properties hold. In particular, + * the cancel method itself must not throw exceptions. + * + *

This method is designed to be invoked by other + * tasks. To terminate the current task, you can just return or + * throw an unchecked exception from its computation method, or + * invoke completeExceptionally. + * + * @param mayInterruptIfRunning this value is ignored in the + * default implementation because tasks are not in general + * cancelled via interruption. + * + * @return true if this task is now cancelled + */ + public boolean cancel(boolean mayInterruptIfRunning) { + setCompletion(CANCELLED); + return (status & COMPLETION_MASK) == CANCELLED; + } + + /** + * Returns true if this task threw an exception or was cancelled + * @return true if this task threw an exception or was cancelled + */ + public final boolean isCompletedAbnormally() { + return (status & COMPLETION_MASK) < NORMAL; + } + + /** + * Returns the exception thrown by the base computation, or a + * CancellationException if cancelled, or null if none or if the + * method has not yet completed. + * @return the exception, or null if none + */ + public final Throwable getException() { + int s = status & COMPLETION_MASK; + if (s >= NORMAL) + return null; + if (s == CANCELLED) + return new CancellationException(); + return exceptionMap.get(this); + } + + /** + * Completes this task abnormally, and if not already aborted or + * cancelled, causes it to throw the given exception upon + * join and related operations. This method may be used + * to induce exceptions in asynchronous tasks, or to force + * completion of tasks that would not otherwise complete. Its use + * in other situations is likely to be wrong. This method is + * overridable, but overridden versions must invoke super + * implementation to maintain guarantees. + * + * @param ex the exception to throw. If this exception is + * not a RuntimeException or Error, the actual exception thrown + * will be a RuntimeException with cause ex. + */ + public void completeExceptionally(Throwable ex) { + setDoneExceptionally((ex instanceof RuntimeException) || + (ex instanceof Error)? ex : + new RuntimeException(ex)); + } + + /** + * Completes this task, and if not already aborted or cancelled, + * returning a null result upon join and related + * operations. This method may be used to provide results for + * asynchronous tasks, or to provide alternative handling for + * tasks that would not otherwise complete normally. Its use in + * other situations is likely to be wrong. This method is + * overridable, but overridden versions must invoke super + * implementation to maintain guarantees. + * + * @param value the result value for this task. + */ + public void complete(V value) { + try { + setRawResult(value); + } catch(Throwable rex) { + setDoneExceptionally(rex); + return; + } + setNormalCompletion(); + } + + public final V get() throws InterruptedException, ExecutionException { + ForkJoinWorkerThread w = getWorker(); + if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke()) + awaitDone(w, true); + return reportFutureResult(); + } + + public final V get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + ForkJoinWorkerThread w = getWorker(); + if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke()) + awaitDone(w, unit.toNanos(timeout)); + return reportTimedFutureResult(); + } + + /** + * Possibly executes other tasks until this task is ready, then + * returns the result of the computation. This method may be more + * efficient than join, but is only applicable when + * there are no potemtial dependencies between continuation of the + * current task and that of any other task that might be executed + * while helping. (This usually holds for pure divide-and-conquer + * tasks). This method may be invoked only from within + * ForkJoinTask computations. Attempts to invoke in other contexts + * resul!t in exceptions or errors possibly including ClassCastException. + * @return the computed result + */ + public final V helpJoin() { + ForkJoinWorkerThread w = (ForkJoinWorkerThread)(Thread.currentThread()); + if (status < 0 || !w.unpushTask(this) || !tryExec()) + reportException(busyJoin(w)); + return getRawResult(); + } + + /** + * Possibly executes other tasks until this task is ready. This + * method may be invoked only from within ForkJoinTask + * computations. Attempts to invoke in other contexts resul!t in + * exceptions or errors possibly including ClassCastException. + */ + public final void quietlyHelpJoin() { + if (status >= 0) { + ForkJoinWorkerThread w = + (ForkJoinWorkerThread)(Thread.currentThread()); + if (!w.unpushTask(this) || !tryQuietlyInvoke()) + busyJoin(w); + } + } + + /** + * Joins this task, without returning its result or throwing an + * exception. This method may be useful when processing + * collections of tasks when some have been cancelled or otherwise + * known to have aborted. + */ + public final void quietlyJoin() { + if (status >= 0) { + ForkJoinWorkerThread w = getWorker(); + if (w == null || !w.unpushTask(this) || !tryQuietlyInvoke()) + awaitDone(w, true); + } + } + + /** + * Commences performing this task and awaits its completion if + * necessary, without returning its result or throwing an + * exception. This method may be useful when processing + * collections of tasks when some have been cancelled or otherwise + * known to have aborted. + */ + public final void quietlyInvoke() { + if (status >= 0 && !tryQuietlyInvoke()) + quietlyJoin(); + } + + /** + * Possibly executes tasks until the pool hosting the current task + * {@link ForkJoinPool#isQuiescent}. This method may be of use in + * designs in which many tasks are forked, but none are explicitly + * joined, instead executing them until all are processed. + */ + public static void helpQuiesce() { + ((ForkJoinWorkerThread)(Thread.currentThread())). + helpQuiescePool(); + } + + /** + * Resets the internal bookkeeping state of this task, allowing a + * subsequent fork. This method allows repeated reuse of + * this task, but only if reuse occurs when this task has either + * never been forked, or has been forked, then completed and all + * outstanding joins of this task have also completed. Effects + * under any other usage conditions are not guaranteed, and are + * almost surely wrong. This method may be useful when executing + * pre-constructed trees of subtasks in loops. + */ + public void reinitialize() { + if ((status & COMPLETION_MASK) == EXCEPTIONAL) + exceptionMap.remove(this); + status = 0; + } + + /** + * Returns the pool hosting the current task execution, or null + * if this task is executing outside of any pool. + * @return the pool, or null if none. + */ + public static ForkJoinPool getPool() { + Thread t = Thread.currentThread(); + return ((t instanceof ForkJoinWorkerThread)? + ((ForkJoinWorkerThread)t).pool : null); + } + + /** + * Tries to unschedule this task for execution. This method will + * typically succeed if this task is the most recently forked task + * by the current thread, and has not commenced executing in + * another thread. This method may be useful when arranging + * alternative local processing of tasks that could have been, but + * were not, stolen. This method may be invoked only from within + * ForkJoinTask computations. Attempts to invoke in other contexts + * result in exceptions or errors possibly including ClassCastException. + * @return true if unforked + */ + public boolean tryUnfork() { + return ((ForkJoinWorkerThread)(Thread.currentThread())).unpushTask(this); + } + + /** + * Returns an estimate of the number of tasks that have been + * forked by the current worker thread but not yet executed. This + * value may be useful for heuristic decisions about whether to + * fork other tasks. + * @return the number of tasks + */ + public static int getQueuedTaskCount() { + return ((ForkJoinWorkerThread)(Thread.currentThread())). + getQueueSize(); + } + + /** + * Returns a estimate of how many more locally queued tasks are + * held by the current worker thread than there are other worker + * threads that might steal them. This value may be useful for + * heuristic decisions about whether to fork other tasks. In many + * usages of ForkJoinTasks, at steady state, each worker should + * aim to maintain a small constant surplus (for example, 3) of + * tasks, and to process computations locally if this threshold is + * exceeded. + * @return the surplus number of tasks, which may be negative + */ + public static int getSurplusQueuedTaskCount() { + return ((ForkJoinWorkerThread)(Thread.currentThread())) + .getEstimatedSurplusTaskCount(); + } + + // Extension methods + + /** + * Returns the result that would be returned by join, + * even if this task completed abnormally, or null if this task is + * not known to have been completed. This method is designed to + * aid debugging, as well as to support extensions. Its use in any + * other context is discouraged. + * + * @return the result, or null if not completed. + */ + public abstract V getRawResult(); + + /** + * Forces the given value to be returned as a result. This method + * is designed to support extensions, and should not in general be + * called otherwise. + * + * @param value the value + */ + protected abstract void setRawResult(V value); + + /** + * Immediately performs the base action of this task. This method + * is designed to support extensions, and should not in general be + * called otherwise. The return value controls whether this task + * is considered to be done normally. It may return false in + * asynchronous actions that require explicit invocations of + * complete to become joinable. It may throw exceptions + * to indicate abnormal exit. + * @return true if completed normally + * @throws Error or RuntimeException if encountered during computation + */ + protected abstract boolean exec(); + + /** + * Returns, but does not unschedule or execute, the task queued by + * the current thread but not yet executed, if one is + * available. There is no guarantee that this task will actually + * be polled or executed next. This method is designed primarily + * to support extensions, and is unlikely to be useful otherwise. + * This method may be invoked only from within ForkJoinTask + * computations. Attempts to invoke in other contexts result in + * exceptions or errors possibly including ClassCastException. + * + * @return the next task, or null if none are available + */ + protected static ForkJoinTask peekNextLocalTask() { + return ((ForkJoinWorkerThread)(Thread.currentThread())).peekTask(); + } + + /** + * Unschedules and returns, without executing, the next task + * queued by the current thread but not yet executed. This method + * is designed primarily to support extensions, and is unlikely to + * be useful otherwise. This method may be invoked only from + * within ForkJoinTask computations. Attempts to invoke in other + * contexts result in exceptions or errors possibly including + * ClassCastException. + * + * @return the next task, or null if none are available + */ + protected static ForkJoinTask pollNextLocalTask() { + return ((ForkJoinWorkerThread)(Thread.currentThread())).pollLocalTask(); + } + + /** + * Unschedules and returns, without executing, the next task + * queued by the current thread but not yet executed, if one is + * available, or if not available, a task that was forked by some + * other thread, if available. Availability may be transient, so a + * null result does not necessarily imply quiecence + * of the pool this task is operating in. This method is designed + * primarily to support extensions, and is unlikely to be useful + * otherwise. This method may be invoked only from within + * ForkJoinTask computations. Attempts to invoke in other contexts + * result in exceptions or errors possibly including + * ClassCastException. + * + * @return a task, or null if none are available + */ + protected static ForkJoinTask pollTask() { + return ((ForkJoinWorkerThread)(Thread.currentThread())). + pollTask(); + } + + // Serialization support + + private static final long serialVersionUID = -7721805057305804111L; + + /** + * Save the state to a stream. + * + * @serialData the current run status and the exception thrown + * during execution, or null if none. + * @param s the stream + */ + private void writeObject(java.io.ObjectOutputStream s) + throws java.io.IOException { + s.defaultWriteObject(); + s.writeObject(getException()); + } + + /** + * Reconstitute the instance from a stream. + * @param s the stream + */ + private void readObject(java.io.ObjectInputStream s) + throws java.io.IOException, ClassNotFoundException { + s.defaultReadObject(); + status &= ~INTERNAL_SIGNAL_MASK; // clear internal signal counts + status |= EXTERNAL_SIGNAL; // conservatively set external signal + Object ex = s.readObject(); + if (ex != null) + setDoneExceptionally((Throwable)ex); + } + + // Temporary Unsafe mechanics for preliminary release + private static Unsafe getUnsafe() throws Throwable { + try { + return Unsafe.getUnsafe(); + } catch (SecurityException se) { + try { + return java.security.AccessController.doPrivileged + (new java.security.PrivilegedExceptionAction() { + public Unsafe run() throws Exception { + return getUnsafePrivileged(); + }}); + } catch (java.security.PrivilegedActionException e) { + throw e.getCause(); + } + } + } + + private static Unsafe getUnsafePrivileged() + throws NoSuchFieldException, IllegalAccessException { + Field f = Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + return (Unsafe) f.get(null); + } + + private static long fieldOffset(String fieldName) + throws NoSuchFieldException { + return _unsafe.objectFieldOffset + (ForkJoinTask.class.getDeclaredField(fieldName)); + } + + static final Unsafe _unsafe; + static final long statusOffset; + + static { + try { + _unsafe = getUnsafe(); + statusOffset = fieldOffset("status"); + } catch (Throwable e) { + throw new RuntimeException("Could not initialize intrinsics", e); + } + } + +} diff --git a/src/jvm15-library/scala/concurrent/forkjoin/ForkJoinWorkerThread.java b/src/jvm15-library/scala/concurrent/forkjoin/ForkJoinWorkerThread.java new file mode 100644 index 0000000000..941f5ec0cb --- /dev/null +++ b/src/jvm15-library/scala/concurrent/forkjoin/ForkJoinWorkerThread.java @@ -0,0 +1,775 @@ +/* + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + */ + +package scala.concurrent.forkjoin; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; +import java.util.concurrent.locks.*; +import sun.misc.Unsafe; +import java.lang.reflect.*; + +/** + * A thread managed by a {@link ForkJoinPool}. This class is + * subclassable solely for the sake of adding functionality -- there + * are no overridable methods dealing with scheduling or + * execution. However, you can override initialization and termination + * methods surrounding the main task processing loop. If you do + * create such a subclass, you will also need to supply a custom + * ForkJoinWorkerThreadFactory to use it in a ForkJoinPool. + * + */ +public class ForkJoinWorkerThread extends Thread { + /* + * Algorithm overview: + * + * 1. Work-Stealing: Work-stealing queues are special forms of + * Deques that support only three of the four possible + * end-operations -- push, pop, and deq (aka steal), and only do + * so under the constraints that push and pop are called only from + * the owning thread, while deq may be called from other threads. + * (If you are unfamiliar with them, you probably want to read + * Herlihy and Shavit's book "The Art of Multiprocessor + * programming", chapter 16 describing these in more detail before + * proceeding.) The main work-stealing queue design is roughly + * similar to "Dynamic Circular Work-Stealing Deque" by David + * Chase and Yossi Lev, SPAA 2005 + * (http://research.sun.com/scalable/pubs/index.html). The main + * difference ultimately stems from gc requirements that we null + * out taken slots as soon as we can, to maintain as small a + * footprint as possible even in programs generating huge numbers + * of tasks. To accomplish this, we shift the CAS arbitrating pop + * vs deq (steal) from being on the indices ("base" and "sp") to + * the slots themselves (mainly via method "casSlotNull()"). So, + * both a successful pop and deq mainly entail CAS'ing a nonnull + * slot to null. Because we rely on CASes of references, we do + * not need tag bits on base or sp. They are simple ints as used + * in any circular array-based queue (see for example ArrayDeque). + * Updates to the indices must still be ordered in a way that + * guarantees that (sp - base) > 0 means the queue is empty, but + * otherwise may err on the side of possibly making the queue + * appear nonempty when a push, pop, or deq have not fully + * committed. Note that this means that the deq operation, + * considered individually, is not wait-free. One thief cannot + * successfully continue until another in-progress one (or, if + * previously empty, a push) completes. However, in the + * aggregate, we ensure at least probablistic non-blockingness. If + * an attempted steal fails, a thief always chooses a different + * random victim target to try next. So, in order for one thief to + * progress, it suffices for any in-progress deq or new push on + * any empty queue to complete. One reason this works well here is + * that apparently-nonempty often means soon-to-be-stealable, + * which gives threads a chance to activate if necessary before + * stealing (see below). + * + * Efficient implementation of this approach currently relies on + * an uncomfortable amount of "Unsafe" mechanics. To maintain + * correct orderings, reads and writes of variable base require + * volatile ordering. Variable sp does not require volatile write + * but needs cheaper store-ordering on writes. Because they are + * protected by volatile base reads, reads of the queue array and + * its slots do not need volatile load semantics, but writes (in + * push) require store order and CASes (in pop and deq) require + * (volatile) CAS semantics. Since these combinations aren't + * supported using ordinary volatiles, the only way to accomplish + * these effciently is to use direct Unsafe calls. (Using external + * AtomicIntegers and AtomicReferenceArrays for the indices and + * array is significantly slower because of memory locality and + * indirection effects.) Further, performance on most platforms is + * very sensitive to placement and sizing of the (resizable) queue + * array. Even though these queues don't usually become all that + * big, the initial size must be large enough to counteract cache + * contention effects across multiple queues (especially in the + * presence of GC cardmarking). Also, to improve thread-locality, + * queues are currently initialized immediately after the thread + * gets the initial signal to start processing tasks. However, + * all queue-related methods except pushTask are written in a way + * that allows them to instead be lazily allocated and/or disposed + * of when empty. All together, these low-level implementation + * choices produce as much as a factor of 4 performance + * improvement compared to naive implementations, and enable the + * processing of billions of tasks per second, sometimes at the + * expense of ugliness. + * + * 2. Run control: The primary run control is based on a global + * counter (activeCount) held by the pool. It uses an algorithm + * similar to that in Herlihy and Shavit section 17.6 to cause + * threads to eventually block when all threads declare they are + * inactive. (See variable "scans".) For this to work, threads + * must be declared active when executing tasks, and before + * stealing a task. They must be inactive before blocking on the + * Pool Barrier (awaiting a new submission or other Pool + * event). In between, there is some free play which we take + * advantage of to avoid contention and rapid flickering of the + * global activeCount: If inactive, we activate only if a victim + * queue appears to be nonempty (see above). Similarly, a thread + * tries to inactivate only after a full scan of other threads. + * The net effect is that contention on activeCount is rarely a + * measurable performance issue. (There are also a few other cases + * where we scan for work rather than retry/block upon + * contention.) + * + * 3. Selection control. We maintain policy of always choosing to + * run local tasks rather than stealing, and always trying to + * steal tasks before trying to run a new submission. All steals + * are currently performed in randomly-chosen deq-order. It may be + * worthwhile to bias these with locality / anti-locality + * information, but doing this well probably requires more + * lower-level information from JVMs than currently provided. + */ + + /** + * Capacity of work-stealing queue array upon initialization. + * Must be a power of two. Initial size must be at least 2, but is + * padded to minimize cache effects. + */ + private static final int INITIAL_QUEUE_CAPACITY = 1 << 13; + + /** + * Maximum work-stealing queue array size. Must be less than or + * equal to 1 << 28 to ensure lack of index wraparound. (This + * is less than usual bounds, because we need leftshift by 3 + * to be in int range). + */ + private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 28; + + /** + * The pool this thread works in. Accessed directly by ForkJoinTask + */ + final ForkJoinPool pool; + + /** + * The work-stealing queue array. Size must be a power of two. + * Initialized when thread starts, to improve memory locality. + */ + private ForkJoinTask[] queue; + + /** + * Index (mod queue.length) of next queue slot to push to or pop + * from. It is written only by owner thread, via ordered store. + * Both sp and base are allowed to wrap around on overflow, but + * (sp - base) still estimates size. + */ + private volatile int sp; + + /** + * Index (mod queue.length) of least valid queue slot, which is + * always the next position to steal from if nonempty. + */ + private volatile int base; + + /** + * Activity status. When true, this worker is considered active. + * Must be false upon construction. It must be true when executing + * tasks, and BEFORE stealing a task. It must be false before + * calling pool.sync + */ + private boolean active; + + /** + * Run state of this worker. Supports simple versions of the usual + * shutdown/shutdownNow control. + */ + private volatile int runState; + + /** + * Seed for random number generator for choosing steal victims. + * Uses Marsaglia xorshift. Must be nonzero upon initialization. + */ + private int seed; + + /** + * Number of steals, transferred to pool when idle + */ + private int stealCount; + + /** + * Index of this worker in pool array. Set once by pool before + * running, and accessed directly by pool during cleanup etc + */ + int poolIndex; + + /** + * The last barrier event waited for. Accessed in pool callback + * methods, but only by current thread. + */ + long lastEventCount; + + /** + * True if use local fifo, not default lifo, for local polling + */ + private boolean locallyFifo; + + /** + * Creates a ForkJoinWorkerThread operating in the given pool. + * @param pool the pool this thread works in + * @throws NullPointerException if pool is null + */ + protected ForkJoinWorkerThread(ForkJoinPool pool) { + if (pool == null) throw new NullPointerException(); + this.pool = pool; + // Note: poolIndex is set by pool during construction + // Remaining initialization is deferred to onStart + } + + // Public access methods + + /** + * Returns the pool hosting this thread + * @return the pool + */ + public ForkJoinPool getPool() { + return pool; + } + + /** + * Returns the index number of this thread in its pool. The + * returned value ranges from zero to the maximum number of + * threads (minus one) that have ever been created in the pool. + * This method may be useful for applications that track status or + * collect results per-worker rather than per-task. + * @return the index number. + */ + public int getPoolIndex() { + return poolIndex; + } + + /** + * Establishes local first-in-first-out scheduling mode for forked + * tasks that are never joined. + * @param async if true, use locally FIFO scheduling + */ + void setAsyncMode(boolean async) { + locallyFifo = async; + } + + // Runstate management + + // Runstate values. Order matters + private static final int RUNNING = 0; + private static final int SHUTDOWN = 1; + private static final int TERMINATING = 2; + private static final int TERMINATED = 3; + + final boolean isShutdown() { return runState >= SHUTDOWN; } + final boolean isTerminating() { return runState >= TERMINATING; } + final boolean isTerminated() { return runState == TERMINATED; } + final boolean shutdown() { return transitionRunStateTo(SHUTDOWN); } + final boolean shutdownNow() { return transitionRunStateTo(TERMINATING); } + + /** + * Transition to at least the given state. Return true if not + * already at least given state. + */ + private boolean transitionRunStateTo(int state) { + for (;;) { + int s = runState; + if (s >= state) + return false; + if (_unsafe.compareAndSwapInt(this, runStateOffset, s, state)) + return true; + } + } + + /** + * Try to set status to active; fail on contention + */ + private boolean tryActivate() { + if (!active) { + if (!pool.tryIncrementActiveCount()) + return false; + active = true; + } + return true; + } + + /** + * Try to set status to active; fail on contention + */ + private boolean tryInactivate() { + if (active) { + if (!pool.tryDecrementActiveCount()) + return false; + active = false; + } + return true; + } + + /** + * Computes next value for random victim probe. Scans don't + * require a very high quality generator, but also not a crummy + * one. Marsaglia xor-shift is cheap and works well. + */ + private static int xorShift(int r) { + r ^= r << 1; + r ^= r >>> 3; + r ^= r << 10; + return r; + } + + // Lifecycle methods + + /** + * This method is required to be public, but should never be + * called explicitly. It performs the main run loop to execute + * ForkJoinTasks. + */ + public void run() { + Throwable exception = null; + try { + onStart(); + pool.sync(this); // await first pool event + mainLoop(); + } catch (Throwable ex) { + exception = ex; + } finally { + onTermination(exception); + } + } + + /** + * Execute tasks until shut down. + */ + private void mainLoop() { + while (!isShutdown()) { + ForkJoinTask t = pollTask(); + if (t != null || (t = pollSubmission()) != null) + t.quietlyExec(); + else if (tryInactivate()) + pool.sync(this); + } + } + + /** + * Initializes internal state after construction but before + * processing any tasks. If you override this method, you must + * invoke super.onStart() at the beginning of the method. + * Initialization requires care: Most fields must have legal + * default values, to ensure that attempted accesses from other + * threads work correctly even before this thread starts + * processing tasks. + */ + protected void onStart() { + // Allocate while starting to improve chances of thread-local + // isolation + queue = new ForkJoinTask[INITIAL_QUEUE_CAPACITY]; + // Initial value of seed need not be especially random but + // should differ across workers and must be nonzero + int p = poolIndex + 1; + seed = p + (p << 8) + (p << 16) + (p << 24); // spread bits + } + + /** + * Perform cleanup associated with termination of this worker + * thread. If you override this method, you must invoke + * super.onTermination at the end of the overridden method. + * + * @param exception the exception causing this thread to abort due + * to an unrecoverable error, or null if completed normally. + */ + protected void onTermination(Throwable exception) { + // Execute remaining local tasks unless aborting or terminating + while (exception == null && !pool.isTerminating() && base != sp) { + try { + ForkJoinTask t = popTask(); + if (t != null) + t.quietlyExec(); + } catch(Throwable ex) { + exception = ex; + } + } + // Cancel other tasks, transition status, notify pool, and + // propagate exception to uncaught exception handler + try { + do;while (!tryInactivate()); // ensure inactive + cancelTasks(); + runState = TERMINATED; + pool.workerTerminated(this); + } catch (Throwable ex) { // Shouldn't ever happen + if (exception == null) // but if so, at least rethrown + exception = ex; + } finally { + if (exception != null) + ForkJoinTask.rethrowException(exception); + } + } + + // Intrinsics-based support for queue operations. + + /** + * Add in store-order the given task at given slot of q to + * null. Caller must ensure q is nonnull and index is in range. + */ + private static void setSlot(ForkJoinTask[] q, int i, + ForkJoinTask t){ +//TR _unsafe.putOrderedObject(q, (i << qShift) + qBase, t); + _unsafe.putObjectVolatile((Object)q, (i << qShift) + qBase, (Object)t); + } + + /** + * CAS given slot of q to null. Caller must ensure q is nonnull + * and index is in range. + */ + private static boolean casSlotNull(ForkJoinTask[] q, int i, + ForkJoinTask t) { + return _unsafe.compareAndSwapObject(q, (i << qShift) + qBase, t, null); + } + + /** + * Sets sp in store-order. + */ + private void storeSp(int s) { +//TR _unsafe.putOrderedInt(this, spOffset, s); + _unsafe.putIntVolatile(this, spOffset, s); + } + + // Main queue methods + + /** + * Pushes a task. Called only by current thread. + * @param t the task. Caller must ensure nonnull + */ + final void pushTask(ForkJoinTask t) { + ForkJoinTask[] q = queue; + int mask = q.length - 1; + int s = sp; + setSlot(q, s & mask, t); + storeSp(++s); + if ((s -= base) == 1) + pool.signalWork(); + else if (s >= mask) + growQueue(); + } + + /** + * Tries to take a task from the base of the queue, failing if + * either empty or contended. + * @return a task, or null if none or contended. + */ + final ForkJoinTask deqTask() { + ForkJoinTask t; + ForkJoinTask[] q; + int i; + int b; + if (sp != (b = base) && + (q = queue) != null && // must read q after b + (t = q[i = (q.length - 1) & b]) != null && + casSlotNull(q, i, t)) { + base = b + 1; + return t; + } + return null; + } + + /** + * Returns a popped task, or null if empty. Ensures active status + * if nonnull. Called only by current thread. + */ + final ForkJoinTask popTask() { + int s = sp; + while (s != base) { + if (tryActivate()) { + ForkJoinTask[] q = queue; + int mask = q.length - 1; + int i = (s - 1) & mask; + ForkJoinTask t = q[i]; + if (t == null || !casSlotNull(q, i, t)) + break; + storeSp(s - 1); + return t; + } + } + return null; + } + + /** + * Specialized version of popTask to pop only if + * topmost element is the given task. Called only + * by current thread while active. + * @param t the task. Caller must ensure nonnull + */ + final boolean unpushTask(ForkJoinTask t) { + ForkJoinTask[] q = queue; + int mask = q.length - 1; + int s = sp - 1; + if (casSlotNull(q, s & mask, t)) { + storeSp(s); + return true; + } + return false; + } + + /** + * Returns next task. + */ + final ForkJoinTask peekTask() { + ForkJoinTask[] q = queue; + if (q == null) + return null; + int mask = q.length - 1; + int i = locallyFifo? base : (sp - 1); + return q[i & mask]; + } + + /** + * Doubles queue array size. Transfers elements by emulating + * steals (deqs) from old array and placing, oldest first, into + * new array. + */ + private void growQueue() { + ForkJoinTask[] oldQ = queue; + int oldSize = oldQ.length; + int newSize = oldSize << 1; + if (newSize > MAXIMUM_QUEUE_CAPACITY) + throw new RejectedExecutionException("Queue capacity exceeded"); + ForkJoinTask[] newQ = queue = new ForkJoinTask[newSize]; + + int b = base; + int bf = b + oldSize; + int oldMask = oldSize - 1; + int newMask = newSize - 1; + do { + int oldIndex = b & oldMask; + ForkJoinTask t = oldQ[oldIndex]; + if (t != null && !casSlotNull(oldQ, oldIndex, t)) + t = null; + setSlot(newQ, b & newMask, t); + } while (++b != bf); + pool.signalWork(); + } + + /** + * Tries to steal a task from another worker. Starts at a random + * index of workers array, and probes workers until finding one + * with non-empty queue or finding that all are empty. It + * randomly selects the first n probes. If these are empty, it + * resorts to a full circular traversal, which is necessary to + * accurately set active status by caller. Also restarts if pool + * events occurred since last scan, which forces refresh of + * workers array, in case barrier was associated with resize. + * + * This method must be both fast and quiet -- usually avoiding + * memory accesses that could disrupt cache sharing etc other than + * those needed to check for and take tasks. This accounts for, + * among other things, updating random seed in place without + * storing it until exit. + * + * @return a task, or null if none found + */ + private ForkJoinTask scan() { + ForkJoinTask t = null; + int r = seed; // extract once to keep scan quiet + ForkJoinWorkerThread[] ws; // refreshed on outer loop + int mask; // must be power 2 minus 1 and > 0 + outer:do { + if ((ws = pool.workers) != null && (mask = ws.length - 1) > 0) { + int idx = r; + int probes = ~mask; // use random index while negative + for (;;) { + r = xorShift(r); // update random seed + ForkJoinWorkerThread v = ws[mask & idx]; + if (v == null || v.sp == v.base) { + if (probes <= mask) + idx = (probes++ < 0)? r : (idx + 1); + else + break; + } + else if (!tryActivate() || (t = v.deqTask()) == null) + continue outer; // restart on contention + else + break outer; + } + } + } while (pool.hasNewSyncEvent(this)); // retry on pool events + seed = r; + return t; + } + + /** + * gets and removes a local or stolen a task + * @return a task, if available + */ + final ForkJoinTask pollTask() { + ForkJoinTask t = locallyFifo? deqTask() : popTask(); + if (t == null && (t = scan()) != null) + ++stealCount; + return t; + } + + /** + * gets a local task + * @return a task, if available + */ + final ForkJoinTask pollLocalTask() { + return locallyFifo? deqTask() : popTask(); + } + + /** + * Returns a pool submission, if one exists, activating first. + * @return a submission, if available + */ + private ForkJoinTask pollSubmission() { + ForkJoinPool p = pool; + while (p.hasQueuedSubmissions()) { + ForkJoinTask t; + if (tryActivate() && (t = p.pollSubmission()) != null) + return t; + } + return null; + } + + // Methods accessed only by Pool + + /** + * Removes and cancels all tasks in queue. Can be called from any + * thread. + */ + final void cancelTasks() { + ForkJoinTask t; + while (base != sp && (t = deqTask()) != null) + t.cancelIgnoringExceptions(); + } + + /** + * Drains tasks to given collection c + * @return the number of tasks drained + */ + final int drainTasksTo(Collection> c) { + int n = 0; + ForkJoinTask t; + while (base != sp && (t = deqTask()) != null) { + c.add(t); + ++n; + } + return n; + } + + /** + * Get and clear steal count for accumulation by pool. Called + * only when known to be idle (in pool.sync and termination). + */ + final int getAndClearStealCount() { + int sc = stealCount; + stealCount = 0; + return sc; + } + + /** + * Returns true if at least one worker in the given array appears + * to have at least one queued task. + * @param ws array of workers + */ + static boolean hasQueuedTasks(ForkJoinWorkerThread[] ws) { + if (ws != null) { + int len = ws.length; + for (int j = 0; j < 2; ++j) { // need two passes for clean sweep + for (int i = 0; i < len; ++i) { + ForkJoinWorkerThread w = ws[i]; + if (w != null && w.sp != w.base) + return true; + } + } + } + return false; + } + + // Support methods for ForkJoinTask + + /** + * Returns an estimate of the number of tasks in the queue. + */ + final int getQueueSize() { + int n = sp - base; + return n < 0? 0 : n; // suppress momentarily negative values + } + + /** + * Returns an estimate of the number of tasks, offset by a + * function of number of idle workers. + */ + final int getEstimatedSurplusTaskCount() { + // The halving approximates weighting idle vs non-idle workers + return (sp - base) - (pool.getIdleThreadCount() >>> 1); + } + + /** + * Scan, returning early if joinMe done + */ + final ForkJoinTask scanWhileJoining(ForkJoinTask joinMe) { + ForkJoinTask t = pollTask(); + if (t != null && joinMe.status < 0 && sp == base) { + pushTask(t); // unsteal if done and this task would be stealable + t = null; + } + return t; + } + + /** + * Runs tasks until pool isQuiescent + */ + final void helpQuiescePool() { + for (;;) { + ForkJoinTask t = pollTask(); + if (t != null) + t.quietlyExec(); + else if (tryInactivate() && pool.isQuiescent()) + break; + } + do;while (!tryActivate()); // re-activate on exit + } + + // Temporary Unsafe mechanics for preliminary release + private static Unsafe getUnsafe() throws Throwable { + try { + return Unsafe.getUnsafe(); + } catch (SecurityException se) { + try { + return java.security.AccessController.doPrivileged + (new java.security.PrivilegedExceptionAction() { + public Unsafe run() throws Exception { + return getUnsafePrivileged(); + }}); + } catch (java.security.PrivilegedActionException e) { + throw e.getCause(); + } + } + } + + private static Unsafe getUnsafePrivileged() + throws NoSuchFieldException, IllegalAccessException { + Field f = Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + return (Unsafe) f.get(null); + } + + private static long fieldOffset(String fieldName) + throws NoSuchFieldException { + return _unsafe.objectFieldOffset + (ForkJoinWorkerThread.class.getDeclaredField(fieldName)); + } + + static final Unsafe _unsafe; + static final long baseOffset; + static final long spOffset; + static final long runStateOffset; + static final long qBase; + static final int qShift; + static { + try { + _unsafe = getUnsafe(); + baseOffset = fieldOffset("base"); + spOffset = fieldOffset("sp"); + runStateOffset = fieldOffset("runState"); + qBase = _unsafe.arrayBaseOffset(ForkJoinTask[].class); + int s = _unsafe.arrayIndexScale(ForkJoinTask[].class); + if ((s & (s-1)) != 0) + throw new Error("data type scale not a power of two"); + qShift = 31 - Integer.numberOfLeadingZeros(s); + } catch (Throwable e) { + throw new RuntimeException("Could not initialize intrinsics", e); + } + } +} diff --git a/src/jvm15-library/scala/concurrent/forkjoin/LinkedTransferQueue.java b/src/jvm15-library/scala/concurrent/forkjoin/LinkedTransferQueue.java new file mode 100644 index 0000000000..3055e3b68f --- /dev/null +++ b/src/jvm15-library/scala/concurrent/forkjoin/LinkedTransferQueue.java @@ -0,0 +1,840 @@ +/* + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + */ + +package scala.concurrent.forkjoin; +import java.util.concurrent.*; +import java.util.concurrent.locks.*; +import java.util.concurrent.atomic.*; +import java.util.*; +import java.io.*; +import sun.misc.Unsafe; +import java.lang.reflect.*; + +/** + * An unbounded {@linkplain TransferQueue} based on linked nodes. + * This queue orders elements FIFO (first-in-first-out) with respect + * to any given producer. The head of the queue is that + * element that has been on the queue the longest time for some + * producer. The tail of the queue is that element that has + * been on the queue the shortest time for some producer. + * + *

Beware that, unlike in most collections, the {@code size} + * method is NOT a constant-time operation. Because of the + * asynchronous nature of these queues, determining the current number + * of elements requires a traversal of the elements. + * + *

This class and its iterator implement all of the + * optional methods of the {@link Collection} and {@link + * Iterator} interfaces. + * + *

Memory consistency effects: As with other concurrent + * collections, actions in a thread prior to placing an object into a + * {@code LinkedTransferQueue} + * happen-before + * actions subsequent to the access or removal of that element from + * the {@code LinkedTransferQueue} in another thread. + * + *

This class is a member of the + * + * Java Collections Framework. + * + * @since 1.7 + * @author Doug Lea + * @param the type of elements held in this collection + * + */ +public class LinkedTransferQueue extends AbstractQueue + implements TransferQueue, java.io.Serializable { + private static final long serialVersionUID = -3223113410248163686L; + + /* + * This class extends the approach used in FIFO-mode + * SynchronousQueues. See the internal documentation, as well as + * the PPoPP 2006 paper "Scalable Synchronous Queues" by Scherer, + * Lea & Scott + * (http://www.cs.rice.edu/~wns1/papers/2006-PPoPP-SQ.pdf) + * + * The main extension is to provide different Wait modes for the + * main "xfer" method that puts or takes items. These don't + * impact the basic dual-queue logic, but instead control whether + * or how threads block upon insertion of request or data nodes + * into the dual queue. It also uses slightly different + * conventions for tracking whether nodes are off-list or + * cancelled. + */ + + // Wait modes for xfer method + static final int NOWAIT = 0; + static final int TIMEOUT = 1; + static final int WAIT = 2; + + /** The number of CPUs, for spin control */ + static final int NCPUS = Runtime.getRuntime().availableProcessors(); + + /** + * The number of times to spin before blocking in timed waits. + * The value is empirically derived -- it works well across a + * variety of processors and OSes. Empirically, the best value + * seems not to vary with number of CPUs (beyond 2) so is just + * a constant. + */ + static final int maxTimedSpins = (NCPUS < 2)? 0 : 32; + + /** + * The number of times to spin before blocking in untimed waits. + * This is greater than timed value because untimed waits spin + * faster since they don't need to check times on each spin. + */ + static final int maxUntimedSpins = maxTimedSpins * 16; + + /** + * The number of nanoseconds for which it is faster to spin + * rather than to use timed park. A rough estimate suffices. + */ + static final long spinForTimeoutThreshold = 1000L; + + /** + * Node class for LinkedTransferQueue. Opportunistically + * subclasses from AtomicReference to represent item. Uses Object, + * not E, to allow setting item to "this" after use, to avoid + * garbage retention. Similarly, setting the next field to this is + * used as sentinel that node is off list. + */ + static final class QNode extends AtomicReference { + volatile QNode next; + volatile Thread waiter; // to control park/unpark + final boolean isData; + QNode(Object item, boolean isData) { + super(item); + this.isData = isData; + } + + static final AtomicReferenceFieldUpdater + nextUpdater = AtomicReferenceFieldUpdater.newUpdater + (QNode.class, QNode.class, "next"); + + final boolean casNext(QNode cmp, QNode val) { + return nextUpdater.compareAndSet(this, cmp, val); + } + + final void clearNext() { + nextUpdater.set(this,this);//TR;lazySet(this, this); + } + + } + + /** + * Padded version of AtomicReference used for head, tail and + * cleanMe, to alleviate contention across threads CASing one vs + * the other. + */ + static final class PaddedAtomicReference extends AtomicReference { + // enough padding for 64bytes with 4byte refs + Object p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd, pe; + PaddedAtomicReference(T r) { super(r); } + } + + + /** head of the queue */ + private transient final PaddedAtomicReference head; + /** tail of the queue */ + private transient final PaddedAtomicReference tail; + + /** + * Reference to a cancelled node that might not yet have been + * unlinked from queue because it was the last inserted node + * when it cancelled. + */ + private transient final PaddedAtomicReference cleanMe; + + /** + * Tries to cas nh as new head; if successful, unlink + * old head's next node to avoid garbage retention. + */ + private boolean advanceHead(QNode h, QNode nh) { + if (h == head.get() && head.compareAndSet(h, nh)) { + h.clearNext(); // forget old next + return true; + } + return false; + } + + /** + * Puts or takes an item. Used for most queue operations (except + * poll() and tryTransfer()). See the similar code in + * SynchronousQueue for detailed explanation. + * + * @param e the item or if null, signifies that this is a take + * @param mode the wait mode: NOWAIT, TIMEOUT, WAIT + * @param nanos timeout in nanosecs, used only if mode is TIMEOUT + * @return an item, or null on failure + */ + private Object xfer(Object e, int mode, long nanos) { + boolean isData = (e != null); + QNode s = null; + final PaddedAtomicReference head = this.head; + final PaddedAtomicReference tail = this.tail; + + for (;;) { + QNode t = tail.get(); + QNode h = head.get(); + + if (t != null && (t == h || t.isData == isData)) { + if (s == null) + s = new QNode(e, isData); + QNode last = t.next; + if (last != null) { + if (t == tail.get()) + tail.compareAndSet(t, last); + } + else if (t.casNext(null, s)) { + tail.compareAndSet(t, s); + return awaitFulfill(t, s, e, mode, nanos); + } + } + + else if (h != null) { + QNode first = h.next; + if (t == tail.get() && first != null && + advanceHead(h, first)) { + Object x = first.get(); + if (x != first && first.compareAndSet(x, e)) { + LockSupport.unpark(first.waiter); + return isData? e : x; + } + } + } + } + } + + + /** + * Version of xfer for poll() and tryTransfer, which + * simplifies control paths both here and in xfer. + */ + private Object fulfill(Object e) { + boolean isData = (e != null); + final PaddedAtomicReference head = this.head; + final PaddedAtomicReference tail = this.tail; + + for (;;) { + QNode t = tail.get(); + QNode h = head.get(); + + if (t != null && (t == h || t.isData == isData)) { + QNode last = t.next; + if (t == tail.get()) { + if (last != null) + tail.compareAndSet(t, last); + else + return null; + } + } + else if (h != null) { + QNode first = h.next; + if (t == tail.get() && + first != null && + advanceHead(h, first)) { + Object x = first.get(); + if (x != first && first.compareAndSet(x, e)) { + LockSupport.unpark(first.waiter); + return isData? e : x; + } + } + } + } + } + + /** + * Spins/blocks until node s is fulfilled or caller gives up, + * depending on wait mode. + * + * @param pred the predecessor of waiting node + * @param s the waiting node + * @param e the comparison value for checking match + * @param mode mode + * @param nanos timeout value + * @return matched item, or s if cancelled + */ + private Object awaitFulfill(QNode pred, QNode s, Object e, + int mode, long nanos) { + if (mode == NOWAIT) + return null; + + long lastTime = (mode == TIMEOUT)? System.nanoTime() : 0; + Thread w = Thread.currentThread(); + int spins = -1; // set to desired spin count below + for (;;) { + if (w.isInterrupted()) + s.compareAndSet(e, s); + Object x = s.get(); + if (x != e) { // Node was matched or cancelled + advanceHead(pred, s); // unlink if head + if (x == s) { // was cancelled + clean(pred, s); + return null; + } + else if (x != null) { + s.set(s); // avoid garbage retention + return x; + } + else + return e; + } + if (mode == TIMEOUT) { + long now = System.nanoTime(); + nanos -= now - lastTime; + lastTime = now; + if (nanos <= 0) { + s.compareAndSet(e, s); // try to cancel + continue; + } + } + if (spins < 0) { + QNode h = head.get(); // only spin if at head + spins = ((h != null && h.next == s) ? + (mode == TIMEOUT? + maxTimedSpins : maxUntimedSpins) : 0); + } + if (spins > 0) + --spins; + else if (s.waiter == null) + s.waiter = w; + else if (mode != TIMEOUT) { + LockSupport.park();//TR(this); + s.waiter = null; + spins = -1; + } + else if (nanos > spinForTimeoutThreshold) { + LockSupport.parkNanos(nanos);//(this, nanos); + s.waiter = null; + spins = -1; + } + } + } + + /** + * Returns validated tail for use in cleaning methods. + */ + private QNode getValidatedTail() { + for (;;) { + QNode h = head.get(); + QNode first = h.next; + if (first != null && first.next == first) { // help advance + advanceHead(h, first); + continue; + } + QNode t = tail.get(); + QNode last = t.next; + if (t == tail.get()) { + if (last != null) + tail.compareAndSet(t, last); // help advance + else + return t; + } + } + } + + /** + * Gets rid of cancelled node s with original predecessor pred. + * + * @param pred predecessor of cancelled node + * @param s the cancelled node + */ + private void clean(QNode pred, QNode s) { + Thread w = s.waiter; + if (w != null) { // Wake up thread + s.waiter = null; + if (w != Thread.currentThread()) + LockSupport.unpark(w); + } + + if (pred == null) + return; + + /* + * At any given time, exactly one node on list cannot be + * deleted -- the last inserted node. To accommodate this, if + * we cannot delete s, we save its predecessor as "cleanMe", + * processing the previously saved version first. At least one + * of node s or the node previously saved can always be + * processed, so this always terminates. + */ + while (pred.next == s) { + QNode oldpred = reclean(); // First, help get rid of cleanMe + QNode t = getValidatedTail(); + if (s != t) { // If not tail, try to unsplice + QNode sn = s.next; // s.next == s means s already off list + if (sn == s || pred.casNext(s, sn)) + break; + } + else if (oldpred == pred || // Already saved + (oldpred == null && cleanMe.compareAndSet(null, pred))) + break; // Postpone cleaning + } + } + + /** + * Tries to unsplice the cancelled node held in cleanMe that was + * previously uncleanable because it was at tail. + * + * @return current cleanMe node (or null) + */ + private QNode reclean() { + /* + * cleanMe is, or at one time was, predecessor of cancelled + * node s that was the tail so could not be unspliced. If s + * is no longer the tail, try to unsplice if necessary and + * make cleanMe slot available. This differs from similar + * code in clean() because we must check that pred still + * points to a cancelled node that must be unspliced -- if + * not, we can (must) clear cleanMe without unsplicing. + * This can loop only due to contention on casNext or + * clearing cleanMe. + */ + QNode pred; + while ((pred = cleanMe.get()) != null) { + QNode t = getValidatedTail(); + QNode s = pred.next; + if (s != t) { + QNode sn; + if (s == null || s == pred || s.get() != s || + (sn = s.next) == s || pred.casNext(s, sn)) + cleanMe.compareAndSet(pred, null); + } + else // s is still tail; cannot clean + break; + } + return pred; + } + + /** + * Creates an initially empty {@code LinkedTransferQueue}. + */ + public LinkedTransferQueue() { + QNode dummy = new QNode(null, false); + head = new PaddedAtomicReference(dummy); + tail = new PaddedAtomicReference(dummy); + cleanMe = new PaddedAtomicReference(null); + } + + /** + * Creates a {@code LinkedTransferQueue} + * initially containing the elements of the given collection, + * added in traversal order of the collection's iterator. + * + * @param c the collection of elements to initially contain + * @throws NullPointerException if the specified collection or any + * of its elements are null + */ + public LinkedTransferQueue(Collection c) { + this(); + addAll(c); + } + + public void put(E e) throws InterruptedException { + if (e == null) throw new NullPointerException(); + if (Thread.interrupted()) throw new InterruptedException(); + xfer(e, NOWAIT, 0); + } + + public boolean offer(E e, long timeout, TimeUnit unit) + throws InterruptedException { + if (e == null) throw new NullPointerException(); + if (Thread.interrupted()) throw new InterruptedException(); + xfer(e, NOWAIT, 0); + return true; + } + + public boolean offer(E e) { + if (e == null) throw new NullPointerException(); + xfer(e, NOWAIT, 0); + return true; + } + + public boolean add(E e) { + if (e == null) throw new NullPointerException(); + xfer(e, NOWAIT, 0); + return true; + } + + public void transfer(E e) throws InterruptedException { + if (e == null) throw new NullPointerException(); + if (xfer(e, WAIT, 0) == null) { + Thread.interrupted(); + throw new InterruptedException(); + } + } + + public boolean tryTransfer(E e, long timeout, TimeUnit unit) + throws InterruptedException { + if (e == null) throw new NullPointerException(); + if (xfer(e, TIMEOUT, unit.toNanos(timeout)) != null) + return true; + if (!Thread.interrupted()) + return false; + throw new InterruptedException(); + } + + public boolean tryTransfer(E e) { + if (e == null) throw new NullPointerException(); + return fulfill(e) != null; + } + + public E take() throws InterruptedException { + Object e = xfer(null, WAIT, 0); + if (e != null) + return (E)e; + Thread.interrupted(); + throw new InterruptedException(); + } + + public E poll(long timeout, TimeUnit unit) throws InterruptedException { + Object e = xfer(null, TIMEOUT, unit.toNanos(timeout)); + if (e != null || !Thread.interrupted()) + return (E)e; + throw new InterruptedException(); + } + + public E poll() { + return (E)fulfill(null); + } + + public int drainTo(Collection c) { + if (c == null) + throw new NullPointerException(); + if (c == this) + throw new IllegalArgumentException(); + int n = 0; + E e; + while ( (e = poll()) != null) { + c.add(e); + ++n; + } + return n; + } + + public int drainTo(Collection c, int maxElements) { + if (c == null) + throw new NullPointerException(); + if (c == this) + throw new IllegalArgumentException(); + int n = 0; + E e; + while (n < maxElements && (e = poll()) != null) { + c.add(e); + ++n; + } + return n; + } + + // Traversal-based methods + + /** + * Returns head after performing any outstanding helping steps. + */ + private QNode traversalHead() { + for (;;) { + QNode t = tail.get(); + QNode h = head.get(); + if (h != null && t != null) { + QNode last = t.next; + QNode first = h.next; + if (t == tail.get()) { + if (last != null) + tail.compareAndSet(t, last); + else if (first != null) { + Object x = first.get(); + if (x == first) + advanceHead(h, first); + else + return h; + } + else + return h; + } + } + reclean(); + } + } + + + public Iterator iterator() { + return new Itr(); + } + + /** + * Iterators. Basic strategy is to traverse list, treating + * non-data (i.e., request) nodes as terminating list. + * Once a valid data node is found, the item is cached + * so that the next call to next() will return it even + * if subsequently removed. + */ + class Itr implements Iterator { + QNode next; // node to return next + QNode pnext; // predecessor of next + QNode snext; // successor of next + QNode curr; // last returned node, for remove() + QNode pcurr; // predecessor of curr, for remove() + E nextItem; // Cache of next item, once commited to in next + + Itr() { + findNext(); + } + + /** + * Ensures next points to next valid node, or null if none. + */ + void findNext() { + for (;;) { + QNode pred = pnext; + QNode q = next; + if (pred == null || pred == q) { + pred = traversalHead(); + q = pred.next; + } + if (q == null || !q.isData) { + next = null; + return; + } + Object x = q.get(); + QNode s = q.next; + if (x != null && q != x && q != s) { + nextItem = (E)x; + snext = s; + pnext = pred; + next = q; + return; + } + pnext = q; + next = s; + } + } + + public boolean hasNext() { + return next != null; + } + + public E next() { + if (next == null) throw new NoSuchElementException(); + pcurr = pnext; + curr = next; + pnext = next; + next = snext; + E x = nextItem; + findNext(); + return x; + } + + public void remove() { + QNode p = curr; + if (p == null) + throw new IllegalStateException(); + Object x = p.get(); + if (x != null && x != p && p.compareAndSet(x, p)) + clean(pcurr, p); + } + } + + public E peek() { + for (;;) { + QNode h = traversalHead(); + QNode p = h.next; + if (p == null) + return null; + Object x = p.get(); + if (p != x) { + if (!p.isData) + return null; + if (x != null) + return (E)x; + } + } + } + + public boolean isEmpty() { + for (;;) { + QNode h = traversalHead(); + QNode p = h.next; + if (p == null) + return true; + Object x = p.get(); + if (p != x) { + if (!p.isData) + return true; + if (x != null) + return false; + } + } + } + + public boolean hasWaitingConsumer() { + for (;;) { + QNode h = traversalHead(); + QNode p = h.next; + if (p == null) + return false; + Object x = p.get(); + if (p != x) + return !p.isData; + } + } + + /** + * Returns the number of elements in this queue. If this queue + * contains more than {@code Integer.MAX_VALUE} elements, returns + * {@code Integer.MAX_VALUE}. + * + *

Beware that, unlike in most collections, this method is + * NOT a constant-time operation. Because of the + * asynchronous nature of these queues, determining the current + * number of elements requires an O(n) traversal. + * + * @return the number of elements in this queue + */ + public int size() { + int count = 0; + QNode h = traversalHead(); + for (QNode p = h.next; p != null && p.isData; p = p.next) { + Object x = p.get(); + if (x != null && x != p) { + if (++count == Integer.MAX_VALUE) // saturated + break; + } + } + return count; + } + + public int getWaitingConsumerCount() { + int count = 0; + QNode h = traversalHead(); + for (QNode p = h.next; p != null && !p.isData; p = p.next) { + if (p.get() == null) { + if (++count == Integer.MAX_VALUE) + break; + } + } + return count; + } + + public int remainingCapacity() { + return Integer.MAX_VALUE; + } + + public boolean remove(Object o) { + if (o == null) + return false; + for (;;) { + QNode pred = traversalHead(); + for (;;) { + QNode q = pred.next; + if (q == null || !q.isData) + return false; + if (q == pred) // restart + break; + Object x = q.get(); + if (x != null && x != q && o.equals(x) && + q.compareAndSet(x, q)) { + clean(pred, q); + return true; + } + pred = q; + } + } + } + + /** + * Save the state to a stream (that is, serialize it). + * + * @serialData All of the elements (each an {@code E}) in + * the proper order, followed by a null + * @param s the stream + */ + private void writeObject(java.io.ObjectOutputStream s) + throws java.io.IOException { + s.defaultWriteObject(); + for (E e : this) + s.writeObject(e); + // Use trailing null as sentinel + s.writeObject(null); + } + + /** + * Reconstitute the Queue instance from a stream (that is, + * deserialize it). + * @param s the stream + */ + private void readObject(java.io.ObjectInputStream s) + throws java.io.IOException, ClassNotFoundException { + s.defaultReadObject(); + resetHeadAndTail(); + for (;;) { + E item = (E)s.readObject(); + if (item == null) + break; + else + offer(item); + } + } + + + // Support for resetting head/tail while deserializing + private void resetHeadAndTail() { + QNode dummy = new QNode(null, false); + _unsafe.putObjectVolatile(this, headOffset, + new PaddedAtomicReference(dummy)); + _unsafe.putObjectVolatile(this, tailOffset, + new PaddedAtomicReference(dummy)); + _unsafe.putObjectVolatile(this, cleanMeOffset, + new PaddedAtomicReference(null)); + } + + // Temporary Unsafe mechanics for preliminary release + private static Unsafe getUnsafe() throws Throwable { + try { + return Unsafe.getUnsafe(); + } catch (SecurityException se) { + try { + return java.security.AccessController.doPrivileged + (new java.security.PrivilegedExceptionAction() { + public Unsafe run() throws Exception { + return getUnsafePrivileged(); + }}); + } catch (java.security.PrivilegedActionException e) { + throw e.getCause(); + } + } + } + + private static Unsafe getUnsafePrivileged() + throws NoSuchFieldException, IllegalAccessException { + Field f = Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + return (Unsafe) f.get(null); + } + + private static long fieldOffset(String fieldName) + throws NoSuchFieldException { + return _unsafe.objectFieldOffset + (LinkedTransferQueue.class.getDeclaredField(fieldName)); + } + + private static final Unsafe _unsafe; + private static final long headOffset; + private static final long tailOffset; + private static final long cleanMeOffset; + static { + try { + _unsafe = getUnsafe(); + headOffset = fieldOffset("head"); + tailOffset = fieldOffset("tail"); + cleanMeOffset = fieldOffset("cleanMe"); + } catch (Throwable e) { + throw new RuntimeException("Could not initialize intrinsics", e); + } + } + +} diff --git a/src/jvm15-library/scala/concurrent/forkjoin/RecursiveAction.java b/src/jvm15-library/scala/concurrent/forkjoin/RecursiveAction.java new file mode 100644 index 0000000000..2d36f7eb33 --- /dev/null +++ b/src/jvm15-library/scala/concurrent/forkjoin/RecursiveAction.java @@ -0,0 +1,151 @@ +/* + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + */ + +package scala.concurrent.forkjoin; + +/** + * Recursive resultless ForkJoinTasks. This class establishes + * conventions to parameterize resultless actions as Void + * ForkJoinTasks. Because null is the only valid value of + * Void, methods such as join always return null + * upon completion. + * + *

Sample Usages. Here is a sketch of a ForkJoin sort that + * sorts a given long[] array: + * + *

+ * class SortTask extends RecursiveAction {
+ *   final long[] array; final int lo; final int hi;
+ *   SortTask(long[] array, int lo, int hi) {
+ *     this.array = array; this.lo = lo; this.hi = hi;
+ *   }
+ *   protected void compute() {
+ *     if (hi - lo < THRESHOLD)
+ *       sequentiallySort(array, lo, hi);
+ *     else {
+ *       int mid = (lo + hi) >>> 1;
+ *       invokeAll(new SortTask(array, lo, mid),
+ *                 new SortTask(array, mid, hi));
+ *       merge(array, lo, hi);
+ *     }
+ *   }
+ * }
+ * 
+ * + * You could then sort anArray by creating new SortTask(anArray, 0, + * anArray.length-1) and invoking it in a ForkJoinPool. + * As a more concrete simple example, the following task increments + * each element of an array: + *
+ * class IncrementTask extends RecursiveAction {
+ *   final long[] array; final int lo; final int hi;
+ *   IncrementTask(long[] array, int lo, int hi) {
+ *     this.array = array; this.lo = lo; this.hi = hi;
+ *   }
+ *   protected void compute() {
+ *     if (hi - lo < THRESHOLD) {
+ *       for (int i = lo; i < hi; ++i)
+ *         array[i]++;
+ *     }
+ *     else {
+ *       int mid = (lo + hi) >>> 1;
+ *       invokeAll(new IncrementTask(array, lo, mid),
+ *                 new IncrementTask(array, mid, hi));
+ *     }
+ *   }
+ * }
+ * 
+ * + * + *

The following example illustrates some refinements and idioms + * that may lead to better performance: RecursiveActions need not be + * fully recursive, so long as they maintain the basic + * divide-and-conquer approach. Here is a class that sums the squares + * of each element of a double array, by subdividing out only the + * right-hand-sides of repeated divisions by two, and keeping track of + * them with a chain of next references. It uses a dynamic + * threshold based on method surplus, but counterbalances + * potential excess partitioning by directly performing leaf actions + * on unstolen tasks rather than further subdividing. + * + *

+ * double sumOfSquares(ForkJoinPool pool, double[] array) {
+ *   int n = array.length;
+ *   int seqSize = 1 + n / (8 * pool.getParallelism());
+ *   Applyer a = new Applyer(array, 0, n, seqSize, null);
+ *   pool.invoke(a);
+ *   return a.result;
+ * }
+ *
+ * class Applyer extends RecursiveAction {
+ *   final double[] array;
+ *   final int lo, hi, seqSize;
+ *   double result;
+ *   Applyer next; // keeps track of right-hand-side tasks
+ *   Applyer(double[] array, int lo, int hi, int seqSize, Applyer next) {
+ *     this.array = array; this.lo = lo; this.hi = hi;
+ *     this.seqSize = seqSize; this.next = next;
+ *   }
+ *
+ *   double atLeaf(int l, int r) {
+ *     double sum = 0;
+ *     for (int i = l; i < h; ++i) // perform leftmost base step
+ *       sum += array[i] * array[i];
+ *     return sum;
+ *   }
+ *
+ *   protected void compute() {
+ *     int l = lo;
+ *     int h = hi;
+ *     Applyer right = null;
+ *     while (h - l > 1 &&
+ *        ForkJoinWorkerThread.getEstimatedSurplusTaskCount() <= 3) {
+ *        int mid = (l + h) >>> 1;
+ *        right = new Applyer(array, mid, h, seqSize, right);
+ *        right.fork();
+ *        h = mid;
+ *     }
+ *     double sum = atLeaf(l, h);
+ *     while (right != null) {
+ *        if (right.tryUnfork()) // directly calculate if not stolen
+ *          sum += right.atLeaf(right.lo, right.hi);
+ *       else {
+ *          right.helpJoin();
+ *          sum += right.result;
+ *        }
+ *        right = right.next;
+ *      }
+ *     result = sum;
+ *   }
+ * }
+ * 
+ */ +public abstract class RecursiveAction extends ForkJoinTask { + + /** + * The main computation performed by this task. + */ + protected abstract void compute(); + + /** + * Always returns null + */ + public final Void getRawResult() { return null; } + + /** + * Requires null completion value. + */ + protected final void setRawResult(Void mustBeNull) { } + + /** + * Implements execution conventions for RecursiveActions + */ + protected final boolean exec() { + compute(); + return true; + } + +} diff --git a/src/jvm15-library/scala/concurrent/forkjoin/RecursiveTask.java b/src/jvm15-library/scala/concurrent/forkjoin/RecursiveTask.java new file mode 100644 index 0000000000..1f3110580b --- /dev/null +++ b/src/jvm15-library/scala/concurrent/forkjoin/RecursiveTask.java @@ -0,0 +1,71 @@ +/* + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + */ + +package scala.concurrent.forkjoin; + +/** + * Recursive result-bearing ForkJoinTasks. + *

For a classic example, here is a task computing Fibonacci numbers: + * + *

+ * class Fibonacci extends RecursiveTask<Integer> {
+ *   final int n;
+ *   Fibonnaci(int n) { this.n = n; }
+ *   Integer compute() {
+ *     if (n <= 1)
+ *        return n;
+ *     Fibonacci f1 = new Fibonacci(n - 1);
+ *     f1.fork();
+ *     Fibonacci f2 = new Fibonacci(n - 2);
+ *     return f2.compute() + f1.join();
+ *   }
+ * }
+ * 
+ * + * However, besides being a dumb way to compute Fibonacci functions + * (there is a simple fast linear algorithm that you'd use in + * practice), this is likely to perform poorly because the smallest + * subtasks are too small to be worthwhile splitting up. Instead, as + * is the case for nearly all fork/join applications, you'd pick some + * minimum granularity size (for example 10 here) for which you always + * sequentially solve rather than subdividing. + * + */ +public abstract class RecursiveTask extends ForkJoinTask { + + /** + * Empty contructor for use by subclasses. + */ + protected RecursiveTask() { + } + + /** + * The result returned by compute method. + */ + V result; + + /** + * The main computation performed by this task. + */ + protected abstract V compute(); + + public final V getRawResult() { + return result; + } + + protected final void setRawResult(V value) { + result = value; + } + + /** + * Implements execution conventions for RecursiveTask + */ + protected final boolean exec() { + result = compute(); + return true; + } + +} diff --git a/src/jvm15-library/scala/concurrent/forkjoin/ThreadLocalRandom.java b/src/jvm15-library/scala/concurrent/forkjoin/ThreadLocalRandom.java new file mode 100644 index 0000000000..34e2e37f37 --- /dev/null +++ b/src/jvm15-library/scala/concurrent/forkjoin/ThreadLocalRandom.java @@ -0,0 +1,186 @@ +/* + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + */ + +package scala.concurrent.forkjoin; +import java.util.*; + +/** + * A random number generator with the same properties as class {@link + * Random} but isolated to the current Thread. Like the global + * generator used by the {@link java.lang.Math} class, a + * ThreadLocalRandom is initialized with an internally generated seed + * that may not otherwise be modified. When applicable, use of + * ThreadLocalRandom rather than shared Random objects in concurrent + * programs will typically encounter much less overhead and + * contention. ThreadLocalRandoms are particularly appropriate when + * multiple tasks (for example, each a {@link ForkJoinTask}), use + * random numbers in parallel in thread pools. + * + *

Usages of this class should typically be of the form: + * ThreadLocalRandom.current().nextX(...) (where + * X is Int, Long, etc). + * When all usages are of this form, it is never possible to + * accidently share ThreadLocalRandoms across multiple threads. + * + *

This class also provides additional commonly used bounded random + * generation methods. + */ +public class ThreadLocalRandom extends Random { + // same constants as Random, but must be redeclared because private + private final static long multiplier = 0x5DEECE66DL; + private final static long addend = 0xBL; + private final static long mask = (1L << 48) - 1; + + /** + * The random seed. We can't use super.seed + */ + private long rnd; + + /** + * Initialization flag to permit the first and only allowed call + * to setSeed (inside Random constructor) to succeed. We can't + * allow others since it would cause setting seed in one part of a + * program to unintentionally impact other usages by the thread. + */ + boolean initialized; + + // Padding to help avoid memory contention among seed updates in + // different TLRs in the common case that they are located near + // each other. + private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7; + + /** + * The actual ThreadLocal + */ + private static final ThreadLocal localRandom = + new ThreadLocal() { + protected ThreadLocalRandom initialValue() { + return new ThreadLocalRandom(); + } + }; + + + /** + * Constructor called only by localRandom.initialValue. + * We rely on the fact that the superclass no-arg constructor + * invokes setSeed exactly once to initialize. + */ + ThreadLocalRandom() { + super(); + } + + /** + * Returns the current Thread's ThreadLocalRandom + * @return the current Thread's ThreadLocalRandom + */ + public static ThreadLocalRandom current() { + return localRandom.get(); + } + + /** + * Throws UnsupportedOperationException. Setting seeds in this + * generator is unsupported. + * @throws UnsupportedOperationException always + */ + public void setSeed(long seed) { + if (initialized) + throw new UnsupportedOperationException(); + initialized = true; + rnd = (seed ^ multiplier) & mask; + } + + protected int next(int bits) { + return (int)((rnd = (rnd * multiplier + addend) & mask) >>> (48-bits)); + } + + /** + * Returns a pseudorandom, uniformly distributed value between the + * given least value (inclusive) and bound (exclusive). + * @param least the least value returned + * @param bound the upper bound (exclusive) + * @throws IllegalArgumentException if least greater than or equal + * to bound + * @return the next value + */ + public int nextInt(int least, int bound) { + if (least >= bound) + throw new IllegalArgumentException(); + return nextInt(bound - least) + least; + } + + /** + * Returns a pseudorandom, uniformly distributed value + * between 0 (inclusive) and the specified value (exclusive) + * @param n the bound on the random number to be returned. Must be + * positive. + * @return the next value + * @throws IllegalArgumentException if n is not positive + */ + public long nextLong(long n) { + if (n <= 0) + throw new IllegalArgumentException("n must be positive"); + // Divide n by two until small enough for nextInt. On each + // iteration (at most 31 of them but usually much less), + // randomly choose both whether to include high bit in result + // (offset) and whether to continue with the lower vs upper + // half (which makes a difference only if odd). + long offset = 0; + while (n >= Integer.MAX_VALUE) { + int bits = next(2); + long half = n >>> 1; + long nextn = ((bits & 2) == 0)? half : n - half; + if ((bits & 1) == 0) + offset += n - nextn; + n = nextn; + } + return offset + nextInt((int)n); + } + + /** + * Returns a pseudorandom, uniformly distributed value between the + * given least value (inclusive) and bound (exclusive). + * @param least the least value returned + * @param bound the upper bound (exclusive) + * @return the next value + * @throws IllegalArgumentException if least greater than or equal + * to bound + */ + public long nextLong(long least, long bound) { + if (least >= bound) + throw new IllegalArgumentException(); + return nextLong(bound - least) + least; + } + + /** + * Returns a pseudorandom, uniformly distributed {@code double} value + * between 0 (inclusive) and the specified value (exclusive) + * @param n the bound on the random number to be returned. Must be + * positive. + * @return the next value + * @throws IllegalArgumentException if n is not positive + */ + public double nextDouble(double n) { + if (n <= 0) + throw new IllegalArgumentException("n must be positive"); + return nextDouble() * n; + } + + /** + * Returns a pseudorandom, uniformly distributed value between the + * given least value (inclusive) and bound (exclusive). + * @param least the least value returned + * @param bound the upper bound (exclusive) + * @return the next value + * @throws IllegalArgumentException if least greater than or equal + * to bound + */ + public double nextDouble(double least, double bound) { + if (least >= bound) + throw new IllegalArgumentException(); + return nextDouble() * (bound - least) + least; + } + +} diff --git a/src/jvm15-library/scala/concurrent/forkjoin/TransferQueue.java b/src/jvm15-library/scala/concurrent/forkjoin/TransferQueue.java new file mode 100644 index 0000000000..9c7b2289c4 --- /dev/null +++ b/src/jvm15-library/scala/concurrent/forkjoin/TransferQueue.java @@ -0,0 +1,118 @@ +/* + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + */ + +package scala.concurrent.forkjoin; +import java.util.concurrent.*; + +/** + * A {@link BlockingQueue} in which producers may wait for consumers + * to receive elements. A {@code TransferQueue} may be useful for + * example in message passing applications in which producers + * sometimes (using method {@code transfer}) await receipt of + * elements by consumers invoking {@code take} or {@code poll}, + * while at other times enqueue elements (via method {@code put}) + * without waiting for receipt. Non-blocking and time-out versions of + * {@code tryTransfer} are also available. A TransferQueue may also + * be queried via {@code hasWaitingConsumer} whether there are any + * threads waiting for items, which is a converse analogy to a + * {@code peek} operation. + * + *

Like any {@code BlockingQueue}, a {@code TransferQueue} may be + * capacity bounded. If so, an attempted {@code transfer} operation + * may initially block waiting for available space, and/or + * subsequently block waiting for reception by a consumer. Note that + * in a queue with zero capacity, such as {@link SynchronousQueue}, + * {@code put} and {@code transfer} are effectively synonymous. + * + *

This interface is a member of the + * + * Java Collections Framework. + * + * @since 1.7 + * @author Doug Lea + * @param the type of elements held in this collection + */ +public interface TransferQueue extends BlockingQueue { + /** + * Transfers the specified element if there exists a consumer + * already waiting to receive it, otherwise returning {@code false} + * without enqueuing the element. + * + * @param e the element to transfer + * @return {@code true} if the element was transferred, else + * {@code false} + * @throws ClassCastException if the class of the specified element + * prevents it from being added to this queue + * @throws NullPointerException if the specified element is null + * @throws IllegalArgumentException if some property of the specified + * element prevents it from being added to this queue + */ + boolean tryTransfer(E e); + + /** + * Inserts the specified element into this queue, waiting if + * necessary for space to become available and the element to be + * dequeued by a consumer invoking {@code take} or {@code poll}. + * + * @param e the element to transfer + * @throws InterruptedException if interrupted while waiting, + * in which case the element is not enqueued. + * @throws ClassCastException if the class of the specified element + * prevents it from being added to this queue + * @throws NullPointerException if the specified element is null + * @throws IllegalArgumentException if some property of the specified + * element prevents it from being added to this queue + */ + void transfer(E e) throws InterruptedException; + + /** + * Inserts the specified element into this queue, waiting up to + * the specified wait time if necessary for space to become + * available and the element to be dequeued by a consumer invoking + * {@code take} or {@code poll}. + * + * @param e the element to transfer + * @param timeout how long to wait before giving up, in units of + * {@code unit} + * @param unit a {@code TimeUnit} determining how to interpret the + * {@code timeout} parameter + * @return {@code true} if successful, or {@code false} if + * the specified waiting time elapses before completion, + * in which case the element is not enqueued. + * @throws InterruptedException if interrupted while waiting, + * in which case the element is not enqueued. + * @throws ClassCastException if the class of the specified element + * prevents it from being added to this queue + * @throws NullPointerException if the specified element is null + * @throws IllegalArgumentException if some property of the specified + * element prevents it from being added to this queue + */ + boolean tryTransfer(E e, long timeout, TimeUnit unit) + throws InterruptedException; + + /** + * Returns {@code true} if there is at least one consumer waiting + * to dequeue an element via {@code take} or {@code poll}. + * The return value represents a momentary state of affairs. + * + * @return {@code true} if there is at least one waiting consumer + */ + boolean hasWaitingConsumer(); + + /** + * Returns an estimate of the number of consumers waiting to + * dequeue elements via {@code take} or {@code poll}. The return + * value is an approximation of a momentary state of affairs, that + * may be inaccurate if consumers have completed or given up + * waiting. The value may be useful for monitoring and heuristics, + * but not for synchronization control. Implementations of this + * method are likely to be noticeably slower than those for + * {@link #hasWaitingConsumer}. + * + * @return the number of consumers waiting to dequeue elements + */ + int getWaitingConsumerCount(); +} diff --git a/src/jvm15-library/scala/concurrent/forkjoin/package-info.java b/src/jvm15-library/scala/concurrent/forkjoin/package-info.java new file mode 100644 index 0000000000..b8fa0fad02 --- /dev/null +++ b/src/jvm15-library/scala/concurrent/forkjoin/package-info.java @@ -0,0 +1,29 @@ +/* + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + */ + + +/** + * Preview versions of classes targeted for Java 7. Includes a + * fine-grained parallel computation framework: ForkJoinTasks and + * their related support classes provide a very efficient basis for + * obtaining platform-independent parallel speed-ups of + * computation-intensive operations. They are not a full substitute + * for the kinds of arbitrary processing supported by Executors or + * Threads. However, when applicable, they typically provide + * significantly greater performance on multiprocessor platforms. + * + *

Candidates for fork/join processing mainly include those that + * can be expressed using parallel divide-and-conquer techniques: To + * solve a problem, break it in two (or more) parts, and then solve + * those parts in parallel, continuing on in this way until the + * problem is too small to be broken up, so is solved directly. The + * underlying work-stealing framework makes subtasks + * available to other threads (normally one per CPU), that help + * complete the tasks. In general, the most efficient ForkJoinTasks + * are those that directly implement this algorithmic design pattern. + * + */ +package scala.concurrent.forkjoin; -- cgit v1.2.3