diff options
author | Philipp Haller <hallerp@gmail.com> | 2009-10-27 09:43:44 +0000 |
---|---|---|
committer | Philipp Haller <hallerp@gmail.com> | 2009-10-27 09:43:44 +0000 |
commit | 42a42ac0c346d48594958ac80d5eb814b8aa0c39 (patch) | |
tree | b4e34bd91d434376236c18d4d1528f46559d5704 /src | |
parent | f9551d0c2f92ded7ef574121d89cda742e407f2d (diff) | |
download | scala-42a42ac0c346d48594958ac80d5eb814b8aa0c39.tar.gz scala-42a42ac0c346d48594958ac80d5eb814b8aa0c39.tar.bz2 scala-42a42ac0c346d48594958ac80d5eb814b8aa0c39.zip |
Removed old fork-join library. Fixed build.
Diffstat (limited to 'src')
-rw-r--r-- | src/actors/scala/actors/FJTask.java | 531 | ||||
-rw-r--r-- | src/actors/scala/actors/FJTaskRunner.java | 996 | ||||
-rw-r--r-- | src/actors/scala/actors/FJTaskRunnerGroup.java | 714 | ||||
-rw-r--r-- | src/actors/scala/actors/FJTaskScheduler2.scala | 133 | ||||
-rw-r--r-- | src/actors/scala/actors/IFJTaskRunnerGroup.java | 18 |
5 files changed, 0 insertions, 2392 deletions
diff --git a/src/actors/scala/actors/FJTask.java b/src/actors/scala/actors/FJTask.java deleted file mode 100644 index 6398f1400e..0000000000 --- a/src/actors/scala/actors/FJTask.java +++ /dev/null @@ -1,531 +0,0 @@ -/* - File: Task.java - - Originally written by Doug Lea and released into the public domain. - This may be used for any purposes whatsoever without acknowledgment. - Thanks for the assistance and support of Sun Microsystems Labs, - and everyone contributing, testing, and using this code. - - History: - Date Who What - 7Jan1999 dl first release - 14jan1999 dl simplify start() semantics; - improve documentation - 18Jan1999 dl Eliminate useless time-based waits. - 7Mar1999 dl Add reset method, - add array-based composite operations - 27Apr1999 dl Rename -*/ - -package scala.actors; - - -/** - * Abstract base class for Fork/Join Tasks. - * - * <p> - * FJTasks are lightweight, stripped-down analogs of Threads. - * Many FJTasks share the same pool of Java threads. This is - * supported by the FJTaskRunnerGroup and FJTaskRunner classes, that - * mainly contain - * methods called only internally by FJTasks. - * FJTasks support versions of the most common methods found in class Thread, - * including start(), yield() and join(). However, they - * don't support priorities, ThreadGroups or other bookkeeping - * or control methods of class Thread. - * <p> - * FJTasks should normally be defined by subclassing and adding a run() method. - * Alternatively, static inner class <code>Wrap(Runnable r)</code> - * can be used to - * wrap an existing Runnable object in a FJTask. - * <p> - * <code>FJTaskRunnerGroup.execute(FJTask)</code> can be used to - * initiate a FJTask from a non-FJTask thread. - * And <code>FJTaskRunnerGroup.invoke(FJTask)</code> can be used to initiate - * a FJTask and then wait for it to complete before returning. - * These are the only entry-points from normal threads to FJTasks. - * Most FJTask methods themselves may only be called from within running FJTasks. - * They throw ClassCastExceptions if they are not, - * reflecting the fact that these methods - * can only be executed using FJTaskRunner threads, not generic - * java.lang.Threads. - * <p> - * There are three different ways to run a FJTask, - * with different scheduling semantics: - * <ul> - * <li> FJTask.start() (as well as FJTaskRunnerGroup.execute(FJTask)) - * behaves pretty much like Thread.start(). It enqueues a task to be - * run the next time any FJTaskRunner thread is otherwise idle. - * It maintains standard FIFO ordering with respect to - * the group of worker threads. - * <li> FJTask.fork() (as well as the two-task spawning method, - * coInvoke(task1, task2), and the array version - * coInvoke(FJTask[] tasks)) starts a task - * that will be executed in - * procedure-call-like LIFO order if executed by the - * same worker thread as the one that created it, but is FIFO - * with respect to other tasks if it is run by - * other worker threads. That is, earlier-forked - * tasks are preferred to later-forked tasks by other idle workers. - * Fork() is noticeably faster than start(), but can only be - * used when these scheduling semantics are acceptable. - * <li> FJTask.invoke(FJTask) just executes the run method - * of one task from within another. It is the analog of a - * direct call. - * </ul> - * <p> - * The main economies of FJTasks stem from the fact that - * FJTasks do not support blocking operations of any kind. - * FJTasks should just run to completion without - * issuing waits or performing blocking IO. - * There are several styles for creating the run methods that - * execute as tasks, including - * event-style methods, and pure computational methods. - * Generally, the best kinds of FJTasks are those that in turn - * generate other FJTasks. - * <p> - * There is nothing actually - * preventing you from blocking within a FJTask, and very short waits/blocks are - * completely well behaved. But FJTasks are not designed - * to support arbitrary synchronization - * since there is no way to suspend and resume individual tasks - * once they have begun executing. FJTasks should also be finite - * in duration -- they should not contain infinite loops. - * FJTasks that might need to perform a blocking - * action, or hold locks for extended periods, or - * loop forever can instead create normal - * java Thread objects that will do so. FJTasks are just not - * designed to support these things. - * FJTasks may however yield() control to allow their FJTaskRunner threads - * to run other tasks, - * and may wait for other dependent tasks via join(). These - * are the only coordination mechanisms supported by FJTasks. - * <p> - * FJTasks, and the FJTaskRunners that execute them are not - * intrinsically robust with respect to exceptions. - * A FJTask that aborts via an exception does not automatically - * have its completion flag (isDone) set. - * As with ordinary Threads, an uncaught exception will normally cause - * its FJTaskRunner thread to die, which in turn may sometimes - * cause other computations being performed to hang or abort. - * You can of course - * do better by trapping exceptions inside the run methods of FJTasks. - * <p> - * The overhead differences between FJTasks and Threads are substantial, - * especially when using fork() or coInvoke(). - * FJTasks can be two or three orders of magnitude faster than Threads, - * at least when run on JVMs with high-performance garbage collection - * (every FJTask quickly becomes garbage) and good native thread support. - * <p> - * Given these overhead savings, you might be tempted to use FJTasks for - * everything you would use a normal Thread to do. Don't. Java Threads - * remain better for general purpose thread-based programming. Remember - * that FJTasks cannot be used for designs involving arbitrary blocking - * synchronization or I/O. Extending FJTasks to support such capabilities - * would amount to re-inventing the Thread class, and would make them - * less optimal in the contexts that they were designed for. - * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] - * <p> - * @see FJTaskRunner - * @see FJTaskRunnerGroup - **/ - -public abstract class FJTask implements Runnable { - - /** - * The only status information associated with FJTasks is whether - * the they are considered to have completed. - * It is set true automatically within - * FJTaskRunner methods upon completion - * of the run method, or manually via cancel. - **/ - - private volatile boolean done; // = false; - - /** - * Return the FJTaskRunner thread running the current FJTask. - * Most FJTask methods are just relays to their current - * FJTaskRunners, that perform the indicated actions. - * @exception ClassCastException if caller thread is not a - * running FJTask. - **/ - - public static FJTaskRunner getFJTaskRunner() { - return (FJTaskRunner)(Thread.currentThread()); - } - - /** - * Return the FJTaskRunnerGroup of the thread running the current FJTask. - * @exception ClassCastException if caller thread is not a - * running FJTask. - **/ - public static IFJTaskRunnerGroup getFJTaskRunnerGroup() { - return getFJTaskRunner().getGroup(); - } - - - /** - * Return true if current task has terminated or been cancelled. - * The method is a simple analog of the Thread.isAlive() - * method. However, it reports true only when the task has terminated - * or has been cancelled. It does not distinguish these two cases. - * And there is no way to determine whether a FJTask has been started - * or is currently executing. - **/ - - public final boolean isDone() { return done; } - - /** - * Indicate termination. Intended only to be called by FJTaskRunner. - * FJTasks themselves should use (non-final) method - * cancel() to suppress execution. - **/ - - protected final void setDone() { done = true; } - - /** - * Set the termination status of this task. This simple-minded - * analog of Thread.interrupt - * causes the task not to execute if it has not already been started. - * Cancelling a running FJTask - * has no effect unless the run method itself uses isDone() - * to probe cancellation and take appropriate action. - * Individual run() methods may sense status and - * act accordingly, normally by returning early. - **/ - - public void cancel() { setDone(); } - - - /** - * Clear the termination status of this task. - * This method is intended to be used - * only as a means to allow task objects to be recycled. It should - * be called only when you are sure that the previous - * execution of this task has terminated and, if applicable, has - * been joined by all other waiting tasks. Usage in any other - * context is a very bad idea. - **/ - - public void reset() { done = false; } - - - /** - * Execute this task. This method merely places the task in a - * group-wide scheduling queue. - * It will be run - * the next time any TaskRunner thread is otherwise idle. - * This scheduling maintains FIFO ordering of started tasks - * with respect to - * the group of worker threads. - * @exception ClassCastException if caller thread is not - * running in a FJTaskRunner thread. - **/ - - public void start() { getFJTaskRunnerGroup().executeTask(this); } - - - /** - * Arrange for execution of a strictly dependent task. - * The task that will be executed in - * procedure-call-like LIFO order if executed by the - * same worker thread, but is FIFO with respect to other tasks - * forked by this thread when taken by other worker threads. - * That is, earlier-forked - * tasks are preferred to later-forked tasks by other idle workers. - * <p> - * Fork() is noticeably - * faster than start(). However, it may only - * be used for strictly dependent tasks -- generally, those that - * could logically be issued as straight method calls without - * changing the logic of the program. - * The method is optimized for use in parallel fork/join designs - * in which the thread that issues one or more forks - * cannot continue until at least some of the forked - * threads terminate and are joined. - * @exception ClassCastException if caller thread is not - * running in a FJTaskRunner thread. - **/ - - public void fork() { getFJTaskRunner().push(this); } - - /** - * Allow the current underlying FJTaskRunner thread to process other tasks. - * <p> - * Spinloops based on yield() are well behaved so long - * as the event or condition being waited for is produced via another - * FJTask. Additionally, you must never hold a lock - * while performing a yield or join. (This is because - * multiple FJTasks can be run by the same Thread during - * a yield. Since java locks are held per-thread, the lock would not - * maintain the conceptual exclusion you have in mind.) - * <p> - * Otherwise, spinloops using - * yield are the main construction of choice when a task must wait - * for a condition that it is sure will eventually occur because it - * is being produced by some other FJTask. The most common - * such condition is built-in: join() repeatedly yields until a task - * has terminated after producing some needed results. You can also - * use yield to wait for callbacks from other FJTasks, to wait for - * status flags to be set, and so on. However, in all these cases, - * you should be confident that the condition being waited for will - * occur, essentially always because it is produced by - * a FJTask generated by the current task, or one of its subtasks. - * - * @exception ClassCastException if caller thread is not - * running in a FJTaskRunner thread. - **/ - - public static void yield() { getFJTaskRunner().taskYield(); } - - /** - * Yield until this task isDone. - * Equivalent to <code>while(!isDone()) yield(); </code> - * @exception ClassCastException if caller thread is not - * running in a FJTaskRunner thread. - **/ - - public void join() { getFJTaskRunner().taskJoin(this); } - - /** - * Immediately execute task t by calling its run method. Has no - * effect if t has already been run or has been cancelled. - * It is equivalent to calling t.run except that it - * deals with completion status, so should always be used - * instead of directly calling run. - * The method can be useful - * when a computation has been packaged as a FJTask, but you just need to - * directly execute its body from within some other task. - **/ - - public static void invoke(FJTask t) { - if (!t.isDone()) { - t.run(); - t.setDone(); - } - } - - /** - * Fork both tasks and then wait for their completion. It behaves as: - * <pre> - * task1.fork(); task2.fork(); task2.join(); task1.join(); - * </pre> - * As a simple classic example, here is - * a class that computes the Fibonacci function: - * <pre> - * public class Fib extends FJTask { - * - * // Computes fibonacci(n) = fibonacci(n-1) + fibonacci(n-2); for n> 1 - * // fibonacci(0) = 0; - * // fibonacci(1) = 1. - * - * // Value to compute fibonacci function for. - * // It is replaced with the answer when computed. - * private volatile int number; - * - * public Fib(int n) { number = n; } - * - * public int getAnswer() { - * if (!isDone()) throw new Error("Not yet computed"); - * return number; - * } - * - * public void run() { - * int n = number; - * if (n > 1) { - * Fib f1 = new Fib(n - 1); - * Fib f2 = new Fib(n - 2); - * - * coInvoke(f1, f2); // run these in parallel - * - * // we know f1 and f2 are computed, so just directly access numbers - * number = f1.number + f2.number; - * } - * } - * - * public static void main(String[] args) { // sample driver - * try { - * int groupSize = 2; // 2 worker threads - * int num = 35; // compute fib(35) - * FJTaskRunnerGroup group = new FJTaskRunnerGroup(groupSize); - * Fib f = new Fib(num); - * group.invoke(f); - * int result = f.getAnswer(); - * System.out.println(" Answer: " + result); - * } - * catch (InterruptedException ex) { - * System.out.println("Interrupted"); - * } - * } - * } - * </pre> - * - * @exception ClassCastException if caller thread is not - * running in a FJTaskRunner thread. - **/ - - public static void coInvoke(FJTask task1, FJTask task2) { - getFJTaskRunner().coInvoke(task1, task2); - } - - - /** - * Fork all tasks in array, and await their completion. - * Behaviorally equivalent to: - * <pre> - * for (int i = 0; i < tasks.length; ++i) tasks[i].fork(); - * for (int i = 0; i < tasks.length; ++i) tasks[i].join(); - * </pre> - **/ - - public static void coInvoke(FJTask[] tasks) { - getFJTaskRunner().coInvoke(tasks); - } - - /** - * A FJTask that holds a Runnable r, and calls r.run when executed. - * The class is a simple utilty to allow arbitrary Runnables - * to be used as FJTasks. - **/ - - public static class Wrap extends FJTask { - protected final Runnable runnable; - public Wrap(Runnable r) { runnable = r; } - public void run() { runnable.run(); } - } - - - /** - * A <code>new Seq</code>, when executed, - * invokes each task provided in the constructor, in order. - * The class is a simple utility - * that makes it easier to create composite FJTasks. - **/ - public static class Seq extends FJTask { - protected final FJTask[] tasks; - - /** - * Construct a Seq that, when executed, will process each of the - * tasks in the tasks array in order - **/ - public Seq(FJTask[] tasks) { - this.tasks = tasks; - } - - /** - * Two-task constructor, for compatibility with previous release. - **/ - public Seq(FJTask task1, FJTask task2) { - this.tasks = new FJTask[] { task1, task2 }; - } - - public void run() { - for (int i = 0; i < tasks.length; ++i) FJTask.invoke(tasks[i]); - } - } - - /** - * Construct and return a FJTask object that, when executed, will - * invoke the tasks in the tasks array in array order - **/ - - public static FJTask seq(FJTask[] tasks) { - return new Seq(tasks); - } - - /** - * A <code>new Par</code>, when executed, - * runs the tasks provided in the constructor in parallel using - * coInvoke(tasks). - * The class is a simple utility - * that makes it easier to create composite FJTasks. - **/ - public static class Par extends FJTask { - protected final FJTask[] tasks; - - /** - * Construct a Seq that, when executed, will process each of the - * tasks in the tasks array in parallel - **/ - public Par(FJTask[] tasks) { - this.tasks = tasks; - } - - /** - * Two-task constructor, for compatibility with previous release. - **/ - public Par(FJTask task1, FJTask task2) { - this.tasks = new FJTask[] { task1, task2 }; - } - - - public void run() { - FJTask.coInvoke(tasks); - } - } - - - /** - * Construct and return a FJTask object that, when executed, will - * invoke the tasks in the tasks array in parallel using coInvoke - **/ - public static FJTask par(FJTask[] tasks) { - return new Par(tasks); - } - - /** - * A <code>new Seq2(task1, task2)</code>, when executed, - * invokes task1 and then task2, in order. - * The class is a simple utility - * that makes it easier to create composite Tasks. - **/ - public static class Seq2 extends FJTask { - protected final FJTask fst; - protected final FJTask snd; - public Seq2(FJTask task1, FJTask task2) { - fst = task1; - snd = task2; - } - public void run() { - FJTask.invoke(fst); - FJTask.invoke(snd); - } - } - - /** - * Construct and return a FJTask object that, when executed, will - * invoke task1 and task2, in order - **/ - - public static FJTask seq(FJTask task1, FJTask task2) { - return new Seq2(task1, task2); - } - - /** - * A <code>new Par(task1, task2)</code>, when executed, - * runs task1 and task2 in parallel using coInvoke(task1, task2). - * The class is a simple utility - * that makes it easier to create composite Tasks. - **/ - public static class Par2 extends FJTask { - protected final FJTask fst; - protected final FJTask snd; - public Par2(FJTask task1, FJTask task2) { - fst = task1; - snd = task2; - } - public void run() { - FJTask.coInvoke(fst, snd); - } - } - - - /** - * Construct and return a FJTask object that, when executed, will - * invoke task1 and task2, in parallel - **/ - public static FJTask par(FJTask task1, FJTask task2) { - return new Par2(task1, task2); - } - -} diff --git a/src/actors/scala/actors/FJTaskRunner.java b/src/actors/scala/actors/FJTaskRunner.java deleted file mode 100644 index 73bc9ff1dd..0000000000 --- a/src/actors/scala/actors/FJTaskRunner.java +++ /dev/null @@ -1,996 +0,0 @@ -/* - File: FJTaskRunner.java - - Originally written by Doug Lea and released into the public domain. - This may be used for any purposes whatsoever without acknowledgment. - Thanks for the assistance and support of Sun Microsystems Labs, - and everyone contributing, testing, and using this code. - - History: - Date Who What - 7Jan1999 dl First public release - 13Jan1999 dl correct a stat counter update; - ensure inactive status on run termination; - misc minor cleaup - 14Jan1999 dl Use random starting point in scan; - variable renamings. - 18Jan1999 dl Runloop allowed to die on task exception; - remove useless timed join - 22Jan1999 dl Rework scan to allow use of priorities. - 6Feb1999 dl Documentation updates. - 7Mar1999 dl Add array-based coInvoke - 31Mar1999 dl Revise scan to remove need for NullTasks - 27Apr1999 dl Renamed - 23oct1999 dl Earlier detect of interrupt in scanWhileIdling - 24nov1999 dl Now works on JVMs that do not properly - implement read-after-write of 2 volatiles. -*/ - -package scala.actors; - -import java.util.Random; - -/** - * Specialized Thread subclass for running FJTasks. - * <p> - * Each FJTaskRunner keeps FJTasks in a double-ended queue (DEQ). - * Double-ended queues support stack-based operations - * push and pop, as well as queue-based operations put and take. - * Normally, threads run their own tasks. But they - * may also steal tasks from each others DEQs. - * <p> - * The algorithms are minor variants of those used - * in <A href="http://supertech.lcs.mit.edu/cilk/"> Cilk</A> and - * <A href="http://www.cs.utexas.edu/users/hood/"> Hood</A>, and - * to a lesser extent - * <A href="http://www.cs.uga.edu/~dkl/filaments/dist.html"> Filaments</A>, - * but are adapted to work in Java. - * <p> - * The two most important capabilities are: - * <ul> - * <li> Fork a FJTask: - * <pre> - * Push task onto DEQ - * </pre> - * <li> Get a task to run (for example within taskYield) - * <pre> - * If DEQ is not empty, - * Pop a task and run it. - * Else if any other DEQ is not empty, - * Take ("steal") a task from it and run it. - * Else if the entry queue for our group is not empty, - * Take a task from it and run it. - * Else if current thread is otherwise idling - * If all threads are idling - * Wait for a task to be put on group entry queue - * Else - * Yield or Sleep for a while, and then retry - * </pre> - * </ul> - * The push, pop, and put are designed to only ever called by the - * current thread, and take (steal) is only ever called by - * other threads. - * All other operations are composites and variants of these, - * plus a few miscellaneous bookkeeping methods. - * <p> - * Implementations of the underlying representations and operations - * are geared for use on JVMs operating on multiple CPUs (although - * they should of course work fine on single CPUs as well). - * <p> - * A possible snapshot of a FJTaskRunner's DEQ is: - * <pre> - * 0 1 2 3 4 5 6 ... - * +-----+-----+-----+-----+-----+-----+-----+-- - * | | t | t | t | t | | | ... deq array - * +-----+-----+-----+-----+-----+-----+-----+-- - * ^ ^ - * base top - * (incremented (incremented - * on take, on push - * decremented decremented - * on put) on pop) - * </pre> - * <p> - * FJTasks are held in elements of the DEQ. - * They are maintained in a bounded array that - * works similarly to a circular bounded buffer. To ensure - * visibility of stolen FJTasks across threads, the array elements - * must be <code>volatile</code>. - * Using volatile rather than synchronizing suffices here since - * each task accessed by a thread is either one that it - * created or one that has never seen before. Thus we cannot - * encounter any staleness problems executing run methods, - * although FJTask programmers must be still sure to either synch or use - * volatile for shared data within their run methods. - * <p> - * However, since there is no way - * to declare an array of volatiles in Java, the DEQ elements actually - * hold VolatileTaskRef objects, each of which in turn holds a - * volatile reference to a FJTask. - * Even with the double-indirection overhead of - * volatile refs, using an array for the DEQ works out - * better than linking them since fewer shared - * memory locations need to be - * touched or modified by the threads while using the DEQ. - * Further, the double indirection may alleviate cache-line - * sharing effects (which cannot otherwise be directly dealt with in Java). - * <p> - * The indices for the <code>base</code> and <code>top</code> of the DEQ - * are declared as volatile. The main contention point with - * multiple FJTaskRunner threads occurs when one thread is trying - * to pop its own stack while another is trying to steal from it. - * This is handled via a specialization of Dekker's algorithm, - * in which the popping thread pre-decrements <code>top</code>, - * and then checks it against <code>base</code>. - * To be conservative in the face of JVMs that only partially - * honor the specification for volatile, the pop proceeds - * without synchronization only if there are apparently enough - * items for both a simultaneous pop and take to succeed. - * It otherwise enters a - * synchronized lock to check if the DEQ is actually empty, - * if so failing. The stealing thread - * does almost the opposite, but is set up to be less likely - * to win in cases of contention: Steals always run under synchronized - * locks in order to avoid conflicts with other ongoing steals. - * They pre-increment <code>base</code>, and then check against - * <code>top</code>. They back out (resetting the base index - * and failing to steal) if the - * DEQ is empty or is about to become empty by an ongoing pop. - * <p> - * A push operation can normally run concurrently with a steal. - * A push enters a synch lock only if the DEQ appears full so must - * either be resized or have indices adjusted due to wrap-around - * of the bounded DEQ. The put operation always requires synchronization. - * <p> - * When a FJTaskRunner thread has no tasks of its own to run, - * it tries to be a good citizen. - * Threads run at lower priority while scanning for work. - * <p> - * If the task is currently waiting - * via yield, the thread alternates scans (starting at a randomly - * chosen victim) with Thread.yields. This is - * well-behaved so long as the JVM handles Thread.yield in a - * sensible fashion. (It need not. Thread.yield is so underspecified - * that it is legal for a JVM to treat it as a no-op.) This also - * keeps things well-behaved even if we are running on a uniprocessor - * JVM using a simple cooperative threading model. - * <p> - * If a thread needing work is - * is otherwise idle (which occurs only in the main runloop), and - * there are no available tasks to steal or poll, it - * instead enters into a sleep-based (actually timed wait(msec)) - * phase in which it progressively sleeps for longer durations - * (up to a maximum of FJTaskRunnerGroup.MAX_SLEEP_TIME, - * currently 100ms) between scans. - * If all threads in the group - * are idling, they further progress to a hard wait phase, suspending - * until a new task is entered into the FJTaskRunnerGroup entry queue. - * A sleeping FJTaskRunner thread may be awakened by a new - * task being put into the group entry queue or by another FJTaskRunner - * becoming active, but not merely by some DEQ becoming non-empty. - * Thus the MAX_SLEEP_TIME provides a bound for sleep durations - * in cases where all but one worker thread start sleeping - * even though there will eventually be work produced - * by a thread that is taking a long time to place tasks in DEQ. - * These sleep mechanics are handled in the FJTaskRunnerGroup class. - * <p> - * Composite operations such as taskJoin include heavy - * manual inlining of the most time-critical operations - * (mainly FJTask.invoke). - * This opens up a few opportunities for further hand-optimizations. - * Until Java compilers get a lot smarter, these tweaks - * improve performance significantly enough for task-intensive - * programs to be worth the poorer maintainability and code duplication. - * <p> - * Because they are so fragile and performance-sensitive, nearly - * all methods are declared as final. However, nearly all fields - * and methods are also declared as protected, so it is possible, - * with much care, to extend functionality in subclasses. (Normally - * you would also need to subclass FJTaskRunnerGroup.) - * <p> - * None of the normal java.lang.Thread class methods should ever be called - * on FJTaskRunners. For this reason, it might have been nicer to - * declare FJTaskRunner as a Runnable to run within a Thread. However, - * this would have complicated many minor logistics. And since - * no FJTaskRunner methods should normally be called from outside the - * FJTask and FJTaskRunnerGroup classes either, this decision doesn't impact - * usage. - * <p> - * You might think that layering this kind of framework on top of - * Java threads, which are already several levels removed from raw CPU - * scheduling on most systems, would lead to very poor performance. - * But on the platforms - * tested, the performance is quite good. - * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] - * @see FJTask - * @see FJTaskRunnerGroup - **/ - -public class FJTaskRunner extends Thread { - - /** The group of which this FJTaskRunner is a member **/ - protected final IFJTaskRunnerGroup group; - - /** - * Constructor called only during FJTaskRunnerGroup initialization - **/ - - /*protected*/ public FJTaskRunner(IFJTaskRunnerGroup g) { - group = g; - victimRNG = new Random(System.identityHashCode(this)); - runPriority = getPriority(); - setDaemon(true); - } - - /** - * Return the FJTaskRunnerGroup of which this thread is a member - **/ - - protected final IFJTaskRunnerGroup getGroup() { return group; } - - - /* ------------ DEQ Representation ------------------- */ - - - /** - * FJTasks are held in an array-based DEQ with INITIAL_CAPACITY - * elements. The DEQ is grown if necessary, but default value is - * normally much more than sufficient unless there are - * user programming errors or questionable operations generating - * large numbers of Tasks without running them. - * Capacities must be a power of two. - **/ - - protected static final int INITIAL_CAPACITY = 4096; - - /** - * The maximum supported DEQ capacity. - * When exceeded, FJTaskRunner operations throw Errors - **/ - - protected static final int MAX_CAPACITY = 1 << 30; - - /** - * An object holding a single volatile reference to a FJTask. - **/ - - protected final static class VolatileTaskRef { - /** The reference **/ - protected volatile FJTask ref; - - /** Set the reference **/ - protected final void put(FJTask r) { ref = r; } - /** Return the reference **/ - protected final FJTask get() { return ref; } - /** Return the reference and clear it **/ - protected final FJTask take() { FJTask r = ref; ref = null; return r; } - - /** - * Initialization utility for constructing arrays. - * Make an array of given capacity and fill it with - * VolatileTaskRefs. - **/ - protected static VolatileTaskRef[] newArray(int cap) { - VolatileTaskRef[] a = new VolatileTaskRef[cap]; - for (int k = 0; k < cap; k++) a[k] = new VolatileTaskRef(); - return a; - } - - } - - /** - * The DEQ array. - **/ - - protected VolatileTaskRef[] deq = VolatileTaskRef.newArray(INITIAL_CAPACITY); - - /** Current size of the task DEQ **/ - protected int deqSize() { return deq.length; } - - /** - * Current top of DEQ. Generally acts just like a stack pointer in an - * array-based stack, except that it circularly wraps around the - * array, as in an array-based queue. The value is NOT - * always kept within <code>0 ... deq.length</code> though. - * The current top element is always at <code>top & (deq.length-1)</code>. - * To avoid integer overflow, top is reset down - * within bounds whenever it is noticed to be out out bounds; - * at worst when it is at <code>2 * deq.length</code>. - **/ - protected volatile int top = 0; - - - /** - * Current base of DEQ. Acts like a take-pointer in an - * array-based bounded queue. Same bounds and usage as top. - **/ - - protected volatile int base = 0; - - - /** - * An extra object to synchronize on in order to - * achieve a memory barrier. - **/ - - protected final Object barrier = new Object(); - - /* ------------ Other BookKeeping ------------------- */ - - /** - * Record whether current thread may be processing a task - * (i.e., has been started and is not in an idle wait). - * Accessed, under synch, ONLY by FJTaskRunnerGroup, but the field is - * stored here for simplicity. - **/ - - /*protected*/ public boolean active = false; - - /** Random starting point generator for scan() **/ - protected final Random victimRNG; - - - /** Priority to use while scanning for work **/ - protected int scanPriority = Thread.MIN_PRIORITY + 1; - - /** Priority to use while running tasks **/ - protected int runPriority; - - /** - * Set the priority to use while scanning. - * We do not bother synchronizing access, since - * by the time the value is needed, both this FJTaskRunner - * and its FJTaskRunnerGroup will - * necessarily have performed enough synchronization - * to avoid staleness problems of any consequence. - **/ - protected void setScanPriority(int pri) { scanPriority = pri; } - - - /** - * Set the priority to use while running tasks. - * Same usage and rationale as setScanPriority. - **/ - protected void setRunPriority(int pri) { runPriority = pri; } - - /** - * Compile-time constant for statistics gathering. - * Even when set, reported values may not be accurate - * since all are read and written without synchronization. - **/ - - - - static final boolean COLLECT_STATS = true; - // static final boolean COLLECT_STATS = false; - - - // for stat collection - - /** Total number of tasks run **/ - protected int runs = 0; - - /** Total number of queues scanned for work **/ - protected int scans = 0; - - /** Total number of tasks obtained via scan **/ - protected int steals = 0; - - - - /* -------- Suspending -------- */ - protected boolean suspending = false; - - synchronized void setSuspending(boolean susp) { - suspending = susp; - } - - /* ------------ DEQ operations ------------------- */ - - - /** - * Push a task onto DEQ. - * Called ONLY by current thread. - **/ - - /*protected*/ public final void push(final FJTask r) { - int t = top; - - /* - This test catches both overflows and index wraps. It doesn't - really matter if base value is in the midst of changing in take. - As long as deq length is < 2^30, we are guaranteed to catch wrap in - time since base can only be incremented at most length times - between pushes (or puts). - */ - - if (t < (base & (deq.length-1)) + deq.length) { - - deq[t & (deq.length-1)].put(r); - top = t + 1; - } - - else // isolate slow case to increase chances push is inlined - slowPush(r); // check overflow and retry - } - - - /** - * Handle slow case for push - **/ - - protected synchronized void slowPush(final FJTask r) { - checkOverflow(); - push(r); // just recurse -- this one is sure to succeed. - } - - - /** - * Enqueue task at base of DEQ. - * Called ONLY by current thread. - * This method is currently not called from class FJTask. It could be used - * as a faster way to do FJTask.start, but most users would - * find the semantics too confusing and unpredictable. - **/ - - protected final synchronized void put(final FJTask r) { - for (;;) { - int b = base - 1; - if (top < b + deq.length) { - - int newBase = b & (deq.length-1); - deq[newBase].put(r); - base = newBase; - - if (b != newBase) { // Adjust for index underflow - int newTop = top & (deq.length-1); - if (newTop < newBase) newTop += deq.length; - top = newTop; - } - return; - } - else { - checkOverflow(); - // ... and retry - } - } - } - - /** - * Return a popped task, or null if DEQ is empty. - * Called ONLY by current thread. - * <p> - * This is not usually called directly but is - * instead inlined in callers. This version differs from the - * cilk algorithm in that pop does not fully back down and - * retry in the case of potential conflict with take. It simply - * rechecks under synch lock. This gives a preference - * for threads to run their own tasks, which seems to - * reduce flailing a bit when there are few tasks to run. - **/ - - protected final FJTask pop() { - /* - Decrement top, to force a contending take to back down. - */ - - int t = --top; - - /* - To avoid problems with JVMs that do not properly implement - read-after-write of a pair of volatiles, we conservatively - grab without lock only if the DEQ appears to have at least two - elements, thus guaranteeing that both a pop and take will succeed, - even if the pre-increment in take is not seen by current thread. - Otherwise we recheck under synch. - */ - - if (base + 1 < t) - return deq[t & (deq.length-1)].take(); - else - return confirmPop(t); - - } - - - /** - * Check under synch lock if DEQ is really empty when doing pop. - * Return task if not empty, else null. - **/ - - protected final synchronized FJTask confirmPop(int provisionalTop) { - if (base <= provisionalTop) - return deq[provisionalTop & (deq.length-1)].take(); - else { // was empty - /* - Reset DEQ indices to zero whenever it is empty. - This both avoids unnecessary calls to checkOverflow - in push, and helps keep the DEQ from accumulating garbage - */ - - top = base = 0; - return null; - } - } - - - /** - * Take a task from the base of the DEQ. - * Always called by other threads via scan() - **/ - - - protected final synchronized FJTask take() { - - /* - Increment base in order to suppress a contending pop - */ - - int b = base++; - - if (b < top) - return confirmTake(b); - else { - // back out - base = b; - return null; - } - } - - - /** - * double-check a potential take - **/ - - protected FJTask confirmTake(int oldBase) { - - /* - Use a second (guaranteed uncontended) synch - to serve as a barrier in case JVM does not - properly process read-after-write of 2 volatiles - */ - - synchronized(barrier) { - if (oldBase < top) { - /* - We cannot call deq[oldBase].take here because of possible races when - nulling out versus concurrent push operations. Resulting - accumulated garbage is swept out periodically in - checkOverflow, or more typically, just by keeping indices - zero-based when found to be empty in pop, which keeps active - region small and constantly overwritten. - */ - - return deq[oldBase & (deq.length-1)].get(); - } - else { - base = oldBase; - return null; - } - } - } - - - /** - * Adjust top and base, and grow DEQ if necessary. - * Called only while DEQ synch lock being held. - * We don't expect this to be called very often. In most - * programs using FJTasks, it is never called. - **/ - - protected void checkOverflow() { - int t = top; - int b = base; - - if (t - b < deq.length-1) { // check if just need an index reset - - int newBase = b & (deq.length-1); - int newTop = top & (deq.length-1); - if (newTop < newBase) newTop += deq.length; - top = newTop; - base = newBase; - - /* - Null out refs to stolen tasks. - This is the only time we can safely do it. - */ - - int i = newBase; - while (i != newTop && deq[i].ref != null) { - deq[i].ref = null; - i = (i - 1) & (deq.length-1); - } - - } - else { // grow by doubling array - - int newTop = t - b; - int oldcap = deq.length; - int newcap = oldcap * 2; - - if (newcap >= MAX_CAPACITY) - throw new Error("FJTask queue maximum capacity exceeded"); - - VolatileTaskRef[] newdeq = new VolatileTaskRef[newcap]; - - // copy in bottom half of new deq with refs from old deq - for (int j = 0; j < oldcap; ++j) newdeq[j] = deq[b++ & (oldcap-1)]; - - // fill top half of new deq with new refs - for (int j = oldcap; j < newcap; ++j) newdeq[j] = new VolatileTaskRef(); - - deq = newdeq; - base = 0; - top = newTop; - } - } - - - /* ------------ Scheduling ------------------- */ - - - /** - * Do all but the pop() part of yield or join, by - * traversing all DEQs in our group looking for a task to - * steal. If none, it checks the entry queue. - * <p> - * Since there are no good, portable alternatives, - * we rely here on a mixture of Thread.yield and priorities - * to reduce wasted spinning, even though these are - * not well defined. We are hoping here that the JVM - * does something sensible. - * @param waitingFor if non-null, the current task being joined - **/ - - protected void scan(final FJTask waitingFor) { - - FJTask task = null; - - // to delay lowering priority until first failure to steal - boolean lowered = false; - - /* - Circularly traverse from a random start index. - - This differs slightly from cilk version that uses a random index - for each attempted steal. - Exhaustive scanning might impede analytic tractablity of - the scheduling policy, but makes it much easier to deal with - startup and shutdown. - */ - - FJTaskRunner[] ts = group.getArray(); - int idx = victimRNG.nextInt(ts.length); - - for (int i = 0; i < ts.length; ++i) { - - FJTaskRunner t = ts[idx]; - if (++idx >= ts.length) idx = 0; // circularly traverse - - if (t != null && t != this) { - - if (waitingFor != null && waitingFor.isDone()) { - break; - } - else { - if (COLLECT_STATS) ++scans; - task = t.take(); - if (task != null) { - if (COLLECT_STATS) ++steals; - break; - } - else if (isInterrupted()) { - break; - } - else if (!lowered) { // if this is first fail, lower priority - lowered = true; - setPriority(scanPriority); - } - else { // otherwise we are at low priority; just yield - yield(); - } - } - } - - } - - if (task == null) { - if (COLLECT_STATS) ++scans; - task = group.pollEntryQueue(); - if (COLLECT_STATS) if (task != null) ++steals; - } - - if (lowered) setPriority(runPriority); - - if (task != null && !task.isDone()) { - if (COLLECT_STATS) ++runs; - task.run(); - task.setDone(); - } - - } - - /** - * Same as scan, but called when current thread is idling. - * It repeatedly scans other threads for tasks, - * sleeping while none are available. - * <p> - * This differs from scan mainly in that - * since there is no reason to return to recheck any - * condition, we iterate until a task is found, backing - * off via sleeps if necessary. - **/ - - protected void scanWhileIdling() { - FJTask task = null; - - boolean lowered = false; - long iters = 0; - - FJTaskRunner[] ts = group.getArray(); - int idx = victimRNG.nextInt(ts.length); - - do { - for (int i = 0; i < ts.length; ++i) { - - FJTaskRunner t = ts[idx]; - if (++idx >= ts.length) idx = 0; // circularly traverse - - if (t != null && t != this) { - if (COLLECT_STATS) ++scans; - - task = t.take(); - if (task != null) { - if (COLLECT_STATS) ++steals; - if (lowered) setPriority(runPriority); - group.setActive(this); - break; - } - } - } - - if (task == null) { - if (isInterrupted()) - return; - - if (COLLECT_STATS) ++scans; - task = group.pollEntryQueue(); - - if (task != null) { - if (COLLECT_STATS) ++steals; - if (lowered) setPriority(runPriority); - group.setActive(this); - } - else { - ++iters; - // Check here for yield vs sleep to avoid entering group synch lock - if (iters >= /*group.SCANS_PER_SLEEP*/ 15) { - group.checkActive(this, iters); - if (isInterrupted()) - return; - } - else if (!lowered) { - lowered = true; - setPriority(scanPriority); - } - else { - yield(); - } - } - } - } while (task == null); - - - if (!task.isDone()) { - if (COLLECT_STATS) ++runs; - task.run(); - task.setDone(); - } - - } - - /* ------------ composite operations ------------------- */ - - - /** - * Main runloop - **/ - - public void run() { - try{ - while (!interrupted()) { - FJTask task = pop(); - if (task != null) { - if (!task.isDone()) { - // inline FJTask.invoke - if (COLLECT_STATS) ++runs; - task.run(); - task.setDone(); - } - } - else - scanWhileIdling(); - } - // check for suspending - if (suspending) { - synchronized(this) { - // move all local tasks to group-wide entry queue - for (int i = 0; i < deq.length; ++i) { - synchronized(group) { - try { - FJTask task = (FJTask)deq[i].take(); - if (task != null) - group.getEntryQueue().put(task); - } catch (InterruptedException ie) { - System.err.println("Suspend: when transferring task to entryQueue: "+ie); - } - } - } - } - } - } - finally { - group.setInactive(this); - } - } - - /** - * Execute a task in this thread. Generally called when current task - * cannot otherwise continue. - **/ - - - protected final void taskYield() { - FJTask task = pop(); - if (task != null) { - if (!task.isDone()) { - if (COLLECT_STATS) ++runs; - task.run(); - task.setDone(); - } - } - else - scan(null); - } - - - /** - * Process tasks until w is done. - * Equivalent to <code>while(!w.isDone()) taskYield(); </code> - **/ - - protected final void taskJoin(final FJTask w) { - - while (!w.isDone()) { - - FJTask task = pop(); - if (task != null) { - if (!task.isDone()) { - if (COLLECT_STATS) ++runs; - task.run(); - task.setDone(); - if (task == w) return; // fast exit if we just ran w - } - } - else - scan(w); - } - } - - /** - * A specialized expansion of - * <code> w.fork(); invoke(v); w.join(); </code> - **/ - - - protected final void coInvoke(final FJTask w, final FJTask v) { - - // inline push - - int t = top; - if (t < (base & (deq.length-1)) + deq.length) { - - deq[t & (deq.length-1)].put(w); - top = t + 1; - - // inline invoke - - if (!v.isDone()) { - if (COLLECT_STATS) ++runs; - v.run(); - v.setDone(); - } - - // inline taskJoin - - while (!w.isDone()) { - FJTask task = pop(); - if (task != null) { - if (!task.isDone()) { - if (COLLECT_STATS) ++runs; - task.run(); - task.setDone(); - if (task == w) return; // fast exit if we just ran w - } - } - else - scan(w); - } - } - - else // handle non-inlinable cases - slowCoInvoke(w, v); - } - - - /** - * Backup to handle noninlinable cases of coInvoke - **/ - - protected void slowCoInvoke(final FJTask w, final FJTask v) { - push(w); // let push deal with overflow - FJTask.invoke(v); - taskJoin(w); - } - - - /** - * Array-based version of coInvoke - **/ - - protected final void coInvoke(FJTask[] tasks) { - int nforks = tasks.length - 1; - - // inline bulk push of all but one task - - int t = top; - - if (nforks >= 0 && t + nforks < (base & (deq.length-1)) + deq.length) { - for (int i = 0; i < nforks; ++i) { - deq[t++ & (deq.length-1)].put(tasks[i]); - top = t; - } - - // inline invoke of one task - FJTask v = tasks[nforks]; - if (!v.isDone()) { - if (COLLECT_STATS) ++runs; - v.run(); - v.setDone(); - } - - // inline taskJoins - - for (int i = 0; i < nforks; ++i) { - FJTask w = tasks[i]; - while (!w.isDone()) { - - FJTask task = pop(); - if (task != null) { - if (!task.isDone()) { - if (COLLECT_STATS) ++runs; - task.run(); - task.setDone(); - } - } - else - scan(w); - } - } - } - - else // handle non-inlinable cases - slowCoInvoke(tasks); - } - - /** - * Backup to handle atypical or noninlinable cases of coInvoke - **/ - - protected void slowCoInvoke(FJTask[] tasks) { - for (int i = 0; i < tasks.length; ++i) push(tasks[i]); - for (int i = 0; i < tasks.length; ++i) taskJoin(tasks[i]); - } - -} - diff --git a/src/actors/scala/actors/FJTaskRunnerGroup.java b/src/actors/scala/actors/FJTaskRunnerGroup.java deleted file mode 100644 index d1f864c54a..0000000000 --- a/src/actors/scala/actors/FJTaskRunnerGroup.java +++ /dev/null @@ -1,714 +0,0 @@ -/* - File: FJTaskRunnerGroup.java - - Originally written by Doug Lea and released into the public domain. - This may be used for any purposes whatsoever without acknowledgment. - Thanks for the assistance and support of Sun Microsystems Labs, - and everyone contributing, testing, and using this code. - - History: - Date Who What - 7Jan1999 dl First public release - 12Jan1999 dl made getActiveCount public; misc minor cleanup. - 14Jan1999 dl Added executeTask - 20Jan1999 dl Allow use of priorities; reformat stats - 6Feb1999 dl Lazy thread starts - 27Apr1999 dl Renamed -*/ - -package scala.actors; - -/** - * A stripped down analog of a ThreadGroup used for - * establishing and managing FJTaskRunner threads. - * ThreadRunnerGroups serve as the control boundary separating - * the general world of normal threads from the specialized world - * of FJTasks. - * <p> - * By intent, this class does not subclass java.lang.ThreadGroup, and - * does not support most methods found in ThreadGroups, since they - * would make no sense for FJTaskRunner threads. In fact, the class - * does not deal with ThreadGroups at all. If you want to restrict - * a FJTaskRunnerGroup to a particular ThreadGroup, you can create - * it from within that ThreadGroup. - * <p> - * The main contextual parameter for a FJTaskRunnerGroup is - * the group size, established in the constructor. - * Groups must be of a fixed size. - * There is no way to dynamically increase or decrease the number - * of threads in an existing group. - * <p> - * In general, the group size should be equal to the number - * of CPUs on the system. (Unfortunately, there is no portable - * means of automatically detecting the number of CPUs on a JVM, so there is - * no good way to automate defaults.) In principle, when - * FJTasks are used for computation-intensive tasks, having only - * as many threads as CPUs should minimize bookkeeping overhead - * and contention, and so maximize throughput. However, because - * FJTaskRunners lie atop Java threads, and in turn operating system - * thread support and scheduling policies, - * it is very possible that using more threads - * than CPUs will improve overall throughput even though it adds - * to overhead. This will always be so if FJTasks are I/O bound. - * So it may pay to experiment a bit when tuning on particular platforms. - * You can also use <code>setRunPriorities</code> to either - * increase or decrease the priorities of active threads, which - * may interact with group size choice. - * <p> - * In any case, overestimating group sizes never - * seriously degrades performance (at least within reasonable bounds). - * You can also use a value - * less than the number of CPUs in order to reserve processing - * for unrelated threads. - * <p> - * There are two general styles for using a FJTaskRunnerGroup. - * You can create one group per entire program execution, for example - * as a static singleton, and use it for all parallel tasks: - * <pre> - * class Tasks { - * static FJTaskRunnerGroup group; - * public void initialize(int groupsize) { - * group = new FJTaskRunnerGroup(groupSize); - * } - * // ... - * } - * </pre> - * Alternatively, you can make new groups on the fly and use them only for - * particular task sets. This is more flexible,, - * and leads to more controllable and deterministic execution patterns, - * but it encounters greater overhead on startup. Also, to reclaim - * system resources, you should - * call <code>FJTaskRunnerGroup.interruptAll</code> when you are done - * using one-shot groups. Otherwise, because FJTaskRunners set - * <code>Thread.isDaemon</code> - * status, they will not normally be reclaimed until program termination. - * <p> - * The main supported methods are <code>execute</code>, - * which starts a task processed by FJTaskRunner threads, - * and <code>invoke</code>, which starts one and waits for completion. - * For example, you might extend the above <code>FJTasks</code> - * class to support a task-based computation, say, the - * <code>Fib</code> class from the <code>FJTask</code> documentation: - * <pre> - * class Tasks { // continued - * // ... - * static int fib(int n) { - * try { - * Fib f = new Fib(n); - * group.invoke(f); - * return f.getAnswer(); - * } - * catch (InterruptedException ex) { - * throw new Error("Interrupted during computation"); - * } - * } - * } - * </pre> - * <p> - * Method <code>stats()</code> can be used to monitor performance. - * Both FJTaskRunnerGroup and FJTaskRunner may be compiled with - * the compile-time constant COLLECT_STATS set to false. In this - * case, various simple counts reported in stats() are not collected. - * On platforms tested, - * this leads to such a tiny performance improvement that there is - * very little motivation to bother. - * - * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] - * <p> - * @see FJTask - * @see FJTaskRunner - **/ - -public class FJTaskRunnerGroup implements IFJTaskRunnerGroup { - - /** The threads in this group **/ - /*protected*/ /*final*/ FJTaskRunner[] threads; - - /** Group-wide queue for tasks entered via execute() **/ - /*protected*/ final LinkedQueue entryQueue = new LinkedQueue(); - - public LinkedQueue getEntryQueue() { - return entryQueue; - } - - /** Number of threads that are not waiting for work **/ - protected int activeCount = 0; - - /** Number of threads that have been started. Used to avoid - unecessary contention during startup of task sets. - **/ - protected int nstarted = 0; - - /** - * Compile-time constant. If true, various counts of - * runs, waits, etc., are maintained. These are NOT - * updated with synchronization, so statistics reports - * might not be accurate. - **/ - - static final boolean COLLECT_STATS = true; - // static final boolean COLLECT_STATS = false; - - // for stats - - /** The time at which this ThreadRunnerGroup was constructed **/ - long initTime = 0; - - /** Total number of executes or invokes **/ - int entries = 0; - - static final int DEFAULT_SCAN_PRIORITY = Thread.MIN_PRIORITY+1; - - /* -------- Suspending -------- */ - - LinkedQueue snapshot() throws InterruptedException { - synchronized (this) { - for (int i = 0; i < threads.length; ++i) { - FJTaskRunner t = threads[i]; - // set flag in all task runners to suspend - t.setSuspending(true); - // interrupt all task runners - // assume: current thread not in threads (scheduler) - t.interrupt(); - } - } - - // wait until all of them have terminated - for (int i = 0; i < threads.length; ++i) { - Thread t = threads[i]; - t.join(); - }; - - return entryQueue; - } - - /** - * Create a FJTaskRunnerGroup with the indicated number - * of FJTaskRunner threads. Normally, the best size to use is - * the number of CPUs on the system. - * <p> - * The threads in a FJTaskRunnerGroup are created with their - * isDaemon status set, so do not normally need to be - * shut down manually upon program termination. - **/ - - public FJTaskRunnerGroup(int groupSize) { - //System.out.println("Creating FJTaskRunnerGroup of size "+groupSize); - threads = new FJTaskRunner[groupSize]; - initializeThreads(); - initTime = System.currentTimeMillis(); - } - - - public boolean existsTask() { - FJTask task = null; - /* - Circularly traverse from a random start index. - - This differs slightly from cilk version that uses a random index - for each attempted steal. - Exhaustive scanning might impede analytic tractablity of - the scheduling policy, but makes it much easier to deal with - startup and shutdown. - */ - - FJTaskRunner[] ts = threads; - int idx = /*victimRNG.nextInt(ts.length)*/ 0; - - for (int i = 0; i < ts.length; ++i) { - FJTaskRunner t = ts[idx]; - if (++idx >= ts.length) idx = 0; // circularly traverse - - if (t != null) { - task = t.take(); - if (task != null) { - break; - } - } - } - - if (task == null) { - task = pollEntryQueue(); - } - - if (task != null && !task.isDone()) { - boolean quit = false; - while (!quit) { - quit = true; - try { - // put task back into entry queue and return true - entryQueue.put(task); - } catch (InterruptedException ie) { - quit = false; - } - } - return true; - } - else return false; - } - - public boolean checkPoolSize() { - // if there is a task in the entryQueue or any of the - // worker queues, increase thread pool size by one - if (!entryQueue.isEmpty() || - existsTask()) { - int newsize = threads.length + 1; - FJTaskRunner[] newar = new FJTaskRunner[newsize]; - System.arraycopy(threads, 0, newar, 0, newsize-1); - synchronized(this) { - threads = newar; - FJTaskRunner t = new FJTaskRunner(this); - threads[newsize-1] = t; - setActive(t); - } - return true; - } - else return false; - } - - - /** - * Arrange for execution of the given task - * by placing it in a work queue. If the argument - * is not of type FJTask, it is embedded in a FJTask via - * <code>FJTask.Wrap</code>. - * @exception InterruptedException if current Thread is - * currently interrupted - **/ - - public void execute(Runnable r) throws InterruptedException { - if (r instanceof FJTask) { - entryQueue.put((FJTask)r); - } - else { - entryQueue.put(new FJTask.Wrap(r)); - } - signalNewTask(); - } - - - /** - * Specialized form of execute called only from within FJTasks - **/ - public void executeTask(FJTask t) { - try { - entryQueue.put(t); - signalNewTask(); - } - catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } - } - - - /** - * Start a task and wait it out. Returns when the task completes. - * @exception InterruptedException if current Thread is - * interrupted before completion of the task. - **/ - - public void invoke(Runnable r) throws InterruptedException { - InvokableFJTask w = new InvokableFJTask(r); - entryQueue.put(w); - signalNewTask(); - w.awaitTermination(); - } - - - /** - * Try to shut down all FJTaskRunner threads in this group - * by interrupting them all. This method is designed - * to be used during cleanup when it is somehow known - * that all threads are idle. - * FJTaskRunners only - * check for interruption when they are not otherwise - * processing a task (and its generated subtasks, - * if any), so if any threads are active, shutdown may - * take a while, and may lead to unpredictable - * task processing. - **/ - - public void interruptAll() { - // paranoically interrupt current thread last if in group. - Thread current = Thread.currentThread(); - boolean stopCurrent = false; - - for (int i = 0; i < threads.length; ++i) { - Thread t = threads[i]; - if (t == current) - stopCurrent = true; - else - t.interrupt(); - } - if (stopCurrent) - current.interrupt(); - } - - - /** - * Set the priority to use while a FJTaskRunner is - * polling for new tasks to perform. Default - * is currently Thread.MIN_PRIORITY+1. The value - * set may not go into effect immediately, but - * will be used at least the next time a thread scans for work. - **/ - public synchronized void setScanPriorities(int pri) { - for (int i = 0; i < threads.length; ++i) { - FJTaskRunner t = threads[i]; - t.setScanPriority(pri); - if (!t.active) t.setPriority(pri); - } - } - - - /** - * Set the priority to use while a FJTaskRunner is - * actively running tasks. Default - * is the priority that was in effect by the thread that - * constructed this FJTaskRunnerGroup. Setting this value - * while threads are running may momentarily result in - * them running at this priority even when idly waiting for work. - **/ - public synchronized void setRunPriorities(int pri) { - for (int i = 0; i < threads.length; ++i) { - FJTaskRunner t = threads[i]; - t.setRunPriority(pri); - if (t.active) t.setPriority(pri); - } - } - - - - /** Return the number of FJTaskRunner threads in this group **/ - - public int size() { return threads.length; } - - - /** - * Return the number of threads that are not idly waiting for work. - * Beware that even active threads might not be doing any useful - * work, but just spinning waiting for other dependent tasks. - * Also, since this is just a snapshot value, some tasks - * may be in the process of becoming idle. - **/ - public synchronized int getActiveCount() { return activeCount; } - - /** - * Prints various snapshot statistics to System.out. - * <ul> - * <li> For each FJTaskRunner thread (labeled as T<em>n</em>, for - * <em>n</em> from zero to group size - 1): - * <ul> - * <li> A star "*" is printed if the thread is currently active; - * that is, not sleeping while waiting for work. Because - * threads gradually enter sleep modes, an active thread - * may in fact be about to sleep (or wake up). - * <li> <em>Q Cap</em> The current capacity of its task queue. - * <li> <em>Run</em> The total number of tasks that have been run. - * <li> <em>New</em> The number of these tasks that were - * taken from either the entry queue or from other - * thread queues; that is, the number of tasks run - * that were <em>not</em> forked by the thread itself. - * <li> <em>Scan</em> The number of times other task - * queues or the entry queue were polled for tasks. - * </ul> - * <li> <em>Execute</em> The total number of tasks entered - * (but not necessarily yet run) via execute or invoke. - * <li> <em>Time</em> Time in seconds since construction of this - * FJTaskRunnerGroup. - * <li> <em>Rate</em> The total number of tasks processed - * per second across all threads. This - * may be useful as a simple throughput indicator - * if all processed tasks take approximately the - * same time to run. - * </ul> - * <p> - * Cautions: Some statistics are updated and gathered - * without synchronization, - * so may not be accurate. However, reported counts may be considered - * as lower bounds of actual values. - * Some values may be zero if classes are compiled - * with COLLECT_STATS set to false. (FJTaskRunner and FJTaskRunnerGroup - * classes can be independently compiled with different values of - * COLLECT_STATS.) Also, the counts are maintained as ints so could - * overflow in exceptionally long-lived applications. - * <p> - * These statistics can be useful when tuning algorithms or diagnosing - * problems. For example: - * <ul> - * <li> High numbers of scans may mean that there is insufficient - * parallelism to keep threads busy. However, high scan rates - * are expected if the number - * of Executes is also high or there is a lot of global - * synchronization in the application, and the system is not otherwise - * busy. Threads may scan - * for work hundreds of times upon startup, shutdown, and - * global synch points of task sets. - * <li> Large imbalances in tasks run across different threads might - * just reflect contention with unrelated threads on a system - * (possibly including JVM threads such as GC), but may also - * indicate some systematic bias in how you generate tasks. - * <li> Large task queue capacities may mean that too many tasks are being - * generated before they can be run. - * Capacities are reported rather than current numbers of tasks - * in queues because they are better indicators of the existence - * of these kinds of possibly-transient problems. - * Queue capacities are - * resized on demand from their initial value of 4096 elements, - * which is much more than sufficient for the kinds of - * applications that this framework is intended to best support. - * </ul> - **/ - - public void stats() { - long time = System.currentTimeMillis() - initTime; - double secs = ((double)time) / 1000.0; - long totalRuns = 0; - long totalScans = 0; - long totalSteals = 0; - - System.out.print("Thread" + - "\tQ Cap" + - "\tScans" + - "\tNew" + - "\tRuns" + - "\n"); - - for (int i = 0; i < threads.length; ++i) { - FJTaskRunner t = threads[i]; - int truns = t.runs; - totalRuns += truns; - - int tscans = t.scans; - totalScans += tscans; - - int tsteals = t.steals; - totalSteals += tsteals; - - String star = (getActive(t))? "*" : " "; - - - System.out.print("T" + i + star + - "\t" + t.deqSize() + - "\t" + tscans + - "\t" + tsteals + - "\t" + truns + - "\n"); - } - - System.out.print("Total" + - "\t " + - "\t" + totalScans + - "\t" + totalSteals + - "\t" + totalRuns + - "\n"); - - System.out.print("Execute: " + entries); - - System.out.print("\tTime: " + secs); - - long rps = 0; - if (secs != 0) rps = Math.round((double)(totalRuns) / secs); - - System.out.println("\tRate: " + rps); - } - - - /* ------------ Methods called only by FJTaskRunners ------------- */ - - - /** - * Return the array of threads in this group. - * Called only by FJTaskRunner.scan(). - **/ - - public FJTaskRunner[] getArray() { return threads; } - - - /** - * Return a task from entry queue, or null if empty. - * Called only by FJTaskRunner.scan(). - **/ - - public FJTask pollEntryQueue() { - try { - FJTask t = (FJTask)(entryQueue.poll(0)); - return t; - } - catch(InterruptedException ex) { // ignore interrupts - Thread.currentThread().interrupt(); - return null; - } - } - - - /** - * Return active status of t. - * Per-thread active status can only be accessed and - * modified via synchronized method here in the group class. - **/ - - protected synchronized boolean getActive(FJTaskRunner t) { - return t.active; - } - - - /** - * Set active status of thread t to true, and notify others - * that might be waiting for work. - **/ - - public synchronized void setActive(FJTaskRunner t) { - if (!t.active) { - t.active = true; - ++activeCount; - if (nstarted < threads.length) - threads[nstarted++].start(); - else - notifyAll(); - } - } - - /** - * Set active status of thread t to false. - **/ - - public synchronized void setInactive(FJTaskRunner t) { - if (t.active) { - t.active = false; - --activeCount; - } - } - - /** - * The number of times to scan other threads for tasks - * before transitioning to a mode where scans are - * interleaved with sleeps (actually timed waits). - * Upon transition, sleeps are for duration of - * scans / SCANS_PER_SLEEP milliseconds. - * <p> - * This is not treated as a user-tunable parameter because - * good values do not appear to vary much across JVMs or - * applications. Its main role is to help avoid some - * useless spinning and contention during task startup. - **/ - static final long SCANS_PER_SLEEP = 15; - - /** - * The maximum time (in msecs) to sleep when a thread is idle, - * yet others are not, so may eventually generate work that - * the current thread can steal. This value reflects the maximum time - * that a thread may sleep when it possibly should not, because there - * are other active threads that might generate work. In practice, - * designs in which some threads become stalled because others - * are running yet not generating tasks are not likely to work - * well in this framework anyway, so the exact value does not matter - * too much. However, keeping it in the sub-second range does - * help smooth out startup and shutdown effects. - **/ - - static final long MAX_SLEEP_TIME = 100; - - /** - * Set active status of thread t to false, and - * then wait until: (a) there is a task in the entry - * queue, or (b) other threads are active, or (c) the current - * thread is interrupted. Upon return, it - * is not certain that there will be work available. - * The thread must itself check. - * <p> - * The main underlying reason - * for these mechanics is that threads do not - * signal each other when they add elements to their queues. - * (This would add to task overhead, reduce locality. - * and increase contention.) - * So we must rely on a tamed form of polling. However, tasks - * inserted into the entry queue do result in signals, so - * tasks can wait on these if all of them are otherwise idle. - **/ - - public synchronized void checkActive(FJTaskRunner t, long scans) { - - setInactive(t); - - try { - // if nothing available, do a hard wait - if (activeCount == 0 && entryQueue.peek() == null) { - wait(); - } - else { - // If there is possibly some work, - // sleep for a while before rechecking - - long msecs = scans / SCANS_PER_SLEEP; - if (msecs > MAX_SLEEP_TIME) msecs = MAX_SLEEP_TIME; - int nsecs = (msecs == 0) ? 1 : 0; // forces shortest possible sleep - wait(msecs, nsecs); - } - } - catch (InterruptedException ex) { - notify(); // avoid lost notifies on interrupts - Thread.currentThread().interrupt(); - } - } - - /* ------------ Utility methods ------------- */ - - /** - * Start or wake up any threads waiting for work - **/ - - protected synchronized void signalNewTask() { - if (COLLECT_STATS) ++entries; - if (nstarted < threads.length) - threads[nstarted++].start(); - else - notify(); - } - - /** - * Create all FJTaskRunner threads in this group. - **/ - - protected void initializeThreads() { - for (int i = 0; i < threads.length; ++i) threads[i] = new FJTaskRunner(this); - } - - - - - /** - * Wrap wait/notify mechanics around a task so that - * invoke() can wait it out - **/ - protected static final class InvokableFJTask extends FJTask { - protected final Runnable wrapped; - protected boolean terminated = false; - - protected InvokableFJTask(Runnable r) { wrapped = r; } - - public void run() { - try { - if (wrapped instanceof FJTask) - FJTask.invoke((FJTask)(wrapped)); - else - wrapped.run(); - } - finally { - setTerminated(); - } - } - - protected synchronized void setTerminated() { - terminated = true; - notifyAll(); - } - - protected synchronized void awaitTermination() throws InterruptedException { - while (!terminated) wait(); - } - } - - -} - diff --git a/src/actors/scala/actors/FJTaskScheduler2.scala b/src/actors/scala/actors/FJTaskScheduler2.scala deleted file mode 100644 index cf26b33916..0000000000 --- a/src/actors/scala/actors/FJTaskScheduler2.scala +++ /dev/null @@ -1,133 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -// $Id$ - -package scala.actors - -import compat.Platform - -import java.lang.{Runnable, Thread, InterruptedException, System, Runtime} -import java.lang.Thread.State - -import scala.collection.Set -import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, HashSet} - -import scheduler.{ThreadPoolConfig, QuitException} - -/** - * FJTaskScheduler2 - * - * @version 0.9.18 - * @author Philipp Haller - */ -class FJTaskScheduler2(val initCoreSize: Int, val maxSize: Int, daemon: Boolean) extends Thread with IScheduler with ActorGC { - setDaemon(daemon) - - /** Default constructor creates a non-daemon thread. */ - def this() = - this(ThreadPoolConfig.corePoolSize, ThreadPoolConfig.maxPoolSize, false) - - def this(daemon: Boolean) = - this(ThreadPoolConfig.corePoolSize, ThreadPoolConfig.maxPoolSize, daemon) - - var printStats = false - - private var coreSize = initCoreSize - - private val executor = - new FJTaskRunnerGroup(coreSize) - - private var terminating = false - private var suspending = false - - private var submittedTasks = 0 - - private val CHECK_FREQ = 100 - - private def allWorkersBlocked: Boolean = - executor.threads.forall(t => { - val s = t.getState() - s == State.BLOCKED || s == State.WAITING || s == State.TIMED_WAITING - }) - - override def run() { - try { - while (!terminating) { - this.synchronized { - try { - wait(CHECK_FREQ) - } catch { - case _: InterruptedException => - if (terminating) throw new QuitException - } - - if (!suspending) { - - gc() - - // check if we need more threads - if (coreSize < maxSize - && allWorkersBlocked - && executor.checkPoolSize()) { - //Debug.info(this+": increasing thread pool size") - coreSize += 1 - } - else { - if (allTerminated) { - // if all worker threads idle terminate - if (executor.getActiveCount() == 0) { - Debug.info(this+": initiating shutdown...") - - // Note that we don't have to shutdown - // the FJTaskRunnerGroup since there is - // no separate thread associated with it, - // and FJTaskRunner threads have daemon status. - throw new QuitException - } - } - } - } - } // sync - - } // while (!terminating) - } catch { - case _: QuitException => - // allow thread to exit - if (printStats) executor.stats() - } - } - - /** - * @param task the task to be executed - */ - def execute(task: Runnable): Unit = - executor execute task - - def execute(fun: => Unit): Unit = - executor.execute(new Runnable { - def run() { fun } - }) - - /** Shuts down all idle worker threads. - */ - def shutdown(): Unit = synchronized { - terminating = true - } - - def snapshot(): LinkedQueue = { - suspending = true - executor.snapshot() - } - - def isActive = !terminating && !suspending - - def managedBlock(blocker: scala.concurrent.ManagedBlocker) { - blocker.block() - } -} diff --git a/src/actors/scala/actors/IFJTaskRunnerGroup.java b/src/actors/scala/actors/IFJTaskRunnerGroup.java deleted file mode 100644 index dbbb325710..0000000000 --- a/src/actors/scala/actors/IFJTaskRunnerGroup.java +++ /dev/null @@ -1,18 +0,0 @@ - -package scala.actors; - -/** - * IFJTaskRunnerGroup - * - * @version 0.9.8 - * @author Philipp Haller - */ -interface IFJTaskRunnerGroup { - public void executeTask(FJTask t); - public FJTaskRunner[] getArray(); - public FJTask pollEntryQueue(); - public void setActive(FJTaskRunner t); - public void checkActive(FJTaskRunner t, long scans); - public void setInactive(FJTaskRunner t); - public LinkedQueue getEntryQueue(); -} |