From 054c404c03847d205dc67668ac34b1d722e1f414 Mon Sep 17 00:00:00 2001 From: Lukas Rytz Date: Fri, 9 Oct 2009 07:35:10 +0000 Subject: removing jvm5 stuff from trunk, now in branches... removing jvm5 stuff from trunk, now in branches/jvm5 --- build.xml | 30 - src/build/five.xml | 519 ------ .../scala/concurrent/forkjoin/ForkJoinPool.java | 1870 -------------------- .../scala/concurrent/forkjoin/ForkJoinTask.java | 1052 ----------- .../concurrent/forkjoin/ForkJoinWorkerThread.java | 775 -------- .../concurrent/forkjoin/LinkedTransferQueue.java | 840 --------- .../scala/concurrent/forkjoin/RecursiveAction.java | 151 -- .../scala/concurrent/forkjoin/RecursiveTask.java | 71 - .../concurrent/forkjoin/ThreadLocalRandom.java | 186 -- .../scala/concurrent/forkjoin/TransferQueue.java | 118 -- .../scala/concurrent/forkjoin/package-info.java | 29 - 11 files changed, 5641 deletions(-) delete mode 100644 src/build/five.xml delete mode 100644 src/jvm15-library/scala/concurrent/forkjoin/ForkJoinPool.java delete mode 100644 src/jvm15-library/scala/concurrent/forkjoin/ForkJoinTask.java delete mode 100644 src/jvm15-library/scala/concurrent/forkjoin/ForkJoinWorkerThread.java delete mode 100644 src/jvm15-library/scala/concurrent/forkjoin/LinkedTransferQueue.java delete mode 100644 src/jvm15-library/scala/concurrent/forkjoin/RecursiveAction.java delete mode 100644 src/jvm15-library/scala/concurrent/forkjoin/RecursiveTask.java delete mode 100644 src/jvm15-library/scala/concurrent/forkjoin/ThreadLocalRandom.java delete mode 100644 src/jvm15-library/scala/concurrent/forkjoin/TransferQueue.java delete mode 100644 src/jvm15-library/scala/concurrent/forkjoin/package-info.java diff --git a/build.xml b/build.xml index 342d8b7e4c..b1fbcefb15 100644 --- a/build.xml +++ b/build.xml @@ -1529,36 +1529,6 @@ STABLE REFERENCE (STARR) - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/src/build/five.xml b/src/build/five.xml deleted file mode 100644 index 5c8bf2c180..0000000000 --- a/src/build/five.xml +++ /dev/null @@ -1,519 +0,0 @@ - - - - - - SuperSabbus extension for the Scala library and compiler targeted for the JVM 1.5. THIS FILE IS NOT STAND-ALONE AND SHOULD ONLY BE USED THROUGH ENTRY POINTS IN SUPERSABBUS. - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/src/jvm15-library/scala/concurrent/forkjoin/ForkJoinPool.java b/src/jvm15-library/scala/concurrent/forkjoin/ForkJoinPool.java deleted file mode 100644 index ba30f3a161..0000000000 --- a/src/jvm15-library/scala/concurrent/forkjoin/ForkJoinPool.java +++ /dev/null @@ -1,1870 +0,0 @@ -/* - * Written by Doug Lea with assistance from members of JCP JSR-166 - * Expert Group and released to the public domain, as explained at - * http://creativecommons.org/licenses/publicdomain - */ - -package scala.concurrent.forkjoin; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.locks.*; -import java.util.concurrent.atomic.*; -import sun.misc.Unsafe; -import java.lang.reflect.*; - -interface RunnableFuture extends Runnable { - //TR placeholder for java.util.concurrent.RunnableFuture -} - -/** - * An {@link ExecutorService} for running {@link ForkJoinTask}s. A - * ForkJoinPool provides the entry point for submissions from - * non-ForkJoinTasks, as well as management and monitoring operations. - * Normally a single ForkJoinPool is used for a large number of - * submitted tasks. Otherwise, use would not usually outweigh the - * construction and bookkeeping overhead of creating a large set of - * threads. - * - *

ForkJoinPools differ from other kinds of Executors mainly in - * that they provide work-stealing: all threads in the pool - * attempt to find and execute subtasks created by other active tasks - * (eventually blocking if none exist). This makes them efficient when - * most tasks spawn other subtasks (as do most ForkJoinTasks), as well - * as the mixed execution of some plain Runnable- or Callable- based - * activities along with ForkJoinTasks. When setting - * setAsyncMode, a ForkJoinPools may also be appropriate for - * use with fine-grained tasks that are never joined. Otherwise, other - * ExecutorService implementations are typically more appropriate - * choices. - * - *

A ForkJoinPool may be constructed with a given parallelism level - * (target pool size), which it attempts to maintain by dynamically - * adding, suspending, or resuming threads, even if some tasks are - * waiting to join others. However, no such adjustments are performed - * in the face of blocked IO or other unmanaged synchronization. The - * nested ManagedBlocker interface enables extension of - * the kinds of synchronization accommodated. The target parallelism - * level may also be changed dynamically (setParallelism) - * and thread construction can be limited using methods - * setMaximumPoolSize and/or - * setMaintainsParallelism. - * - *

In addition to execution and lifecycle control methods, this - * class provides status check methods (for example - * getStealCount) that are intended to aid in developing, - * tuning, and monitoring fork/join applications. Also, method - * toString returns indications of pool state in a - * convenient form for informal monitoring. - * - *

Implementation notes: This implementation restricts the - * maximum number of running threads to 32767. Attempts to create - * pools with greater than the maximum result in - * IllegalArgumentExceptions. - */ -public class ForkJoinPool /*extends AbstractExecutorService*/ { - - /* - * See the extended comments interspersed below for design, - * rationale, and walkthroughs. - */ - - /** Mask for packing and unpacking shorts */ - private static final int shortMask = 0xffff; - - /** Max pool size -- must be a power of two minus 1 */ - private static final int MAX_THREADS = 0x7FFF; - - /** - * Factory for creating new ForkJoinWorkerThreads. A - * ForkJoinWorkerThreadFactory must be defined and used for - * ForkJoinWorkerThread subclasses that extend base functionality - * or initialize threads with different contexts. - */ - public static interface ForkJoinWorkerThreadFactory { - /** - * Returns a new worker thread operating in the given pool. - * - * @param pool the pool this thread works in - * @throws NullPointerException if pool is null; - */ - public ForkJoinWorkerThread newThread(ForkJoinPool pool); - } - - /** - * Default ForkJoinWorkerThreadFactory implementation, creates a - * new ForkJoinWorkerThread. - */ - static class DefaultForkJoinWorkerThreadFactory - implements ForkJoinWorkerThreadFactory { - public ForkJoinWorkerThread newThread(ForkJoinPool pool) { - try { - return new ForkJoinWorkerThread(pool); - } catch (OutOfMemoryError oom) { - return null; - } - } - } - - /** - * Creates a new ForkJoinWorkerThread. This factory is used unless - * overridden in ForkJoinPool constructors. - */ - public static final ForkJoinWorkerThreadFactory - defaultForkJoinWorkerThreadFactory = - new DefaultForkJoinWorkerThreadFactory(); - - /** - * Permission required for callers of methods that may start or - * kill threads. - */ - private static final RuntimePermission modifyThreadPermission = - new RuntimePermission("modifyThread"); - - /** - * If there is a security manager, makes sure caller has - * permission to modify threads. - */ - private static void checkPermission() { - SecurityManager security = System.getSecurityManager(); - if (security != null) - security.checkPermission(modifyThreadPermission); - } - - /** - * Generator for assigning sequence numbers as pool names. - */ - private static final AtomicInteger poolNumberGenerator = - new AtomicInteger(); - - /** - * Array holding all worker threads in the pool. Initialized upon - * first use. Array size must be a power of two. Updates and - * replacements are protected by workerLock, but it is always kept - * in a consistent enough state to be randomly accessed without - * locking by workers performing work-stealing. - */ - public volatile ForkJoinWorkerThread[] workers; - - /** - * Lock protecting access to workers. - */ - private final ReentrantLock workerLock; - - /** - * Condition for awaitTermination. - */ - private final Condition termination; - - /** - * The uncaught exception handler used when any worker - * abrupty terminates - */ - private Thread.UncaughtExceptionHandler ueh; - - /** - * Creation factory for worker threads. - */ - private final ForkJoinWorkerThreadFactory factory; - - /** - * Head of stack of threads that were created to maintain - * parallelism when other threads blocked, but have since - * suspended when the parallelism level rose. - */ - private volatile WaitQueueNode spareStack; - - /** - * Sum of per-thread steal counts, updated only when threads are - * idle or terminating. - */ - private final AtomicLong stealCount; - - /** - * Queue for external submissions. - */ - private final LinkedTransferQueue> submissionQueue; - - /** - * Head of Treiber stack for barrier sync. See below for explanation - */ - private volatile WaitQueueNode syncStack; - - /** - * The count for event barrier - */ - private volatile long eventCount; - - /** - * Pool number, just for assigning useful names to worker threads - */ - private final int poolNumber; - - /** - * The maximum allowed pool size - */ - private volatile int maxPoolSize; - - /** - * The desired parallelism level, updated only under workerLock. - */ - private volatile int parallelism; - - /** - * True if use local fifo, not default lifo, for local polling - */ - private volatile boolean locallyFifo; - - /** - * Holds number of total (i.e., created and not yet terminated) - * and running (i.e., not blocked on joins or other managed sync) - * threads, packed into one int to ensure consistent snapshot when - * making decisions about creating and suspending spare - * threads. Updated only by CAS. Note: CASes in - * updateRunningCount and preJoin running active count is in low - * word, so need to be modified if this changes - */ - private volatile int workerCounts; - - private static int totalCountOf(int s) { return s >>> 16; } - private static int runningCountOf(int s) { return s & shortMask; } - private static int workerCountsFor(int t, int r) { return (t << 16) + r; } - - /** - * Add delta (which may be negative) to running count. This must - * be called before (with negative arg) and after (with positive) - * any managed synchronization (i.e., mainly, joins) - * @param delta the number to add - */ - final void updateRunningCount(int delta) { - int s; - do;while (!casWorkerCounts(s = workerCounts, s + delta)); - } - - /** - * Add delta (which may be negative) to both total and running - * count. This must be called upon creation and termination of - * worker threads. - * @param delta the number to add - */ - private void updateWorkerCount(int delta) { - int d = delta + (delta << 16); // add to both lo and hi parts - int s; - do;while (!casWorkerCounts(s = workerCounts, s + d)); - } - - /** - * Lifecycle control. High word contains runState, low word - * contains the number of workers that are (probably) executing - * tasks. This value is atomically incremented before a worker - * gets a task to run, and decremented when worker has no tasks - * and cannot find any. These two fields are bundled together to - * support correct termination triggering. Note: activeCount - * CAS'es cheat by assuming active count is in low word, so need - * to be modified if this changes - */ - private volatile int runControl; - - // RunState values. Order among values matters - private static final int RUNNING = 0; - private static final int SHUTDOWN = 1; - private static final int TERMINATING = 2; - private static final int TERMINATED = 3; - - private static int runStateOf(int c) { return c >>> 16; } - private static int activeCountOf(int c) { return c & shortMask; } - private static int runControlFor(int r, int a) { return (r << 16) + a; } - - /** - * Try incrementing active count; fail on contention. Called by - * workers before/during executing tasks. - * @return true on success; - */ - final boolean tryIncrementActiveCount() { - int c = runControl; - return casRunControl(c, c+1); - } - - /** - * Try decrementing active count; fail on contention. - * Possibly trigger termination on success - * Called by workers when they can't find tasks. - * @return true on success - */ - final boolean tryDecrementActiveCount() { - int c = runControl; - int nextc = c - 1; - if (!casRunControl(c, nextc)) - return false; - if (canTerminateOnShutdown(nextc)) - terminateOnShutdown(); - return true; - } - - /** - * Return true if argument represents zero active count and - * nonzero runstate, which is the triggering condition for - * terminating on shutdown. - */ - private static boolean canTerminateOnShutdown(int c) { - return ((c & -c) >>> 16) != 0; // i.e. least bit is nonzero runState bit - } - - /** - * Transition run state to at least the given state. Return true - * if not already at least given state. - */ - private boolean transitionRunStateTo(int state) { - for (;;) { - int c = runControl; - if (runStateOf(c) >= state) - return false; - if (casRunControl(c, runControlFor(state, activeCountOf(c)))) - return true; - } - } - - /** - * Controls whether to add spares to maintain parallelism - */ - private volatile boolean maintainsParallelism; - - // Constructors - - /** - * Creates a ForkJoinPool with a pool size equal to the number of - * processors available on the system and using the default - * ForkJoinWorkerThreadFactory, - * @throws SecurityException if a security manager exists and - * the caller is not permitted to modify threads - * because it does not hold {@link - * java.lang.RuntimePermission}("modifyThread"), - */ - public ForkJoinPool() { - this(Runtime.getRuntime().availableProcessors(), - defaultForkJoinWorkerThreadFactory); - } - - /** - * Creates a ForkJoinPool with the indicated parellelism level - * threads, and using the default ForkJoinWorkerThreadFactory, - * @param parallelism the number of worker threads - * @throws IllegalArgumentException if parallelism less than or - * equal to zero - * @throws SecurityException if a security manager exists and - * the caller is not permitted to modify threads - * because it does not hold {@link - * java.lang.RuntimePermission}("modifyThread"), - */ - public ForkJoinPool(int parallelism) { - this(parallelism, defaultForkJoinWorkerThreadFactory); - } - - /** - * Creates a ForkJoinPool with parallelism equal to the number of - * processors available on the system and using the given - * ForkJoinWorkerThreadFactory, - * @param factory the factory for creating new threads - * @throws NullPointerException if factory is null - * @throws SecurityException if a security manager exists and - * the caller is not permitted to modify threads - * because it does not hold {@link - * java.lang.RuntimePermission}("modifyThread"), - */ - public ForkJoinPool(ForkJoinWorkerThreadFactory factory) { - this(Runtime.getRuntime().availableProcessors(), factory); - } - - /** - * Creates a ForkJoinPool with the given parallelism and factory. - * - * @param parallelism the targeted number of worker threads - * @param factory the factory for creating new threads - * @throws IllegalArgumentException if parallelism less than or - * equal to zero, or greater than implementation limit. - * @throws NullPointerException if factory is null - * @throws SecurityException if a security manager exists and - * the caller is not permitted to modify threads - * because it does not hold {@link - * java.lang.RuntimePermission}("modifyThread"), - */ - public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory) { - if (parallelism <= 0 || parallelism > MAX_THREADS) - throw new IllegalArgumentException(); - if (factory == null) - throw new NullPointerException(); - checkPermission(); - this.factory = factory; - this.parallelism = parallelism; - this.maxPoolSize = MAX_THREADS; - this.maintainsParallelism = true; - this.poolNumber = poolNumberGenerator.incrementAndGet(); - this.workerLock = new ReentrantLock(); - this.termination = workerLock.newCondition(); - this.stealCount = new AtomicLong(); - this.submissionQueue = new LinkedTransferQueue>(); - // worker array and workers are lazily constructed - } - - /** - * Create new worker using factory. - * @param index the index to assign worker - * @return new worker, or null of factory failed - */ - private ForkJoinWorkerThread createWorker(int index) { - Thread.UncaughtExceptionHandler h = ueh; - ForkJoinWorkerThread w = factory.newThread(this); - if (w != null) { - w.poolIndex = index; - w.setDaemon(true); - w.setAsyncMode(locallyFifo); - w.setName("ForkJoinPool-" + poolNumber + "-worker-" + index); - if (h != null) - w.setUncaughtExceptionHandler(h); - } - return w; - } - - /** - * Return a good size for worker array given pool size. - * Currently requires size to be a power of two. - */ - private static int arraySizeFor(int ps) { - return ps <= 1? 1 : (1 << (32 - Integer.numberOfLeadingZeros(ps-1))); - } - - public static ForkJoinWorkerThread[] copyOfWorkers(ForkJoinWorkerThread[] original, int newLength) { - ForkJoinWorkerThread[] copy = new ForkJoinWorkerThread[newLength]; - System.arraycopy(original, 0, copy, 0, Math.min(newLength, original.length)); - return copy; - } - - /** - * Create or resize array if necessary to hold newLength. - * Call only under exlusion or lock - * @return the array - */ - private ForkJoinWorkerThread[] ensureWorkerArrayCapacity(int newLength) { - ForkJoinWorkerThread[] ws = workers; - if (ws == null) - return workers = new ForkJoinWorkerThread[arraySizeFor(newLength)]; - else if (newLength > ws.length) - return workers = copyOfWorkers(ws, arraySizeFor(newLength)); - else - return ws; - } - - /** - * Try to shrink workers into smaller array after one or more terminate - */ - private void tryShrinkWorkerArray() { - ForkJoinWorkerThread[] ws = workers; - if (ws != null) { - int len = ws.length; - int last = len - 1; - while (last >= 0 && ws[last] == null) - --last; - int newLength = arraySizeFor(last+1); - if (newLength < len) - workers = copyOfWorkers(ws, newLength); - } - } - - /** - * Initialize workers if necessary - */ - final void ensureWorkerInitialization() { - ForkJoinWorkerThread[] ws = workers; - if (ws == null) { - final ReentrantLock lock = this.workerLock; - lock.lock(); - try { - ws = workers; - if (ws == null) { - int ps = parallelism; - ws = ensureWorkerArrayCapacity(ps); - for (int i = 0; i < ps; ++i) { - ForkJoinWorkerThread w = createWorker(i); - if (w != null) { - ws[i] = w; - w.start(); - updateWorkerCount(1); - } - } - } - } finally { - lock.unlock(); - } - } - } - - /** - * Worker creation and startup for threads added via setParallelism. - */ - private void createAndStartAddedWorkers() { - resumeAllSpares(); // Allow spares to convert to nonspare - int ps = parallelism; - ForkJoinWorkerThread[] ws = ensureWorkerArrayCapacity(ps); - int len = ws.length; - // Sweep through slots, to keep lowest indices most populated - int k = 0; - while (k < len) { - if (ws[k] != null) { - ++k; - continue; - } - int s = workerCounts; - int tc = totalCountOf(s); - int rc = runningCountOf(s); - if (rc >= ps || tc >= ps) - break; - if (casWorkerCounts (s, workerCountsFor(tc+1, rc+1))) { - ForkJoinWorkerThread w = createWorker(k); - if (w != null) { - ws[k++] = w; - w.start(); - } - else { - updateWorkerCount(-1); // back out on failed creation - break; - } - } - } - } - - // Execution methods - - /** - * Common code for execute, invoke and submit - */ - private void doSubmit(ForkJoinTask task) { - if (isShutdown()) - throw new RejectedExecutionException(); - if (workers == null) - ensureWorkerInitialization(); - submissionQueue.offer(task); - signalIdleWorkers(); - } - - /** - * Performs the given task; returning its result upon completion - * @param task the task - * @return the task's result - * @throws NullPointerException if task is null - * @throws RejectedExecutionException if pool is shut down - */ - public T invoke(ForkJoinTask task) { - doSubmit(task); - return task.join(); - } - - /** - * Arranges for (asynchronous) execution of the given task. - * @param task the task - * @throws NullPointerException if task is null - * @throws RejectedExecutionException if pool is shut down - */ - public void execute(ForkJoinTask task) { - doSubmit(task); - } - - // AbstractExecutorService methods - - public void execute(Runnable task) { - doSubmit(new AdaptedRunnable(task, null)); - } - - public ForkJoinTask submit(Callable task) { - ForkJoinTask job = new AdaptedCallable(task); - doSubmit(job); - return job; - } - - public ForkJoinTask submit(Runnable task, T result) { - ForkJoinTask job = new AdaptedRunnable(task, result); - doSubmit(job); - return job; - } - - public ForkJoinTask submit(Runnable task) { - ForkJoinTask job = new AdaptedRunnable(task, null); - doSubmit(job); - return job; - } - - /** - * Adaptor for Runnables. This implements RunnableFuture - * to be compliant with AbstractExecutorService constraints - */ - static final class AdaptedRunnable extends ForkJoinTask - implements RunnableFuture { - final Runnable runnable; - final T resultOnCompletion; - T result; - AdaptedRunnable(Runnable runnable, T result) { - if (runnable == null) throw new NullPointerException(); - this.runnable = runnable; - this.resultOnCompletion = result; - } - public T getRawResult() { return result; } - public void setRawResult(T v) { result = v; } - public boolean exec() { - runnable.run(); - result = resultOnCompletion; - return true; - } - public void run() { invoke(); } - } - - /** - * Adaptor for Callables - */ - static final class AdaptedCallable extends ForkJoinTask - implements RunnableFuture { - final Callable callable; - T result; - AdaptedCallable(Callable callable) { - if (callable == null) throw new NullPointerException(); - this.callable = callable; - } - public T getRawResult() { return result; } - public void setRawResult(T v) { result = v; } - public boolean exec() { - try { - result = callable.call(); - return true; - } catch (Error err) { - throw err; - } catch (RuntimeException rex) { - throw rex; - } catch (Exception ex) { - throw new RuntimeException(ex); - } - } - public void run() { invoke(); } - } - - public List> invokeAll(Collection> tasks) { - ArrayList> ts = - new ArrayList>(tasks.size()); - for (Callable c : tasks) - ts.add(new AdaptedCallable(c)); - invoke(new InvokeAll(ts)); - return (List>)(List)ts; - } - - static final class InvokeAll extends RecursiveAction { - final ArrayList> tasks; - InvokeAll(ArrayList> tasks) { this.tasks = tasks; } - public void compute() { - try { invokeAll(tasks); } catch(Exception ignore) {} - } - } - - // Configuration and status settings and queries - - /** - * Returns the factory used for constructing new workers - * - * @return the factory used for constructing new workers - */ - public ForkJoinWorkerThreadFactory getFactory() { - return factory; - } - - /** - * Returns the handler for internal worker threads that terminate - * due to unrecoverable errors encountered while executing tasks. - * @return the handler, or null if none - */ - public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() { - Thread.UncaughtExceptionHandler h; - final ReentrantLock lock = this.workerLock; - lock.lock(); - try { - h = ueh; - } finally { - lock.unlock(); - } - return h; - } - - /** - * Sets the handler for internal worker threads that terminate due - * to unrecoverable errors encountered while executing tasks. - * Unless set, the current default or ThreadGroup handler is used - * as handler. - * - * @param h the new handler - * @return the old handler, or null if none - * @throws SecurityException if a security manager exists and - * the caller is not permitted to modify threads - * because it does not hold {@link - * java.lang.RuntimePermission}("modifyThread"), - */ - public Thread.UncaughtExceptionHandler - setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler h) { - checkPermission(); - Thread.UncaughtExceptionHandler old = null; - final ReentrantLock lock = this.workerLock; - lock.lock(); - try { - old = ueh; - ueh = h; - ForkJoinWorkerThread[] ws = workers; - if (ws != null) { - for (int i = 0; i < ws.length; ++i) { - ForkJoinWorkerThread w = ws[i]; - if (w != null) - w.setUncaughtExceptionHandler(h); - } - } - } finally { - lock.unlock(); - } - return old; - } - - - /** - * Sets the target paralleism level of this pool. - * @param parallelism the target parallelism - * @throws IllegalArgumentException if parallelism less than or - * equal to zero or greater than maximum size bounds. - * @throws SecurityException if a security manager exists and - * the caller is not permitted to modify threads - * because it does not hold {@link - * java.lang.RuntimePermission}("modifyThread"), - */ - public void setParallelism(int parallelism) { - checkPermission(); - if (parallelism <= 0 || parallelism > maxPoolSize) - throw new IllegalArgumentException(); - final ReentrantLock lock = this.workerLock; - lock.lock(); - try { - if (!isTerminating()) { - int p = this.parallelism; - this.parallelism = parallelism; - if (parallelism > p) - createAndStartAddedWorkers(); - else - trimSpares(); - } - } finally { - lock.unlock(); - } - signalIdleWorkers(); - } - - /** - * Returns the targeted number of worker threads in this pool. - * - * @return the targeted number of worker threads in this pool - */ - public int getParallelism() { - return parallelism; - } - - /** - * Returns the number of worker threads that have started but not - * yet terminated. This result returned by this method may differ - * from getParallelism when threads are created to - * maintain parallelism when others are cooperatively blocked. - * - * @return the number of worker threads - */ - public int getPoolSize() { - return totalCountOf(workerCounts); - } - - /** - * Returns the maximum number of threads allowed to exist in the - * pool, even if there are insufficient unblocked running threads. - * @return the maximum - */ - public int getMaximumPoolSize() { - return maxPoolSize; - } - - /** - * Sets the maximum number of threads allowed to exist in the - * pool, even if there are insufficient unblocked running threads. - * Setting this value has no effect on current pool size. It - * controls construction of new threads. - * @throws IllegalArgumentException if negative or greater then - * internal implementation limit. - */ - public void setMaximumPoolSize(int newMax) { - if (newMax < 0 || newMax > MAX_THREADS) - throw new IllegalArgumentException(); - maxPoolSize = newMax; - } - - - /** - * Returns true if this pool dynamically maintains its target - * parallelism level. If false, new threads are added only to - * avoid possible starvation. - * This setting is by default true; - * @return true if maintains parallelism - */ - public boolean getMaintainsParallelism() { - return maintainsParallelism; - } - - /** - * Sets whether this pool dynamically maintains its target - * parallelism level. If false, new threads are added only to - * avoid possible starvation. - * @param enable true to maintains parallelism - */ - public void setMaintainsParallelism(boolean enable) { - maintainsParallelism = enable; - } - - /** - * Establishes local first-in-first-out scheduling mode for forked - * tasks that are never joined. This mode may be more appropriate - * than default locally stack-based mode in applications in which - * worker threads only process asynchronous tasks. This method is - * designed to be invoked only when pool is quiescent, and - * typically only before any tasks are submitted. The effects of - * invocations at ather times may be unpredictable. - * - * @param async if true, use locally FIFO scheduling - * @return the previous mode. - */ - public boolean setAsyncMode(boolean async) { - boolean oldMode = locallyFifo; - locallyFifo = async; - ForkJoinWorkerThread[] ws = workers; - if (ws != null) { - for (int i = 0; i < ws.length; ++i) { - ForkJoinWorkerThread t = ws[i]; - if (t != null) - t.setAsyncMode(async); - } - } - return oldMode; - } - - /** - * Returns true if this pool uses local first-in-first-out - * scheduling mode for forked tasks that are never joined. - * - * @return true if this pool uses async mode. - */ - public boolean getAsyncMode() { - return locallyFifo; - } - - /** - * Returns an estimate of the number of worker threads that are - * not blocked waiting to join tasks or for other managed - * synchronization. - * - * @return the number of worker threads - */ - public int getRunningThreadCount() { - return runningCountOf(workerCounts); - } - - /** - * Returns an estimate of the number of threads that are currently - * stealing or executing tasks. This method may overestimate the - * number of active threads. - * @return the number of active threads. - */ - public int getActiveThreadCount() { - return activeCountOf(runControl); - } - - /** - * Returns an estimate of the number of threads that are currently - * idle waiting for tasks. This method may underestimate the - * number of idle threads. - * @return the number of idle threads. - */ - final int getIdleThreadCount() { - int c = runningCountOf(workerCounts) - activeCountOf(runControl); - return (c <= 0)? 0 : c; - } - - /** - * Returns true if all worker threads are currently idle. An idle - * worker is one that cannot obtain a task to execute because none - * are available to steal from other threads, and there are no - * pending submissions to the pool. This method is conservative: - * It might not return true immediately upon idleness of all - * threads, but will eventually become true if threads remain - * inactive. - * @return true if all threads are currently idle - */ - public boolean isQuiescent() { - return activeCountOf(runControl) == 0; - } - - /** - * Returns an estimate of the total number of tasks stolen from - * one thread's work queue by another. The reported value - * underestimates the actual total number of steals when the pool - * is not quiescent. This value may be useful for monitoring and - * tuning fork/join programs: In general, steal counts should be - * high enough to keep threads busy, but low enough to avoid - * overhead and contention across threads. - * @return the number of steals. - */ - public long getStealCount() { - return stealCount.get(); - } - - /** - * Accumulate steal count from a worker. Call only - * when worker known to be idle. - */ - private void updateStealCount(ForkJoinWorkerThread w) { - int sc = w.getAndClearStealCount(); - if (sc != 0) - stealCount.addAndGet(sc); - } - - /** - * Returns an estimate of the total number of tasks currently held - * in queues by worker threads (but not including tasks submitted - * to the pool that have not begun executing). This value is only - * an approximation, obtained by iterating across all threads in - * the pool. This method may be useful for tuning task - * granularities. - * @return the number of queued tasks. - */ - public long getQueuedTaskCount() { - long count = 0; - ForkJoinWorkerThread[] ws = workers; - if (ws != null) { - for (int i = 0; i < ws.length; ++i) { - ForkJoinWorkerThread t = ws[i]; - if (t != null) - count += t.getQueueSize(); - } - } - return count; - } - - /** - * Returns an estimate of the number tasks submitted to this pool - * that have not yet begun executing. This method takes time - * proportional to the number of submissions. - * @return the number of queued submissions. - */ - public int getQueuedSubmissionCount() { - return submissionQueue.size(); - } - - /** - * Returns true if there are any tasks submitted to this pool - * that have not yet begun executing. - * @return true if there are any queued submissions. - */ - public boolean hasQueuedSubmissions() { - return !submissionQueue.isEmpty(); - } - - /** - * Removes and returns the next unexecuted submission if one is - * available. This method may be useful in extensions to this - * class that re-assign work in systems with multiple pools. - * @return the next submission, or null if none - */ - protected ForkJoinTask pollSubmission() { - return submissionQueue.poll(); - } - - /** - * Removes all available unexecuted submitted and forked tasks - * from scheduling queues and adds them to the given collection, - * without altering their execution status. These may include - * artifically generated or wrapped tasks. This method id designed - * to be invoked only when the pool is known to be - * quiescent. Invocations at other times may not remove all - * tasks. A failure encountered while attempting to add elements - * to collection c may result in elements being in - * neither, either or both collections when the associated - * exception is thrown. The behavior of this operation is - * undefined if the specified collection is modified while the - * operation is in progress. - * @param c the collection to transfer elements into - * @return the number of elements transferred - */ - protected int drainTasksTo(Collection> c) { - int n = submissionQueue.drainTo(c); - ForkJoinWorkerThread[] ws = workers; - if (ws != null) { - for (int i = 0; i < ws.length; ++i) { - ForkJoinWorkerThread w = ws[i]; - if (w != null) - n += w.drainTasksTo(c); - } - } - return n; - } - - /** - * Returns a string identifying this pool, as well as its state, - * including indications of run state, parallelism level, and - * worker and task counts. - * - * @return a string identifying this pool, as well as its state - */ - public String toString() { - int ps = parallelism; - int wc = workerCounts; - int rc = runControl; - long st = getStealCount(); - long qt = getQueuedTaskCount(); - long qs = getQueuedSubmissionCount(); - return super.toString() + - "[" + runStateToString(runStateOf(rc)) + - ", parallelism = " + ps + - ", size = " + totalCountOf(wc) + - ", active = " + activeCountOf(rc) + - ", running = " + runningCountOf(wc) + - ", steals = " + st + - ", tasks = " + qt + - ", submissions = " + qs + - "]"; - } - - private static String runStateToString(int rs) { - switch(rs) { - case RUNNING: return "Running"; - case SHUTDOWN: return "Shutting down"; - case TERMINATING: return "Terminating"; - case TERMINATED: return "Terminated"; - default: throw new Error("Unknown run state"); - } - } - - // lifecycle control - - /** - * Initiates an orderly shutdown in which previously submitted - * tasks are executed, but no new tasks will be accepted. - * Invocation has no additional effect if already shut down. - * Tasks that are in the process of being submitted concurrently - * during the course of this method may or may not be rejected. - * @throws SecurityException if a security manager exists and - * the caller is not permitted to modify threads - * because it does not hold {@link - * java.lang.RuntimePermission}("modifyThread"), - */ - public void shutdown() { - checkPermission(); - transitionRunStateTo(SHUTDOWN); - if (canTerminateOnShutdown(runControl)) - terminateOnShutdown(); - } - - /** - * Attempts to stop all actively executing tasks, and cancels all - * waiting tasks. Tasks that are in the process of being - * submitted or executed concurrently during the course of this - * method may or may not be rejected. Unlike some other executors, - * this method cancels rather than collects non-executed tasks - * upon termination, so always returns an empty list. However, you - * can use method drainTasksTo before invoking this - * method to transfer unexecuted tasks to another collection. - * @return an empty list - * @throws SecurityException if a security manager exists and - * the caller is not permitted to modify threads - * because it does not hold {@link - * java.lang.RuntimePermission}("modifyThread"), - */ - public List shutdownNow() { - checkPermission(); - terminate(); - return Collections.emptyList(); - } - - /** - * Returns true if all tasks have completed following shut down. - * - * @return true if all tasks have completed following shut down - */ - public boolean isTerminated() { - return runStateOf(runControl) == TERMINATED; - } - - /** - * Returns true if the process of termination has - * commenced but possibly not yet completed. - * - * @return true if terminating - */ - public boolean isTerminating() { - return runStateOf(runControl) >= TERMINATING; - } - - /** - * Returns true if this pool has been shut down. - * - * @return true if this pool has been shut down - */ - public boolean isShutdown() { - return runStateOf(runControl) >= SHUTDOWN; - } - - /** - * Blocks until all tasks have completed execution after a shutdown - * request, or the timeout occurs, or the current thread is - * interrupted, whichever happens first. - * - * @param timeout the maximum time to wait - * @param unit the time unit of the timeout argument - * @return true if this executor terminated and - * false if the timeout elapsed before termination - * @throws InterruptedException if interrupted while waiting - */ - public boolean awaitTermination(long timeout, TimeUnit unit) - throws InterruptedException { - long nanos = unit.toNanos(timeout); - final ReentrantLock lock = this.workerLock; - lock.lock(); - try { - for (;;) { - if (isTerminated()) - return true; - if (nanos <= 0) - return false; - nanos = termination.awaitNanos(nanos); - } - } finally { - lock.unlock(); - } - } - - // Shutdown and termination support - - /** - * Callback from terminating worker. Null out the corresponding - * workers slot, and if terminating, try to terminate, else try to - * shrink workers array. - * @param w the worker - */ - final void workerTerminated(ForkJoinWorkerThread w) { - updateStealCount(w); - updateWorkerCount(-1); - final ReentrantLock lock = this.workerLock; - lock.lock(); - try { - ForkJoinWorkerThread[] ws = workers; - if (ws != null) { - int idx = w.poolIndex; - if (idx >= 0 && idx < ws.length && ws[idx] == w) - ws[idx] = null; - if (totalCountOf(workerCounts) == 0) { - terminate(); // no-op if already terminating - transitionRunStateTo(TERMINATED); - termination.signalAll(); - } - else if (!isTerminating()) { - tryShrinkWorkerArray(); - tryResumeSpare(true); // allow replacement - } - } - } finally { - lock.unlock(); - } - signalIdleWorkers(); - } - - /** - * Initiate termination. - */ - private void terminate() { - if (transitionRunStateTo(TERMINATING)) { - stopAllWorkers(); - resumeAllSpares(); - signalIdleWorkers(); - cancelQueuedSubmissions(); - cancelQueuedWorkerTasks(); - interruptUnterminatedWorkers(); - signalIdleWorkers(); // resignal after interrupt - } - } - - /** - * Possibly terminate when on shutdown state - */ - private void terminateOnShutdown() { - if (!hasQueuedSubmissions() && canTerminateOnShutdown(runControl)) - terminate(); - } - - /** - * Clear out and cancel submissions - */ - private void cancelQueuedSubmissions() { - ForkJoinTask task; - while ((task = pollSubmission()) != null) - task.cancel(false); - } - - /** - * Clean out worker queues. - */ - private void cancelQueuedWorkerTasks() { - final ReentrantLock lock = this.workerLock; - lock.lock(); - try { - ForkJoinWorkerThread[] ws = workers; - if (ws != null) { - for (int i = 0; i < ws.length; ++i) { - ForkJoinWorkerThread t = ws[i]; - if (t != null) - t.cancelTasks(); - } - } - } finally { - lock.unlock(); - } - } - - /** - * Set each worker's status to terminating. Requires lock to avoid - * conflicts with add/remove - */ - private void stopAllWorkers() { - final ReentrantLock lock = this.workerLock; - lock.lock(); - try { - ForkJoinWorkerThread[] ws = workers; - if (ws != null) { - for (int i = 0; i < ws.length; ++i) { - ForkJoinWorkerThread t = ws[i]; - if (t != null) - t.shutdownNow(); - } - } - } finally { - lock.unlock(); - } - } - - /** - * Interrupt all unterminated workers. This is not required for - * sake of internal control, but may help unstick user code during - * shutdown. - */ - private void interruptUnterminatedWorkers() { - final ReentrantLock lock = this.workerLock; - lock.lock(); - try { - ForkJoinWorkerThread[] ws = workers; - if (ws != null) { - for (int i = 0; i < ws.length; ++i) { - ForkJoinWorkerThread t = ws[i]; - if (t != null && !t.isTerminated()) { - try { - t.interrupt(); - } catch (SecurityException ignore) { - } - } - } - } - } finally { - lock.unlock(); - } - } - - - /* - * Nodes for event barrier to manage idle threads. Queue nodes - * are basic Treiber stack nodes, also used for spare stack. - * - * The event barrier has an event count and a wait queue (actually - * a Treiber stack). Workers are enabled to look for work when - * the eventCount is incremented. If they fail to find work, they - * may wait for next count. Upon release, threads help others wake - * up. - * - * Synchronization events occur only in enough contexts to - * maintain overall liveness: - * - * - Submission of a new task to the pool - * - Resizes or other changes to the workers array - * - pool termination - * - A worker pushing a task on an empty queue - * - * The case of pushing a task occurs often enough, and is heavy - * enough compared to simple stack pushes, to require special - * handling: Method signalWork returns without advancing count if - * the queue appears to be empty. This would ordinarily result in - * races causing some queued waiters not to be woken up. To avoid - * this, the first worker enqueued in method sync (see - * syncIsReleasable) rescans for tasks after being enqueued, and - * helps signal if any are found. This works well because the - * worker has nothing better to do, and so might as well help - * alleviate the overhead and contention on the threads actually - * doing work. Also, since event counts increments on task - * availability exist to maintain liveness (rather than to force - * refreshes etc), it is OK for callers to exit early if - * contending with another signaller. - */ - static final class WaitQueueNode { - WaitQueueNode next; // only written before enqueued - volatile ForkJoinWorkerThread thread; // nulled to cancel wait - final long count; // unused for spare stack - - WaitQueueNode(long c, ForkJoinWorkerThread w) { - count = c; - thread = w; - } - - /** - * Wake up waiter, returning false if known to already - */ - boolean signal() { - ForkJoinWorkerThread t = thread; - if (t == null) - return false; - thread = null; - LockSupport.unpark(t); - return true; - } - - /** - * Await release on sync - */ - void awaitSyncRelease(ForkJoinPool p) { - while (thread != null && !p.syncIsReleasable(this)) - LockSupport.park();//TR park(this); - } - - /** - * Await resumption as spare - */ - void awaitSpareRelease() { - while (thread != null) { - if (!Thread.interrupted()) - LockSupport.park();//TR park(this); - } - } - } - - /** - * Ensures that no thread is waiting for count to advance from the - * current value of eventCount read on entry to this method, by - * releasing waiting threads if necessary. - * @return the count - */ - final long ensureSync() { - long c = eventCount; - WaitQueueNode q; - while ((q = syncStack) != null && q.count < c) { - if (casBarrierStack(q, null)) { - do { - q.signal(); - } while ((q = q.next) != null); - break; - } - } - return c; - } - - /** - * Increments event count and releases waiting threads. - */ - private void signalIdleWorkers() { - long c; - do;while (!casEventCount(c = eventCount, c+1)); - ensureSync(); - } - - /** - * Signal threads waiting to poll a task. Because method sync - * rechecks availability, it is OK to only proceed if queue - * appears to be non-empty, and OK to skip under contention to - * increment count (since some other thread succeeded). - */ - final void signalWork() { - long c; - WaitQueueNode q; - if (syncStack != null && - casEventCount(c = eventCount, c+1) && - (((q = syncStack) != null && q.count <= c) && - (!casBarrierStack(q, q.next) || !q.signal()))) - ensureSync(); - } - - /** - * Waits until event count advances from last value held by - * caller, or if excess threads, caller is resumed as spare, or - * caller or pool is terminating. Updates caller's event on exit. - * @param w the calling worker thread - */ - final void sync(ForkJoinWorkerThread w) { - updateStealCount(w); // Transfer w's count while it is idle - - while (!w.isShutdown() && !isTerminating() && !suspendIfSpare(w)) { - long prev = w.lastEventCount; - WaitQueueNode node = null; - WaitQueueNode h; - while (eventCount == prev && - ((h = syncStack) == null || h.count == prev)) { - if (node == null) - node = new WaitQueueNode(prev, w); - if (casBarrierStack(node.next = h, node)) { - node.awaitSyncRelease(this); - break; - } - } - long ec = ensureSync(); - if (ec != prev) { - w.lastEventCount = ec; - break; - } - } - } - - /** - * Returns true if worker waiting on sync can proceed: - * - on signal (thread == null) - * - on event count advance (winning race to notify vs signaller) - * - on Interrupt - * - if the first queued node, we find work available - * If node was not signalled and event count not advanced on exit, - * then we also help advance event count. - * @return true if node can be released - */ - final boolean syncIsReleasable(WaitQueueNode node) { - long prev = node.count; - if (!Thread.interrupted() && node.thread != null && - (node.next != null || - !ForkJoinWorkerThread.hasQueuedTasks(workers)) && - eventCount == prev) - return false; - if (node.thread != null) { - node.thread = null; - long ec = eventCount; - if (prev <= ec) // help signal - casEventCount(ec, ec+1); - } - return true; - } - - /** - * Returns true if a new sync event occurred since last call to - * sync or this method, if so, updating caller's count. - */ - final boolean hasNewSyncEvent(ForkJoinWorkerThread w) { - long lc = w.lastEventCount; - long ec = ensureSync(); - if (ec == lc) - return false; - w.lastEventCount = ec; - return true; - } - - // Parallelism maintenance - - /** - * Decrement running count; if too low, add spare. - * - * Conceptually, all we need to do here is add or resume a - * spare thread when one is about to block (and remove or - * suspend it later when unblocked -- see suspendIfSpare). - * However, implementing this idea requires coping with - * several problems: We have imperfect information about the - * states of threads. Some count updates can and usually do - * lag run state changes, despite arrangements to keep them - * accurate (for example, when possible, updating counts - * before signalling or resuming), especially when running on - * dynamic JVMs that don't optimize the infrequent paths that - * update counts. Generating too many threads can make these - * problems become worse, because excess threads are more - * likely to be context-switched with others, slowing them all - * down, especially if there is no work available, so all are - * busy scanning or idling. Also, excess spare threads can - * only be suspended or removed when they are idle, not - * immediately when they aren't needed. So adding threads will - * raise parallelism level for longer than necessary. Also, - * FJ applications often enounter highly transient peaks when - * many threads are blocked joining, but for less time than it - * takes to create or resume spares. - * - * @param joinMe if non-null, return early if done - * @param maintainParallelism if true, try to stay within - * target counts, else create only to avoid starvation - * @return true if joinMe known to be done - */ - final boolean preJoin(ForkJoinTask joinMe, boolean maintainParallelism) { - maintainParallelism &= maintainsParallelism; // overrride - boolean dec = false; // true when running count decremented - while (spareStack == null || !tryResumeSpare(dec)) { - int counts = workerCounts; - if (dec || (dec = casWorkerCounts(counts, --counts))) { // CAS cheat - if (!needSpare(counts, maintainParallelism)) - break; - if (joinMe.status < 0) - return true; - if (tryAddSpare(counts)) - break; - } - } - return false; - } - - /** - * Same idea as preJoin - */ - final boolean preBlock(ManagedBlocker blocker, boolean maintainParallelism){ - maintainParallelism &= maintainsParallelism; - boolean dec = false; - while (spareStack == null || !tryResumeSpare(dec)) { - int counts = workerCounts; - if (dec || (dec = casWorkerCounts(counts, --counts))) { - if (!needSpare(counts, maintainParallelism)) - break; - if (blocker.isReleasable()) - return true; - if (tryAddSpare(counts)) - break; - } - } - return false; - } - - /** - * Returns true if a spare thread appears to be needed. If - * maintaining parallelism, returns true when the deficit in - * running threads is more than the surplus of total threads, and - * there is apparently some work to do. This self-limiting rule - * means that the more threads that have already been added, the - * less parallelism we will tolerate before adding another. - * @param counts current worker counts - * @param maintainParallelism try to maintain parallelism - */ - private boolean needSpare(int counts, boolean maintainParallelism) { - int ps = parallelism; - int rc = runningCountOf(counts); - int tc = totalCountOf(counts); - int runningDeficit = ps - rc; - int totalSurplus = tc - ps; - return (tc < maxPoolSize && - (rc == 0 || totalSurplus < 0 || - (maintainParallelism && - runningDeficit > totalSurplus && - ForkJoinWorkerThread.hasQueuedTasks(workers)))); - } - - /** - * Add a spare worker if lock available and no more than the - * expected numbers of threads exist - * @return true if successful - */ - private boolean tryAddSpare(int expectedCounts) { - final ReentrantLock lock = this.workerLock; - int expectedRunning = runningCountOf(expectedCounts); - int expectedTotal = totalCountOf(expectedCounts); - boolean success = false; - boolean locked = false; - // confirm counts while locking; CAS after obtaining lock - try { - for (;;) { - int s = workerCounts; - int tc = totalCountOf(s); - int rc = runningCountOf(s); - if (rc > expectedRunning || tc > expectedTotal) - break; - if (!locked && !(locked = lock.tryLock())) - break; - if (casWorkerCounts(s, workerCountsFor(tc+1, rc+1))) { - createAndStartSpare(tc); - success = true; - break; - } - } - } finally { - if (locked) - lock.unlock(); - } - return success; - } - - /** - * Add the kth spare worker. On entry, pool coounts are already - * adjusted to reflect addition. - */ - private void createAndStartSpare(int k) { - ForkJoinWorkerThread w = null; - ForkJoinWorkerThread[] ws = ensureWorkerArrayCapacity(k + 1); - int len = ws.length; - // Probably, we can place at slot k. If not, find empty slot - if (k < len && ws[k] != null) { - for (k = 0; k < len && ws[k] != null; ++k) - ; - } - if (k < len && !isTerminating() && (w = createWorker(k)) != null) { - ws[k] = w; - w.start(); - } - else - updateWorkerCount(-1); // adjust on failure - signalIdleWorkers(); - } - - /** - * Suspend calling thread w if there are excess threads. Called - * only from sync. Spares are enqueued in a Treiber stack - * using the same WaitQueueNodes as barriers. They are resumed - * mainly in preJoin, but are also woken on pool events that - * require all threads to check run state. - * @param w the caller - */ - private boolean suspendIfSpare(ForkJoinWorkerThread w) { - WaitQueueNode node = null; - int s; - while (parallelism < runningCountOf(s = workerCounts)) { - if (node == null) - node = new WaitQueueNode(0, w); - if (casWorkerCounts(s, s-1)) { // representation-dependent - // push onto stack - do;while (!casSpareStack(node.next = spareStack, node)); - // block until released by resumeSpare - node.awaitSpareRelease(); - return true; - } - } - return false; - } - - /** - * Try to pop and resume a spare thread. - * @param updateCount if true, increment running count on success - * @return true if successful - */ - private boolean tryResumeSpare(boolean updateCount) { - WaitQueueNode q; - while ((q = spareStack) != null) { - if (casSpareStack(q, q.next)) { - if (updateCount) - updateRunningCount(1); - q.signal(); - return true; - } - } - return false; - } - - /** - * Pop and resume all spare threads. Same idea as ensureSync. - * @return true if any spares released - */ - private boolean resumeAllSpares() { - WaitQueueNode q; - while ( (q = spareStack) != null) { - if (casSpareStack(q, null)) { - do { - updateRunningCount(1); - q.signal(); - } while ((q = q.next) != null); - return true; - } - } - return false; - } - - /** - * Pop and shutdown excessive spare threads. Call only while - * holding lock. This is not guaranteed to eliminate all excess - * threads, only those suspended as spares, which are the ones - * unlikely to be needed in the future. - */ - private void trimSpares() { - int surplus = totalCountOf(workerCounts) - parallelism; - WaitQueueNode q; - while (surplus > 0 && (q = spareStack) != null) { - if (casSpareStack(q, null)) { - do { - updateRunningCount(1); - ForkJoinWorkerThread w = q.thread; - if (w != null && surplus > 0 && - runningCountOf(workerCounts) > 0 && w.shutdown()) - --surplus; - q.signal(); - } while ((q = q.next) != null); - } - } - } - - /** - * Interface for extending managed parallelism for tasks running - * in ForkJoinPools. A ManagedBlocker provides two methods. - * Method isReleasable must return true if blocking is not - * necessary. Method block blocks the current thread - * if necessary (perhaps internally invoking isReleasable before - * actually blocking.). - *

For example, here is a ManagedBlocker based on a - * ReentrantLock: - *

-     *   class ManagedLocker implements ManagedBlocker {
-     *     final ReentrantLock lock;
-     *     boolean hasLock = false;
-     *     ManagedLocker(ReentrantLock lock) { this.lock = lock; }
-     *     public boolean block() {
-     *        if (!hasLock)
-     *           lock.lock();
-     *        return true;
-     *     }
-     *     public boolean isReleasable() {
-     *        return hasLock || (hasLock = lock.tryLock());
-     *     }
-     *   }
-     * 
- */ - public static interface ManagedBlocker { - /** - * Possibly blocks the current thread, for example waiting for - * a lock or condition. - * @return true if no additional blocking is necessary (i.e., - * if isReleasable would return true). - * @throws InterruptedException if interrupted while waiting - * (the method is not required to do so, but is allowe to). - */ - boolean block() throws InterruptedException; - - /** - * Returns true if blocking is unnecessary. - */ - boolean isReleasable(); - } - - /** - * Blocks in accord with the given blocker. If the current thread - * is a ForkJoinWorkerThread, this method possibly arranges for a - * spare thread to be activated if necessary to ensure parallelism - * while the current thread is blocked. If - * maintainParallelism is true and the pool supports - * it ({@link #getMaintainsParallelism}), this method attempts to - * maintain the pool's nominal parallelism. Otherwise if activates - * a thread only if necessary to avoid complete starvation. This - * option may be preferable when blockages use timeouts, or are - * almost always brief. - * - *

If the caller is not a ForkJoinTask, this method is behaviorally - * equivalent to - *

-     *   while (!blocker.isReleasable())
-     *      if (blocker.block())
-     *         return;
-     * 
- * If the caller is a ForkJoinTask, then the pool may first - * be expanded to ensure parallelism, and later adjusted. - * - * @param blocker the blocker - * @param maintainParallelism if true and supported by this pool, - * attempt to maintain the pool's nominal parallelism; otherwise - * activate a thread only if necessary to avoid complete - * starvation. - * @throws InterruptedException if blocker.block did so. - */ - public static void managedBlock(ManagedBlocker blocker, - boolean maintainParallelism) - throws InterruptedException { - Thread t = Thread.currentThread(); - ForkJoinPool pool = (t instanceof ForkJoinWorkerThread? - ((ForkJoinWorkerThread)t).pool : null); - if (!blocker.isReleasable()) { - try { - if (pool == null || - !pool.preBlock(blocker, maintainParallelism)) - awaitBlocker(blocker); - } finally { - if (pool != null) - pool.updateRunningCount(1); - } - } - } - - private static void awaitBlocker(ManagedBlocker blocker) - throws InterruptedException { - do;while (!blocker.isReleasable() && !blocker.block()); - } - - // AbstractExecutorService overrides - - protected RunnableFuture newTaskFor(Runnable runnable, T value) { - return new AdaptedRunnable(runnable, value); - } - - protected RunnableFuture newTaskFor(Callable callable) { - return new AdaptedCallable(callable); - } - - - // Temporary Unsafe mechanics for preliminary release - private static Unsafe getUnsafe() throws Throwable { - try { - return Unsafe.getUnsafe(); - } catch (SecurityException se) { - try { - return java.security.AccessController.doPrivileged - (new java.security.PrivilegedExceptionAction() { - public Unsafe run() throws Exception { - return getUnsafePrivileged(); - }}); - } catch (java.security.PrivilegedActionException e) { - throw e.getCause(); - } - } - } - - private static Unsafe getUnsafePrivileged() - throws NoSuchFieldException, IllegalAccessException { - Field f = Unsafe.class.getDeclaredField("theUnsafe"); - f.setAccessible(true); - return (Unsafe) f.get(null); - } - - private static long fieldOffset(String fieldName) - throws NoSuchFieldException { - return _unsafe.objectFieldOffset - (ForkJoinPool.class.getDeclaredField(fieldName)); - } - - static final Unsafe _unsafe; - static final long eventCountOffset; - static final long workerCountsOffset; - static final long runControlOffset; - static final long syncStackOffset; - static final long spareStackOffset; - - static { - try { - _unsafe = getUnsafe(); - eventCountOffset = fieldOffset("eventCount"); - workerCountsOffset = fieldOffset("workerCounts"); - runControlOffset = fieldOffset("runControl"); - syncStackOffset = fieldOffset("syncStack"); - spareStackOffset = fieldOffset("spareStack"); - } catch (Throwable e) { - throw new RuntimeException("Could not initialize intrinsics", e); - } - } - - private boolean casEventCount(long cmp, long val) { - return _unsafe.compareAndSwapLong(this, eventCountOffset, cmp, val); - } - private boolean casWorkerCounts(int cmp, int val) { - return _unsafe.compareAndSwapInt(this, workerCountsOffset, cmp, val); - } - private boolean casRunControl(int cmp, int val) { - return _unsafe.compareAndSwapInt(this, runControlOffset, cmp, val); - } - private boolean casSpareStack(WaitQueueNode cmp, WaitQueueNode val) { - return _unsafe.compareAndSwapObject(this, spareStackOffset, cmp, val); - } - private boolean casBarrierStack(WaitQueueNode cmp, WaitQueueNode val) { - return _unsafe.compareAndSwapObject(this, syncStackOffset, cmp, val); - } -} diff --git a/src/jvm15-library/scala/concurrent/forkjoin/ForkJoinTask.java b/src/jvm15-library/scala/concurrent/forkjoin/ForkJoinTask.java deleted file mode 100644 index e6c0fa7bb4..0000000000 --- a/src/jvm15-library/scala/concurrent/forkjoin/ForkJoinTask.java +++ /dev/null @@ -1,1052 +0,0 @@ -/* - * Written by Doug Lea with assistance from members of JCP JSR-166 - * Expert Group and released to the public domain, as explained at - * http://creativecommons.org/licenses/publicdomain - */ - -package scala.concurrent.forkjoin; -import java.io.Serializable; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; -import sun.misc.Unsafe; -import java.lang.reflect.*; - -/** - * Abstract base class for tasks that run within a {@link - * ForkJoinPool}. A ForkJoinTask is a thread-like entity that is much - * lighter weight than a normal thread. Huge numbers of tasks and - * subtasks may be hosted by a small number of actual threads in a - * ForkJoinPool, at the price of some usage limitations. - * - *

A "main" ForkJoinTask begins execution when submitted to a - * {@link ForkJoinPool}. Once started, it will usually in turn start - * other subtasks. As indicated by the name of this class, many - * programs using ForkJoinTasks employ only methods fork - * and join, or derivatives such as - * invokeAll. However, this class also provides a number - * of other methods that can come into play in advanced usages, as - * well as extension mechanics that allow support of new forms of - * fork/join processing. - * - *

A ForkJoinTask is a lightweight form of {@link Future}. The - * efficiency of ForkJoinTasks stems from a set of restrictions (that - * are only partially statically enforceable) reflecting their - * intended use as computational tasks calculating pure functions or - * operating on purely isolated objects. The primary coordination - * mechanisms are {@link #fork}, that arranges asynchronous execution, - * and {@link #join}, that doesn't proceed until the task's result has - * been computed. Computations should avoid synchronized - * methods or blocks, and should minimize other blocking - * synchronization apart from joining other tasks or using - * synchronizers such as Phasers that are advertised to cooperate with - * fork/join scheduling. Tasks should also not perform blocking IO, - * and should ideally access variables that are completely independent - * of those accessed by other running tasks. Minor breaches of these - * restrictions, for example using shared output streams, may be - * tolerable in practice, but frequent use may result in poor - * performance, and the potential to indefinitely stall if the number - * of threads not waiting for IO or other external synchronization - * becomes exhausted. This usage restriction is in part enforced by - * not permitting checked exceptions such as IOExceptions - * to be thrown. However, computations may still encounter unchecked - * exceptions, that are rethrown to callers attempting join - * them. These exceptions may additionally include - * RejectedExecutionExceptions stemming from internal resource - * exhaustion such as failure to allocate internal task queues. - * - *

The primary method for awaiting completion and extracting - * results of a task is {@link #join}, but there are several variants: - * The {@link Future#get} methods support interruptible and/or timed - * waits for completion and report results using Future - * conventions. Method {@link #helpJoin} enables callers to actively - * execute other tasks while awaiting joins, which is sometimes more - * efficient but only applies when all subtasks are known to be - * strictly tree-structured. Method {@link #invoke} is semantically - * equivalent to fork(); join() but always attempts to - * begin execution in the current thread. The "quiet" forms - * of these methods do not extract results or report exceptions. These - * may be useful when a set of tasks are being executed, and you need - * to delay processing of results or exceptions until all complete. - * Method invokeAll (available in multiple versions) - * performs the most common form of parallel invocation: forking a set - * of tasks and joining them all. - * - *

The ForkJoinTask class is not usually directly subclassed. - * Instead, you subclass one of the abstract classes that support a - * particular style of fork/join processing. Normally, a concrete - * ForkJoinTask subclass declares fields comprising its parameters, - * established in a constructor, and then defines a compute - * method that somehow uses the control methods supplied by this base - * class. While these methods have public access (to allow - * instances of different task subclasses to call each others - * methods), some of them may only be called from within other - * ForkJoinTasks. Attempts to invoke them in other contexts result in - * exceptions or errors possibly including ClassCastException. - * - *

Most base support methods are final because their - * implementations are intrinsically tied to the underlying - * lightweight task scheduling framework, and so cannot be overridden. - * Developers creating new basic styles of fork/join processing should - * minimally implement protected methods - * exec, setRawResult, and - * getRawResult, while also introducing an abstract - * computational method that can be implemented in its subclasses, - * possibly relying on other protected methods provided - * by this class. - * - *

ForkJoinTasks should perform relatively small amounts of - * computations, othewise splitting into smaller tasks. As a very - * rough rule of thumb, a task should perform more than 100 and less - * than 10000 basic computational steps. If tasks are too big, then - * parellelism cannot improve throughput. If too small, then memory - * and internal task maintenance overhead may overwhelm processing. - * - *

ForkJoinTasks are Serializable, which enables them - * to be used in extensions such as remote execution frameworks. It is - * in general sensible to serialize tasks only before or after, but - * not during execution. Serialization is not relied on during - * execution itself. - */ -public abstract class ForkJoinTask implements Future, Serializable { - - /** - * Run control status bits packed into a single int to minimize - * footprint and to ensure atomicity (via CAS). Status is - * initially zero, and takes on nonnegative values until - * completed, upon which status holds COMPLETED. CANCELLED, or - * EXCEPTIONAL, which use the top 3 bits. Tasks undergoing - * blocking waits by other threads have SIGNAL_MASK bits set -- - * bit 15 for external (nonFJ) waits, and the rest a count of - * waiting FJ threads. (This representation relies on - * ForkJoinPool max thread limits). Completion of a stolen task - * with SIGNAL_MASK bits set awakens waiter via notifyAll. Even - * though suboptimal for some purposes, we use basic builtin - * wait/notify to take advantage of "monitor inflation" in JVMs - * that we would otherwise need to emulate to avoid adding further - * per-task bookkeeping overhead. Note that bits 16-28 are - * currently unused. Also value 0x80000000 is available as spare - * completion value. - */ - volatile int status; // accessed directy by pool and workers - - static final int COMPLETION_MASK = 0xe0000000; - static final int NORMAL = 0xe0000000; // == mask - static final int CANCELLED = 0xc0000000; - static final int EXCEPTIONAL = 0xa0000000; - static final int SIGNAL_MASK = 0x0000ffff; - static final int INTERNAL_SIGNAL_MASK = 0x00007fff; - static final int EXTERNAL_SIGNAL = 0x00008000; // top bit of low word - - /** - * Table of exceptions thrown by tasks, to enable reporting by - * callers. Because exceptions are rare, we don't directly keep - * them with task objects, but instead us a weak ref table. Note - * that cancellation exceptions don't appear in the table, but are - * instead recorded as status values. - * Todo: Use ConcurrentReferenceHashMap - */ - static final Map, Throwable> exceptionMap = - Collections.synchronizedMap - (new WeakHashMap, Throwable>()); - - // within-package utilities - - /** - * Get current worker thread, or null if not a worker thread - */ - static ForkJoinWorkerThread getWorker() { - Thread t = Thread.currentThread(); - return ((t instanceof ForkJoinWorkerThread)? - (ForkJoinWorkerThread)t : null); - } - - final boolean casStatus(int cmp, int val) { - return _unsafe.compareAndSwapInt(this, statusOffset, cmp, val); - } - - /** - * Workaround for not being able to rethrow unchecked exceptions. - */ - static void rethrowException(Throwable ex) { - if (ex != null) - _unsafe.throwException(ex); - } - - // Setting completion status - - /** - * Mark completion and wake up threads waiting to join this task. - * @param completion one of NORMAL, CANCELLED, EXCEPTIONAL - */ - final void setCompletion(int completion) { - ForkJoinPool pool = getPool(); - if (pool != null) { - int s; // Clear signal bits while setting completion status - do;while ((s = status) >= 0 && !casStatus(s, completion)); - - if ((s & SIGNAL_MASK) != 0) { - if ((s &= INTERNAL_SIGNAL_MASK) != 0) - pool.updateRunningCount(s); - synchronized(this) { notifyAll(); } - } - } - else - externallySetCompletion(completion); - } - - /** - * Version of setCompletion for non-FJ threads. Leaves signal - * bits for unblocked threads to adjust, and always notifies. - */ - private void externallySetCompletion(int completion) { - int s; - do;while ((s = status) >= 0 && - !casStatus(s, (s & SIGNAL_MASK) | completion)); - synchronized(this) { notifyAll(); } - } - - /** - * Sets status to indicate normal completion - */ - final void setNormalCompletion() { - // Try typical fast case -- single CAS, no signal, not already done. - // Manually expand casStatus to improve chances of inlining it - if (!_unsafe.compareAndSwapInt(this, statusOffset, 0, NORMAL)) - setCompletion(NORMAL); - } - - // internal waiting and notification - - /** - * Performs the actual monitor wait for awaitDone - */ - private void doAwaitDone() { - // Minimize lock bias and in/de-flation effects by maximizing - // chances of waiting inside sync - try { - while (status >= 0) - synchronized(this) { if (status >= 0) wait(); } - } catch (InterruptedException ie) { - onInterruptedWait(); - } - } - - /** - * Performs the actual monitor wait for awaitDone - */ - private void doAwaitDone(long startTime, long nanos) { - synchronized(this) { - try { - while (status >= 0) { - long nt = nanos - System.nanoTime() - startTime; - if (nt <= 0) - break; - wait(nt / 1000000, (int)(nt % 1000000)); - } - } catch (InterruptedException ie) { - onInterruptedWait(); - } - } - } - - // Awaiting completion - - /** - * Sets status to indicate there is joiner, then waits for join, - * surrounded with pool notifications. - * @return status upon exit - */ - private int awaitDone(ForkJoinWorkerThread w, boolean maintainParallelism) { - ForkJoinPool pool = w == null? null : w.pool; - int s; - while ((s = status) >= 0) { - if (casStatus(s, pool == null? s|EXTERNAL_SIGNAL : s+1)) { - if (pool == null || !pool.preJoin(this, maintainParallelism)) - doAwaitDone(); - if (((s = status) & INTERNAL_SIGNAL_MASK) != 0) - adjustPoolCountsOnUnblock(pool); - break; - } - } - return s; - } - - /** - * Timed version of awaitDone - * @return status upon exit - */ - private int awaitDone(ForkJoinWorkerThread w, long nanos) { - ForkJoinPool pool = w == null? null : w.pool; - int s; - while ((s = status) >= 0) { - if (casStatus(s, pool == null? s|EXTERNAL_SIGNAL : s+1)) { - long startTime = System.nanoTime(); - if (pool == null || !pool.preJoin(this, false)) - doAwaitDone(startTime, nanos); - if ((s = status) >= 0) { - adjustPoolCountsOnCancelledWait(pool); - s = status; - } - if (s < 0 && (s & INTERNAL_SIGNAL_MASK) != 0) - adjustPoolCountsOnUnblock(pool); - break; - } - } - return s; - } - - /** - * Notify pool that thread is unblocked. Called by signalled - * threads when woken by non-FJ threads (which is atypical). - */ - private void adjustPoolCountsOnUnblock(ForkJoinPool pool) { - int s; - do;while ((s = status) < 0 && !casStatus(s, s & COMPLETION_MASK)); - if (pool != null && (s &= INTERNAL_SIGNAL_MASK) != 0) - pool.updateRunningCount(s); - } - - /** - * Notify pool to adjust counts on cancelled or timed out wait - */ - private void adjustPoolCountsOnCancelledWait(ForkJoinPool pool) { - if (pool != null) { - int s; - while ((s = status) >= 0 && (s & INTERNAL_SIGNAL_MASK) != 0) { - if (casStatus(s, s - 1)) { - pool.updateRunningCount(1); - break; - } - } - } - } - - /** - * Handle interruptions during waits. - */ - private void onInterruptedWait() { - ForkJoinWorkerThread w = getWorker(); - if (w == null) - Thread.currentThread().interrupt(); // re-interrupt - else if (w.isTerminating()) - cancelIgnoringExceptions(); - // else if FJworker, ignore interrupt - } - - // Recording and reporting exceptions - - private void setDoneExceptionally(Throwable rex) { - exceptionMap.put(this, rex); - setCompletion(EXCEPTIONAL); - } - - /** - * Throws the exception associated with status s; - * @throws the exception - */ - private void reportException(int s) { - if ((s &= COMPLETION_MASK) < NORMAL) { - if (s == CANCELLED) - throw new CancellationException(); - else - rethrowException(exceptionMap.get(this)); - } - } - - /** - * Returns result or throws exception using j.u.c.Future conventions - * Only call when isDone known to be true. - */ - private V reportFutureResult() - throws ExecutionException, InterruptedException { - int s = status & COMPLETION_MASK; - if (s < NORMAL) { - Throwable ex; - if (s == CANCELLED) - throw new CancellationException(); - if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null) - throw new ExecutionException(ex); - if (Thread.interrupted()) - throw new InterruptedException(); - } - return getRawResult(); - } - - /** - * Returns result or throws exception using j.u.c.Future conventions - * with timeouts - */ - private V reportTimedFutureResult() - throws InterruptedException, ExecutionException, TimeoutException { - Throwable ex; - int s = status & COMPLETION_MASK; - if (s == NORMAL) - return getRawResult(); - if (s == CANCELLED) - throw new CancellationException(); - if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null) - throw new ExecutionException(ex); - if (Thread.interrupted()) - throw new InterruptedException(); - throw new TimeoutException(); - } - - // internal execution methods - - /** - * Calls exec, recording completion, and rethrowing exception if - * encountered. Caller should normally check status before calling - * @return true if completed normally - */ - private boolean tryExec() { - try { // try block must contain only call to exec - if (!exec()) - return false; - } catch (Throwable rex) { - setDoneExceptionally(rex); - rethrowException(rex); - return false; // not reached - } - setNormalCompletion(); - return true; - } - - /** - * Main execution method used by worker threads. Invokes - * base computation unless already complete - */ - final void quietlyExec() { - if (status >= 0) { - try { - if (!exec()) - return; - } catch(Throwable rex) { - setDoneExceptionally(rex); - return; - } - setNormalCompletion(); - } - } - - /** - * Calls exec, recording but not rethrowing exception - * Caller should normally check status before calling - * @return true if completed normally - */ - private boolean tryQuietlyInvoke() { - try { - if (!exec()) - return false; - } catch (Throwable rex) { - setDoneExceptionally(rex); - return false; - } - setNormalCompletion(); - return true; - } - - /** - * Cancel, ignoring any exceptions it throws - */ - final void cancelIgnoringExceptions() { - try { - cancel(false); - } catch(Throwable ignore) { - } - } - - /** - * Main implementation of helpJoin - */ - private int busyJoin(ForkJoinWorkerThread w) { - int s; - ForkJoinTask t; - while ((s = status) >= 0 && (t = w.scanWhileJoining(this)) != null) - t.quietlyExec(); - return (s >= 0)? awaitDone(w, false) : s; // block if no work - } - - // public methods - - /** - * Arranges to asynchronously execute this task. While it is not - * necessarily enforced, it is a usage error to fork a task more - * than once unless it has completed and been reinitialized. This - * method may be invoked only from within ForkJoinTask - * computations. Attempts to invoke in other contexts result in - * exceptions or errors possibly including ClassCastException. - */ - public final void fork() { - ((ForkJoinWorkerThread)(Thread.currentThread())).pushTask(this); - } - - /** - * Returns the result of the computation when it is ready. - * This method differs from get in that abnormal - * completion results in RuntimeExceptions or Errors, not - * ExecutionExceptions. - * - * @return the computed result - */ - public final V join() { - ForkJoinWorkerThread w = getWorker(); - if (w == null || status < 0 || !w.unpushTask(this) || !tryExec()) - reportException(awaitDone(w, true)); - return getRawResult(); - } - - /** - * Commences performing this task, awaits its completion if - * necessary, and return its result. - * @throws Throwable (a RuntimeException, Error, or unchecked - * exception) if the underlying computation did so. - * @return the computed result - */ - public final V invoke() { - if (status >= 0 && tryExec()) - return getRawResult(); - else - return join(); - } - - /** - * Forks both tasks, returning when isDone holds for - * both of them or an exception is encountered. This method may be - * invoked only from within ForkJoinTask computations. Attempts to - * invoke in other contexts result in exceptions or errors - * possibly including ClassCastException. - * @param t1 one task - * @param t2 the other task - * @throws NullPointerException if t1 or t2 are null - * @throws RuntimeException or Error if either task did so. - */ - public static void invokeAll(ForkJoinTaskt1, ForkJoinTask t2) { - t2.fork(); - t1.invoke(); - t2.join(); - } - - /** - * Forks the given tasks, returning when isDone holds - * for all of them. If any task encounters an exception, others - * may be cancelled. This method may be invoked only from within - * ForkJoinTask computations. Attempts to invoke in other contexts - * result in exceptions or errors possibly including ClassCastException. - * @param tasks the array of tasks - * @throws NullPointerException if tasks or any element are null. - * @throws RuntimeException or Error if any task did so. - */ - public static void invokeAll(ForkJoinTask... tasks) { - Throwable ex = null; - int last = tasks.length - 1; - for (int i = last; i >= 0; --i) { - ForkJoinTask t = tasks[i]; - if (t == null) { - if (ex == null) - ex = new NullPointerException(); - } - else if (i != 0) - t.fork(); - else { - t.quietlyInvoke(); - if (ex == null) - ex = t.getException(); - } - } - for (int i = 1; i <= last; ++i) { - ForkJoinTask t = tasks[i]; - if (t != null) { - if (ex != null) - t.cancel(false); - else { - t.quietlyJoin(); - if (ex == null) - ex = t.getException(); - } - } - } - if (ex != null) - rethrowException(ex); - } - - /** - * Forks all tasks in the collection, returning when - * isDone holds for all of them. If any task - * encounters an exception, others may be cancelled. This method - * may be invoked only from within ForkJoinTask - * computations. Attempts to invoke in other contexts resul!t in - * exceptions or errors possibly including ClassCastException. - * @param tasks the collection of tasks - * @throws NullPointerException if tasks or any element are null. - * @throws RuntimeException or Error if any task did so. - */ - public static void invokeAll(Collection> tasks) { - if (!(tasks instanceof List)) { - invokeAll(tasks.toArray(new ForkJoinTask[tasks.size()])); - return; - } - List> ts = - (List>)tasks; - Throwable ex = null; - int last = ts.size() - 1; - for (int i = last; i >= 0; --i) { - ForkJoinTask t = ts.get(i); - if (t == null) { - if (ex == null) - ex = new NullPointerException(); - } - else if (i != 0) - t.fork(); - else { - t.quietlyInvoke(); - if (ex == null) - ex = t.getException(); - } - } - for (int i = 1; i <= last; ++i) { - ForkJoinTask t = ts.get(i); - if (t != null) { - if (ex != null) - t.cancel(false); - else { - t.quietlyJoin(); - if (ex == null) - ex = t.getException(); - } - } - } - if (ex != null) - rethrowException(ex); - } - - /** - * Returns true if the computation performed by this task has - * completed (or has been cancelled). - * @return true if this computation has completed - */ - public final boolean isDone() { - return status < 0; - } - - /** - * Returns true if this task was cancelled. - * @return true if this task was cancelled - */ - public final boolean isCancelled() { - return (status & COMPLETION_MASK) == CANCELLED; - } - - /** - * Asserts that the results of this task's computation will not be - * used. If a cancellation occurs before atempting to execute this - * task, then execution will be suppressed, isCancelled - * will report true, and join will result in a - * CancellationException being thrown. Otherwise, when - * cancellation races with completion, there are no guarantees - * about whether isCancelled will report true, whether - * join will return normally or via an exception, or - * whether these behaviors will remain consistent upon repeated - * invocation. - * - *

This method may be overridden in subclasses, but if so, must - * still ensure that these minimal properties hold. In particular, - * the cancel method itself must not throw exceptions. - * - *

This method is designed to be invoked by other - * tasks. To terminate the current task, you can just return or - * throw an unchecked exception from its computation method, or - * invoke completeExceptionally. - * - * @param mayInterruptIfRunning this value is ignored in the - * default implementation because tasks are not in general - * cancelled via interruption. - * - * @return true if this task is now cancelled - */ - public boolean cancel(boolean mayInterruptIfRunning) { - setCompletion(CANCELLED); - return (status & COMPLETION_MASK) == CANCELLED; - } - - /** - * Returns true if this task threw an exception or was cancelled - * @return true if this task threw an exception or was cancelled - */ - public final boolean isCompletedAbnormally() { - return (status & COMPLETION_MASK) < NORMAL; - } - - /** - * Returns the exception thrown by the base computation, or a - * CancellationException if cancelled, or null if none or if the - * method has not yet completed. - * @return the exception, or null if none - */ - public final Throwable getException() { - int s = status & COMPLETION_MASK; - if (s >= NORMAL) - return null; - if (s == CANCELLED) - return new CancellationException(); - return exceptionMap.get(this); - } - - /** - * Completes this task abnormally, and if not already aborted or - * cancelled, causes it to throw the given exception upon - * join and related operations. This method may be used - * to induce exceptions in asynchronous tasks, or to force - * completion of tasks that would not otherwise complete. Its use - * in other situations is likely to be wrong. This method is - * overridable, but overridden versions must invoke super - * implementation to maintain guarantees. - * - * @param ex the exception to throw. If this exception is - * not a RuntimeException or Error, the actual exception thrown - * will be a RuntimeException with cause ex. - */ - public void completeExceptionally(Throwable ex) { - setDoneExceptionally((ex instanceof RuntimeException) || - (ex instanceof Error)? ex : - new RuntimeException(ex)); - } - - /** - * Completes this task, and if not already aborted or cancelled, - * returning a null result upon join and related - * operations. This method may be used to provide results for - * asynchronous tasks, or to provide alternative handling for - * tasks that would not otherwise complete normally. Its use in - * other situations is likely to be wrong. This method is - * overridable, but overridden versions must invoke super - * implementation to maintain guarantees. - * - * @param value the result value for this task. - */ - public void complete(V value) { - try { - setRawResult(value); - } catch(Throwable rex) { - setDoneExceptionally(rex); - return; - } - setNormalCompletion(); - } - - public final V get() throws InterruptedException, ExecutionException { - ForkJoinWorkerThread w = getWorker(); - if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke()) - awaitDone(w, true); - return reportFutureResult(); - } - - public final V get(long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - ForkJoinWorkerThread w = getWorker(); - if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke()) - awaitDone(w, unit.toNanos(timeout)); - return reportTimedFutureResult(); - } - - /** - * Possibly executes other tasks until this task is ready, then - * returns the result of the computation. This method may be more - * efficient than join, but is only applicable when - * there are no potemtial dependencies between continuation of the - * current task and that of any other task that might be executed - * while helping. (This usually holds for pure divide-and-conquer - * tasks). This method may be invoked only from within - * ForkJoinTask computations. Attempts to invoke in other contexts - * resul!t in exceptions or errors possibly including ClassCastException. - * @return the computed result - */ - public final V helpJoin() { - ForkJoinWorkerThread w = (ForkJoinWorkerThread)(Thread.currentThread()); - if (status < 0 || !w.unpushTask(this) || !tryExec()) - reportException(busyJoin(w)); - return getRawResult(); - } - - /** - * Possibly executes other tasks until this task is ready. This - * method may be invoked only from within ForkJoinTask - * computations. Attempts to invoke in other contexts resul!t in - * exceptions or errors possibly including ClassCastException. - */ - public final void quietlyHelpJoin() { - if (status >= 0) { - ForkJoinWorkerThread w = - (ForkJoinWorkerThread)(Thread.currentThread()); - if (!w.unpushTask(this) || !tryQuietlyInvoke()) - busyJoin(w); - } - } - - /** - * Joins this task, without returning its result or throwing an - * exception. This method may be useful when processing - * collections of tasks when some have been cancelled or otherwise - * known to have aborted. - */ - public final void quietlyJoin() { - if (status >= 0) { - ForkJoinWorkerThread w = getWorker(); - if (w == null || !w.unpushTask(this) || !tryQuietlyInvoke()) - awaitDone(w, true); - } - } - - /** - * Commences performing this task and awaits its completion if - * necessary, without returning its result or throwing an - * exception. This method may be useful when processing - * collections of tasks when some have been cancelled or otherwise - * known to have aborted. - */ - public final void quietlyInvoke() { - if (status >= 0 && !tryQuietlyInvoke()) - quietlyJoin(); - } - - /** - * Possibly executes tasks until the pool hosting the current task - * {@link ForkJoinPool#isQuiescent}. This method may be of use in - * designs in which many tasks are forked, but none are explicitly - * joined, instead executing them until all are processed. - */ - public static void helpQuiesce() { - ((ForkJoinWorkerThread)(Thread.currentThread())). - helpQuiescePool(); - } - - /** - * Resets the internal bookkeeping state of this task, allowing a - * subsequent fork. This method allows repeated reuse of - * this task, but only if reuse occurs when this task has either - * never been forked, or has been forked, then completed and all - * outstanding joins of this task have also completed. Effects - * under any other usage conditions are not guaranteed, and are - * almost surely wrong. This method may be useful when executing - * pre-constructed trees of subtasks in loops. - */ - public void reinitialize() { - if ((status & COMPLETION_MASK) == EXCEPTIONAL) - exceptionMap.remove(this); - status = 0; - } - - /** - * Returns the pool hosting the current task execution, or null - * if this task is executing outside of any pool. - * @return the pool, or null if none. - */ - public static ForkJoinPool getPool() { - Thread t = Thread.currentThread(); - return ((t instanceof ForkJoinWorkerThread)? - ((ForkJoinWorkerThread)t).pool : null); - } - - /** - * Tries to unschedule this task for execution. This method will - * typically succeed if this task is the most recently forked task - * by the current thread, and has not commenced executing in - * another thread. This method may be useful when arranging - * alternative local processing of tasks that could have been, but - * were not, stolen. This method may be invoked only from within - * ForkJoinTask computations. Attempts to invoke in other contexts - * result in exceptions or errors possibly including ClassCastException. - * @return true if unforked - */ - public boolean tryUnfork() { - return ((ForkJoinWorkerThread)(Thread.currentThread())).unpushTask(this); - } - - /** - * Returns an estimate of the number of tasks that have been - * forked by the current worker thread but not yet executed. This - * value may be useful for heuristic decisions about whether to - * fork other tasks. - * @return the number of tasks - */ - public static int getQueuedTaskCount() { - return ((ForkJoinWorkerThread)(Thread.currentThread())). - getQueueSize(); - } - - /** - * Returns a estimate of how many more locally queued tasks are - * held by the current worker thread than there are other worker - * threads that might steal them. This value may be useful for - * heuristic decisions about whether to fork other tasks. In many - * usages of ForkJoinTasks, at steady state, each worker should - * aim to maintain a small constant surplus (for example, 3) of - * tasks, and to process computations locally if this threshold is - * exceeded. - * @return the surplus number of tasks, which may be negative - */ - public static int getSurplusQueuedTaskCount() { - return ((ForkJoinWorkerThread)(Thread.currentThread())) - .getEstimatedSurplusTaskCount(); - } - - // Extension methods - - /** - * Returns the result that would be returned by join, - * even if this task completed abnormally, or null if this task is - * not known to have been completed. This method is designed to - * aid debugging, as well as to support extensions. Its use in any - * other context is discouraged. - * - * @return the result, or null if not completed. - */ - public abstract V getRawResult(); - - /** - * Forces the given value to be returned as a result. This method - * is designed to support extensions, and should not in general be - * called otherwise. - * - * @param value the value - */ - protected abstract void setRawResult(V value); - - /** - * Immediately performs the base action of this task. This method - * is designed to support extensions, and should not in general be - * called otherwise. The return value controls whether this task - * is considered to be done normally. It may return false in - * asynchronous actions that require explicit invocations of - * complete to become joinable. It may throw exceptions - * to indicate abnormal exit. - * @return true if completed normally - * @throws Error or RuntimeException if encountered during computation - */ - protected abstract boolean exec(); - - /** - * Returns, but does not unschedule or execute, the task queued by - * the current thread but not yet executed, if one is - * available. There is no guarantee that this task will actually - * be polled or executed next. This method is designed primarily - * to support extensions, and is unlikely to be useful otherwise. - * This method may be invoked only from within ForkJoinTask - * computations. Attempts to invoke in other contexts result in - * exceptions or errors possibly including ClassCastException. - * - * @return the next task, or null if none are available - */ - protected static ForkJoinTask peekNextLocalTask() { - return ((ForkJoinWorkerThread)(Thread.currentThread())).peekTask(); - } - - /** - * Unschedules and returns, without executing, the next task - * queued by the current thread but not yet executed. This method - * is designed primarily to support extensions, and is unlikely to - * be useful otherwise. This method may be invoked only from - * within ForkJoinTask computations. Attempts to invoke in other - * contexts result in exceptions or errors possibly including - * ClassCastException. - * - * @return the next task, or null if none are available - */ - protected static ForkJoinTask pollNextLocalTask() { - return ((ForkJoinWorkerThread)(Thread.currentThread())).pollLocalTask(); - } - - /** - * Unschedules and returns, without executing, the next task - * queued by the current thread but not yet executed, if one is - * available, or if not available, a task that was forked by some - * other thread, if available. Availability may be transient, so a - * null result does not necessarily imply quiecence - * of the pool this task is operating in. This method is designed - * primarily to support extensions, and is unlikely to be useful - * otherwise. This method may be invoked only from within - * ForkJoinTask computations. Attempts to invoke in other contexts - * result in exceptions or errors possibly including - * ClassCastException. - * - * @return a task, or null if none are available - */ - protected static ForkJoinTask pollTask() { - return ((ForkJoinWorkerThread)(Thread.currentThread())). - pollTask(); - } - - // Serialization support - - private static final long serialVersionUID = -7721805057305804111L; - - /** - * Save the state to a stream. - * - * @serialData the current run status and the exception thrown - * during execution, or null if none. - * @param s the stream - */ - private void writeObject(java.io.ObjectOutputStream s) - throws java.io.IOException { - s.defaultWriteObject(); - s.writeObject(getException()); - } - - /** - * Reconstitute the instance from a stream. - * @param s the stream - */ - private void readObject(java.io.ObjectInputStream s) - throws java.io.IOException, ClassNotFoundException { - s.defaultReadObject(); - status &= ~INTERNAL_SIGNAL_MASK; // clear internal signal counts - status |= EXTERNAL_SIGNAL; // conservatively set external signal - Object ex = s.readObject(); - if (ex != null) - setDoneExceptionally((Throwable)ex); - } - - // Temporary Unsafe mechanics for preliminary release - private static Unsafe getUnsafe() throws Throwable { - try { - return Unsafe.getUnsafe(); - } catch (SecurityException se) { - try { - return java.security.AccessController.doPrivileged - (new java.security.PrivilegedExceptionAction() { - public Unsafe run() throws Exception { - return getUnsafePrivileged(); - }}); - } catch (java.security.PrivilegedActionException e) { - throw e.getCause(); - } - } - } - - private static Unsafe getUnsafePrivileged() - throws NoSuchFieldException, IllegalAccessException { - Field f = Unsafe.class.getDeclaredField("theUnsafe"); - f.setAccessible(true); - return (Unsafe) f.get(null); - } - - private static long fieldOffset(String fieldName) - throws NoSuchFieldException { - return _unsafe.objectFieldOffset - (ForkJoinTask.class.getDeclaredField(fieldName)); - } - - static final Unsafe _unsafe; - static final long statusOffset; - - static { - try { - _unsafe = getUnsafe(); - statusOffset = fieldOffset("status"); - } catch (Throwable e) { - throw new RuntimeException("Could not initialize intrinsics", e); - } - } - -} diff --git a/src/jvm15-library/scala/concurrent/forkjoin/ForkJoinWorkerThread.java b/src/jvm15-library/scala/concurrent/forkjoin/ForkJoinWorkerThread.java deleted file mode 100644 index 941f5ec0cb..0000000000 --- a/src/jvm15-library/scala/concurrent/forkjoin/ForkJoinWorkerThread.java +++ /dev/null @@ -1,775 +0,0 @@ -/* - * Written by Doug Lea with assistance from members of JCP JSR-166 - * Expert Group and released to the public domain, as explained at - * http://creativecommons.org/licenses/publicdomain - */ - -package scala.concurrent.forkjoin; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; -import java.util.concurrent.locks.*; -import sun.misc.Unsafe; -import java.lang.reflect.*; - -/** - * A thread managed by a {@link ForkJoinPool}. This class is - * subclassable solely for the sake of adding functionality -- there - * are no overridable methods dealing with scheduling or - * execution. However, you can override initialization and termination - * methods surrounding the main task processing loop. If you do - * create such a subclass, you will also need to supply a custom - * ForkJoinWorkerThreadFactory to use it in a ForkJoinPool. - * - */ -public class ForkJoinWorkerThread extends Thread { - /* - * Algorithm overview: - * - * 1. Work-Stealing: Work-stealing queues are special forms of - * Deques that support only three of the four possible - * end-operations -- push, pop, and deq (aka steal), and only do - * so under the constraints that push and pop are called only from - * the owning thread, while deq may be called from other threads. - * (If you are unfamiliar with them, you probably want to read - * Herlihy and Shavit's book "The Art of Multiprocessor - * programming", chapter 16 describing these in more detail before - * proceeding.) The main work-stealing queue design is roughly - * similar to "Dynamic Circular Work-Stealing Deque" by David - * Chase and Yossi Lev, SPAA 2005 - * (http://research.sun.com/scalable/pubs/index.html). The main - * difference ultimately stems from gc requirements that we null - * out taken slots as soon as we can, to maintain as small a - * footprint as possible even in programs generating huge numbers - * of tasks. To accomplish this, we shift the CAS arbitrating pop - * vs deq (steal) from being on the indices ("base" and "sp") to - * the slots themselves (mainly via method "casSlotNull()"). So, - * both a successful pop and deq mainly entail CAS'ing a nonnull - * slot to null. Because we rely on CASes of references, we do - * not need tag bits on base or sp. They are simple ints as used - * in any circular array-based queue (see for example ArrayDeque). - * Updates to the indices must still be ordered in a way that - * guarantees that (sp - base) > 0 means the queue is empty, but - * otherwise may err on the side of possibly making the queue - * appear nonempty when a push, pop, or deq have not fully - * committed. Note that this means that the deq operation, - * considered individually, is not wait-free. One thief cannot - * successfully continue until another in-progress one (or, if - * previously empty, a push) completes. However, in the - * aggregate, we ensure at least probablistic non-blockingness. If - * an attempted steal fails, a thief always chooses a different - * random victim target to try next. So, in order for one thief to - * progress, it suffices for any in-progress deq or new push on - * any empty queue to complete. One reason this works well here is - * that apparently-nonempty often means soon-to-be-stealable, - * which gives threads a chance to activate if necessary before - * stealing (see below). - * - * Efficient implementation of this approach currently relies on - * an uncomfortable amount of "Unsafe" mechanics. To maintain - * correct orderings, reads and writes of variable base require - * volatile ordering. Variable sp does not require volatile write - * but needs cheaper store-ordering on writes. Because they are - * protected by volatile base reads, reads of the queue array and - * its slots do not need volatile load semantics, but writes (in - * push) require store order and CASes (in pop and deq) require - * (volatile) CAS semantics. Since these combinations aren't - * supported using ordinary volatiles, the only way to accomplish - * these effciently is to use direct Unsafe calls. (Using external - * AtomicIntegers and AtomicReferenceArrays for the indices and - * array is significantly slower because of memory locality and - * indirection effects.) Further, performance on most platforms is - * very sensitive to placement and sizing of the (resizable) queue - * array. Even though these queues don't usually become all that - * big, the initial size must be large enough to counteract cache - * contention effects across multiple queues (especially in the - * presence of GC cardmarking). Also, to improve thread-locality, - * queues are currently initialized immediately after the thread - * gets the initial signal to start processing tasks. However, - * all queue-related methods except pushTask are written in a way - * that allows them to instead be lazily allocated and/or disposed - * of when empty. All together, these low-level implementation - * choices produce as much as a factor of 4 performance - * improvement compared to naive implementations, and enable the - * processing of billions of tasks per second, sometimes at the - * expense of ugliness. - * - * 2. Run control: The primary run control is based on a global - * counter (activeCount) held by the pool. It uses an algorithm - * similar to that in Herlihy and Shavit section 17.6 to cause - * threads to eventually block when all threads declare they are - * inactive. (See variable "scans".) For this to work, threads - * must be declared active when executing tasks, and before - * stealing a task. They must be inactive before blocking on the - * Pool Barrier (awaiting a new submission or other Pool - * event). In between, there is some free play which we take - * advantage of to avoid contention and rapid flickering of the - * global activeCount: If inactive, we activate only if a victim - * queue appears to be nonempty (see above). Similarly, a thread - * tries to inactivate only after a full scan of other threads. - * The net effect is that contention on activeCount is rarely a - * measurable performance issue. (There are also a few other cases - * where we scan for work rather than retry/block upon - * contention.) - * - * 3. Selection control. We maintain policy of always choosing to - * run local tasks rather than stealing, and always trying to - * steal tasks before trying to run a new submission. All steals - * are currently performed in randomly-chosen deq-order. It may be - * worthwhile to bias these with locality / anti-locality - * information, but doing this well probably requires more - * lower-level information from JVMs than currently provided. - */ - - /** - * Capacity of work-stealing queue array upon initialization. - * Must be a power of two. Initial size must be at least 2, but is - * padded to minimize cache effects. - */ - private static final int INITIAL_QUEUE_CAPACITY = 1 << 13; - - /** - * Maximum work-stealing queue array size. Must be less than or - * equal to 1 << 28 to ensure lack of index wraparound. (This - * is less than usual bounds, because we need leftshift by 3 - * to be in int range). - */ - private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 28; - - /** - * The pool this thread works in. Accessed directly by ForkJoinTask - */ - final ForkJoinPool pool; - - /** - * The work-stealing queue array. Size must be a power of two. - * Initialized when thread starts, to improve memory locality. - */ - private ForkJoinTask[] queue; - - /** - * Index (mod queue.length) of next queue slot to push to or pop - * from. It is written only by owner thread, via ordered store. - * Both sp and base are allowed to wrap around on overflow, but - * (sp - base) still estimates size. - */ - private volatile int sp; - - /** - * Index (mod queue.length) of least valid queue slot, which is - * always the next position to steal from if nonempty. - */ - private volatile int base; - - /** - * Activity status. When true, this worker is considered active. - * Must be false upon construction. It must be true when executing - * tasks, and BEFORE stealing a task. It must be false before - * calling pool.sync - */ - private boolean active; - - /** - * Run state of this worker. Supports simple versions of the usual - * shutdown/shutdownNow control. - */ - private volatile int runState; - - /** - * Seed for random number generator for choosing steal victims. - * Uses Marsaglia xorshift. Must be nonzero upon initialization. - */ - private int seed; - - /** - * Number of steals, transferred to pool when idle - */ - private int stealCount; - - /** - * Index of this worker in pool array. Set once by pool before - * running, and accessed directly by pool during cleanup etc - */ - int poolIndex; - - /** - * The last barrier event waited for. Accessed in pool callback - * methods, but only by current thread. - */ - long lastEventCount; - - /** - * True if use local fifo, not default lifo, for local polling - */ - private boolean locallyFifo; - - /** - * Creates a ForkJoinWorkerThread operating in the given pool. - * @param pool the pool this thread works in - * @throws NullPointerException if pool is null - */ - protected ForkJoinWorkerThread(ForkJoinPool pool) { - if (pool == null) throw new NullPointerException(); - this.pool = pool; - // Note: poolIndex is set by pool during construction - // Remaining initialization is deferred to onStart - } - - // Public access methods - - /** - * Returns the pool hosting this thread - * @return the pool - */ - public ForkJoinPool getPool() { - return pool; - } - - /** - * Returns the index number of this thread in its pool. The - * returned value ranges from zero to the maximum number of - * threads (minus one) that have ever been created in the pool. - * This method may be useful for applications that track status or - * collect results per-worker rather than per-task. - * @return the index number. - */ - public int getPoolIndex() { - return poolIndex; - } - - /** - * Establishes local first-in-first-out scheduling mode for forked - * tasks that are never joined. - * @param async if true, use locally FIFO scheduling - */ - void setAsyncMode(boolean async) { - locallyFifo = async; - } - - // Runstate management - - // Runstate values. Order matters - private static final int RUNNING = 0; - private static final int SHUTDOWN = 1; - private static final int TERMINATING = 2; - private static final int TERMINATED = 3; - - final boolean isShutdown() { return runState >= SHUTDOWN; } - final boolean isTerminating() { return runState >= TERMINATING; } - final boolean isTerminated() { return runState == TERMINATED; } - final boolean shutdown() { return transitionRunStateTo(SHUTDOWN); } - final boolean shutdownNow() { return transitionRunStateTo(TERMINATING); } - - /** - * Transition to at least the given state. Return true if not - * already at least given state. - */ - private boolean transitionRunStateTo(int state) { - for (;;) { - int s = runState; - if (s >= state) - return false; - if (_unsafe.compareAndSwapInt(this, runStateOffset, s, state)) - return true; - } - } - - /** - * Try to set status to active; fail on contention - */ - private boolean tryActivate() { - if (!active) { - if (!pool.tryIncrementActiveCount()) - return false; - active = true; - } - return true; - } - - /** - * Try to set status to active; fail on contention - */ - private boolean tryInactivate() { - if (active) { - if (!pool.tryDecrementActiveCount()) - return false; - active = false; - } - return true; - } - - /** - * Computes next value for random victim probe. Scans don't - * require a very high quality generator, but also not a crummy - * one. Marsaglia xor-shift is cheap and works well. - */ - private static int xorShift(int r) { - r ^= r << 1; - r ^= r >>> 3; - r ^= r << 10; - return r; - } - - // Lifecycle methods - - /** - * This method is required to be public, but should never be - * called explicitly. It performs the main run loop to execute - * ForkJoinTasks. - */ - public void run() { - Throwable exception = null; - try { - onStart(); - pool.sync(this); // await first pool event - mainLoop(); - } catch (Throwable ex) { - exception = ex; - } finally { - onTermination(exception); - } - } - - /** - * Execute tasks until shut down. - */ - private void mainLoop() { - while (!isShutdown()) { - ForkJoinTask t = pollTask(); - if (t != null || (t = pollSubmission()) != null) - t.quietlyExec(); - else if (tryInactivate()) - pool.sync(this); - } - } - - /** - * Initializes internal state after construction but before - * processing any tasks. If you override this method, you must - * invoke super.onStart() at the beginning of the method. - * Initialization requires care: Most fields must have legal - * default values, to ensure that attempted accesses from other - * threads work correctly even before this thread starts - * processing tasks. - */ - protected void onStart() { - // Allocate while starting to improve chances of thread-local - // isolation - queue = new ForkJoinTask[INITIAL_QUEUE_CAPACITY]; - // Initial value of seed need not be especially random but - // should differ across workers and must be nonzero - int p = poolIndex + 1; - seed = p + (p << 8) + (p << 16) + (p << 24); // spread bits - } - - /** - * Perform cleanup associated with termination of this worker - * thread. If you override this method, you must invoke - * super.onTermination at the end of the overridden method. - * - * @param exception the exception causing this thread to abort due - * to an unrecoverable error, or null if completed normally. - */ - protected void onTermination(Throwable exception) { - // Execute remaining local tasks unless aborting or terminating - while (exception == null && !pool.isTerminating() && base != sp) { - try { - ForkJoinTask t = popTask(); - if (t != null) - t.quietlyExec(); - } catch(Throwable ex) { - exception = ex; - } - } - // Cancel other tasks, transition status, notify pool, and - // propagate exception to uncaught exception handler - try { - do;while (!tryInactivate()); // ensure inactive - cancelTasks(); - runState = TERMINATED; - pool.workerTerminated(this); - } catch (Throwable ex) { // Shouldn't ever happen - if (exception == null) // but if so, at least rethrown - exception = ex; - } finally { - if (exception != null) - ForkJoinTask.rethrowException(exception); - } - } - - // Intrinsics-based support for queue operations. - - /** - * Add in store-order the given task at given slot of q to - * null. Caller must ensure q is nonnull and index is in range. - */ - private static void setSlot(ForkJoinTask[] q, int i, - ForkJoinTask t){ -//TR _unsafe.putOrderedObject(q, (i << qShift) + qBase, t); - _unsafe.putObjectVolatile((Object)q, (i << qShift) + qBase, (Object)t); - } - - /** - * CAS given slot of q to null. Caller must ensure q is nonnull - * and index is in range. - */ - private static boolean casSlotNull(ForkJoinTask[] q, int i, - ForkJoinTask t) { - return _unsafe.compareAndSwapObject(q, (i << qShift) + qBase, t, null); - } - - /** - * Sets sp in store-order. - */ - private void storeSp(int s) { -//TR _unsafe.putOrderedInt(this, spOffset, s); - _unsafe.putIntVolatile(this, spOffset, s); - } - - // Main queue methods - - /** - * Pushes a task. Called only by current thread. - * @param t the task. Caller must ensure nonnull - */ - final void pushTask(ForkJoinTask t) { - ForkJoinTask[] q = queue; - int mask = q.length - 1; - int s = sp; - setSlot(q, s & mask, t); - storeSp(++s); - if ((s -= base) == 1) - pool.signalWork(); - else if (s >= mask) - growQueue(); - } - - /** - * Tries to take a task from the base of the queue, failing if - * either empty or contended. - * @return a task, or null if none or contended. - */ - final ForkJoinTask deqTask() { - ForkJoinTask t; - ForkJoinTask[] q; - int i; - int b; - if (sp != (b = base) && - (q = queue) != null && // must read q after b - (t = q[i = (q.length - 1) & b]) != null && - casSlotNull(q, i, t)) { - base = b + 1; - return t; - } - return null; - } - - /** - * Returns a popped task, or null if empty. Ensures active status - * if nonnull. Called only by current thread. - */ - final ForkJoinTask popTask() { - int s = sp; - while (s != base) { - if (tryActivate()) { - ForkJoinTask[] q = queue; - int mask = q.length - 1; - int i = (s - 1) & mask; - ForkJoinTask t = q[i]; - if (t == null || !casSlotNull(q, i, t)) - break; - storeSp(s - 1); - return t; - } - } - return null; - } - - /** - * Specialized version of popTask to pop only if - * topmost element is the given task. Called only - * by current thread while active. - * @param t the task. Caller must ensure nonnull - */ - final boolean unpushTask(ForkJoinTask t) { - ForkJoinTask[] q = queue; - int mask = q.length - 1; - int s = sp - 1; - if (casSlotNull(q, s & mask, t)) { - storeSp(s); - return true; - } - return false; - } - - /** - * Returns next task. - */ - final ForkJoinTask peekTask() { - ForkJoinTask[] q = queue; - if (q == null) - return null; - int mask = q.length - 1; - int i = locallyFifo? base : (sp - 1); - return q[i & mask]; - } - - /** - * Doubles queue array size. Transfers elements by emulating - * steals (deqs) from old array and placing, oldest first, into - * new array. - */ - private void growQueue() { - ForkJoinTask[] oldQ = queue; - int oldSize = oldQ.length; - int newSize = oldSize << 1; - if (newSize > MAXIMUM_QUEUE_CAPACITY) - throw new RejectedExecutionException("Queue capacity exceeded"); - ForkJoinTask[] newQ = queue = new ForkJoinTask[newSize]; - - int b = base; - int bf = b + oldSize; - int oldMask = oldSize - 1; - int newMask = newSize - 1; - do { - int oldIndex = b & oldMask; - ForkJoinTask t = oldQ[oldIndex]; - if (t != null && !casSlotNull(oldQ, oldIndex, t)) - t = null; - setSlot(newQ, b & newMask, t); - } while (++b != bf); - pool.signalWork(); - } - - /** - * Tries to steal a task from another worker. Starts at a random - * index of workers array, and probes workers until finding one - * with non-empty queue or finding that all are empty. It - * randomly selects the first n probes. If these are empty, it - * resorts to a full circular traversal, which is necessary to - * accurately set active status by caller. Also restarts if pool - * events occurred since last scan, which forces refresh of - * workers array, in case barrier was associated with resize. - * - * This method must be both fast and quiet -- usually avoiding - * memory accesses that could disrupt cache sharing etc other than - * those needed to check for and take tasks. This accounts for, - * among other things, updating random seed in place without - * storing it until exit. - * - * @return a task, or null if none found - */ - private ForkJoinTask scan() { - ForkJoinTask t = null; - int r = seed; // extract once to keep scan quiet - ForkJoinWorkerThread[] ws; // refreshed on outer loop - int mask; // must be power 2 minus 1 and > 0 - outer:do { - if ((ws = pool.workers) != null && (mask = ws.length - 1) > 0) { - int idx = r; - int probes = ~mask; // use random index while negative - for (;;) { - r = xorShift(r); // update random seed - ForkJoinWorkerThread v = ws[mask & idx]; - if (v == null || v.sp == v.base) { - if (probes <= mask) - idx = (probes++ < 0)? r : (idx + 1); - else - break; - } - else if (!tryActivate() || (t = v.deqTask()) == null) - continue outer; // restart on contention - else - break outer; - } - } - } while (pool.hasNewSyncEvent(this)); // retry on pool events - seed = r; - return t; - } - - /** - * gets and removes a local or stolen a task - * @return a task, if available - */ - final ForkJoinTask pollTask() { - ForkJoinTask t = locallyFifo? deqTask() : popTask(); - if (t == null && (t = scan()) != null) - ++stealCount; - return t; - } - - /** - * gets a local task - * @return a task, if available - */ - final ForkJoinTask pollLocalTask() { - return locallyFifo? deqTask() : popTask(); - } - - /** - * Returns a pool submission, if one exists, activating first. - * @return a submission, if available - */ - private ForkJoinTask pollSubmission() { - ForkJoinPool p = pool; - while (p.hasQueuedSubmissions()) { - ForkJoinTask t; - if (tryActivate() && (t = p.pollSubmission()) != null) - return t; - } - return null; - } - - // Methods accessed only by Pool - - /** - * Removes and cancels all tasks in queue. Can be called from any - * thread. - */ - final void cancelTasks() { - ForkJoinTask t; - while (base != sp && (t = deqTask()) != null) - t.cancelIgnoringExceptions(); - } - - /** - * Drains tasks to given collection c - * @return the number of tasks drained - */ - final int drainTasksTo(Collection> c) { - int n = 0; - ForkJoinTask t; - while (base != sp && (t = deqTask()) != null) { - c.add(t); - ++n; - } - return n; - } - - /** - * Get and clear steal count for accumulation by pool. Called - * only when known to be idle (in pool.sync and termination). - */ - final int getAndClearStealCount() { - int sc = stealCount; - stealCount = 0; - return sc; - } - - /** - * Returns true if at least one worker in the given array appears - * to have at least one queued task. - * @param ws array of workers - */ - static boolean hasQueuedTasks(ForkJoinWorkerThread[] ws) { - if (ws != null) { - int len = ws.length; - for (int j = 0; j < 2; ++j) { // need two passes for clean sweep - for (int i = 0; i < len; ++i) { - ForkJoinWorkerThread w = ws[i]; - if (w != null && w.sp != w.base) - return true; - } - } - } - return false; - } - - // Support methods for ForkJoinTask - - /** - * Returns an estimate of the number of tasks in the queue. - */ - final int getQueueSize() { - int n = sp - base; - return n < 0? 0 : n; // suppress momentarily negative values - } - - /** - * Returns an estimate of the number of tasks, offset by a - * function of number of idle workers. - */ - final int getEstimatedSurplusTaskCount() { - // The halving approximates weighting idle vs non-idle workers - return (sp - base) - (pool.getIdleThreadCount() >>> 1); - } - - /** - * Scan, returning early if joinMe done - */ - final ForkJoinTask scanWhileJoining(ForkJoinTask joinMe) { - ForkJoinTask t = pollTask(); - if (t != null && joinMe.status < 0 && sp == base) { - pushTask(t); // unsteal if done and this task would be stealable - t = null; - } - return t; - } - - /** - * Runs tasks until pool isQuiescent - */ - final void helpQuiescePool() { - for (;;) { - ForkJoinTask t = pollTask(); - if (t != null) - t.quietlyExec(); - else if (tryInactivate() && pool.isQuiescent()) - break; - } - do;while (!tryActivate()); // re-activate on exit - } - - // Temporary Unsafe mechanics for preliminary release - private static Unsafe getUnsafe() throws Throwable { - try { - return Unsafe.getUnsafe(); - } catch (SecurityException se) { - try { - return java.security.AccessController.doPrivileged - (new java.security.PrivilegedExceptionAction() { - public Unsafe run() throws Exception { - return getUnsafePrivileged(); - }}); - } catch (java.security.PrivilegedActionException e) { - throw e.getCause(); - } - } - } - - private static Unsafe getUnsafePrivileged() - throws NoSuchFieldException, IllegalAccessException { - Field f = Unsafe.class.getDeclaredField("theUnsafe"); - f.setAccessible(true); - return (Unsafe) f.get(null); - } - - private static long fieldOffset(String fieldName) - throws NoSuchFieldException { - return _unsafe.objectFieldOffset - (ForkJoinWorkerThread.class.getDeclaredField(fieldName)); - } - - static final Unsafe _unsafe; - static final long baseOffset; - static final long spOffset; - static final long runStateOffset; - static final long qBase; - static final int qShift; - static { - try { - _unsafe = getUnsafe(); - baseOffset = fieldOffset("base"); - spOffset = fieldOffset("sp"); - runStateOffset = fieldOffset("runState"); - qBase = _unsafe.arrayBaseOffset(ForkJoinTask[].class); - int s = _unsafe.arrayIndexScale(ForkJoinTask[].class); - if ((s & (s-1)) != 0) - throw new Error("data type scale not a power of two"); - qShift = 31 - Integer.numberOfLeadingZeros(s); - } catch (Throwable e) { - throw new RuntimeException("Could not initialize intrinsics", e); - } - } -} diff --git a/src/jvm15-library/scala/concurrent/forkjoin/LinkedTransferQueue.java b/src/jvm15-library/scala/concurrent/forkjoin/LinkedTransferQueue.java deleted file mode 100644 index 3055e3b68f..0000000000 --- a/src/jvm15-library/scala/concurrent/forkjoin/LinkedTransferQueue.java +++ /dev/null @@ -1,840 +0,0 @@ -/* - * Written by Doug Lea with assistance from members of JCP JSR-166 - * Expert Group and released to the public domain, as explained at - * http://creativecommons.org/licenses/publicdomain - */ - -package scala.concurrent.forkjoin; -import java.util.concurrent.*; -import java.util.concurrent.locks.*; -import java.util.concurrent.atomic.*; -import java.util.*; -import java.io.*; -import sun.misc.Unsafe; -import java.lang.reflect.*; - -/** - * An unbounded {@linkplain TransferQueue} based on linked nodes. - * This queue orders elements FIFO (first-in-first-out) with respect - * to any given producer. The head of the queue is that - * element that has been on the queue the longest time for some - * producer. The tail of the queue is that element that has - * been on the queue the shortest time for some producer. - * - *

Beware that, unlike in most collections, the {@code size} - * method is NOT a constant-time operation. Because of the - * asynchronous nature of these queues, determining the current number - * of elements requires a traversal of the elements. - * - *

This class and its iterator implement all of the - * optional methods of the {@link Collection} and {@link - * Iterator} interfaces. - * - *

Memory consistency effects: As with other concurrent - * collections, actions in a thread prior to placing an object into a - * {@code LinkedTransferQueue} - * happen-before - * actions subsequent to the access or removal of that element from - * the {@code LinkedTransferQueue} in another thread. - * - *

This class is a member of the - * - * Java Collections Framework. - * - * @since 1.7 - * @author Doug Lea - * @param the type of elements held in this collection - * - */ -public class LinkedTransferQueue extends AbstractQueue - implements TransferQueue, java.io.Serializable { - private static final long serialVersionUID = -3223113410248163686L; - - /* - * This class extends the approach used in FIFO-mode - * SynchronousQueues. See the internal documentation, as well as - * the PPoPP 2006 paper "Scalable Synchronous Queues" by Scherer, - * Lea & Scott - * (http://www.cs.rice.edu/~wns1/papers/2006-PPoPP-SQ.pdf) - * - * The main extension is to provide different Wait modes for the - * main "xfer" method that puts or takes items. These don't - * impact the basic dual-queue logic, but instead control whether - * or how threads block upon insertion of request or data nodes - * into the dual queue. It also uses slightly different - * conventions for tracking whether nodes are off-list or - * cancelled. - */ - - // Wait modes for xfer method - static final int NOWAIT = 0; - static final int TIMEOUT = 1; - static final int WAIT = 2; - - /** The number of CPUs, for spin control */ - static final int NCPUS = Runtime.getRuntime().availableProcessors(); - - /** - * The number of times to spin before blocking in timed waits. - * The value is empirically derived -- it works well across a - * variety of processors and OSes. Empirically, the best value - * seems not to vary with number of CPUs (beyond 2) so is just - * a constant. - */ - static final int maxTimedSpins = (NCPUS < 2)? 0 : 32; - - /** - * The number of times to spin before blocking in untimed waits. - * This is greater than timed value because untimed waits spin - * faster since they don't need to check times on each spin. - */ - static final int maxUntimedSpins = maxTimedSpins * 16; - - /** - * The number of nanoseconds for which it is faster to spin - * rather than to use timed park. A rough estimate suffices. - */ - static final long spinForTimeoutThreshold = 1000L; - - /** - * Node class for LinkedTransferQueue. Opportunistically - * subclasses from AtomicReference to represent item. Uses Object, - * not E, to allow setting item to "this" after use, to avoid - * garbage retention. Similarly, setting the next field to this is - * used as sentinel that node is off list. - */ - static final class QNode extends AtomicReference { - volatile QNode next; - volatile Thread waiter; // to control park/unpark - final boolean isData; - QNode(Object item, boolean isData) { - super(item); - this.isData = isData; - } - - static final AtomicReferenceFieldUpdater - nextUpdater = AtomicReferenceFieldUpdater.newUpdater - (QNode.class, QNode.class, "next"); - - final boolean casNext(QNode cmp, QNode val) { - return nextUpdater.compareAndSet(this, cmp, val); - } - - final void clearNext() { - nextUpdater.set(this,this);//TR;lazySet(this, this); - } - - } - - /** - * Padded version of AtomicReference used for head, tail and - * cleanMe, to alleviate contention across threads CASing one vs - * the other. - */ - static final class PaddedAtomicReference extends AtomicReference { - // enough padding for 64bytes with 4byte refs - Object p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd, pe; - PaddedAtomicReference(T r) { super(r); } - } - - - /** head of the queue */ - private transient final PaddedAtomicReference head; - /** tail of the queue */ - private transient final PaddedAtomicReference tail; - - /** - * Reference to a cancelled node that might not yet have been - * unlinked from queue because it was the last inserted node - * when it cancelled. - */ - private transient final PaddedAtomicReference cleanMe; - - /** - * Tries to cas nh as new head; if successful, unlink - * old head's next node to avoid garbage retention. - */ - private boolean advanceHead(QNode h, QNode nh) { - if (h == head.get() && head.compareAndSet(h, nh)) { - h.clearNext(); // forget old next - return true; - } - return false; - } - - /** - * Puts or takes an item. Used for most queue operations (except - * poll() and tryTransfer()). See the similar code in - * SynchronousQueue for detailed explanation. - * - * @param e the item or if null, signifies that this is a take - * @param mode the wait mode: NOWAIT, TIMEOUT, WAIT - * @param nanos timeout in nanosecs, used only if mode is TIMEOUT - * @return an item, or null on failure - */ - private Object xfer(Object e, int mode, long nanos) { - boolean isData = (e != null); - QNode s = null; - final PaddedAtomicReference head = this.head; - final PaddedAtomicReference tail = this.tail; - - for (;;) { - QNode t = tail.get(); - QNode h = head.get(); - - if (t != null && (t == h || t.isData == isData)) { - if (s == null) - s = new QNode(e, isData); - QNode last = t.next; - if (last != null) { - if (t == tail.get()) - tail.compareAndSet(t, last); - } - else if (t.casNext(null, s)) { - tail.compareAndSet(t, s); - return awaitFulfill(t, s, e, mode, nanos); - } - } - - else if (h != null) { - QNode first = h.next; - if (t == tail.get() && first != null && - advanceHead(h, first)) { - Object x = first.get(); - if (x != first && first.compareAndSet(x, e)) { - LockSupport.unpark(first.waiter); - return isData? e : x; - } - } - } - } - } - - - /** - * Version of xfer for poll() and tryTransfer, which - * simplifies control paths both here and in xfer. - */ - private Object fulfill(Object e) { - boolean isData = (e != null); - final PaddedAtomicReference head = this.head; - final PaddedAtomicReference tail = this.tail; - - for (;;) { - QNode t = tail.get(); - QNode h = head.get(); - - if (t != null && (t == h || t.isData == isData)) { - QNode last = t.next; - if (t == tail.get()) { - if (last != null) - tail.compareAndSet(t, last); - else - return null; - } - } - else if (h != null) { - QNode first = h.next; - if (t == tail.get() && - first != null && - advanceHead(h, first)) { - Object x = first.get(); - if (x != first && first.compareAndSet(x, e)) { - LockSupport.unpark(first.waiter); - return isData? e : x; - } - } - } - } - } - - /** - * Spins/blocks until node s is fulfilled or caller gives up, - * depending on wait mode. - * - * @param pred the predecessor of waiting node - * @param s the waiting node - * @param e the comparison value for checking match - * @param mode mode - * @param nanos timeout value - * @return matched item, or s if cancelled - */ - private Object awaitFulfill(QNode pred, QNode s, Object e, - int mode, long nanos) { - if (mode == NOWAIT) - return null; - - long lastTime = (mode == TIMEOUT)? System.nanoTime() : 0; - Thread w = Thread.currentThread(); - int spins = -1; // set to desired spin count below - for (;;) { - if (w.isInterrupted()) - s.compareAndSet(e, s); - Object x = s.get(); - if (x != e) { // Node was matched or cancelled - advanceHead(pred, s); // unlink if head - if (x == s) { // was cancelled - clean(pred, s); - return null; - } - else if (x != null) { - s.set(s); // avoid garbage retention - return x; - } - else - return e; - } - if (mode == TIMEOUT) { - long now = System.nanoTime(); - nanos -= now - lastTime; - lastTime = now; - if (nanos <= 0) { - s.compareAndSet(e, s); // try to cancel - continue; - } - } - if (spins < 0) { - QNode h = head.get(); // only spin if at head - spins = ((h != null && h.next == s) ? - (mode == TIMEOUT? - maxTimedSpins : maxUntimedSpins) : 0); - } - if (spins > 0) - --spins; - else if (s.waiter == null) - s.waiter = w; - else if (mode != TIMEOUT) { - LockSupport.park();//TR(this); - s.waiter = null; - spins = -1; - } - else if (nanos > spinForTimeoutThreshold) { - LockSupport.parkNanos(nanos);//(this, nanos); - s.waiter = null; - spins = -1; - } - } - } - - /** - * Returns validated tail for use in cleaning methods. - */ - private QNode getValidatedTail() { - for (;;) { - QNode h = head.get(); - QNode first = h.next; - if (first != null && first.next == first) { // help advance - advanceHead(h, first); - continue; - } - QNode t = tail.get(); - QNode last = t.next; - if (t == tail.get()) { - if (last != null) - tail.compareAndSet(t, last); // help advance - else - return t; - } - } - } - - /** - * Gets rid of cancelled node s with original predecessor pred. - * - * @param pred predecessor of cancelled node - * @param s the cancelled node - */ - private void clean(QNode pred, QNode s) { - Thread w = s.waiter; - if (w != null) { // Wake up thread - s.waiter = null; - if (w != Thread.currentThread()) - LockSupport.unpark(w); - } - - if (pred == null) - return; - - /* - * At any given time, exactly one node on list cannot be - * deleted -- the last inserted node. To accommodate this, if - * we cannot delete s, we save its predecessor as "cleanMe", - * processing the previously saved version first. At least one - * of node s or the node previously saved can always be - * processed, so this always terminates. - */ - while (pred.next == s) { - QNode oldpred = reclean(); // First, help get rid of cleanMe - QNode t = getValidatedTail(); - if (s != t) { // If not tail, try to unsplice - QNode sn = s.next; // s.next == s means s already off list - if (sn == s || pred.casNext(s, sn)) - break; - } - else if (oldpred == pred || // Already saved - (oldpred == null && cleanMe.compareAndSet(null, pred))) - break; // Postpone cleaning - } - } - - /** - * Tries to unsplice the cancelled node held in cleanMe that was - * previously uncleanable because it was at tail. - * - * @return current cleanMe node (or null) - */ - private QNode reclean() { - /* - * cleanMe is, or at one time was, predecessor of cancelled - * node s that was the tail so could not be unspliced. If s - * is no longer the tail, try to unsplice if necessary and - * make cleanMe slot available. This differs from similar - * code in clean() because we must check that pred still - * points to a cancelled node that must be unspliced -- if - * not, we can (must) clear cleanMe without unsplicing. - * This can loop only due to contention on casNext or - * clearing cleanMe. - */ - QNode pred; - while ((pred = cleanMe.get()) != null) { - QNode t = getValidatedTail(); - QNode s = pred.next; - if (s != t) { - QNode sn; - if (s == null || s == pred || s.get() != s || - (sn = s.next) == s || pred.casNext(s, sn)) - cleanMe.compareAndSet(pred, null); - } - else // s is still tail; cannot clean - break; - } - return pred; - } - - /** - * Creates an initially empty {@code LinkedTransferQueue}. - */ - public LinkedTransferQueue() { - QNode dummy = new QNode(null, false); - head = new PaddedAtomicReference(dummy); - tail = new PaddedAtomicReference(dummy); - cleanMe = new PaddedAtomicReference(null); - } - - /** - * Creates a {@code LinkedTransferQueue} - * initially containing the elements of the given collection, - * added in traversal order of the collection's iterator. - * - * @param c the collection of elements to initially contain - * @throws NullPointerException if the specified collection or any - * of its elements are null - */ - public LinkedTransferQueue(Collection c) { - this(); - addAll(c); - } - - public void put(E e) throws InterruptedException { - if (e == null) throw new NullPointerException(); - if (Thread.interrupted()) throw new InterruptedException(); - xfer(e, NOWAIT, 0); - } - - public boolean offer(E e, long timeout, TimeUnit unit) - throws InterruptedException { - if (e == null) throw new NullPointerException(); - if (Thread.interrupted()) throw new InterruptedException(); - xfer(e, NOWAIT, 0); - return true; - } - - public boolean offer(E e) { - if (e == null) throw new NullPointerException(); - xfer(e, NOWAIT, 0); - return true; - } - - public boolean add(E e) { - if (e == null) throw new NullPointerException(); - xfer(e, NOWAIT, 0); - return true; - } - - public void transfer(E e) throws InterruptedException { - if (e == null) throw new NullPointerException(); - if (xfer(e, WAIT, 0) == null) { - Thread.interrupted(); - throw new InterruptedException(); - } - } - - public boolean tryTransfer(E e, long timeout, TimeUnit unit) - throws InterruptedException { - if (e == null) throw new NullPointerException(); - if (xfer(e, TIMEOUT, unit.toNanos(timeout)) != null) - return true; - if (!Thread.interrupted()) - return false; - throw new InterruptedException(); - } - - public boolean tryTransfer(E e) { - if (e == null) throw new NullPointerException(); - return fulfill(e) != null; - } - - public E take() throws InterruptedException { - Object e = xfer(null, WAIT, 0); - if (e != null) - return (E)e; - Thread.interrupted(); - throw new InterruptedException(); - } - - public E poll(long timeout, TimeUnit unit) throws InterruptedException { - Object e = xfer(null, TIMEOUT, unit.toNanos(timeout)); - if (e != null || !Thread.interrupted()) - return (E)e; - throw new InterruptedException(); - } - - public E poll() { - return (E)fulfill(null); - } - - public int drainTo(Collection c) { - if (c == null) - throw new NullPointerException(); - if (c == this) - throw new IllegalArgumentException(); - int n = 0; - E e; - while ( (e = poll()) != null) { - c.add(e); - ++n; - } - return n; - } - - public int drainTo(Collection c, int maxElements) { - if (c == null) - throw new NullPointerException(); - if (c == this) - throw new IllegalArgumentException(); - int n = 0; - E e; - while (n < maxElements && (e = poll()) != null) { - c.add(e); - ++n; - } - return n; - } - - // Traversal-based methods - - /** - * Returns head after performing any outstanding helping steps. - */ - private QNode traversalHead() { - for (;;) { - QNode t = tail.get(); - QNode h = head.get(); - if (h != null && t != null) { - QNode last = t.next; - QNode first = h.next; - if (t == tail.get()) { - if (last != null) - tail.compareAndSet(t, last); - else if (first != null) { - Object x = first.get(); - if (x == first) - advanceHead(h, first); - else - return h; - } - else - return h; - } - } - reclean(); - } - } - - - public Iterator iterator() { - return new Itr(); - } - - /** - * Iterators. Basic strategy is to traverse list, treating - * non-data (i.e., request) nodes as terminating list. - * Once a valid data node is found, the item is cached - * so that the next call to next() will return it even - * if subsequently removed. - */ - class Itr implements Iterator { - QNode next; // node to return next - QNode pnext; // predecessor of next - QNode snext; // successor of next - QNode curr; // last returned node, for remove() - QNode pcurr; // predecessor of curr, for remove() - E nextItem; // Cache of next item, once commited to in next - - Itr() { - findNext(); - } - - /** - * Ensures next points to next valid node, or null if none. - */ - void findNext() { - for (;;) { - QNode pred = pnext; - QNode q = next; - if (pred == null || pred == q) { - pred = traversalHead(); - q = pred.next; - } - if (q == null || !q.isData) { - next = null; - return; - } - Object x = q.get(); - QNode s = q.next; - if (x != null && q != x && q != s) { - nextItem = (E)x; - snext = s; - pnext = pred; - next = q; - return; - } - pnext = q; - next = s; - } - } - - public boolean hasNext() { - return next != null; - } - - public E next() { - if (next == null) throw new NoSuchElementException(); - pcurr = pnext; - curr = next; - pnext = next; - next = snext; - E x = nextItem; - findNext(); - return x; - } - - public void remove() { - QNode p = curr; - if (p == null) - throw new IllegalStateException(); - Object x = p.get(); - if (x != null && x != p && p.compareAndSet(x, p)) - clean(pcurr, p); - } - } - - public E peek() { - for (;;) { - QNode h = traversalHead(); - QNode p = h.next; - if (p == null) - return null; - Object x = p.get(); - if (p != x) { - if (!p.isData) - return null; - if (x != null) - return (E)x; - } - } - } - - public boolean isEmpty() { - for (;;) { - QNode h = traversalHead(); - QNode p = h.next; - if (p == null) - return true; - Object x = p.get(); - if (p != x) { - if (!p.isData) - return true; - if (x != null) - return false; - } - } - } - - public boolean hasWaitingConsumer() { - for (;;) { - QNode h = traversalHead(); - QNode p = h.next; - if (p == null) - return false; - Object x = p.get(); - if (p != x) - return !p.isData; - } - } - - /** - * Returns the number of elements in this queue. If this queue - * contains more than {@code Integer.MAX_VALUE} elements, returns - * {@code Integer.MAX_VALUE}. - * - *

Beware that, unlike in most collections, this method is - * NOT a constant-time operation. Because of the - * asynchronous nature of these queues, determining the current - * number of elements requires an O(n) traversal. - * - * @return the number of elements in this queue - */ - public int size() { - int count = 0; - QNode h = traversalHead(); - for (QNode p = h.next; p != null && p.isData; p = p.next) { - Object x = p.get(); - if (x != null && x != p) { - if (++count == Integer.MAX_VALUE) // saturated - break; - } - } - return count; - } - - public int getWaitingConsumerCount() { - int count = 0; - QNode h = traversalHead(); - for (QNode p = h.next; p != null && !p.isData; p = p.next) { - if (p.get() == null) { - if (++count == Integer.MAX_VALUE) - break; - } - } - return count; - } - - public int remainingCapacity() { - return Integer.MAX_VALUE; - } - - public boolean remove(Object o) { - if (o == null) - return false; - for (;;) { - QNode pred = traversalHead(); - for (;;) { - QNode q = pred.next; - if (q == null || !q.isData) - return false; - if (q == pred) // restart - break; - Object x = q.get(); - if (x != null && x != q && o.equals(x) && - q.compareAndSet(x, q)) { - clean(pred, q); - return true; - } - pred = q; - } - } - } - - /** - * Save the state to a stream (that is, serialize it). - * - * @serialData All of the elements (each an {@code E}) in - * the proper order, followed by a null - * @param s the stream - */ - private void writeObject(java.io.ObjectOutputStream s) - throws java.io.IOException { - s.defaultWriteObject(); - for (E e : this) - s.writeObject(e); - // Use trailing null as sentinel - s.writeObject(null); - } - - /** - * Reconstitute the Queue instance from a stream (that is, - * deserialize it). - * @param s the stream - */ - private void readObject(java.io.ObjectInputStream s) - throws java.io.IOException, ClassNotFoundException { - s.defaultReadObject(); - resetHeadAndTail(); - for (;;) { - E item = (E)s.readObject(); - if (item == null) - break; - else - offer(item); - } - } - - - // Support for resetting head/tail while deserializing - private void resetHeadAndTail() { - QNode dummy = new QNode(null, false); - _unsafe.putObjectVolatile(this, headOffset, - new PaddedAtomicReference(dummy)); - _unsafe.putObjectVolatile(this, tailOffset, - new PaddedAtomicReference(dummy)); - _unsafe.putObjectVolatile(this, cleanMeOffset, - new PaddedAtomicReference(null)); - } - - // Temporary Unsafe mechanics for preliminary release - private static Unsafe getUnsafe() throws Throwable { - try { - return Unsafe.getUnsafe(); - } catch (SecurityException se) { - try { - return java.security.AccessController.doPrivileged - (new java.security.PrivilegedExceptionAction() { - public Unsafe run() throws Exception { - return getUnsafePrivileged(); - }}); - } catch (java.security.PrivilegedActionException e) { - throw e.getCause(); - } - } - } - - private static Unsafe getUnsafePrivileged() - throws NoSuchFieldException, IllegalAccessException { - Field f = Unsafe.class.getDeclaredField("theUnsafe"); - f.setAccessible(true); - return (Unsafe) f.get(null); - } - - private static long fieldOffset(String fieldName) - throws NoSuchFieldException { - return _unsafe.objectFieldOffset - (LinkedTransferQueue.class.getDeclaredField(fieldName)); - } - - private static final Unsafe _unsafe; - private static final long headOffset; - private static final long tailOffset; - private static final long cleanMeOffset; - static { - try { - _unsafe = getUnsafe(); - headOffset = fieldOffset("head"); - tailOffset = fieldOffset("tail"); - cleanMeOffset = fieldOffset("cleanMe"); - } catch (Throwable e) { - throw new RuntimeException("Could not initialize intrinsics", e); - } - } - -} diff --git a/src/jvm15-library/scala/concurrent/forkjoin/RecursiveAction.java b/src/jvm15-library/scala/concurrent/forkjoin/RecursiveAction.java deleted file mode 100644 index 2d36f7eb33..0000000000 --- a/src/jvm15-library/scala/concurrent/forkjoin/RecursiveAction.java +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Written by Doug Lea with assistance from members of JCP JSR-166 - * Expert Group and released to the public domain, as explained at - * http://creativecommons.org/licenses/publicdomain - */ - -package scala.concurrent.forkjoin; - -/** - * Recursive resultless ForkJoinTasks. This class establishes - * conventions to parameterize resultless actions as Void - * ForkJoinTasks. Because null is the only valid value of - * Void, methods such as join always return null - * upon completion. - * - *

Sample Usages. Here is a sketch of a ForkJoin sort that - * sorts a given long[] array: - * - *

- * class SortTask extends RecursiveAction {
- *   final long[] array; final int lo; final int hi;
- *   SortTask(long[] array, int lo, int hi) {
- *     this.array = array; this.lo = lo; this.hi = hi;
- *   }
- *   protected void compute() {
- *     if (hi - lo < THRESHOLD)
- *       sequentiallySort(array, lo, hi);
- *     else {
- *       int mid = (lo + hi) >>> 1;
- *       invokeAll(new SortTask(array, lo, mid),
- *                 new SortTask(array, mid, hi));
- *       merge(array, lo, hi);
- *     }
- *   }
- * }
- * 
- * - * You could then sort anArray by creating new SortTask(anArray, 0, - * anArray.length-1) and invoking it in a ForkJoinPool. - * As a more concrete simple example, the following task increments - * each element of an array: - *
- * class IncrementTask extends RecursiveAction {
- *   final long[] array; final int lo; final int hi;
- *   IncrementTask(long[] array, int lo, int hi) {
- *     this.array = array; this.lo = lo; this.hi = hi;
- *   }
- *   protected void compute() {
- *     if (hi - lo < THRESHOLD) {
- *       for (int i = lo; i < hi; ++i)
- *         array[i]++;
- *     }
- *     else {
- *       int mid = (lo + hi) >>> 1;
- *       invokeAll(new IncrementTask(array, lo, mid),
- *                 new IncrementTask(array, mid, hi));
- *     }
- *   }
- * }
- * 
- * - * - *

The following example illustrates some refinements and idioms - * that may lead to better performance: RecursiveActions need not be - * fully recursive, so long as they maintain the basic - * divide-and-conquer approach. Here is a class that sums the squares - * of each element of a double array, by subdividing out only the - * right-hand-sides of repeated divisions by two, and keeping track of - * them with a chain of next references. It uses a dynamic - * threshold based on method surplus, but counterbalances - * potential excess partitioning by directly performing leaf actions - * on unstolen tasks rather than further subdividing. - * - *

- * double sumOfSquares(ForkJoinPool pool, double[] array) {
- *   int n = array.length;
- *   int seqSize = 1 + n / (8 * pool.getParallelism());
- *   Applyer a = new Applyer(array, 0, n, seqSize, null);
- *   pool.invoke(a);
- *   return a.result;
- * }
- *
- * class Applyer extends RecursiveAction {
- *   final double[] array;
- *   final int lo, hi, seqSize;
- *   double result;
- *   Applyer next; // keeps track of right-hand-side tasks
- *   Applyer(double[] array, int lo, int hi, int seqSize, Applyer next) {
- *     this.array = array; this.lo = lo; this.hi = hi;
- *     this.seqSize = seqSize; this.next = next;
- *   }
- *
- *   double atLeaf(int l, int r) {
- *     double sum = 0;
- *     for (int i = l; i < h; ++i) // perform leftmost base step
- *       sum += array[i] * array[i];
- *     return sum;
- *   }
- *
- *   protected void compute() {
- *     int l = lo;
- *     int h = hi;
- *     Applyer right = null;
- *     while (h - l > 1 &&
- *        ForkJoinWorkerThread.getEstimatedSurplusTaskCount() <= 3) {
- *        int mid = (l + h) >>> 1;
- *        right = new Applyer(array, mid, h, seqSize, right);
- *        right.fork();
- *        h = mid;
- *     }
- *     double sum = atLeaf(l, h);
- *     while (right != null) {
- *        if (right.tryUnfork()) // directly calculate if not stolen
- *          sum += right.atLeaf(right.lo, right.hi);
- *       else {
- *          right.helpJoin();
- *          sum += right.result;
- *        }
- *        right = right.next;
- *      }
- *     result = sum;
- *   }
- * }
- * 
- */ -public abstract class RecursiveAction extends ForkJoinTask { - - /** - * The main computation performed by this task. - */ - protected abstract void compute(); - - /** - * Always returns null - */ - public final Void getRawResult() { return null; } - - /** - * Requires null completion value. - */ - protected final void setRawResult(Void mustBeNull) { } - - /** - * Implements execution conventions for RecursiveActions - */ - protected final boolean exec() { - compute(); - return true; - } - -} diff --git a/src/jvm15-library/scala/concurrent/forkjoin/RecursiveTask.java b/src/jvm15-library/scala/concurrent/forkjoin/RecursiveTask.java deleted file mode 100644 index 1f3110580b..0000000000 --- a/src/jvm15-library/scala/concurrent/forkjoin/RecursiveTask.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Written by Doug Lea with assistance from members of JCP JSR-166 - * Expert Group and released to the public domain, as explained at - * http://creativecommons.org/licenses/publicdomain - */ - -package scala.concurrent.forkjoin; - -/** - * Recursive result-bearing ForkJoinTasks. - *

For a classic example, here is a task computing Fibonacci numbers: - * - *

- * class Fibonacci extends RecursiveTask<Integer> {
- *   final int n;
- *   Fibonnaci(int n) { this.n = n; }
- *   Integer compute() {
- *     if (n <= 1)
- *        return n;
- *     Fibonacci f1 = new Fibonacci(n - 1);
- *     f1.fork();
- *     Fibonacci f2 = new Fibonacci(n - 2);
- *     return f2.compute() + f1.join();
- *   }
- * }
- * 
- * - * However, besides being a dumb way to compute Fibonacci functions - * (there is a simple fast linear algorithm that you'd use in - * practice), this is likely to perform poorly because the smallest - * subtasks are too small to be worthwhile splitting up. Instead, as - * is the case for nearly all fork/join applications, you'd pick some - * minimum granularity size (for example 10 here) for which you always - * sequentially solve rather than subdividing. - * - */ -public abstract class RecursiveTask extends ForkJoinTask { - - /** - * Empty contructor for use by subclasses. - */ - protected RecursiveTask() { - } - - /** - * The result returned by compute method. - */ - V result; - - /** - * The main computation performed by this task. - */ - protected abstract V compute(); - - public final V getRawResult() { - return result; - } - - protected final void setRawResult(V value) { - result = value; - } - - /** - * Implements execution conventions for RecursiveTask - */ - protected final boolean exec() { - result = compute(); - return true; - } - -} diff --git a/src/jvm15-library/scala/concurrent/forkjoin/ThreadLocalRandom.java b/src/jvm15-library/scala/concurrent/forkjoin/ThreadLocalRandom.java deleted file mode 100644 index 34e2e37f37..0000000000 --- a/src/jvm15-library/scala/concurrent/forkjoin/ThreadLocalRandom.java +++ /dev/null @@ -1,186 +0,0 @@ -/* - * Written by Doug Lea with assistance from members of JCP JSR-166 - * Expert Group and released to the public domain, as explained at - * http://creativecommons.org/licenses/publicdomain - */ - -package scala.concurrent.forkjoin; -import java.util.*; - -/** - * A random number generator with the same properties as class {@link - * Random} but isolated to the current Thread. Like the global - * generator used by the {@link java.lang.Math} class, a - * ThreadLocalRandom is initialized with an internally generated seed - * that may not otherwise be modified. When applicable, use of - * ThreadLocalRandom rather than shared Random objects in concurrent - * programs will typically encounter much less overhead and - * contention. ThreadLocalRandoms are particularly appropriate when - * multiple tasks (for example, each a {@link ForkJoinTask}), use - * random numbers in parallel in thread pools. - * - *

Usages of this class should typically be of the form: - * ThreadLocalRandom.current().nextX(...) (where - * X is Int, Long, etc). - * When all usages are of this form, it is never possible to - * accidently share ThreadLocalRandoms across multiple threads. - * - *

This class also provides additional commonly used bounded random - * generation methods. - */ -public class ThreadLocalRandom extends Random { - // same constants as Random, but must be redeclared because private - private final static long multiplier = 0x5DEECE66DL; - private final static long addend = 0xBL; - private final static long mask = (1L << 48) - 1; - - /** - * The random seed. We can't use super.seed - */ - private long rnd; - - /** - * Initialization flag to permit the first and only allowed call - * to setSeed (inside Random constructor) to succeed. We can't - * allow others since it would cause setting seed in one part of a - * program to unintentionally impact other usages by the thread. - */ - boolean initialized; - - // Padding to help avoid memory contention among seed updates in - // different TLRs in the common case that they are located near - // each other. - private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7; - - /** - * The actual ThreadLocal - */ - private static final ThreadLocal localRandom = - new ThreadLocal() { - protected ThreadLocalRandom initialValue() { - return new ThreadLocalRandom(); - } - }; - - - /** - * Constructor called only by localRandom.initialValue. - * We rely on the fact that the superclass no-arg constructor - * invokes setSeed exactly once to initialize. - */ - ThreadLocalRandom() { - super(); - } - - /** - * Returns the current Thread's ThreadLocalRandom - * @return the current Thread's ThreadLocalRandom - */ - public static ThreadLocalRandom current() { - return localRandom.get(); - } - - /** - * Throws UnsupportedOperationException. Setting seeds in this - * generator is unsupported. - * @throws UnsupportedOperationException always - */ - public void setSeed(long seed) { - if (initialized) - throw new UnsupportedOperationException(); - initialized = true; - rnd = (seed ^ multiplier) & mask; - } - - protected int next(int bits) { - return (int)((rnd = (rnd * multiplier + addend) & mask) >>> (48-bits)); - } - - /** - * Returns a pseudorandom, uniformly distributed value between the - * given least value (inclusive) and bound (exclusive). - * @param least the least value returned - * @param bound the upper bound (exclusive) - * @throws IllegalArgumentException if least greater than or equal - * to bound - * @return the next value - */ - public int nextInt(int least, int bound) { - if (least >= bound) - throw new IllegalArgumentException(); - return nextInt(bound - least) + least; - } - - /** - * Returns a pseudorandom, uniformly distributed value - * between 0 (inclusive) and the specified value (exclusive) - * @param n the bound on the random number to be returned. Must be - * positive. - * @return the next value - * @throws IllegalArgumentException if n is not positive - */ - public long nextLong(long n) { - if (n <= 0) - throw new IllegalArgumentException("n must be positive"); - // Divide n by two until small enough for nextInt. On each - // iteration (at most 31 of them but usually much less), - // randomly choose both whether to include high bit in result - // (offset) and whether to continue with the lower vs upper - // half (which makes a difference only if odd). - long offset = 0; - while (n >= Integer.MAX_VALUE) { - int bits = next(2); - long half = n >>> 1; - long nextn = ((bits & 2) == 0)? half : n - half; - if ((bits & 1) == 0) - offset += n - nextn; - n = nextn; - } - return offset + nextInt((int)n); - } - - /** - * Returns a pseudorandom, uniformly distributed value between the - * given least value (inclusive) and bound (exclusive). - * @param least the least value returned - * @param bound the upper bound (exclusive) - * @return the next value - * @throws IllegalArgumentException if least greater than or equal - * to bound - */ - public long nextLong(long least, long bound) { - if (least >= bound) - throw new IllegalArgumentException(); - return nextLong(bound - least) + least; - } - - /** - * Returns a pseudorandom, uniformly distributed {@code double} value - * between 0 (inclusive) and the specified value (exclusive) - * @param n the bound on the random number to be returned. Must be - * positive. - * @return the next value - * @throws IllegalArgumentException if n is not positive - */ - public double nextDouble(double n) { - if (n <= 0) - throw new IllegalArgumentException("n must be positive"); - return nextDouble() * n; - } - - /** - * Returns a pseudorandom, uniformly distributed value between the - * given least value (inclusive) and bound (exclusive). - * @param least the least value returned - * @param bound the upper bound (exclusive) - * @return the next value - * @throws IllegalArgumentException if least greater than or equal - * to bound - */ - public double nextDouble(double least, double bound) { - if (least >= bound) - throw new IllegalArgumentException(); - return nextDouble() * (bound - least) + least; - } - -} diff --git a/src/jvm15-library/scala/concurrent/forkjoin/TransferQueue.java b/src/jvm15-library/scala/concurrent/forkjoin/TransferQueue.java deleted file mode 100644 index 9c7b2289c4..0000000000 --- a/src/jvm15-library/scala/concurrent/forkjoin/TransferQueue.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Written by Doug Lea with assistance from members of JCP JSR-166 - * Expert Group and released to the public domain, as explained at - * http://creativecommons.org/licenses/publicdomain - */ - -package scala.concurrent.forkjoin; -import java.util.concurrent.*; - -/** - * A {@link BlockingQueue} in which producers may wait for consumers - * to receive elements. A {@code TransferQueue} may be useful for - * example in message passing applications in which producers - * sometimes (using method {@code transfer}) await receipt of - * elements by consumers invoking {@code take} or {@code poll}, - * while at other times enqueue elements (via method {@code put}) - * without waiting for receipt. Non-blocking and time-out versions of - * {@code tryTransfer} are also available. A TransferQueue may also - * be queried via {@code hasWaitingConsumer} whether there are any - * threads waiting for items, which is a converse analogy to a - * {@code peek} operation. - * - *

Like any {@code BlockingQueue}, a {@code TransferQueue} may be - * capacity bounded. If so, an attempted {@code transfer} operation - * may initially block waiting for available space, and/or - * subsequently block waiting for reception by a consumer. Note that - * in a queue with zero capacity, such as {@link SynchronousQueue}, - * {@code put} and {@code transfer} are effectively synonymous. - * - *

This interface is a member of the - * - * Java Collections Framework. - * - * @since 1.7 - * @author Doug Lea - * @param the type of elements held in this collection - */ -public interface TransferQueue extends BlockingQueue { - /** - * Transfers the specified element if there exists a consumer - * already waiting to receive it, otherwise returning {@code false} - * without enqueuing the element. - * - * @param e the element to transfer - * @return {@code true} if the element was transferred, else - * {@code false} - * @throws ClassCastException if the class of the specified element - * prevents it from being added to this queue - * @throws NullPointerException if the specified element is null - * @throws IllegalArgumentException if some property of the specified - * element prevents it from being added to this queue - */ - boolean tryTransfer(E e); - - /** - * Inserts the specified element into this queue, waiting if - * necessary for space to become available and the element to be - * dequeued by a consumer invoking {@code take} or {@code poll}. - * - * @param e the element to transfer - * @throws InterruptedException if interrupted while waiting, - * in which case the element is not enqueued. - * @throws ClassCastException if the class of the specified element - * prevents it from being added to this queue - * @throws NullPointerException if the specified element is null - * @throws IllegalArgumentException if some property of the specified - * element prevents it from being added to this queue - */ - void transfer(E e) throws InterruptedException; - - /** - * Inserts the specified element into this queue, waiting up to - * the specified wait time if necessary for space to become - * available and the element to be dequeued by a consumer invoking - * {@code take} or {@code poll}. - * - * @param e the element to transfer - * @param timeout how long to wait before giving up, in units of - * {@code unit} - * @param unit a {@code TimeUnit} determining how to interpret the - * {@code timeout} parameter - * @return {@code true} if successful, or {@code false} if - * the specified waiting time elapses before completion, - * in which case the element is not enqueued. - * @throws InterruptedException if interrupted while waiting, - * in which case the element is not enqueued. - * @throws ClassCastException if the class of the specified element - * prevents it from being added to this queue - * @throws NullPointerException if the specified element is null - * @throws IllegalArgumentException if some property of the specified - * element prevents it from being added to this queue - */ - boolean tryTransfer(E e, long timeout, TimeUnit unit) - throws InterruptedException; - - /** - * Returns {@code true} if there is at least one consumer waiting - * to dequeue an element via {@code take} or {@code poll}. - * The return value represents a momentary state of affairs. - * - * @return {@code true} if there is at least one waiting consumer - */ - boolean hasWaitingConsumer(); - - /** - * Returns an estimate of the number of consumers waiting to - * dequeue elements via {@code take} or {@code poll}. The return - * value is an approximation of a momentary state of affairs, that - * may be inaccurate if consumers have completed or given up - * waiting. The value may be useful for monitoring and heuristics, - * but not for synchronization control. Implementations of this - * method are likely to be noticeably slower than those for - * {@link #hasWaitingConsumer}. - * - * @return the number of consumers waiting to dequeue elements - */ - int getWaitingConsumerCount(); -} diff --git a/src/jvm15-library/scala/concurrent/forkjoin/package-info.java b/src/jvm15-library/scala/concurrent/forkjoin/package-info.java deleted file mode 100644 index b8fa0fad02..0000000000 --- a/src/jvm15-library/scala/concurrent/forkjoin/package-info.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Written by Doug Lea with assistance from members of JCP JSR-166 - * Expert Group and released to the public domain, as explained at - * http://creativecommons.org/licenses/publicdomain - */ - - -/** - * Preview versions of classes targeted for Java 7. Includes a - * fine-grained parallel computation framework: ForkJoinTasks and - * their related support classes provide a very efficient basis for - * obtaining platform-independent parallel speed-ups of - * computation-intensive operations. They are not a full substitute - * for the kinds of arbitrary processing supported by Executors or - * Threads. However, when applicable, they typically provide - * significantly greater performance on multiprocessor platforms. - * - *

Candidates for fork/join processing mainly include those that - * can be expressed using parallel divide-and-conquer techniques: To - * solve a problem, break it in two (or more) parts, and then solve - * those parts in parallel, continuing on in this way until the - * problem is too small to be broken up, so is solved directly. The - * underlying work-stealing framework makes subtasks - * available to other threads (normally one per CPU), that help - * complete the tasks. In general, the most efficient ForkJoinTasks - * are those that directly implement this algorithmic design pattern. - * - */ -package scala.concurrent.forkjoin; -- cgit v1.2.3