diff options
Diffstat (limited to 'src/actors/scala/actors/threadpool/ThreadPoolExecutor.java')
-rw-r--r-- | src/actors/scala/actors/threadpool/ThreadPoolExecutor.java | 1968 |
1 files changed, 0 insertions, 1968 deletions
diff --git a/src/actors/scala/actors/threadpool/ThreadPoolExecutor.java b/src/actors/scala/actors/threadpool/ThreadPoolExecutor.java deleted file mode 100644 index 11e35b034c..0000000000 --- a/src/actors/scala/actors/threadpool/ThreadPoolExecutor.java +++ /dev/null @@ -1,1968 +0,0 @@ -/* - * Written by Doug Lea with assistance from members of JCP JSR-166 - * Expert Group and released to the public domain, as explained at - * http://creativecommons.org/licenses/publicdomain - */ - -package scala.actors.threadpool; -import scala.actors.threadpool.locks.*; -import scala.actors.threadpool.helpers.Utils; -import java.util.HashSet; -import java.util.List; -import java.util.Iterator; -import java.util.ArrayList; -import java.util.ConcurrentModificationException; - -/** - * An {@link ExecutorService} that executes each submitted task using - * one of possibly several pooled threads, normally configured - * using {@link Executors} factory methods. - * - * <p>Thread pools address two different problems: they usually - * provide improved performance when executing large numbers of - * asynchronous tasks, due to reduced per-task invocation overhead, - * and they provide a means of bounding and managing the resources, - * including threads, consumed when executing a collection of tasks. - * Each {@code ThreadPoolExecutor} also maintains some basic - * statistics, such as the number of completed tasks. - * - * <p>To be useful across a wide range of contexts, this class - * provides many adjustable parameters and extensibility - * hooks. However, programmers are urged to use the more convenient - * {@link Executors} factory methods {@link - * Executors#newCachedThreadPool} (unbounded thread pool, with - * automatic thread reclamation), {@link Executors#newFixedThreadPool} - * (fixed size thread pool) and {@link - * Executors#newSingleThreadExecutor} (single background thread), that - * preconfigure settings for the most common usage - * scenarios. Otherwise, use the following guide when manually - * configuring and tuning this class: - * - * <dl> - * - * <dt>Core and maximum pool sizes</dt> - * - * <dd>A {@code ThreadPoolExecutor} will automatically adjust the - * pool size (see {@link #getPoolSize}) - * according to the bounds set by - * corePoolSize (see {@link #getCorePoolSize}) and - * maximumPoolSize (see {@link #getMaximumPoolSize}). - * - * When a new task is submitted in method {@link #execute}, and fewer - * than corePoolSize threads are running, a new thread is created to - * handle the request, even if other worker threads are idle. If - * there are more than corePoolSize but less than maximumPoolSize - * threads running, a new thread will be created only if the queue is - * full. By setting corePoolSize and maximumPoolSize the same, you - * create a fixed-size thread pool. By setting maximumPoolSize to an - * essentially unbounded value such as {@code Integer.MAX_VALUE}, you - * allow the pool to accommodate an arbitrary number of concurrent - * tasks. Most typically, core and maximum pool sizes are set only - * upon construction, but they may also be changed dynamically using - * {@link #setCorePoolSize} and {@link #setMaximumPoolSize}. </dd> - * - * <dt>On-demand construction</dt> - * - * <dd> By default, even core threads are initially created and - * started only when new tasks arrive, but this can be overridden - * dynamically using method {@link #prestartCoreThread} or {@link - * #prestartAllCoreThreads}. You probably want to prestart threads if - * you construct the pool with a non-empty queue. </dd> - * - * <dt>Creating new threads</dt> - * - * <dd>New threads are created using a {@link ThreadFactory}. If not - * otherwise specified, a {@link Executors#defaultThreadFactory} is - * used, that creates threads to all be in the same {@link - * ThreadGroup} and with the same {@code NORM_PRIORITY} priority and - * non-daemon status. By supplying a different ThreadFactory, you can - * alter the thread's name, thread group, priority, daemon status, - * etc. If a {@code ThreadFactory} fails to create a thread when asked - * by returning null from {@code newThread}, the executor will - * continue, but might not be able to execute any tasks. Threads - * should possess the "modifyThread" {@code RuntimePermission}. If - * worker threads or other threads using the pool do not possess this - * permission, service may be degraded: configuration changes may not - * take effect in a timely manner, and a shutdown pool may remain in a - * state in which termination is possible but not completed.</dd> - * - * <dt>Keep-alive times</dt> - * - * <dd>If the pool currently has more than corePoolSize threads, - * excess threads will be terminated if they have been idle for more - * than the keepAliveTime (see {@link #getKeepAliveTime}). This - * provides a means of reducing resource consumption when the pool is - * not being actively used. If the pool becomes more active later, new - * threads will be constructed. This parameter can also be changed - * dynamically using method {@link #setKeepAliveTime}. Using a value - * of {@code Long.MAX_VALUE} {@link TimeUnit#NANOSECONDS} effectively - * disables idle threads from ever terminating prior to shut down. By - * default, the keep-alive policy applies only when there are more - * than corePoolSizeThreads. But method {@link - * #allowCoreThreadTimeOut(boolean)} can be used to apply this - * time-out policy to core threads as well, so long as the - * keepAliveTime value is non-zero. </dd> - * - * <dt>Queuing</dt> - * - * <dd>Any {@link BlockingQueue} may be used to transfer and hold - * submitted tasks. The use of this queue interacts with pool sizing: - * - * <ul> - * - * <li> If fewer than corePoolSize threads are running, the Executor - * always prefers adding a new thread - * rather than queuing.</li> - * - * <li> If corePoolSize or more threads are running, the Executor - * always prefers queuing a request rather than adding a new - * thread.</li> - * - * <li> If a request cannot be queued, a new thread is created unless - * this would exceed maximumPoolSize, in which case, the task will be - * rejected.</li> - * - * </ul> - * - * There are three general strategies for queuing: - * <ol> - * - * <li> <em> Direct handoffs.</em> A good default choice for a work - * queue is a {@link SynchronousQueue} that hands off tasks to threads - * without otherwise holding them. Here, an attempt to queue a task - * will fail if no threads are immediately available to run it, so a - * new thread will be constructed. This policy avoids lockups when - * handling sets of requests that might have internal dependencies. - * Direct handoffs generally require unbounded maximumPoolSizes to - * avoid rejection of new submitted tasks. This in turn admits the - * possibility of unbounded thread growth when commands continue to - * arrive on average faster than they can be processed. </li> - * - * <li><em> Unbounded queues.</em> Using an unbounded queue (for - * example a {@link LinkedBlockingQueue} without a predefined - * capacity) will cause new tasks to wait in the queue when all - * corePoolSize threads are busy. Thus, no more than corePoolSize - * threads will ever be created. (And the value of the maximumPoolSize - * therefore doesn't have any effect.) This may be appropriate when - * each task is completely independent of others, so tasks cannot - * affect each others execution; for example, in a web page server. - * While this style of queuing can be useful in smoothing out - * transient bursts of requests, it admits the possibility of - * unbounded work queue growth when commands continue to arrive on - * average faster than they can be processed. </li> - * - * <li><em>Bounded queues.</em> A bounded queue (for example, an - * {@link ArrayBlockingQueue}) helps prevent resource exhaustion when - * used with finite maximumPoolSizes, but can be more difficult to - * tune and control. Queue sizes and maximum pool sizes may be traded - * off for each other: Using large queues and small pools minimizes - * CPU usage, OS resources, and context-switching overhead, but can - * lead to artificially low throughput. If tasks frequently block (for - * example if they are I/O bound), a system may be able to schedule - * time for more threads than you otherwise allow. Use of small queues - * generally requires larger pool sizes, which keeps CPUs busier but - * may encounter unacceptable scheduling overhead, which also - * decreases throughput. </li> - * - * </ol> - * - * </dd> - * - * <dt>Rejected tasks</dt> - * - * <dd> New tasks submitted in method {@link #execute} will be - * <em>rejected</em> when the Executor has been shut down, and also - * when the Executor uses finite bounds for both maximum threads and - * work queue capacity, and is saturated. In either case, the {@code - * execute} method invokes the {@link - * RejectedExecutionHandler#rejectedExecution} method of its {@link - * RejectedExecutionHandler}. Four predefined handler policies are - * provided: - * - * <ol> - * - * <li> In the default {@link ThreadPoolExecutor.AbortPolicy}, the - * handler throws a runtime {@link RejectedExecutionException} upon - * rejection. </li> - * - * <li> In {@link ThreadPoolExecutor.CallerRunsPolicy}, the thread - * that invokes {@code execute} itself runs the task. This provides a - * simple feedback control mechanism that will slow down the rate that - * new tasks are submitted. </li> - * - * <li> In {@link ThreadPoolExecutor.DiscardPolicy}, a task that - * cannot be executed is simply dropped. </li> - * - * <li>In {@link ThreadPoolExecutor.DiscardOldestPolicy}, if the - * executor is not shut down, the task at the head of the work queue - * is dropped, and then execution is retried (which can fail again, - * causing this to be repeated.) </li> - * - * </ol> - * - * It is possible to define and use other kinds of {@link - * RejectedExecutionHandler} classes. Doing so requires some care - * especially when policies are designed to work only under particular - * capacity or queuing policies. </dd> - * - * <dt>Hook methods</dt> - * - * <dd>This class provides {@code protected} overridable {@link - * #beforeExecute} and {@link #afterExecute} methods that are called - * before and after execution of each task. These can be used to - * manipulate the execution environment; for example, reinitializing - * ThreadLocals, gathering statistics, or adding log - * entries. Additionally, method {@link #terminated} can be overridden - * to perform any special processing that needs to be done once the - * Executor has fully terminated. - * - * <p>If hook or callback methods throw exceptions, internal worker - * threads may in turn fail and abruptly terminate.</dd> - * - * <dt>Queue maintenance</dt> - * - * <dd> Method {@link #getQueue} allows access to the work queue for - * purposes of monitoring and debugging. Use of this method for any - * other purpose is strongly discouraged. Two supplied methods, - * {@link #remove} and {@link #purge} are available to assist in - * storage reclamation when large numbers of queued tasks become - * cancelled.</dd> - * - * <dt>Finalization</dt> - * - * <dd> A pool that is no longer referenced in a program <em>AND</em> - * has no remaining threads will be {@code shutdown} automatically. If - * you would like to ensure that unreferenced pools are reclaimed even - * if users forget to call {@link #shutdown}, then you must arrange - * that unused threads eventually die, by setting appropriate - * keep-alive times, using a lower bound of zero core threads and/or - * setting {@link #allowCoreThreadTimeOut(boolean)}. </dd> - * - * </dl> - * - * <p> <b>Extension example</b>. Most extensions of this class - * override one or more of the protected hook methods. For example, - * here is a subclass that adds a simple pause/resume feature: - * - * <pre> {@code - * class PausableThreadPoolExecutor extends ThreadPoolExecutor { - * private boolean isPaused; - * private ReentrantLock pauseLock = new ReentrantLock(); - * private Condition unpaused = pauseLock.newCondition(); - * - * public PausableThreadPoolExecutor(...) { super(...); } - * - * protected void beforeExecute(Thread t, Runnable r) { - * super.beforeExecute(t, r); - * pauseLock.lock(); - * try { - * while (isPaused) unpaused.await(); - * } catch (InterruptedException ie) { - * t.interrupt(); - * } finally { - * pauseLock.unlock(); - * } - * } - * - * public void pause() { - * pauseLock.lock(); - * try { - * isPaused = true; - * } finally { - * pauseLock.unlock(); - * } - * } - * - * public void resume() { - * pauseLock.lock(); - * try { - * isPaused = false; - * unpaused.signalAll(); - * } finally { - * pauseLock.unlock(); - * } - * } - * }}</pre> - * - * @since 1.5 - * @author Doug Lea - */ -public class ThreadPoolExecutor extends AbstractExecutorService { - /** - * The main pool control state, ctl, is an atomic integer packing - * two conceptual fields - * workerCount, indicating the effective number of threads - * runState, indicating whether running, shutting down etc - * - * In order to pack them into one int, we limit workerCount to - * (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2 - * billion) otherwise representable. If this is ever an issue in - * the future, the variable can be changed to be an AtomicLong, - * and the shift/mask constants below adjusted. But until the need - * arises, this code is a bit faster and simpler using an int. - * - * The workerCount is the number of workers that have been - * permitted to start and not permitted to stop. The value may be - * transiently different from the actual number of live threads, - * for example when a ThreadFactory fails to create a thread when - * asked, and when exiting threads are still performing - * bookkeeping before terminating. The user-visible pool size is - * reported as the current size of the workers set. - * - * The runState provides the main lifecyle control, taking on values: - * - * RUNNING: Accept new tasks and process queued tasks - * SHUTDOWN: Don't accept new tasks, but process queued tasks - * STOP: Don't accept new tasks, don't process queued tasks, - * and interrupt in-progress tasks - * TIDYING: All tasks have terminated, workerCount is zero, - * the thread transitioning to state TIDYING - * will run the terminated() hook method - * TERMINATED: terminated() has completed - * - * The numerical order among these values matters, to allow - * ordered comparisons. The runState monotonically increases over - * time, but need not hit each state. The transitions are: - * - * RUNNING -> SHUTDOWN - * On invocation of shutdown(), perhaps implicitly in finalize() - * (RUNNING or SHUTDOWN) -> STOP - * On invocation of shutdownNow() - * SHUTDOWN -> TIDYING - * When both queue and pool are empty - * STOP -> TIDYING - * When pool is empty - * TIDYING -> TERMINATED - * When the terminated() hook method has completed - * - * Threads waiting in awaitTermination() will return when the - * state reaches TERMINATED. - * - * Detecting the transition from SHUTDOWN to TIDYING is less - * straightforward than you'd like because the queue may become - * empty after non-empty and vice versa during SHUTDOWN state, but - * we can only terminate if, after seeing that it is empty, we see - * that workerCount is 0 (which sometimes entails a recheck -- see - * below). - */ - private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); - private static final int COUNT_BITS = 29; // Integer.SIZE - 3; - private static final int CAPACITY = (1 << COUNT_BITS) - 1; - - // runState is stored in the high-order bits - private static final int RUNNING = -1 << COUNT_BITS; - private static final int SHUTDOWN = 0 << COUNT_BITS; - private static final int STOP = 1 << COUNT_BITS; - private static final int TIDYING = 2 << COUNT_BITS; - private static final int TERMINATED = 3 << COUNT_BITS; - - // Packing and unpacking ctl - private static int runStateOf(int c) { return c & ~CAPACITY; } - private static int workerCountOf(int c) { return c & CAPACITY; } - private static int ctlOf(int rs, int wc) { return rs | wc; } - - /* - * Bit field accessors that don't require unpacking ctl. - * These depend on the bit layout and on workerCount being never negative. - */ - - private static boolean runStateLessThan(int c, int s) { - return c < s; - } - - private static boolean runStateAtLeast(int c, int s) { - return c >= s; - } - - private static boolean isRunning(int c) { - return c < SHUTDOWN; - } - - /** - * Attempt to CAS-increment the workerCount field of ctl. - */ - private boolean compareAndIncrementWorkerCount(int expect) { - return ctl.compareAndSet(expect, expect + 1); - } - - /** - * Attempt to CAS-decrement the workerCount field of ctl. - */ - private boolean compareAndDecrementWorkerCount(int expect) { - return ctl.compareAndSet(expect, expect - 1); - } - - /** - * Decrements the workerCount field of ctl. This is called only on - * abrupt termination of a thread (see processWorkerExit). Other - * decrements are performed within getTask. - */ - private void decrementWorkerCount() { - do {} while (! compareAndDecrementWorkerCount(ctl.get())); - } - - /** - * The queue used for holding tasks and handing off to worker - * threads. We do not require that workQueue.poll() returning - * null necessarily means that workQueue.isEmpty(), so rely - * solely on isEmpty to see if the queue is empty (which we must - * do for example when deciding whether to transition from - * SHUTDOWN to TIDYING). This accommodates special-purpose - * queues such as DelayQueues for which poll() is allowed to - * return null even if it may later return non-null when delays - * expire. - */ - private final BlockingQueue workQueue; - - // TODO: DK: mainLock is used in lock(); try { ... } finally { unlock(); } - // Consider replacing with synchronized {} if performance reasons exist - /** - * Lock held on access to workers set and related bookkeeping. - * While we could use a concurrent set of some sort, it turns out - * to be generally preferable to use a lock. Among the reasons is - * that this serializes interruptIdleWorkers, which avoids - * unnecessary interrupt storms, especially during shutdown. - * Otherwise exiting threads would concurrently interrupt those - * that have not yet interrupted. It also simplifies some of the - * associated statistics bookkeeping of largestPoolSize etc. We - * also hold mainLock on shutdown and shutdownNow, for the sake of - * ensuring workers set is stable while separately checking - * permission to interrupt and actually interrupting. - */ - public final ReentrantLock mainLock = new ReentrantLock(); - - /** - * Set containing all worker threads in pool. Accessed only when - * holding mainLock. - */ - public final HashSet workers = new HashSet(); - - /** - * Wait condition to support awaitTermination - */ - private final Condition termination = mainLock.newCondition(); - - /** - * Tracks largest attained pool size. Accessed only under - * mainLock. - */ - private int largestPoolSize; - - /** - * Counter for completed tasks. Updated only on termination of - * worker threads. Accessed only under mainLock. - */ - private long completedTaskCount; - - /* - * All user control parameters are declared as volatiles so that - * ongoing actions are based on freshest values, but without need - * for locking, since no internal invariants depend on them - * changing synchronously with respect to other actions. - */ - - /** - * Factory for new threads. All threads are created using this - * factory (via method addWorker). All callers must be prepared - * for addWorker to fail, which may reflect a system or user's - * policy limiting the number of threads. Even though it is not - * treated as an error, failure to create threads may result in - * new tasks being rejected or existing ones remaining stuck in - * the queue. On the other hand, no special precautions exist to - * handle OutOfMemoryErrors that might be thrown while trying to - * create threads, since there is generally no recourse from - * within this class. - */ - private volatile ThreadFactory threadFactory; - - /** - * Handler called when saturated or shutdown in execute. - */ - private volatile RejectedExecutionHandler handler; - - /** - * Timeout in nanoseconds for idle threads waiting for work. - * Threads use this timeout when there are more than corePoolSize - * present or if allowCoreThreadTimeOut. Otherwise they wait - * forever for new work. - */ - private volatile long keepAliveTime; - - /** - * If false (default), core threads stay alive even when idle. - * If true, core threads use keepAliveTime to time out waiting - * for work. - */ - private volatile boolean allowCoreThreadTimeOut; - - /** - * Core pool size is the minimum number of workers to keep alive - * (and not allow to time out etc) unless allowCoreThreadTimeOut - * is set, in which case the minimum is zero. - */ - private volatile int corePoolSize; - - /** - * Maximum pool size. Note that the actual maximum is internally - * bounded by CAPACITY. - */ - private volatile int maximumPoolSize; - - /** - * The default rejected execution handler - */ - private static final RejectedExecutionHandler defaultHandler = - new AbortPolicy(); - - /** - * Permission required for callers of shutdown and shutdownNow. - * We additionally require (see checkShutdownAccess) that callers - * have permission to actually interrupt threads in the worker set - * (as governed by Thread.interrupt, which relies on - * ThreadGroup.checkAccess, which in turn relies on - * SecurityManager.checkAccess). Shutdowns are attempted only if - * these checks pass. - * - * All actual invocations of Thread.interrupt (see - * interruptIdleWorkers and interruptWorkers) ignore - * SecurityExceptions, meaning that the attempted interrupts - * silently fail. In the case of shutdown, they should not fail - * unless the SecurityManager has inconsistent policies, sometimes - * allowing access to a thread and sometimes not. In such cases, - * failure to actually interrupt threads may disable or delay full - * termination. Other uses of interruptIdleWorkers are advisory, - * and failure to actually interrupt will merely delay response to - * configuration changes so is not handled exceptionally. - */ - private static final RuntimePermission shutdownPerm = - new RuntimePermission("modifyThread"); - - /** - * Class Worker mainly maintains interrupt control state for - * threads running tasks, along with other minor bookkeeping. This - * class opportunistically extends ReentrantLock to simplify - * acquiring and releasing a lock surrounding each task execution. - * This protects against interrupts that are intended to wake up a - * worker thread waiting for a task from instead interrupting a - * task being run. - */ - public final class Worker extends ReentrantLock implements Runnable { - /** - * This class will never be serialized, but we provide a - * serialVersionUID to suppress a javac warning. - */ - private static final long serialVersionUID = 6138294804551838833L; - - /** Thread this worker is running in. Null if factory fails. */ - public final Thread thread; - /** Initial task to run. Possibly null. */ - Runnable firstTask; - /** Per-thread task counter */ - volatile long completedTasks; - - /** - * Creates with given first task and thread from ThreadFactory. - * @param firstTask the first task (null if none) - */ - Worker(Runnable firstTask) { - this.firstTask = firstTask; - this.thread = getThreadFactory().newThread(this); - } - - /** Delegates main run loop to outer runWorker */ - public void run() { - runWorker(this); - } - } - - /* - * Methods for setting control state - */ - - /** - * Transitions runState to given target, or leaves it alone if - * already at least the given target. - * - * @param targetState the desired state, either SHUTDOWN or STOP - * (but not TIDYING or TERMINATED -- use tryTerminate for that) - */ - private void advanceRunState(int targetState) { - for (;;) { - int c = ctl.get(); - if (runStateAtLeast(c, targetState) || - ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) - break; - } - } - - /** - * Transitions to TERMINATED state if either (SHUTDOWN and pool - * and queue empty) or (STOP and pool empty). If otherwise - * eligible to terminate but workerCount is nonzero, interrupts an - * idle worker to ensure that shutdown signals propagate. This - * method must be called following any action that might make - * termination possible -- reducing worker count or removing tasks - * from the queue during shutdown. The method is non-private to - * allow access from ScheduledThreadPoolExecutor. - */ - final void tryTerminate() { - for (;;) { - int c = ctl.get(); - if (isRunning(c) || - runStateAtLeast(c, TIDYING) || - (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) - return; - if (workerCountOf(c) != 0) { // Eligible to terminate - interruptIdleWorkers(ONLY_ONE); - return; - } - - final ReentrantLock mainLock = this.mainLock; - mainLock.lock(); - try { - if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { - try { - terminated(); - } finally { - ctl.set(ctlOf(TERMINATED, 0)); - termination.signalAll(); - } - return; - } - } finally { - mainLock.unlock(); - } - // else retry on failed CAS - } - } - - /* - * Methods for controlling interrupts to worker threads. - */ - - /** - * If there is a security manager, makes sure caller has - * permission to shut down threads in general (see shutdownPerm). - * If this passes, additionally makes sure the caller is allowed - * to interrupt each worker thread. This might not be true even if - * first check passed, if the SecurityManager treats some threads - * specially. - */ - private void checkShutdownAccess() { - SecurityManager security = System.getSecurityManager(); - if (security != null) { - security.checkPermission(shutdownPerm); - final ReentrantLock mainLock = this.mainLock; - mainLock.lock(); - try { - for (Iterator itr = workers.iterator(); itr.hasNext();) { - Worker w = (Worker)itr.next(); - security.checkAccess(w.thread); - } - } finally { - mainLock.unlock(); - } - } - } - - /** - * Interrupts all threads, even if active. Ignores SecurityExceptions - * (in which case some threads may remain uninterrupted). - */ - private void interruptWorkers() { - final ReentrantLock mainLock = this.mainLock; - mainLock.lock(); - try { - for (Iterator itr = workers.iterator(); itr.hasNext();) { - Worker w = (Worker)itr.next(); - try { - w.thread.interrupt(); - } catch (SecurityException ignore) { - } - } - } finally { - mainLock.unlock(); - } - } - - /** - * Interrupts threads that might be waiting for tasks (as - * indicated by not being locked) so they can check for - * termination or configuration changes. Ignores - * SecurityExceptions (in which case some threads may remain - * uninterrupted). - * - * @param onlyOne If true, interrupt at most one worker. This is - * called only from tryTerminate when termination is otherwise - * enabled but there are still other workers. In this case, at - * most one waiting worker is interrupted to propagate shutdown - * signals in case all threads are currently waiting. - * Interrupting any arbitrary thread ensures that newly arriving - * workers since shutdown began will also eventually exit. - * To guarantee eventual termination, it suffices to always - * interrupt only one idle worker, but shutdown() interrupts all - * idle workers so that redundant workers exit promptly, not - * waiting for a straggler task to finish. - */ - private void interruptIdleWorkers(boolean onlyOne) { - final ReentrantLock mainLock = this.mainLock; - mainLock.lock(); - try { - Iterator it = workers.iterator(); - while (it.hasNext()) { - Worker w = (Worker)it.next(); - Thread t = w.thread; - if (!t.isInterrupted() && w.tryLock()) { - try { - t.interrupt(); - } catch (SecurityException ignore) { - } finally { - w.unlock(); - } - } - if (onlyOne) - break; - } - } finally { - mainLock.unlock(); - } - } - - /** - * Common form of interruptIdleWorkers, to avoid having to - * remember what the boolean argument means. - */ - private void interruptIdleWorkers() { - interruptIdleWorkers(false); - } - - private static final boolean ONLY_ONE = true; - - /** - * Ensures that unless the pool is stopping, the current thread - * does not have its interrupt set. This requires a double-check - * of state in case the interrupt was cleared concurrently with a - * shutdownNow -- if so, the interrupt is re-enabled. - */ - private void clearInterruptsForTaskRun() { - if (runStateLessThan(ctl.get(), STOP) && - Thread.interrupted() && - runStateAtLeast(ctl.get(), STOP)) - Thread.currentThread().interrupt(); - } - - /* - * Misc utilities, most of which are also exported to - * ScheduledThreadPoolExecutor - */ - - /** - * Invokes the rejected execution handler for the given command. - * Package-protected for use by ScheduledThreadPoolExecutor. - */ - final void reject(Runnable command) { - handler.rejectedExecution(command, this); - } - - /** - * Performs any further cleanup following run state transition on - * invocation of shutdown. A no-op here, but used by - * ScheduledThreadPoolExecutor to cancel delayed tasks. - */ - void onShutdown() { - } - - /** - * State check needed by ScheduledThreadPoolExecutor to - * enable running tasks during shutdown. - * - * @param shutdownOK true if should return true if SHUTDOWN - */ - final boolean isRunningOrShutdown(boolean shutdownOK) { - int rs = runStateOf(ctl.get()); - return rs == RUNNING || (rs == SHUTDOWN && shutdownOK); - } - - /** - * Drains the task queue into a new list, normally using - * drainTo. But if the queue is a DelayQueue or any other kind of - * queue for which poll or drainTo may fail to remove some - * elements, it deletes them one by one. - */ - private List drainQueue() { - BlockingQueue q = workQueue; - List<Runnable> taskList = new ArrayList<Runnable>(); - q.drainTo(taskList); - if (!q.isEmpty()) { - Runnable[] arr = (Runnable[])q.toArray(new Runnable[0]); - for (int i=0; i<arr.length; i++) { - Runnable r = arr[i]; - if (q.remove(r)) - taskList.add(r); - } - } - return taskList; - } - - /* - * Methods for creating, running and cleaning up after workers - */ - - /** - * Checks if a new worker can be added with respect to current - * pool state and the given bound (either core or maximum). If so, - * the worker count is adjusted accordingly, and, if possible, a - * new worker is created and started running firstTask as its - * first task. This method returns false if the pool is stopped or - * eligible to shut down. It also returns false if the thread - * factory fails to create a thread when asked, which requires a - * backout of workerCount, and a recheck for termination, in case - * the existence of this worker was holding up termination. - * - * @param firstTask the task the new thread should run first (or - * null if none). Workers are created with an initial first task - * (in method execute()) to bypass queuing when there are fewer - * than corePoolSize threads (in which case we always start one), - * or when the queue is full (in which case we must bypass queue). - * Initially idle threads are usually created via - * prestartCoreThread or to replace other dying workers. - * - * @param core if true use corePoolSize as bound, else - * maximumPoolSize. (A boolean indicator is used here rather than a - * value to ensure reads of fresh values after checking other pool - * state). - * @return true if successful - */ - private boolean addWorker(Runnable firstTask, boolean core) { - retry: - for (;;) { - int c = ctl.get(); - int rs = runStateOf(c); - - // Check if queue empty only if necessary. - if (rs >= SHUTDOWN && - ! (rs == SHUTDOWN && - firstTask == null && - ! workQueue.isEmpty())) - return false; - - for (;;) { - int wc = workerCountOf(c); - if (wc >= CAPACITY || - wc >= (core ? corePoolSize : maximumPoolSize)) - return false; - if (compareAndIncrementWorkerCount(c)) - break retry; - c = ctl.get(); // Re-read ctl - if (runStateOf(c) != rs) - continue retry; - // else CAS failed due to workerCount change; retry inner loop - } - } - - Worker w = new Worker(firstTask); - Thread t = w.thread; - - final ReentrantLock mainLock = this.mainLock; - mainLock.lock(); - try { - // Recheck while holding lock. - // Back out on ThreadFactory failure or if - // shut down before lock acquired. - int c = ctl.get(); - int rs = runStateOf(c); - - if (t == null || - (rs >= SHUTDOWN && - ! (rs == SHUTDOWN && - firstTask == null))) { - decrementWorkerCount(); - tryTerminate(); - return false; - } - - workers.add(w); - - int s = workers.size(); - if (s > largestPoolSize) - largestPoolSize = s; - } finally { - mainLock.unlock(); - } - - t.start(); - // It is possible (but unlikely) for a thread to have been - // added to workers, but not yet started, during transition to - // STOP, which could result in a rare missed interrupt, - // because Thread.interrupt is not guaranteed to have any effect - // on a non-yet-started Thread (see Thread#interrupt). - if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted()) - t.interrupt(); - - return true; - } - - /** - * Performs cleanup and bookkeeping for a dying worker. Called - * only from worker threads. Unless completedAbruptly is set, - * assumes that workerCount has already been adjusted to account - * for exit. This method removes thread from worker set, and - * possibly terminates the pool or replaces the worker if either - * it exited due to user task exception or if fewer than - * corePoolSize workers are running or queue is non-empty but - * there are no workers. - * - * @param w the worker - * @param completedAbruptly if the worker died due to user exception - */ - private void processWorkerExit(Worker w, boolean completedAbruptly) { - if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted - decrementWorkerCount(); - - final ReentrantLock mainLock = this.mainLock; - mainLock.lock(); - try { - completedTaskCount += w.completedTasks; - workers.remove(w); - } finally { - mainLock.unlock(); - } - - tryTerminate(); - - int c = ctl.get(); - if (runStateLessThan(c, STOP)) { - if (!completedAbruptly) { - int min = allowCoreThreadTimeOut ? 0 : corePoolSize; - if (min == 0 && ! workQueue.isEmpty()) - min = 1; - if (workerCountOf(c) >= min) - return; // replacement not needed - } - addWorker(null, false); - } - } - - /** - * Performs blocking or timed wait for a task, depending on - * current configuration settings, or returns null if this worker - * must exit because of any of: - * 1. There are more than maximumPoolSize workers (due to - * a call to setMaximumPoolSize). - * 2. The pool is stopped. - * 3. The pool is shutdown and the queue is empty. - * 4. This worker timed out waiting for a task, and timed-out - * workers are subject to termination (that is, - * {@code allowCoreThreadTimeOut || workerCount > corePoolSize}) - * both before and after the timed wait. - * - * @return task, or null if the worker must exit, in which case - * workerCount is decremented - */ - private Runnable getTask() { - boolean timedOut = false; // Did the last poll() time out? - - retry: - for (;;) { - int c = ctl.get(); - int rs = runStateOf(c); - - // Check if queue empty only if necessary. - if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { - decrementWorkerCount(); - return null; - } - - boolean timed; // Are workers subject to culling? - - for (;;) { - int wc = workerCountOf(c); - timed = allowCoreThreadTimeOut || wc > corePoolSize; - - if (wc <= maximumPoolSize && ! (timedOut && timed)) - break; - if (compareAndDecrementWorkerCount(c)) - return null; - c = ctl.get(); // Re-read ctl - if (runStateOf(c) != rs) - continue retry; - // else CAS failed due to workerCount change; retry inner loop - } - - try { - Runnable r = timed ? - (Runnable)workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : - (Runnable)workQueue.take(); - if (r != null) - return r; - timedOut = true; - } catch (InterruptedException retry) { - timedOut = false; - } - } - } - - /** - * Main worker run loop. Repeatedly gets tasks from queue and - * executes them, while coping with a number of issues: - * - * 1. We may start out with an initial task, in which case we - * don't need to get the first one. Otherwise, as long as pool is - * running, we get tasks from getTask. If it returns null then the - * worker exits due to changed pool state or configuration - * parameters. Other exits result from exception throws in - * external code, in which case completedAbruptly holds, which - * usually leads processWorkerExit to replace this thread. - * - * 2. Before running any task, the lock is acquired to prevent - * other pool interrupts while the task is executing, and - * clearInterruptsForTaskRun called to ensure that unless pool is - * stopping, this thread does not have its interrupt set. - * - * 3. Each task run is preceded by a call to beforeExecute, which - * might throw an exception, in which case we cause thread to die - * (breaking loop with completedAbruptly true) without processing - * the task. - * - * 4. Assuming beforeExecute completes normally, we run the task, - * gathering any of its thrown exceptions to send to - * afterExecute. We separately handle RuntimeException, Error - * (both of which the specs guarantee that we trap) and arbitrary - * Throwables. Because we cannot rethrow Throwables within - * Runnable.run, we wrap them within Errors on the way out (to the - * thread's UncaughtExceptionHandler). Any thrown exception also - * conservatively causes thread to die. - * - * 5. After task.run completes, we call afterExecute, which may - * also throw an exception, which will also cause thread to - * die. According to JLS Sec 14.20, this exception is the one that - * will be in effect even if task.run throws. - * - * The net effect of the exception mechanics is that afterExecute - * and the thread's UncaughtExceptionHandler have as accurate - * information as we can provide about any problems encountered by - * user code. - * - * @param w the worker - */ - final void runWorker(Worker w) { - Runnable task = w.firstTask; - w.firstTask = null; - boolean completedAbruptly = true; - try { - while (task != null || (task = getTask()) != null) { - w.lock(); - clearInterruptsForTaskRun(); - try { - beforeExecute(w.thread, task); - Throwable thrown = null; - try { - task.run(); - } catch (RuntimeException x) { - thrown = x; throw x; - } catch (Error x) { - thrown = x; throw x; - } catch (Throwable x) { - thrown = x; throw new Error(x); - } finally { - afterExecute(task, thrown); - } - } finally { - task = null; - w.completedTasks++; - w.unlock(); - } - } - completedAbruptly = false; - } finally { - processWorkerExit(w, completedAbruptly); - } - } - - // Public constructors and methods - - /** - * Creates a new {@code ThreadPoolExecutor} with the given initial - * parameters and default thread factory and rejected execution handler. - * It may be more convenient to use one of the {@link Executors} factory - * methods instead of this general purpose constructor. - * - * @param corePoolSize the number of threads to keep in the pool, even - * if they are idle, unless {@code allowCoreThreadTimeOut} is set - * @param maximumPoolSize the maximum number of threads to allow in the - * pool - * @param keepAliveTime when the number of threads is greater than - * the core, this is the maximum time that excess idle threads - * will wait for new tasks before terminating. - * @param unit the time unit for the {@code keepAliveTime} argument - * @param workQueue the queue to use for holding tasks before they are - * executed. This queue will hold only the {@code Runnable} - * tasks submitted by the {@code execute} method. - * @throws IllegalArgumentException if one of the following holds:<br> - * {@code corePoolSize < 0}<br> - * {@code keepAliveTime < 0}<br> - * {@code maximumPoolSize <= 0}<br> - * {@code maximumPoolSize < corePoolSize} - * @throws NullPointerException if {@code workQueue} is null - */ - public ThreadPoolExecutor(int corePoolSize, - int maximumPoolSize, - long keepAliveTime, - TimeUnit unit, - BlockingQueue workQueue) { - this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, - Executors.defaultThreadFactory(), defaultHandler); - } - - /** - * Creates a new {@code ThreadPoolExecutor} with the given initial - * parameters and default rejected execution handler. - * - * @param corePoolSize the number of threads to keep in the pool, even - * if they are idle, unless {@code allowCoreThreadTimeOut} is set - * @param maximumPoolSize the maximum number of threads to allow in the - * pool - * @param keepAliveTime when the number of threads is greater than - * the core, this is the maximum time that excess idle threads - * will wait for new tasks before terminating. - * @param unit the time unit for the {@code keepAliveTime} argument - * @param workQueue the queue to use for holding tasks before they are - * executed. This queue will hold only the {@code Runnable} - * tasks submitted by the {@code execute} method. - * @param threadFactory the factory to use when the executor - * creates a new thread - * @throws IllegalArgumentException if one of the following holds:<br> - * {@code corePoolSize < 0}<br> - * {@code keepAliveTime < 0}<br> - * {@code maximumPoolSize <= 0}<br> - * {@code maximumPoolSize < corePoolSize} - * @throws NullPointerException if {@code workQueue} - * or {@code threadFactory} is null - */ - public ThreadPoolExecutor(int corePoolSize, - int maximumPoolSize, - long keepAliveTime, - TimeUnit unit, - BlockingQueue workQueue, - ThreadFactory threadFactory) { - this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, - threadFactory, defaultHandler); - } - - /** - * Creates a new {@code ThreadPoolExecutor} with the given initial - * parameters and default thread factory. - * - * @param corePoolSize the number of threads to keep in the pool, even - * if they are idle, unless {@code allowCoreThreadTimeOut} is set - * @param maximumPoolSize the maximum number of threads to allow in the - * pool - * @param keepAliveTime when the number of threads is greater than - * the core, this is the maximum time that excess idle threads - * will wait for new tasks before terminating. - * @param unit the time unit for the {@code keepAliveTime} argument - * @param workQueue the queue to use for holding tasks before they are - * executed. This queue will hold only the {@code Runnable} - * tasks submitted by the {@code execute} method. - * @param handler the handler to use when execution is blocked - * because the thread bounds and queue capacities are reached - * @throws IllegalArgumentException if one of the following holds:<br> - * {@code corePoolSize < 0}<br> - * {@code keepAliveTime < 0}<br> - * {@code maximumPoolSize <= 0}<br> - * {@code maximumPoolSize < corePoolSize} - * @throws NullPointerException if {@code workQueue} - * or {@code handler} is null - */ - public ThreadPoolExecutor(int corePoolSize, - int maximumPoolSize, - long keepAliveTime, - TimeUnit unit, - BlockingQueue workQueue, - RejectedExecutionHandler handler) { - this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, - Executors.defaultThreadFactory(), handler); - } - - /** - * Creates a new {@code ThreadPoolExecutor} with the given initial - * parameters. - * - * @param corePoolSize the number of threads to keep in the pool, even - * if they are idle, unless {@code allowCoreThreadTimeOut} is set - * @param maximumPoolSize the maximum number of threads to allow in the - * pool - * @param keepAliveTime when the number of threads is greater than - * the core, this is the maximum time that excess idle threads - * will wait for new tasks before terminating. - * @param unit the time unit for the {@code keepAliveTime} argument - * @param workQueue the queue to use for holding tasks before they are - * executed. This queue will hold only the {@code Runnable} - * tasks submitted by the {@code execute} method. - * @param threadFactory the factory to use when the executor - * creates a new thread - * @param handler the handler to use when execution is blocked - * because the thread bounds and queue capacities are reached - * @throws IllegalArgumentException if one of the following holds:<br> - * {@code corePoolSize < 0}<br> - * {@code keepAliveTime < 0}<br> - * {@code maximumPoolSize <= 0}<br> - * {@code maximumPoolSize < corePoolSize} - * @throws NullPointerException if {@code workQueue} - * or {@code threadFactory} or {@code handler} is null - */ - public ThreadPoolExecutor(int corePoolSize, - int maximumPoolSize, - long keepAliveTime, - TimeUnit unit, - BlockingQueue workQueue, - ThreadFactory threadFactory, - RejectedExecutionHandler handler) { - if (corePoolSize < 0 || - maximumPoolSize <= 0 || - maximumPoolSize < corePoolSize || - keepAliveTime < 0) - throw new IllegalArgumentException(); - if (workQueue == null || threadFactory == null || handler == null) - throw new NullPointerException(); - this.corePoolSize = corePoolSize; - this.maximumPoolSize = maximumPoolSize; - this.workQueue = workQueue; - this.keepAliveTime = unit.toNanos(keepAliveTime); - this.threadFactory = threadFactory; - this.handler = handler; - } - - /** - * Executes the given task sometime in the future. The task - * may execute in a new thread or in an existing pooled thread. - * - * If the task cannot be submitted for execution, either because this - * executor has been shutdown or because its capacity has been reached, - * the task is handled by the current {@code RejectedExecutionHandler}. - * - * @param command the task to execute - * @throws RejectedExecutionException at discretion of - * {@code RejectedExecutionHandler}, if the task - * cannot be accepted for execution - * @throws NullPointerException if {@code command} is null - */ - public void execute(Runnable command) { - if (command == null) - throw new NullPointerException(); - /* - * Proceed in 3 steps: - * - * 1. If fewer than corePoolSize threads are running, try to - * start a new thread with the given command as its first - * task. The call to addWorker atomically checks runState and - * workerCount, and so prevents false alarms that would add - * threads when it shouldn't, by returning false. - * - * 2. If a task can be successfully queued, then we still need - * to double-check whether we should have added a thread - * (because existing ones died since last checking) or that - * the pool shut down since entry into this method. So we - * recheck state and if necessary roll back the enqueuing if - * stopped, or start a new thread if there are none. - * - * 3. If we cannot queue task, then we try to add a new - * thread. If it fails, we know we are shut down or saturated - * and so reject the task. - */ - int c = ctl.get(); - if (workerCountOf(c) < corePoolSize) { - if (addWorker(command, true)) - return; - c = ctl.get(); - } - if (isRunning(c) && workQueue.offer(command)) { - int recheck = ctl.get(); - if (! isRunning(recheck) && remove(command)) - reject(command); - else if (workerCountOf(recheck) == 0) - addWorker(null, false); - } - else if (!addWorker(command, false)) - reject(command); - } - - /** - * Initiates an orderly shutdown in which previously submitted - * tasks are executed, but no new tasks will be accepted. - * Invocation has no additional effect if already shut down. - * - * @throws SecurityException {@inheritDoc} - */ - public void shutdown() { - final ReentrantLock mainLock = this.mainLock; - mainLock.lock(); - try { - checkShutdownAccess(); - advanceRunState(SHUTDOWN); - interruptIdleWorkers(); - onShutdown(); // hook for ScheduledThreadPoolExecutor - } finally { - mainLock.unlock(); - } - tryTerminate(); - } - - /** - * Attempts to stop all actively executing tasks, halts the - * processing of waiting tasks, and returns a list of the tasks - * that were awaiting execution. These tasks are drained (removed) - * from the task queue upon return from this method. - * - * <p>There are no guarantees beyond best-effort attempts to stop - * processing actively executing tasks. This implementation - * cancels tasks via {@link Thread#interrupt}, so any task that - * fails to respond to interrupts may never terminate. - * - * @throws SecurityException {@inheritDoc} - */ - public List shutdownNow() { - List tasks; - final ReentrantLock mainLock = this.mainLock; - mainLock.lock(); - try { - checkShutdownAccess(); - advanceRunState(STOP); - interruptWorkers(); - tasks = drainQueue(); - } finally { - mainLock.unlock(); - } - tryTerminate(); - return tasks; - } - - public boolean isShutdown() { - return ! isRunning(ctl.get()); - } - - /** - * Returns true if this executor is in the process of terminating - * after {@link #shutdown} or {@link #shutdownNow} but has not - * completely terminated. This method may be useful for - * debugging. A return of {@code true} reported a sufficient - * period after shutdown may indicate that submitted tasks have - * ignored or suppressed interruption, causing this executor not - * to properly terminate. - * - * @return true if terminating but not yet terminated - */ - public boolean isTerminating() { - int c = ctl.get(); - return ! isRunning(c) && runStateLessThan(c, TERMINATED); - } - - public boolean isTerminated() { - return runStateAtLeast(ctl.get(), TERMINATED); - } - - public boolean awaitTermination(long timeout, TimeUnit unit) - throws InterruptedException { - long nanos = unit.toNanos(timeout); - long deadline = Utils.nanoTime() + nanos; - final ReentrantLock mainLock = this.mainLock; - mainLock.lock(); - try { - if (runStateAtLeast(ctl.get(), TERMINATED)) - return true; - while (nanos > 0) { - termination.await(nanos, TimeUnit.NANOSECONDS); - if (runStateAtLeast(ctl.get(), TERMINATED)) - return true; - nanos = deadline - Utils.nanoTime(); - } - return false; - } finally { - mainLock.unlock(); - } - } - - /** - * Invokes {@code shutdown} when this executor is no longer - * referenced and it has no threads. - */ - protected void finalize() { - shutdown(); - } - - /** - * Sets the thread factory used to create new threads. - * - * @param threadFactory the new thread factory - * @throws NullPointerException if threadFactory is null - * @see #getThreadFactory - */ - public void setThreadFactory(ThreadFactory threadFactory) { - if (threadFactory == null) - throw new NullPointerException(); - this.threadFactory = threadFactory; - } - - /** - * Returns the thread factory used to create new threads. - * - * @return the current thread factory - * @see #setThreadFactory - */ - public ThreadFactory getThreadFactory() { - return threadFactory; - } - - /** - * Sets a new handler for unexecutable tasks. - * - * @param handler the new handler - * @throws NullPointerException if handler is null - * @see #getRejectedExecutionHandler - */ - public void setRejectedExecutionHandler(RejectedExecutionHandler handler) { - if (handler == null) - throw new NullPointerException(); - this.handler = handler; - } - - /** - * Returns the current handler for unexecutable tasks. - * - * @return the current handler - * @see #setRejectedExecutionHandler - */ - public RejectedExecutionHandler getRejectedExecutionHandler() { - return handler; - } - - /** - * Sets the core number of threads. This overrides any value set - * in the constructor. If the new value is smaller than the - * current value, excess existing threads will be terminated when - * they next become idle. If larger, new threads will, if needed, - * be started to execute any queued tasks. - * - * @param corePoolSize the new core size - * @throws IllegalArgumentException if {@code corePoolSize < 0} - * @see #getCorePoolSize - */ - public void setCorePoolSize(int corePoolSize) { - if (corePoolSize < 0) - throw new IllegalArgumentException(); - int delta = corePoolSize - this.corePoolSize; - this.corePoolSize = corePoolSize; - if (workerCountOf(ctl.get()) > corePoolSize) - interruptIdleWorkers(); - else if (delta > 0) { - // We don't really know how many new threads are "needed". - // As a heuristic, prestart enough new workers (up to new - // core size) to handle the current number of tasks in - // queue, but stop if queue becomes empty while doing so. - int k = Math.min(delta, workQueue.size()); - while (k-- > 0 && addWorker(null, true)) { - if (workQueue.isEmpty()) - break; - } - } - } - - /** - * Returns the core number of threads. - * - * @return the core number of threads - * @see #setCorePoolSize - */ - public int getCorePoolSize() { - return corePoolSize; - } - - /** - * Starts a core thread, causing it to idly wait for work. This - * overrides the default policy of starting core threads only when - * new tasks are executed. This method will return {@code false} - * if all core threads have already been started. - * - * @return {@code true} if a thread was started - */ - public boolean prestartCoreThread() { - return workerCountOf(ctl.get()) < corePoolSize && - addWorker(null, true); - } - - /** - * Starts all core threads, causing them to idly wait for work. This - * overrides the default policy of starting core threads only when - * new tasks are executed. - * - * @return the number of threads started - */ - public int prestartAllCoreThreads() { - int n = 0; - while (addWorker(null, true)) - ++n; - return n; - } - - /** - * Returns true if this pool allows core threads to time out and - * terminate if no tasks arrive within the keepAlive time, being - * replaced if needed when new tasks arrive. When true, the same - * keep-alive policy applying to non-core threads applies also to - * core threads. When false (the default), core threads are never - * terminated due to lack of incoming tasks. - * - * @return {@code true} if core threads are allowed to time out, - * else {@code false} - * - * @since 1.6 - */ - public boolean allowsCoreThreadTimeOut() { - return allowCoreThreadTimeOut; - } - - /** - * Sets the policy governing whether core threads may time out and - * terminate if no tasks arrive within the keep-alive time, being - * replaced if needed when new tasks arrive. When false, core - * threads are never terminated due to lack of incoming - * tasks. When true, the same keep-alive policy applying to - * non-core threads applies also to core threads. To avoid - * continual thread replacement, the keep-alive time must be - * greater than zero when setting {@code true}. This method - * should in general be called before the pool is actively used. - * - * @param value {@code true} if should time out, else {@code false} - * @throws IllegalArgumentException if value is {@code true} - * and the current keep-alive time is not greater than zero - * - * @since 1.6 - */ - public void allowCoreThreadTimeOut(boolean value) { - if (value && keepAliveTime <= 0) - throw new IllegalArgumentException("Core threads must have nonzero keep alive times"); - if (value != allowCoreThreadTimeOut) { - allowCoreThreadTimeOut = value; - if (value) - interruptIdleWorkers(); - } - } - - /** - * Sets the maximum allowed number of threads. This overrides any - * value set in the constructor. If the new value is smaller than - * the current value, excess existing threads will be - * terminated when they next become idle. - * - * @param maximumPoolSize the new maximum - * @throws IllegalArgumentException if the new maximum is - * less than or equal to zero, or - * less than the {@linkplain #getCorePoolSize core pool size} - * @see #getMaximumPoolSize - */ - public void setMaximumPoolSize(int maximumPoolSize) { - if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) - throw new IllegalArgumentException(); - this.maximumPoolSize = maximumPoolSize; - if (workerCountOf(ctl.get()) > maximumPoolSize) - interruptIdleWorkers(); - } - - /** - * Returns the maximum allowed number of threads. - * - * @return the maximum allowed number of threads - * @see #setMaximumPoolSize - */ - public int getMaximumPoolSize() { - return maximumPoolSize; - } - - /** - * Sets the time limit for which threads may remain idle before - * being terminated. If there are more than the core number of - * threads currently in the pool, after waiting this amount of - * time without processing a task, excess threads will be - * terminated. This overrides any value set in the constructor. - * - * @param time the time to wait. A time value of zero will cause - * excess threads to terminate immediately after executing tasks. - * @param unit the time unit of the {@code time} argument - * @throws IllegalArgumentException if {@code time} less than zero or - * if {@code time} is zero and {@code allowsCoreThreadTimeOut} - * @see #getKeepAliveTime - */ - public void setKeepAliveTime(long time, TimeUnit unit) { - if (time < 0) - throw new IllegalArgumentException(); - if (time == 0 && allowsCoreThreadTimeOut()) - throw new IllegalArgumentException("Core threads must have nonzero keep alive times"); - long keepAliveTime = unit.toNanos(time); - long delta = keepAliveTime - this.keepAliveTime; - this.keepAliveTime = keepAliveTime; - if (delta < 0) - interruptIdleWorkers(); - } - - /** - * Returns the thread keep-alive time, which is the amount of time - * that threads in excess of the core pool size may remain - * idle before being terminated. - * - * @param unit the desired time unit of the result - * @return the time limit - * @see #setKeepAliveTime - */ - public long getKeepAliveTime(TimeUnit unit) { - return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS); - } - - /* User-level queue utilities */ - - /** - * Returns the task queue used by this executor. Access to the - * task queue is intended primarily for debugging and monitoring. - * This queue may be in active use. Retrieving the task queue - * does not prevent queued tasks from executing. - * - * @return the task queue - */ - public BlockingQueue getQueue() { - return workQueue; - } - - /** - * Removes this task from the executor's internal queue if it is - * present, thus causing it not to be run if it has not already - * started. - * - * <p> This method may be useful as one part of a cancellation - * scheme. It may fail to remove tasks that have been converted - * into other forms before being placed on the internal queue. For - * example, a task entered using {@code submit} might be - * converted into a form that maintains {@code Future} status. - * However, in such cases, method {@link #purge} may be used to - * remove those Futures that have been cancelled. - * - * @param task the task to remove - * @return true if the task was removed - */ - public boolean remove(Runnable task) { - boolean removed = workQueue.remove(task); - tryTerminate(); // In case SHUTDOWN and now empty - return removed; - } - - /** - * Tries to remove from the work queue all {@link Future} - * tasks that have been cancelled. This method can be useful as a - * storage reclamation operation, that has no other impact on - * functionality. Cancelled tasks are never executed, but may - * accumulate in work queues until worker threads can actively - * remove them. Invoking this method instead tries to remove them now. - * However, this method may fail to remove tasks in - * the presence of interference by other threads. - */ - public void purge() { - final BlockingQueue q = workQueue; - try { - Iterator it = q.iterator(); - while (it.hasNext()) { - Runnable r = (Runnable)it.next(); - if (r instanceof Future && ((Future)r).isCancelled()) - it.remove(); - } - } catch (ConcurrentModificationException fallThrough) { - // Take slow path if we encounter interference during traversal. - // Make copy for traversal and call remove for cancelled entries. - // The slow path is more likely to be O(N*N). - Object[] arr = q.toArray(); - for (int i=0; i<arr.length; i++) { - Object r = arr[i]; - if (r instanceof Future && ((Future)r).isCancelled()) - q.remove(r); - } - } - - tryTerminate(); // In case SHUTDOWN and now empty - } - - /* Statistics */ - - /** - * Returns the current number of threads in the pool. - * - * @return the number of threads - */ - public int getPoolSize() { - final ReentrantLock mainLock = this.mainLock; - mainLock.lock(); - try { - // Remove rare and surprising possibility of - // isTerminated() && getPoolSize() > 0 - return runStateAtLeast(ctl.get(), TIDYING) ? 0 - : workers.size(); - } finally { - mainLock.unlock(); - } - } - - /** - * Returns the approximate number of threads that are actively - * executing tasks. - * - * @return the number of threads - */ - public int getActiveCount() { - final ReentrantLock mainLock = this.mainLock; - mainLock.lock(); - try { - int n = 0; - for (Iterator itr = workers.iterator(); itr.hasNext();) { - Worker w = (Worker)itr.next(); - if (w.isLocked()) - ++n; - } - return n; - } finally { - mainLock.unlock(); - } - } - - /** - * Returns the largest number of threads that have ever - * simultaneously been in the pool. - * - * @return the number of threads - */ - public int getLargestPoolSize() { - final ReentrantLock mainLock = this.mainLock; - mainLock.lock(); - try { - return largestPoolSize; - } finally { - mainLock.unlock(); - } - } - - /** - * Returns the approximate total number of tasks that have ever been - * scheduled for execution. Because the states of tasks and - * threads may change dynamically during computation, the returned - * value is only an approximation. - * - * @return the number of tasks - */ - public long getTaskCount() { - final ReentrantLock mainLock = this.mainLock; - mainLock.lock(); - try { - long n = completedTaskCount; - for (Iterator itr = workers.iterator(); itr.hasNext();) { - Worker w = (Worker)itr.next(); - n += w.completedTasks; - if (w.isLocked()) - ++n; - } - return n + workQueue.size(); - } finally { - mainLock.unlock(); - } - } - - /** - * Returns the approximate total number of tasks that have - * completed execution. Because the states of tasks and threads - * may change dynamically during computation, the returned value - * is only an approximation, but one that does not ever decrease - * across successive calls. - * - * @return the number of tasks - */ - public long getCompletedTaskCount() { - final ReentrantLock mainLock = this.mainLock; - mainLock.lock(); - try { - long n = completedTaskCount; - for (Iterator itr = workers.iterator(); itr.hasNext();) { - Worker w = (Worker)itr.next(); - n += w.completedTasks; - } - return n; - } finally { - mainLock.unlock(); - } - } - - /* Extension hooks */ - - /** - * Method invoked prior to executing the given Runnable in the - * given thread. This method is invoked by thread {@code t} that - * will execute task {@code r}, and may be used to re-initialize - * ThreadLocals, or to perform logging. - * - * <p>This implementation does nothing, but may be customized in - * subclasses. Note: To properly nest multiple overridings, subclasses - * should generally invoke {@code super.beforeExecute} at the end of - * this method. - * - * @param t the thread that will run task {@code r} - * @param r the task that will be executed - */ - protected void beforeExecute(Thread t, Runnable r) { } - - /** - * Method invoked upon completion of execution of the given Runnable. - * This method is invoked by the thread that executed the task. If - * non-null, the Throwable is the uncaught {@code RuntimeException} - * or {@code Error} that caused execution to terminate abruptly. - * - * <p>This implementation does nothing, but may be customized in - * subclasses. Note: To properly nest multiple overridings, subclasses - * should generally invoke {@code super.afterExecute} at the - * beginning of this method. - * - * <p><b>Note:</b> When actions are enclosed in tasks (such as - * {@link FutureTask}) either explicitly or via methods such as - * {@code submit}, these task objects catch and maintain - * computational exceptions, and so they do not cause abrupt - * termination, and the internal exceptions are <em>not</em> - * passed to this method. If you would like to trap both kinds of - * failures in this method, you can further probe for such cases, - * as in this sample subclass that prints either the direct cause - * or the underlying exception if a task has been aborted: - * - * <pre> {@code - * class ExtendedExecutor extends ThreadPoolExecutor { - * // ... - * protected void afterExecute(Runnable r, Throwable t) { - * super.afterExecute(r, t); - * if (t == null && r instanceof Future<?>) { - * try { - * Object result = ((Future<?>) r).get(); - * } catch (CancellationException ce) { - * t = ce; - * } catch (ExecutionException ee) { - * t = ee.getCause(); - * } catch (InterruptedException ie) { - * Thread.currentThread().interrupt(); // ignore/reset - * } - * } - * if (t != null) - * System.out.println(t); - * } - * }}</pre> - * - * @param r the runnable that has completed - * @param t the exception that caused termination, or null if - * execution completed normally - */ - protected void afterExecute(Runnable r, Throwable t) { } - - /** - * Method invoked when the Executor has terminated. Default - * implementation does nothing. Note: To properly nest multiple - * overridings, subclasses should generally invoke - * {@code super.terminated} within this method. - */ - protected void terminated() { } - - /* Predefined RejectedExecutionHandlers */ - - /** - * A handler for rejected tasks that runs the rejected task - * directly in the calling thread of the {@code execute} method, - * unless the executor has been shut down, in which case the task - * is discarded. - */ - public static class CallerRunsPolicy implements RejectedExecutionHandler { - /** - * Creates a {@code CallerRunsPolicy}. - */ - public CallerRunsPolicy() { } - - /** - * Executes task r in the caller's thread, unless the executor - * has been shut down, in which case the task is discarded. - * - * @param r the runnable task requested to be executed - * @param e the executor attempting to execute this task - */ - public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { - if (!e.isShutdown()) { - r.run(); - } - } - } - - /** - * A handler for rejected tasks that throws a - * {@code RejectedExecutionException}. - */ - public static class AbortPolicy implements RejectedExecutionHandler { - /** - * Creates an {@code AbortPolicy}. - */ - public AbortPolicy() { } - - /** - * Always throws RejectedExecutionException. - * - * @param r the runnable task requested to be executed - * @param e the executor attempting to execute this task - * @throws RejectedExecutionException always. - */ - public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { - throw new RejectedExecutionException(); - } - } - - /** - * A handler for rejected tasks that silently discards the - * rejected task. - */ - public static class DiscardPolicy implements RejectedExecutionHandler { - /** - * Creates a {@code DiscardPolicy}. - */ - public DiscardPolicy() { } - - /** - * Does nothing, which has the effect of discarding task r. - * - * @param r the runnable task requested to be executed - * @param e the executor attempting to execute this task - */ - public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { - } - } - - /** - * A handler for rejected tasks that discards the oldest unhandled - * request and then retries {@code execute}, unless the executor - * is shut down, in which case the task is discarded. - */ - public static class DiscardOldestPolicy implements RejectedExecutionHandler { - /** - * Creates a {@code DiscardOldestPolicy} for the given executor. - */ - public DiscardOldestPolicy() { } - - /** - * Obtains and ignores the next task that the executor - * would otherwise execute, if one is immediately available, - * and then retries execution of task r, unless the executor - * is shut down, in which case task r is instead discarded. - * - * @param r the runnable task requested to be executed - * @param e the executor attempting to execute this task - */ - public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { - if (!e.isShutdown()) { - e.getQueue().poll(); - e.execute(r); - } - } - } -} |