summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2009-10-27 09:43:44 +0000
committerPhilipp Haller <hallerp@gmail.com>2009-10-27 09:43:44 +0000
commit42a42ac0c346d48594958ac80d5eb814b8aa0c39 (patch)
treeb4e34bd91d434376236c18d4d1528f46559d5704
parentf9551d0c2f92ded7ef574121d89cda742e407f2d (diff)
downloadscala-42a42ac0c346d48594958ac80d5eb814b8aa0c39.tar.gz
scala-42a42ac0c346d48594958ac80d5eb814b8aa0c39.tar.bz2
scala-42a42ac0c346d48594958ac80d5eb814b8aa0c39.zip
Removed old fork-join library. Fixed build.
-rw-r--r--src/actors/scala/actors/FJTask.java531
-rw-r--r--src/actors/scala/actors/FJTaskRunner.java996
-rw-r--r--src/actors/scala/actors/FJTaskRunnerGroup.java714
-rw-r--r--src/actors/scala/actors/FJTaskScheduler2.scala133
-rw-r--r--src/actors/scala/actors/IFJTaskRunnerGroup.java18
5 files changed, 0 insertions, 2392 deletions
diff --git a/src/actors/scala/actors/FJTask.java b/src/actors/scala/actors/FJTask.java
deleted file mode 100644
index 6398f1400e..0000000000
--- a/src/actors/scala/actors/FJTask.java
+++ /dev/null
@@ -1,531 +0,0 @@
-/*
- File: Task.java
-
- Originally written by Doug Lea and released into the public domain.
- This may be used for any purposes whatsoever without acknowledgment.
- Thanks for the assistance and support of Sun Microsystems Labs,
- and everyone contributing, testing, and using this code.
-
- History:
- Date Who What
- 7Jan1999 dl first release
- 14jan1999 dl simplify start() semantics;
- improve documentation
- 18Jan1999 dl Eliminate useless time-based waits.
- 7Mar1999 dl Add reset method,
- add array-based composite operations
- 27Apr1999 dl Rename
-*/
-
-package scala.actors;
-
-
-/**
- * Abstract base class for Fork/Join Tasks.
- *
- * <p>
- * FJTasks are lightweight, stripped-down analogs of Threads.
- * Many FJTasks share the same pool of Java threads. This is
- * supported by the FJTaskRunnerGroup and FJTaskRunner classes, that
- * mainly contain
- * methods called only internally by FJTasks.
- * FJTasks support versions of the most common methods found in class Thread,
- * including start(), yield() and join(). However, they
- * don't support priorities, ThreadGroups or other bookkeeping
- * or control methods of class Thread.
- * <p>
- * FJTasks should normally be defined by subclassing and adding a run() method.
- * Alternatively, static inner class <code>Wrap(Runnable r)</code>
- * can be used to
- * wrap an existing Runnable object in a FJTask.
- * <p>
- * <code>FJTaskRunnerGroup.execute(FJTask)</code> can be used to
- * initiate a FJTask from a non-FJTask thread.
- * And <code>FJTaskRunnerGroup.invoke(FJTask)</code> can be used to initiate
- * a FJTask and then wait for it to complete before returning.
- * These are the only entry-points from normal threads to FJTasks.
- * Most FJTask methods themselves may only be called from within running FJTasks.
- * They throw ClassCastExceptions if they are not,
- * reflecting the fact that these methods
- * can only be executed using FJTaskRunner threads, not generic
- * java.lang.Threads.
- * <p>
- * There are three different ways to run a FJTask,
- * with different scheduling semantics:
- * <ul>
- * <li> FJTask.start() (as well as FJTaskRunnerGroup.execute(FJTask))
- * behaves pretty much like Thread.start(). It enqueues a task to be
- * run the next time any FJTaskRunner thread is otherwise idle.
- * It maintains standard FIFO ordering with respect to
- * the group of worker threads.
- * <li> FJTask.fork() (as well as the two-task spawning method,
- * coInvoke(task1, task2), and the array version
- * coInvoke(FJTask[] tasks)) starts a task
- * that will be executed in
- * procedure-call-like LIFO order if executed by the
- * same worker thread as the one that created it, but is FIFO
- * with respect to other tasks if it is run by
- * other worker threads. That is, earlier-forked
- * tasks are preferred to later-forked tasks by other idle workers.
- * Fork() is noticeably faster than start(), but can only be
- * used when these scheduling semantics are acceptable.
- * <li> FJTask.invoke(FJTask) just executes the run method
- * of one task from within another. It is the analog of a
- * direct call.
- * </ul>
- * <p>
- * The main economies of FJTasks stem from the fact that
- * FJTasks do not support blocking operations of any kind.
- * FJTasks should just run to completion without
- * issuing waits or performing blocking IO.
- * There are several styles for creating the run methods that
- * execute as tasks, including
- * event-style methods, and pure computational methods.
- * Generally, the best kinds of FJTasks are those that in turn
- * generate other FJTasks.
- * <p>
- * There is nothing actually
- * preventing you from blocking within a FJTask, and very short waits/blocks are
- * completely well behaved. But FJTasks are not designed
- * to support arbitrary synchronization
- * since there is no way to suspend and resume individual tasks
- * once they have begun executing. FJTasks should also be finite
- * in duration -- they should not contain infinite loops.
- * FJTasks that might need to perform a blocking
- * action, or hold locks for extended periods, or
- * loop forever can instead create normal
- * java Thread objects that will do so. FJTasks are just not
- * designed to support these things.
- * FJTasks may however yield() control to allow their FJTaskRunner threads
- * to run other tasks,
- * and may wait for other dependent tasks via join(). These
- * are the only coordination mechanisms supported by FJTasks.
- * <p>
- * FJTasks, and the FJTaskRunners that execute them are not
- * intrinsically robust with respect to exceptions.
- * A FJTask that aborts via an exception does not automatically
- * have its completion flag (isDone) set.
- * As with ordinary Threads, an uncaught exception will normally cause
- * its FJTaskRunner thread to die, which in turn may sometimes
- * cause other computations being performed to hang or abort.
- * You can of course
- * do better by trapping exceptions inside the run methods of FJTasks.
- * <p>
- * The overhead differences between FJTasks and Threads are substantial,
- * especially when using fork() or coInvoke().
- * FJTasks can be two or three orders of magnitude faster than Threads,
- * at least when run on JVMs with high-performance garbage collection
- * (every FJTask quickly becomes garbage) and good native thread support.
- * <p>
- * Given these overhead savings, you might be tempted to use FJTasks for
- * everything you would use a normal Thread to do. Don't. Java Threads
- * remain better for general purpose thread-based programming. Remember
- * that FJTasks cannot be used for designs involving arbitrary blocking
- * synchronization or I/O. Extending FJTasks to support such capabilities
- * would amount to re-inventing the Thread class, and would make them
- * less optimal in the contexts that they were designed for.
- * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
- * <p>
- * @see FJTaskRunner
- * @see FJTaskRunnerGroup
- **/
-
-public abstract class FJTask implements Runnable {
-
- /**
- * The only status information associated with FJTasks is whether
- * the they are considered to have completed.
- * It is set true automatically within
- * FJTaskRunner methods upon completion
- * of the run method, or manually via cancel.
- **/
-
- private volatile boolean done; // = false;
-
- /**
- * Return the FJTaskRunner thread running the current FJTask.
- * Most FJTask methods are just relays to their current
- * FJTaskRunners, that perform the indicated actions.
- * @exception ClassCastException if caller thread is not a
- * running FJTask.
- **/
-
- public static FJTaskRunner getFJTaskRunner() {
- return (FJTaskRunner)(Thread.currentThread());
- }
-
- /**
- * Return the FJTaskRunnerGroup of the thread running the current FJTask.
- * @exception ClassCastException if caller thread is not a
- * running FJTask.
- **/
- public static IFJTaskRunnerGroup getFJTaskRunnerGroup() {
- return getFJTaskRunner().getGroup();
- }
-
-
- /**
- * Return true if current task has terminated or been cancelled.
- * The method is a simple analog of the Thread.isAlive()
- * method. However, it reports true only when the task has terminated
- * or has been cancelled. It does not distinguish these two cases.
- * And there is no way to determine whether a FJTask has been started
- * or is currently executing.
- **/
-
- public final boolean isDone() { return done; }
-
- /**
- * Indicate termination. Intended only to be called by FJTaskRunner.
- * FJTasks themselves should use (non-final) method
- * cancel() to suppress execution.
- **/
-
- protected final void setDone() { done = true; }
-
- /**
- * Set the termination status of this task. This simple-minded
- * analog of Thread.interrupt
- * causes the task not to execute if it has not already been started.
- * Cancelling a running FJTask
- * has no effect unless the run method itself uses isDone()
- * to probe cancellation and take appropriate action.
- * Individual run() methods may sense status and
- * act accordingly, normally by returning early.
- **/
-
- public void cancel() { setDone(); }
-
-
- /**
- * Clear the termination status of this task.
- * This method is intended to be used
- * only as a means to allow task objects to be recycled. It should
- * be called only when you are sure that the previous
- * execution of this task has terminated and, if applicable, has
- * been joined by all other waiting tasks. Usage in any other
- * context is a very bad idea.
- **/
-
- public void reset() { done = false; }
-
-
- /**
- * Execute this task. This method merely places the task in a
- * group-wide scheduling queue.
- * It will be run
- * the next time any TaskRunner thread is otherwise idle.
- * This scheduling maintains FIFO ordering of started tasks
- * with respect to
- * the group of worker threads.
- * @exception ClassCastException if caller thread is not
- * running in a FJTaskRunner thread.
- **/
-
- public void start() { getFJTaskRunnerGroup().executeTask(this); }
-
-
- /**
- * Arrange for execution of a strictly dependent task.
- * The task that will be executed in
- * procedure-call-like LIFO order if executed by the
- * same worker thread, but is FIFO with respect to other tasks
- * forked by this thread when taken by other worker threads.
- * That is, earlier-forked
- * tasks are preferred to later-forked tasks by other idle workers.
- * <p>
- * Fork() is noticeably
- * faster than start(). However, it may only
- * be used for strictly dependent tasks -- generally, those that
- * could logically be issued as straight method calls without
- * changing the logic of the program.
- * The method is optimized for use in parallel fork/join designs
- * in which the thread that issues one or more forks
- * cannot continue until at least some of the forked
- * threads terminate and are joined.
- * @exception ClassCastException if caller thread is not
- * running in a FJTaskRunner thread.
- **/
-
- public void fork() { getFJTaskRunner().push(this); }
-
- /**
- * Allow the current underlying FJTaskRunner thread to process other tasks.
- * <p>
- * Spinloops based on yield() are well behaved so long
- * as the event or condition being waited for is produced via another
- * FJTask. Additionally, you must never hold a lock
- * while performing a yield or join. (This is because
- * multiple FJTasks can be run by the same Thread during
- * a yield. Since java locks are held per-thread, the lock would not
- * maintain the conceptual exclusion you have in mind.)
- * <p>
- * Otherwise, spinloops using
- * yield are the main construction of choice when a task must wait
- * for a condition that it is sure will eventually occur because it
- * is being produced by some other FJTask. The most common
- * such condition is built-in: join() repeatedly yields until a task
- * has terminated after producing some needed results. You can also
- * use yield to wait for callbacks from other FJTasks, to wait for
- * status flags to be set, and so on. However, in all these cases,
- * you should be confident that the condition being waited for will
- * occur, essentially always because it is produced by
- * a FJTask generated by the current task, or one of its subtasks.
- *
- * @exception ClassCastException if caller thread is not
- * running in a FJTaskRunner thread.
- **/
-
- public static void yield() { getFJTaskRunner().taskYield(); }
-
- /**
- * Yield until this task isDone.
- * Equivalent to <code>while(!isDone()) yield(); </code>
- * @exception ClassCastException if caller thread is not
- * running in a FJTaskRunner thread.
- **/
-
- public void join() { getFJTaskRunner().taskJoin(this); }
-
- /**
- * Immediately execute task t by calling its run method. Has no
- * effect if t has already been run or has been cancelled.
- * It is equivalent to calling t.run except that it
- * deals with completion status, so should always be used
- * instead of directly calling run.
- * The method can be useful
- * when a computation has been packaged as a FJTask, but you just need to
- * directly execute its body from within some other task.
- **/
-
- public static void invoke(FJTask t) {
- if (!t.isDone()) {
- t.run();
- t.setDone();
- }
- }
-
- /**
- * Fork both tasks and then wait for their completion. It behaves as:
- * <pre>
- * task1.fork(); task2.fork(); task2.join(); task1.join();
- * </pre>
- * As a simple classic example, here is
- * a class that computes the Fibonacci function:
- * <pre>
- * public class Fib extends FJTask {
- *
- * // Computes fibonacci(n) = fibonacci(n-1) + fibonacci(n-2); for n> 1
- * // fibonacci(0) = 0;
- * // fibonacci(1) = 1.
- *
- * // Value to compute fibonacci function for.
- * // It is replaced with the answer when computed.
- * private volatile int number;
- *
- * public Fib(int n) { number = n; }
- *
- * public int getAnswer() {
- * if (!isDone()) throw new Error("Not yet computed");
- * return number;
- * }
- *
- * public void run() {
- * int n = number;
- * if (n > 1) {
- * Fib f1 = new Fib(n - 1);
- * Fib f2 = new Fib(n - 2);
- *
- * coInvoke(f1, f2); // run these in parallel
- *
- * // we know f1 and f2 are computed, so just directly access numbers
- * number = f1.number + f2.number;
- * }
- * }
- *
- * public static void main(String[] args) { // sample driver
- * try {
- * int groupSize = 2; // 2 worker threads
- * int num = 35; // compute fib(35)
- * FJTaskRunnerGroup group = new FJTaskRunnerGroup(groupSize);
- * Fib f = new Fib(num);
- * group.invoke(f);
- * int result = f.getAnswer();
- * System.out.println(" Answer: " + result);
- * }
- * catch (InterruptedException ex) {
- * System.out.println("Interrupted");
- * }
- * }
- * }
- * </pre>
- *
- * @exception ClassCastException if caller thread is not
- * running in a FJTaskRunner thread.
- **/
-
- public static void coInvoke(FJTask task1, FJTask task2) {
- getFJTaskRunner().coInvoke(task1, task2);
- }
-
-
- /**
- * Fork all tasks in array, and await their completion.
- * Behaviorally equivalent to:
- * <pre>
- * for (int i = 0; i &lt; tasks.length; ++i) tasks[i].fork();
- * for (int i = 0; i &lt; tasks.length; ++i) tasks[i].join();
- * </pre>
- **/
-
- public static void coInvoke(FJTask[] tasks) {
- getFJTaskRunner().coInvoke(tasks);
- }
-
- /**
- * A FJTask that holds a Runnable r, and calls r.run when executed.
- * The class is a simple utilty to allow arbitrary Runnables
- * to be used as FJTasks.
- **/
-
- public static class Wrap extends FJTask {
- protected final Runnable runnable;
- public Wrap(Runnable r) { runnable = r; }
- public void run() { runnable.run(); }
- }
-
-
- /**
- * A <code>new Seq</code>, when executed,
- * invokes each task provided in the constructor, in order.
- * The class is a simple utility
- * that makes it easier to create composite FJTasks.
- **/
- public static class Seq extends FJTask {
- protected final FJTask[] tasks;
-
- /**
- * Construct a Seq that, when executed, will process each of the
- * tasks in the tasks array in order
- **/
- public Seq(FJTask[] tasks) {
- this.tasks = tasks;
- }
-
- /**
- * Two-task constructor, for compatibility with previous release.
- **/
- public Seq(FJTask task1, FJTask task2) {
- this.tasks = new FJTask[] { task1, task2 };
- }
-
- public void run() {
- for (int i = 0; i < tasks.length; ++i) FJTask.invoke(tasks[i]);
- }
- }
-
- /**
- * Construct and return a FJTask object that, when executed, will
- * invoke the tasks in the tasks array in array order
- **/
-
- public static FJTask seq(FJTask[] tasks) {
- return new Seq(tasks);
- }
-
- /**
- * A <code>new Par</code>, when executed,
- * runs the tasks provided in the constructor in parallel using
- * coInvoke(tasks).
- * The class is a simple utility
- * that makes it easier to create composite FJTasks.
- **/
- public static class Par extends FJTask {
- protected final FJTask[] tasks;
-
- /**
- * Construct a Seq that, when executed, will process each of the
- * tasks in the tasks array in parallel
- **/
- public Par(FJTask[] tasks) {
- this.tasks = tasks;
- }
-
- /**
- * Two-task constructor, for compatibility with previous release.
- **/
- public Par(FJTask task1, FJTask task2) {
- this.tasks = new FJTask[] { task1, task2 };
- }
-
-
- public void run() {
- FJTask.coInvoke(tasks);
- }
- }
-
-
- /**
- * Construct and return a FJTask object that, when executed, will
- * invoke the tasks in the tasks array in parallel using coInvoke
- **/
- public static FJTask par(FJTask[] tasks) {
- return new Par(tasks);
- }
-
- /**
- * A <code>new Seq2(task1, task2)</code>, when executed,
- * invokes task1 and then task2, in order.
- * The class is a simple utility
- * that makes it easier to create composite Tasks.
- **/
- public static class Seq2 extends FJTask {
- protected final FJTask fst;
- protected final FJTask snd;
- public Seq2(FJTask task1, FJTask task2) {
- fst = task1;
- snd = task2;
- }
- public void run() {
- FJTask.invoke(fst);
- FJTask.invoke(snd);
- }
- }
-
- /**
- * Construct and return a FJTask object that, when executed, will
- * invoke task1 and task2, in order
- **/
-
- public static FJTask seq(FJTask task1, FJTask task2) {
- return new Seq2(task1, task2);
- }
-
- /**
- * A <code>new Par(task1, task2)</code>, when executed,
- * runs task1 and task2 in parallel using coInvoke(task1, task2).
- * The class is a simple utility
- * that makes it easier to create composite Tasks.
- **/
- public static class Par2 extends FJTask {
- protected final FJTask fst;
- protected final FJTask snd;
- public Par2(FJTask task1, FJTask task2) {
- fst = task1;
- snd = task2;
- }
- public void run() {
- FJTask.coInvoke(fst, snd);
- }
- }
-
-
- /**
- * Construct and return a FJTask object that, when executed, will
- * invoke task1 and task2, in parallel
- **/
- public static FJTask par(FJTask task1, FJTask task2) {
- return new Par2(task1, task2);
- }
-
-}
diff --git a/src/actors/scala/actors/FJTaskRunner.java b/src/actors/scala/actors/FJTaskRunner.java
deleted file mode 100644
index 73bc9ff1dd..0000000000
--- a/src/actors/scala/actors/FJTaskRunner.java
+++ /dev/null
@@ -1,996 +0,0 @@
-/*
- File: FJTaskRunner.java
-
- Originally written by Doug Lea and released into the public domain.
- This may be used for any purposes whatsoever without acknowledgment.
- Thanks for the assistance and support of Sun Microsystems Labs,
- and everyone contributing, testing, and using this code.
-
- History:
- Date Who What
- 7Jan1999 dl First public release
- 13Jan1999 dl correct a stat counter update;
- ensure inactive status on run termination;
- misc minor cleaup
- 14Jan1999 dl Use random starting point in scan;
- variable renamings.
- 18Jan1999 dl Runloop allowed to die on task exception;
- remove useless timed join
- 22Jan1999 dl Rework scan to allow use of priorities.
- 6Feb1999 dl Documentation updates.
- 7Mar1999 dl Add array-based coInvoke
- 31Mar1999 dl Revise scan to remove need for NullTasks
- 27Apr1999 dl Renamed
- 23oct1999 dl Earlier detect of interrupt in scanWhileIdling
- 24nov1999 dl Now works on JVMs that do not properly
- implement read-after-write of 2 volatiles.
-*/
-
-package scala.actors;
-
-import java.util.Random;
-
-/**
- * Specialized Thread subclass for running FJTasks.
- * <p>
- * Each FJTaskRunner keeps FJTasks in a double-ended queue (DEQ).
- * Double-ended queues support stack-based operations
- * push and pop, as well as queue-based operations put and take.
- * Normally, threads run their own tasks. But they
- * may also steal tasks from each others DEQs.
- * <p>
- * The algorithms are minor variants of those used
- * in <A href="http://supertech.lcs.mit.edu/cilk/"> Cilk</A> and
- * <A href="http://www.cs.utexas.edu/users/hood/"> Hood</A>, and
- * to a lesser extent
- * <A href="http://www.cs.uga.edu/~dkl/filaments/dist.html"> Filaments</A>,
- * but are adapted to work in Java.
- * <p>
- * The two most important capabilities are:
- * <ul>
- * <li> Fork a FJTask:
- * <pre>
- * Push task onto DEQ
- * </pre>
- * <li> Get a task to run (for example within taskYield)
- * <pre>
- * If DEQ is not empty,
- * Pop a task and run it.
- * Else if any other DEQ is not empty,
- * Take ("steal") a task from it and run it.
- * Else if the entry queue for our group is not empty,
- * Take a task from it and run it.
- * Else if current thread is otherwise idling
- * If all threads are idling
- * Wait for a task to be put on group entry queue
- * Else
- * Yield or Sleep for a while, and then retry
- * </pre>
- * </ul>
- * The push, pop, and put are designed to only ever called by the
- * current thread, and take (steal) is only ever called by
- * other threads.
- * All other operations are composites and variants of these,
- * plus a few miscellaneous bookkeeping methods.
- * <p>
- * Implementations of the underlying representations and operations
- * are geared for use on JVMs operating on multiple CPUs (although
- * they should of course work fine on single CPUs as well).
- * <p>
- * A possible snapshot of a FJTaskRunner's DEQ is:
- * <pre>
- * 0 1 2 3 4 5 6 ...
- * +-----+-----+-----+-----+-----+-----+-----+--
- * | | t | t | t | t | | | ... deq array
- * +-----+-----+-----+-----+-----+-----+-----+--
- * ^ ^
- * base top
- * (incremented (incremented
- * on take, on push
- * decremented decremented
- * on put) on pop)
- * </pre>
- * <p>
- * FJTasks are held in elements of the DEQ.
- * They are maintained in a bounded array that
- * works similarly to a circular bounded buffer. To ensure
- * visibility of stolen FJTasks across threads, the array elements
- * must be <code>volatile</code>.
- * Using volatile rather than synchronizing suffices here since
- * each task accessed by a thread is either one that it
- * created or one that has never seen before. Thus we cannot
- * encounter any staleness problems executing run methods,
- * although FJTask programmers must be still sure to either synch or use
- * volatile for shared data within their run methods.
- * <p>
- * However, since there is no way
- * to declare an array of volatiles in Java, the DEQ elements actually
- * hold VolatileTaskRef objects, each of which in turn holds a
- * volatile reference to a FJTask.
- * Even with the double-indirection overhead of
- * volatile refs, using an array for the DEQ works out
- * better than linking them since fewer shared
- * memory locations need to be
- * touched or modified by the threads while using the DEQ.
- * Further, the double indirection may alleviate cache-line
- * sharing effects (which cannot otherwise be directly dealt with in Java).
- * <p>
- * The indices for the <code>base</code> and <code>top</code> of the DEQ
- * are declared as volatile. The main contention point with
- * multiple FJTaskRunner threads occurs when one thread is trying
- * to pop its own stack while another is trying to steal from it.
- * This is handled via a specialization of Dekker's algorithm,
- * in which the popping thread pre-decrements <code>top</code>,
- * and then checks it against <code>base</code>.
- * To be conservative in the face of JVMs that only partially
- * honor the specification for volatile, the pop proceeds
- * without synchronization only if there are apparently enough
- * items for both a simultaneous pop and take to succeed.
- * It otherwise enters a
- * synchronized lock to check if the DEQ is actually empty,
- * if so failing. The stealing thread
- * does almost the opposite, but is set up to be less likely
- * to win in cases of contention: Steals always run under synchronized
- * locks in order to avoid conflicts with other ongoing steals.
- * They pre-increment <code>base</code>, and then check against
- * <code>top</code>. They back out (resetting the base index
- * and failing to steal) if the
- * DEQ is empty or is about to become empty by an ongoing pop.
- * <p>
- * A push operation can normally run concurrently with a steal.
- * A push enters a synch lock only if the DEQ appears full so must
- * either be resized or have indices adjusted due to wrap-around
- * of the bounded DEQ. The put operation always requires synchronization.
- * <p>
- * When a FJTaskRunner thread has no tasks of its own to run,
- * it tries to be a good citizen.
- * Threads run at lower priority while scanning for work.
- * <p>
- * If the task is currently waiting
- * via yield, the thread alternates scans (starting at a randomly
- * chosen victim) with Thread.yields. This is
- * well-behaved so long as the JVM handles Thread.yield in a
- * sensible fashion. (It need not. Thread.yield is so underspecified
- * that it is legal for a JVM to treat it as a no-op.) This also
- * keeps things well-behaved even if we are running on a uniprocessor
- * JVM using a simple cooperative threading model.
- * <p>
- * If a thread needing work is
- * is otherwise idle (which occurs only in the main runloop), and
- * there are no available tasks to steal or poll, it
- * instead enters into a sleep-based (actually timed wait(msec))
- * phase in which it progressively sleeps for longer durations
- * (up to a maximum of FJTaskRunnerGroup.MAX_SLEEP_TIME,
- * currently 100ms) between scans.
- * If all threads in the group
- * are idling, they further progress to a hard wait phase, suspending
- * until a new task is entered into the FJTaskRunnerGroup entry queue.
- * A sleeping FJTaskRunner thread may be awakened by a new
- * task being put into the group entry queue or by another FJTaskRunner
- * becoming active, but not merely by some DEQ becoming non-empty.
- * Thus the MAX_SLEEP_TIME provides a bound for sleep durations
- * in cases where all but one worker thread start sleeping
- * even though there will eventually be work produced
- * by a thread that is taking a long time to place tasks in DEQ.
- * These sleep mechanics are handled in the FJTaskRunnerGroup class.
- * <p>
- * Composite operations such as taskJoin include heavy
- * manual inlining of the most time-critical operations
- * (mainly FJTask.invoke).
- * This opens up a few opportunities for further hand-optimizations.
- * Until Java compilers get a lot smarter, these tweaks
- * improve performance significantly enough for task-intensive
- * programs to be worth the poorer maintainability and code duplication.
- * <p>
- * Because they are so fragile and performance-sensitive, nearly
- * all methods are declared as final. However, nearly all fields
- * and methods are also declared as protected, so it is possible,
- * with much care, to extend functionality in subclasses. (Normally
- * you would also need to subclass FJTaskRunnerGroup.)
- * <p>
- * None of the normal java.lang.Thread class methods should ever be called
- * on FJTaskRunners. For this reason, it might have been nicer to
- * declare FJTaskRunner as a Runnable to run within a Thread. However,
- * this would have complicated many minor logistics. And since
- * no FJTaskRunner methods should normally be called from outside the
- * FJTask and FJTaskRunnerGroup classes either, this decision doesn't impact
- * usage.
- * <p>
- * You might think that layering this kind of framework on top of
- * Java threads, which are already several levels removed from raw CPU
- * scheduling on most systems, would lead to very poor performance.
- * But on the platforms
- * tested, the performance is quite good.
- * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
- * @see FJTask
- * @see FJTaskRunnerGroup
- **/
-
-public class FJTaskRunner extends Thread {
-
- /** The group of which this FJTaskRunner is a member **/
- protected final IFJTaskRunnerGroup group;
-
- /**
- * Constructor called only during FJTaskRunnerGroup initialization
- **/
-
- /*protected*/ public FJTaskRunner(IFJTaskRunnerGroup g) {
- group = g;
- victimRNG = new Random(System.identityHashCode(this));
- runPriority = getPriority();
- setDaemon(true);
- }
-
- /**
- * Return the FJTaskRunnerGroup of which this thread is a member
- **/
-
- protected final IFJTaskRunnerGroup getGroup() { return group; }
-
-
- /* ------------ DEQ Representation ------------------- */
-
-
- /**
- * FJTasks are held in an array-based DEQ with INITIAL_CAPACITY
- * elements. The DEQ is grown if necessary, but default value is
- * normally much more than sufficient unless there are
- * user programming errors or questionable operations generating
- * large numbers of Tasks without running them.
- * Capacities must be a power of two.
- **/
-
- protected static final int INITIAL_CAPACITY = 4096;
-
- /**
- * The maximum supported DEQ capacity.
- * When exceeded, FJTaskRunner operations throw Errors
- **/
-
- protected static final int MAX_CAPACITY = 1 << 30;
-
- /**
- * An object holding a single volatile reference to a FJTask.
- **/
-
- protected final static class VolatileTaskRef {
- /** The reference **/
- protected volatile FJTask ref;
-
- /** Set the reference **/
- protected final void put(FJTask r) { ref = r; }
- /** Return the reference **/
- protected final FJTask get() { return ref; }
- /** Return the reference and clear it **/
- protected final FJTask take() { FJTask r = ref; ref = null; return r; }
-
- /**
- * Initialization utility for constructing arrays.
- * Make an array of given capacity and fill it with
- * VolatileTaskRefs.
- **/
- protected static VolatileTaskRef[] newArray(int cap) {
- VolatileTaskRef[] a = new VolatileTaskRef[cap];
- for (int k = 0; k < cap; k++) a[k] = new VolatileTaskRef();
- return a;
- }
-
- }
-
- /**
- * The DEQ array.
- **/
-
- protected VolatileTaskRef[] deq = VolatileTaskRef.newArray(INITIAL_CAPACITY);
-
- /** Current size of the task DEQ **/
- protected int deqSize() { return deq.length; }
-
- /**
- * Current top of DEQ. Generally acts just like a stack pointer in an
- * array-based stack, except that it circularly wraps around the
- * array, as in an array-based queue. The value is NOT
- * always kept within <code>0 ... deq.length</code> though.
- * The current top element is always at <code>top & (deq.length-1)</code>.
- * To avoid integer overflow, top is reset down
- * within bounds whenever it is noticed to be out out bounds;
- * at worst when it is at <code>2 * deq.length</code>.
- **/
- protected volatile int top = 0;
-
-
- /**
- * Current base of DEQ. Acts like a take-pointer in an
- * array-based bounded queue. Same bounds and usage as top.
- **/
-
- protected volatile int base = 0;
-
-
- /**
- * An extra object to synchronize on in order to
- * achieve a memory barrier.
- **/
-
- protected final Object barrier = new Object();
-
- /* ------------ Other BookKeeping ------------------- */
-
- /**
- * Record whether current thread may be processing a task
- * (i.e., has been started and is not in an idle wait).
- * Accessed, under synch, ONLY by FJTaskRunnerGroup, but the field is
- * stored here for simplicity.
- **/
-
- /*protected*/ public boolean active = false;
-
- /** Random starting point generator for scan() **/
- protected final Random victimRNG;
-
-
- /** Priority to use while scanning for work **/
- protected int scanPriority = Thread.MIN_PRIORITY + 1;
-
- /** Priority to use while running tasks **/
- protected int runPriority;
-
- /**
- * Set the priority to use while scanning.
- * We do not bother synchronizing access, since
- * by the time the value is needed, both this FJTaskRunner
- * and its FJTaskRunnerGroup will
- * necessarily have performed enough synchronization
- * to avoid staleness problems of any consequence.
- **/
- protected void setScanPriority(int pri) { scanPriority = pri; }
-
-
- /**
- * Set the priority to use while running tasks.
- * Same usage and rationale as setScanPriority.
- **/
- protected void setRunPriority(int pri) { runPriority = pri; }
-
- /**
- * Compile-time constant for statistics gathering.
- * Even when set, reported values may not be accurate
- * since all are read and written without synchronization.
- **/
-
-
-
- static final boolean COLLECT_STATS = true;
- // static final boolean COLLECT_STATS = false;
-
-
- // for stat collection
-
- /** Total number of tasks run **/
- protected int runs = 0;
-
- /** Total number of queues scanned for work **/
- protected int scans = 0;
-
- /** Total number of tasks obtained via scan **/
- protected int steals = 0;
-
-
-
- /* -------- Suspending -------- */
- protected boolean suspending = false;
-
- synchronized void setSuspending(boolean susp) {
- suspending = susp;
- }
-
- /* ------------ DEQ operations ------------------- */
-
-
- /**
- * Push a task onto DEQ.
- * Called ONLY by current thread.
- **/
-
- /*protected*/ public final void push(final FJTask r) {
- int t = top;
-
- /*
- This test catches both overflows and index wraps. It doesn't
- really matter if base value is in the midst of changing in take.
- As long as deq length is < 2^30, we are guaranteed to catch wrap in
- time since base can only be incremented at most length times
- between pushes (or puts).
- */
-
- if (t < (base & (deq.length-1)) + deq.length) {
-
- deq[t & (deq.length-1)].put(r);
- top = t + 1;
- }
-
- else // isolate slow case to increase chances push is inlined
- slowPush(r); // check overflow and retry
- }
-
-
- /**
- * Handle slow case for push
- **/
-
- protected synchronized void slowPush(final FJTask r) {
- checkOverflow();
- push(r); // just recurse -- this one is sure to succeed.
- }
-
-
- /**
- * Enqueue task at base of DEQ.
- * Called ONLY by current thread.
- * This method is currently not called from class FJTask. It could be used
- * as a faster way to do FJTask.start, but most users would
- * find the semantics too confusing and unpredictable.
- **/
-
- protected final synchronized void put(final FJTask r) {
- for (;;) {
- int b = base - 1;
- if (top < b + deq.length) {
-
- int newBase = b & (deq.length-1);
- deq[newBase].put(r);
- base = newBase;
-
- if (b != newBase) { // Adjust for index underflow
- int newTop = top & (deq.length-1);
- if (newTop < newBase) newTop += deq.length;
- top = newTop;
- }
- return;
- }
- else {
- checkOverflow();
- // ... and retry
- }
- }
- }
-
- /**
- * Return a popped task, or null if DEQ is empty.
- * Called ONLY by current thread.
- * <p>
- * This is not usually called directly but is
- * instead inlined in callers. This version differs from the
- * cilk algorithm in that pop does not fully back down and
- * retry in the case of potential conflict with take. It simply
- * rechecks under synch lock. This gives a preference
- * for threads to run their own tasks, which seems to
- * reduce flailing a bit when there are few tasks to run.
- **/
-
- protected final FJTask pop() {
- /*
- Decrement top, to force a contending take to back down.
- */
-
- int t = --top;
-
- /*
- To avoid problems with JVMs that do not properly implement
- read-after-write of a pair of volatiles, we conservatively
- grab without lock only if the DEQ appears to have at least two
- elements, thus guaranteeing that both a pop and take will succeed,
- even if the pre-increment in take is not seen by current thread.
- Otherwise we recheck under synch.
- */
-
- if (base + 1 < t)
- return deq[t & (deq.length-1)].take();
- else
- return confirmPop(t);
-
- }
-
-
- /**
- * Check under synch lock if DEQ is really empty when doing pop.
- * Return task if not empty, else null.
- **/
-
- protected final synchronized FJTask confirmPop(int provisionalTop) {
- if (base <= provisionalTop)
- return deq[provisionalTop & (deq.length-1)].take();
- else { // was empty
- /*
- Reset DEQ indices to zero whenever it is empty.
- This both avoids unnecessary calls to checkOverflow
- in push, and helps keep the DEQ from accumulating garbage
- */
-
- top = base = 0;
- return null;
- }
- }
-
-
- /**
- * Take a task from the base of the DEQ.
- * Always called by other threads via scan()
- **/
-
-
- protected final synchronized FJTask take() {
-
- /*
- Increment base in order to suppress a contending pop
- */
-
- int b = base++;
-
- if (b < top)
- return confirmTake(b);
- else {
- // back out
- base = b;
- return null;
- }
- }
-
-
- /**
- * double-check a potential take
- **/
-
- protected FJTask confirmTake(int oldBase) {
-
- /*
- Use a second (guaranteed uncontended) synch
- to serve as a barrier in case JVM does not
- properly process read-after-write of 2 volatiles
- */
-
- synchronized(barrier) {
- if (oldBase < top) {
- /*
- We cannot call deq[oldBase].take here because of possible races when
- nulling out versus concurrent push operations. Resulting
- accumulated garbage is swept out periodically in
- checkOverflow, or more typically, just by keeping indices
- zero-based when found to be empty in pop, which keeps active
- region small and constantly overwritten.
- */
-
- return deq[oldBase & (deq.length-1)].get();
- }
- else {
- base = oldBase;
- return null;
- }
- }
- }
-
-
- /**
- * Adjust top and base, and grow DEQ if necessary.
- * Called only while DEQ synch lock being held.
- * We don't expect this to be called very often. In most
- * programs using FJTasks, it is never called.
- **/
-
- protected void checkOverflow() {
- int t = top;
- int b = base;
-
- if (t - b < deq.length-1) { // check if just need an index reset
-
- int newBase = b & (deq.length-1);
- int newTop = top & (deq.length-1);
- if (newTop < newBase) newTop += deq.length;
- top = newTop;
- base = newBase;
-
- /*
- Null out refs to stolen tasks.
- This is the only time we can safely do it.
- */
-
- int i = newBase;
- while (i != newTop && deq[i].ref != null) {
- deq[i].ref = null;
- i = (i - 1) & (deq.length-1);
- }
-
- }
- else { // grow by doubling array
-
- int newTop = t - b;
- int oldcap = deq.length;
- int newcap = oldcap * 2;
-
- if (newcap >= MAX_CAPACITY)
- throw new Error("FJTask queue maximum capacity exceeded");
-
- VolatileTaskRef[] newdeq = new VolatileTaskRef[newcap];
-
- // copy in bottom half of new deq with refs from old deq
- for (int j = 0; j < oldcap; ++j) newdeq[j] = deq[b++ & (oldcap-1)];
-
- // fill top half of new deq with new refs
- for (int j = oldcap; j < newcap; ++j) newdeq[j] = new VolatileTaskRef();
-
- deq = newdeq;
- base = 0;
- top = newTop;
- }
- }
-
-
- /* ------------ Scheduling ------------------- */
-
-
- /**
- * Do all but the pop() part of yield or join, by
- * traversing all DEQs in our group looking for a task to
- * steal. If none, it checks the entry queue.
- * <p>
- * Since there are no good, portable alternatives,
- * we rely here on a mixture of Thread.yield and priorities
- * to reduce wasted spinning, even though these are
- * not well defined. We are hoping here that the JVM
- * does something sensible.
- * @param waitingFor if non-null, the current task being joined
- **/
-
- protected void scan(final FJTask waitingFor) {
-
- FJTask task = null;
-
- // to delay lowering priority until first failure to steal
- boolean lowered = false;
-
- /*
- Circularly traverse from a random start index.
-
- This differs slightly from cilk version that uses a random index
- for each attempted steal.
- Exhaustive scanning might impede analytic tractablity of
- the scheduling policy, but makes it much easier to deal with
- startup and shutdown.
- */
-
- FJTaskRunner[] ts = group.getArray();
- int idx = victimRNG.nextInt(ts.length);
-
- for (int i = 0; i < ts.length; ++i) {
-
- FJTaskRunner t = ts[idx];
- if (++idx >= ts.length) idx = 0; // circularly traverse
-
- if (t != null && t != this) {
-
- if (waitingFor != null && waitingFor.isDone()) {
- break;
- }
- else {
- if (COLLECT_STATS) ++scans;
- task = t.take();
- if (task != null) {
- if (COLLECT_STATS) ++steals;
- break;
- }
- else if (isInterrupted()) {
- break;
- }
- else if (!lowered) { // if this is first fail, lower priority
- lowered = true;
- setPriority(scanPriority);
- }
- else { // otherwise we are at low priority; just yield
- yield();
- }
- }
- }
-
- }
-
- if (task == null) {
- if (COLLECT_STATS) ++scans;
- task = group.pollEntryQueue();
- if (COLLECT_STATS) if (task != null) ++steals;
- }
-
- if (lowered) setPriority(runPriority);
-
- if (task != null && !task.isDone()) {
- if (COLLECT_STATS) ++runs;
- task.run();
- task.setDone();
- }
-
- }
-
- /**
- * Same as scan, but called when current thread is idling.
- * It repeatedly scans other threads for tasks,
- * sleeping while none are available.
- * <p>
- * This differs from scan mainly in that
- * since there is no reason to return to recheck any
- * condition, we iterate until a task is found, backing
- * off via sleeps if necessary.
- **/
-
- protected void scanWhileIdling() {
- FJTask task = null;
-
- boolean lowered = false;
- long iters = 0;
-
- FJTaskRunner[] ts = group.getArray();
- int idx = victimRNG.nextInt(ts.length);
-
- do {
- for (int i = 0; i < ts.length; ++i) {
-
- FJTaskRunner t = ts[idx];
- if (++idx >= ts.length) idx = 0; // circularly traverse
-
- if (t != null && t != this) {
- if (COLLECT_STATS) ++scans;
-
- task = t.take();
- if (task != null) {
- if (COLLECT_STATS) ++steals;
- if (lowered) setPriority(runPriority);
- group.setActive(this);
- break;
- }
- }
- }
-
- if (task == null) {
- if (isInterrupted())
- return;
-
- if (COLLECT_STATS) ++scans;
- task = group.pollEntryQueue();
-
- if (task != null) {
- if (COLLECT_STATS) ++steals;
- if (lowered) setPriority(runPriority);
- group.setActive(this);
- }
- else {
- ++iters;
- // Check here for yield vs sleep to avoid entering group synch lock
- if (iters >= /*group.SCANS_PER_SLEEP*/ 15) {
- group.checkActive(this, iters);
- if (isInterrupted())
- return;
- }
- else if (!lowered) {
- lowered = true;
- setPriority(scanPriority);
- }
- else {
- yield();
- }
- }
- }
- } while (task == null);
-
-
- if (!task.isDone()) {
- if (COLLECT_STATS) ++runs;
- task.run();
- task.setDone();
- }
-
- }
-
- /* ------------ composite operations ------------------- */
-
-
- /**
- * Main runloop
- **/
-
- public void run() {
- try{
- while (!interrupted()) {
- FJTask task = pop();
- if (task != null) {
- if (!task.isDone()) {
- // inline FJTask.invoke
- if (COLLECT_STATS) ++runs;
- task.run();
- task.setDone();
- }
- }
- else
- scanWhileIdling();
- }
- // check for suspending
- if (suspending) {
- synchronized(this) {
- // move all local tasks to group-wide entry queue
- for (int i = 0; i < deq.length; ++i) {
- synchronized(group) {
- try {
- FJTask task = (FJTask)deq[i].take();
- if (task != null)
- group.getEntryQueue().put(task);
- } catch (InterruptedException ie) {
- System.err.println("Suspend: when transferring task to entryQueue: "+ie);
- }
- }
- }
- }
- }
- }
- finally {
- group.setInactive(this);
- }
- }
-
- /**
- * Execute a task in this thread. Generally called when current task
- * cannot otherwise continue.
- **/
-
-
- protected final void taskYield() {
- FJTask task = pop();
- if (task != null) {
- if (!task.isDone()) {
- if (COLLECT_STATS) ++runs;
- task.run();
- task.setDone();
- }
- }
- else
- scan(null);
- }
-
-
- /**
- * Process tasks until w is done.
- * Equivalent to <code>while(!w.isDone()) taskYield(); </code>
- **/
-
- protected final void taskJoin(final FJTask w) {
-
- while (!w.isDone()) {
-
- FJTask task = pop();
- if (task != null) {
- if (!task.isDone()) {
- if (COLLECT_STATS) ++runs;
- task.run();
- task.setDone();
- if (task == w) return; // fast exit if we just ran w
- }
- }
- else
- scan(w);
- }
- }
-
- /**
- * A specialized expansion of
- * <code> w.fork(); invoke(v); w.join(); </code>
- **/
-
-
- protected final void coInvoke(final FJTask w, final FJTask v) {
-
- // inline push
-
- int t = top;
- if (t < (base & (deq.length-1)) + deq.length) {
-
- deq[t & (deq.length-1)].put(w);
- top = t + 1;
-
- // inline invoke
-
- if (!v.isDone()) {
- if (COLLECT_STATS) ++runs;
- v.run();
- v.setDone();
- }
-
- // inline taskJoin
-
- while (!w.isDone()) {
- FJTask task = pop();
- if (task != null) {
- if (!task.isDone()) {
- if (COLLECT_STATS) ++runs;
- task.run();
- task.setDone();
- if (task == w) return; // fast exit if we just ran w
- }
- }
- else
- scan(w);
- }
- }
-
- else // handle non-inlinable cases
- slowCoInvoke(w, v);
- }
-
-
- /**
- * Backup to handle noninlinable cases of coInvoke
- **/
-
- protected void slowCoInvoke(final FJTask w, final FJTask v) {
- push(w); // let push deal with overflow
- FJTask.invoke(v);
- taskJoin(w);
- }
-
-
- /**
- * Array-based version of coInvoke
- **/
-
- protected final void coInvoke(FJTask[] tasks) {
- int nforks = tasks.length - 1;
-
- // inline bulk push of all but one task
-
- int t = top;
-
- if (nforks >= 0 && t + nforks < (base & (deq.length-1)) + deq.length) {
- for (int i = 0; i < nforks; ++i) {
- deq[t++ & (deq.length-1)].put(tasks[i]);
- top = t;
- }
-
- // inline invoke of one task
- FJTask v = tasks[nforks];
- if (!v.isDone()) {
- if (COLLECT_STATS) ++runs;
- v.run();
- v.setDone();
- }
-
- // inline taskJoins
-
- for (int i = 0; i < nforks; ++i) {
- FJTask w = tasks[i];
- while (!w.isDone()) {
-
- FJTask task = pop();
- if (task != null) {
- if (!task.isDone()) {
- if (COLLECT_STATS) ++runs;
- task.run();
- task.setDone();
- }
- }
- else
- scan(w);
- }
- }
- }
-
- else // handle non-inlinable cases
- slowCoInvoke(tasks);
- }
-
- /**
- * Backup to handle atypical or noninlinable cases of coInvoke
- **/
-
- protected void slowCoInvoke(FJTask[] tasks) {
- for (int i = 0; i < tasks.length; ++i) push(tasks[i]);
- for (int i = 0; i < tasks.length; ++i) taskJoin(tasks[i]);
- }
-
-}
-
diff --git a/src/actors/scala/actors/FJTaskRunnerGroup.java b/src/actors/scala/actors/FJTaskRunnerGroup.java
deleted file mode 100644
index d1f864c54a..0000000000
--- a/src/actors/scala/actors/FJTaskRunnerGroup.java
+++ /dev/null
@@ -1,714 +0,0 @@
-/*
- File: FJTaskRunnerGroup.java
-
- Originally written by Doug Lea and released into the public domain.
- This may be used for any purposes whatsoever without acknowledgment.
- Thanks for the assistance and support of Sun Microsystems Labs,
- and everyone contributing, testing, and using this code.
-
- History:
- Date Who What
- 7Jan1999 dl First public release
- 12Jan1999 dl made getActiveCount public; misc minor cleanup.
- 14Jan1999 dl Added executeTask
- 20Jan1999 dl Allow use of priorities; reformat stats
- 6Feb1999 dl Lazy thread starts
- 27Apr1999 dl Renamed
-*/
-
-package scala.actors;
-
-/**
- * A stripped down analog of a ThreadGroup used for
- * establishing and managing FJTaskRunner threads.
- * ThreadRunnerGroups serve as the control boundary separating
- * the general world of normal threads from the specialized world
- * of FJTasks.
- * <p>
- * By intent, this class does not subclass java.lang.ThreadGroup, and
- * does not support most methods found in ThreadGroups, since they
- * would make no sense for FJTaskRunner threads. In fact, the class
- * does not deal with ThreadGroups at all. If you want to restrict
- * a FJTaskRunnerGroup to a particular ThreadGroup, you can create
- * it from within that ThreadGroup.
- * <p>
- * The main contextual parameter for a FJTaskRunnerGroup is
- * the group size, established in the constructor.
- * Groups must be of a fixed size.
- * There is no way to dynamically increase or decrease the number
- * of threads in an existing group.
- * <p>
- * In general, the group size should be equal to the number
- * of CPUs on the system. (Unfortunately, there is no portable
- * means of automatically detecting the number of CPUs on a JVM, so there is
- * no good way to automate defaults.) In principle, when
- * FJTasks are used for computation-intensive tasks, having only
- * as many threads as CPUs should minimize bookkeeping overhead
- * and contention, and so maximize throughput. However, because
- * FJTaskRunners lie atop Java threads, and in turn operating system
- * thread support and scheduling policies,
- * it is very possible that using more threads
- * than CPUs will improve overall throughput even though it adds
- * to overhead. This will always be so if FJTasks are I/O bound.
- * So it may pay to experiment a bit when tuning on particular platforms.
- * You can also use <code>setRunPriorities</code> to either
- * increase or decrease the priorities of active threads, which
- * may interact with group size choice.
- * <p>
- * In any case, overestimating group sizes never
- * seriously degrades performance (at least within reasonable bounds).
- * You can also use a value
- * less than the number of CPUs in order to reserve processing
- * for unrelated threads.
- * <p>
- * There are two general styles for using a FJTaskRunnerGroup.
- * You can create one group per entire program execution, for example
- * as a static singleton, and use it for all parallel tasks:
- * <pre>
- * class Tasks {
- * static FJTaskRunnerGroup group;
- * public void initialize(int groupsize) {
- * group = new FJTaskRunnerGroup(groupSize);
- * }
- * // ...
- * }
- * </pre>
- * Alternatively, you can make new groups on the fly and use them only for
- * particular task sets. This is more flexible,,
- * and leads to more controllable and deterministic execution patterns,
- * but it encounters greater overhead on startup. Also, to reclaim
- * system resources, you should
- * call <code>FJTaskRunnerGroup.interruptAll</code> when you are done
- * using one-shot groups. Otherwise, because FJTaskRunners set
- * <code>Thread.isDaemon</code>
- * status, they will not normally be reclaimed until program termination.
- * <p>
- * The main supported methods are <code>execute</code>,
- * which starts a task processed by FJTaskRunner threads,
- * and <code>invoke</code>, which starts one and waits for completion.
- * For example, you might extend the above <code>FJTasks</code>
- * class to support a task-based computation, say, the
- * <code>Fib</code> class from the <code>FJTask</code> documentation:
- * <pre>
- * class Tasks { // continued
- * // ...
- * static int fib(int n) {
- * try {
- * Fib f = new Fib(n);
- * group.invoke(f);
- * return f.getAnswer();
- * }
- * catch (InterruptedException ex) {
- * throw new Error("Interrupted during computation");
- * }
- * }
- * }
- * </pre>
- * <p>
- * Method <code>stats()</code> can be used to monitor performance.
- * Both FJTaskRunnerGroup and FJTaskRunner may be compiled with
- * the compile-time constant COLLECT_STATS set to false. In this
- * case, various simple counts reported in stats() are not collected.
- * On platforms tested,
- * this leads to such a tiny performance improvement that there is
- * very little motivation to bother.
- *
- * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
- * <p>
- * @see FJTask
- * @see FJTaskRunner
- **/
-
-public class FJTaskRunnerGroup implements IFJTaskRunnerGroup {
-
- /** The threads in this group **/
- /*protected*/ /*final*/ FJTaskRunner[] threads;
-
- /** Group-wide queue for tasks entered via execute() **/
- /*protected*/ final LinkedQueue entryQueue = new LinkedQueue();
-
- public LinkedQueue getEntryQueue() {
- return entryQueue;
- }
-
- /** Number of threads that are not waiting for work **/
- protected int activeCount = 0;
-
- /** Number of threads that have been started. Used to avoid
- unecessary contention during startup of task sets.
- **/
- protected int nstarted = 0;
-
- /**
- * Compile-time constant. If true, various counts of
- * runs, waits, etc., are maintained. These are NOT
- * updated with synchronization, so statistics reports
- * might not be accurate.
- **/
-
- static final boolean COLLECT_STATS = true;
- // static final boolean COLLECT_STATS = false;
-
- // for stats
-
- /** The time at which this ThreadRunnerGroup was constructed **/
- long initTime = 0;
-
- /** Total number of executes or invokes **/
- int entries = 0;
-
- static final int DEFAULT_SCAN_PRIORITY = Thread.MIN_PRIORITY+1;
-
- /* -------- Suspending -------- */
-
- LinkedQueue snapshot() throws InterruptedException {
- synchronized (this) {
- for (int i = 0; i < threads.length; ++i) {
- FJTaskRunner t = threads[i];
- // set flag in all task runners to suspend
- t.setSuspending(true);
- // interrupt all task runners
- // assume: current thread not in threads (scheduler)
- t.interrupt();
- }
- }
-
- // wait until all of them have terminated
- for (int i = 0; i < threads.length; ++i) {
- Thread t = threads[i];
- t.join();
- };
-
- return entryQueue;
- }
-
- /**
- * Create a FJTaskRunnerGroup with the indicated number
- * of FJTaskRunner threads. Normally, the best size to use is
- * the number of CPUs on the system.
- * <p>
- * The threads in a FJTaskRunnerGroup are created with their
- * isDaemon status set, so do not normally need to be
- * shut down manually upon program termination.
- **/
-
- public FJTaskRunnerGroup(int groupSize) {
- //System.out.println("Creating FJTaskRunnerGroup of size "+groupSize);
- threads = new FJTaskRunner[groupSize];
- initializeThreads();
- initTime = System.currentTimeMillis();
- }
-
-
- public boolean existsTask() {
- FJTask task = null;
- /*
- Circularly traverse from a random start index.
-
- This differs slightly from cilk version that uses a random index
- for each attempted steal.
- Exhaustive scanning might impede analytic tractablity of
- the scheduling policy, but makes it much easier to deal with
- startup and shutdown.
- */
-
- FJTaskRunner[] ts = threads;
- int idx = /*victimRNG.nextInt(ts.length)*/ 0;
-
- for (int i = 0; i < ts.length; ++i) {
- FJTaskRunner t = ts[idx];
- if (++idx >= ts.length) idx = 0; // circularly traverse
-
- if (t != null) {
- task = t.take();
- if (task != null) {
- break;
- }
- }
- }
-
- if (task == null) {
- task = pollEntryQueue();
- }
-
- if (task != null && !task.isDone()) {
- boolean quit = false;
- while (!quit) {
- quit = true;
- try {
- // put task back into entry queue and return true
- entryQueue.put(task);
- } catch (InterruptedException ie) {
- quit = false;
- }
- }
- return true;
- }
- else return false;
- }
-
- public boolean checkPoolSize() {
- // if there is a task in the entryQueue or any of the
- // worker queues, increase thread pool size by one
- if (!entryQueue.isEmpty() ||
- existsTask()) {
- int newsize = threads.length + 1;
- FJTaskRunner[] newar = new FJTaskRunner[newsize];
- System.arraycopy(threads, 0, newar, 0, newsize-1);
- synchronized(this) {
- threads = newar;
- FJTaskRunner t = new FJTaskRunner(this);
- threads[newsize-1] = t;
- setActive(t);
- }
- return true;
- }
- else return false;
- }
-
-
- /**
- * Arrange for execution of the given task
- * by placing it in a work queue. If the argument
- * is not of type FJTask, it is embedded in a FJTask via
- * <code>FJTask.Wrap</code>.
- * @exception InterruptedException if current Thread is
- * currently interrupted
- **/
-
- public void execute(Runnable r) throws InterruptedException {
- if (r instanceof FJTask) {
- entryQueue.put((FJTask)r);
- }
- else {
- entryQueue.put(new FJTask.Wrap(r));
- }
- signalNewTask();
- }
-
-
- /**
- * Specialized form of execute called only from within FJTasks
- **/
- public void executeTask(FJTask t) {
- try {
- entryQueue.put(t);
- signalNewTask();
- }
- catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- }
- }
-
-
- /**
- * Start a task and wait it out. Returns when the task completes.
- * @exception InterruptedException if current Thread is
- * interrupted before completion of the task.
- **/
-
- public void invoke(Runnable r) throws InterruptedException {
- InvokableFJTask w = new InvokableFJTask(r);
- entryQueue.put(w);
- signalNewTask();
- w.awaitTermination();
- }
-
-
- /**
- * Try to shut down all FJTaskRunner threads in this group
- * by interrupting them all. This method is designed
- * to be used during cleanup when it is somehow known
- * that all threads are idle.
- * FJTaskRunners only
- * check for interruption when they are not otherwise
- * processing a task (and its generated subtasks,
- * if any), so if any threads are active, shutdown may
- * take a while, and may lead to unpredictable
- * task processing.
- **/
-
- public void interruptAll() {
- // paranoically interrupt current thread last if in group.
- Thread current = Thread.currentThread();
- boolean stopCurrent = false;
-
- for (int i = 0; i < threads.length; ++i) {
- Thread t = threads[i];
- if (t == current)
- stopCurrent = true;
- else
- t.interrupt();
- }
- if (stopCurrent)
- current.interrupt();
- }
-
-
- /**
- * Set the priority to use while a FJTaskRunner is
- * polling for new tasks to perform. Default
- * is currently Thread.MIN_PRIORITY+1. The value
- * set may not go into effect immediately, but
- * will be used at least the next time a thread scans for work.
- **/
- public synchronized void setScanPriorities(int pri) {
- for (int i = 0; i < threads.length; ++i) {
- FJTaskRunner t = threads[i];
- t.setScanPriority(pri);
- if (!t.active) t.setPriority(pri);
- }
- }
-
-
- /**
- * Set the priority to use while a FJTaskRunner is
- * actively running tasks. Default
- * is the priority that was in effect by the thread that
- * constructed this FJTaskRunnerGroup. Setting this value
- * while threads are running may momentarily result in
- * them running at this priority even when idly waiting for work.
- **/
- public synchronized void setRunPriorities(int pri) {
- for (int i = 0; i < threads.length; ++i) {
- FJTaskRunner t = threads[i];
- t.setRunPriority(pri);
- if (t.active) t.setPriority(pri);
- }
- }
-
-
-
- /** Return the number of FJTaskRunner threads in this group **/
-
- public int size() { return threads.length; }
-
-
- /**
- * Return the number of threads that are not idly waiting for work.
- * Beware that even active threads might not be doing any useful
- * work, but just spinning waiting for other dependent tasks.
- * Also, since this is just a snapshot value, some tasks
- * may be in the process of becoming idle.
- **/
- public synchronized int getActiveCount() { return activeCount; }
-
- /**
- * Prints various snapshot statistics to System.out.
- * <ul>
- * <li> For each FJTaskRunner thread (labeled as T<em>n</em>, for
- * <em>n</em> from zero to group size - 1):
- * <ul>
- * <li> A star "*" is printed if the thread is currently active;
- * that is, not sleeping while waiting for work. Because
- * threads gradually enter sleep modes, an active thread
- * may in fact be about to sleep (or wake up).
- * <li> <em>Q Cap</em> The current capacity of its task queue.
- * <li> <em>Run</em> The total number of tasks that have been run.
- * <li> <em>New</em> The number of these tasks that were
- * taken from either the entry queue or from other
- * thread queues; that is, the number of tasks run
- * that were <em>not</em> forked by the thread itself.
- * <li> <em>Scan</em> The number of times other task
- * queues or the entry queue were polled for tasks.
- * </ul>
- * <li> <em>Execute</em> The total number of tasks entered
- * (but not necessarily yet run) via execute or invoke.
- * <li> <em>Time</em> Time in seconds since construction of this
- * FJTaskRunnerGroup.
- * <li> <em>Rate</em> The total number of tasks processed
- * per second across all threads. This
- * may be useful as a simple throughput indicator
- * if all processed tasks take approximately the
- * same time to run.
- * </ul>
- * <p>
- * Cautions: Some statistics are updated and gathered
- * without synchronization,
- * so may not be accurate. However, reported counts may be considered
- * as lower bounds of actual values.
- * Some values may be zero if classes are compiled
- * with COLLECT_STATS set to false. (FJTaskRunner and FJTaskRunnerGroup
- * classes can be independently compiled with different values of
- * COLLECT_STATS.) Also, the counts are maintained as ints so could
- * overflow in exceptionally long-lived applications.
- * <p>
- * These statistics can be useful when tuning algorithms or diagnosing
- * problems. For example:
- * <ul>
- * <li> High numbers of scans may mean that there is insufficient
- * parallelism to keep threads busy. However, high scan rates
- * are expected if the number
- * of Executes is also high or there is a lot of global
- * synchronization in the application, and the system is not otherwise
- * busy. Threads may scan
- * for work hundreds of times upon startup, shutdown, and
- * global synch points of task sets.
- * <li> Large imbalances in tasks run across different threads might
- * just reflect contention with unrelated threads on a system
- * (possibly including JVM threads such as GC), but may also
- * indicate some systematic bias in how you generate tasks.
- * <li> Large task queue capacities may mean that too many tasks are being
- * generated before they can be run.
- * Capacities are reported rather than current numbers of tasks
- * in queues because they are better indicators of the existence
- * of these kinds of possibly-transient problems.
- * Queue capacities are
- * resized on demand from their initial value of 4096 elements,
- * which is much more than sufficient for the kinds of
- * applications that this framework is intended to best support.
- * </ul>
- **/
-
- public void stats() {
- long time = System.currentTimeMillis() - initTime;
- double secs = ((double)time) / 1000.0;
- long totalRuns = 0;
- long totalScans = 0;
- long totalSteals = 0;
-
- System.out.print("Thread" +
- "\tQ Cap" +
- "\tScans" +
- "\tNew" +
- "\tRuns" +
- "\n");
-
- for (int i = 0; i < threads.length; ++i) {
- FJTaskRunner t = threads[i];
- int truns = t.runs;
- totalRuns += truns;
-
- int tscans = t.scans;
- totalScans += tscans;
-
- int tsteals = t.steals;
- totalSteals += tsteals;
-
- String star = (getActive(t))? "*" : " ";
-
-
- System.out.print("T" + i + star +
- "\t" + t.deqSize() +
- "\t" + tscans +
- "\t" + tsteals +
- "\t" + truns +
- "\n");
- }
-
- System.out.print("Total" +
- "\t " +
- "\t" + totalScans +
- "\t" + totalSteals +
- "\t" + totalRuns +
- "\n");
-
- System.out.print("Execute: " + entries);
-
- System.out.print("\tTime: " + secs);
-
- long rps = 0;
- if (secs != 0) rps = Math.round((double)(totalRuns) / secs);
-
- System.out.println("\tRate: " + rps);
- }
-
-
- /* ------------ Methods called only by FJTaskRunners ------------- */
-
-
- /**
- * Return the array of threads in this group.
- * Called only by FJTaskRunner.scan().
- **/
-
- public FJTaskRunner[] getArray() { return threads; }
-
-
- /**
- * Return a task from entry queue, or null if empty.
- * Called only by FJTaskRunner.scan().
- **/
-
- public FJTask pollEntryQueue() {
- try {
- FJTask t = (FJTask)(entryQueue.poll(0));
- return t;
- }
- catch(InterruptedException ex) { // ignore interrupts
- Thread.currentThread().interrupt();
- return null;
- }
- }
-
-
- /**
- * Return active status of t.
- * Per-thread active status can only be accessed and
- * modified via synchronized method here in the group class.
- **/
-
- protected synchronized boolean getActive(FJTaskRunner t) {
- return t.active;
- }
-
-
- /**
- * Set active status of thread t to true, and notify others
- * that might be waiting for work.
- **/
-
- public synchronized void setActive(FJTaskRunner t) {
- if (!t.active) {
- t.active = true;
- ++activeCount;
- if (nstarted < threads.length)
- threads[nstarted++].start();
- else
- notifyAll();
- }
- }
-
- /**
- * Set active status of thread t to false.
- **/
-
- public synchronized void setInactive(FJTaskRunner t) {
- if (t.active) {
- t.active = false;
- --activeCount;
- }
- }
-
- /**
- * The number of times to scan other threads for tasks
- * before transitioning to a mode where scans are
- * interleaved with sleeps (actually timed waits).
- * Upon transition, sleeps are for duration of
- * scans / SCANS_PER_SLEEP milliseconds.
- * <p>
- * This is not treated as a user-tunable parameter because
- * good values do not appear to vary much across JVMs or
- * applications. Its main role is to help avoid some
- * useless spinning and contention during task startup.
- **/
- static final long SCANS_PER_SLEEP = 15;
-
- /**
- * The maximum time (in msecs) to sleep when a thread is idle,
- * yet others are not, so may eventually generate work that
- * the current thread can steal. This value reflects the maximum time
- * that a thread may sleep when it possibly should not, because there
- * are other active threads that might generate work. In practice,
- * designs in which some threads become stalled because others
- * are running yet not generating tasks are not likely to work
- * well in this framework anyway, so the exact value does not matter
- * too much. However, keeping it in the sub-second range does
- * help smooth out startup and shutdown effects.
- **/
-
- static final long MAX_SLEEP_TIME = 100;
-
- /**
- * Set active status of thread t to false, and
- * then wait until: (a) there is a task in the entry
- * queue, or (b) other threads are active, or (c) the current
- * thread is interrupted. Upon return, it
- * is not certain that there will be work available.
- * The thread must itself check.
- * <p>
- * The main underlying reason
- * for these mechanics is that threads do not
- * signal each other when they add elements to their queues.
- * (This would add to task overhead, reduce locality.
- * and increase contention.)
- * So we must rely on a tamed form of polling. However, tasks
- * inserted into the entry queue do result in signals, so
- * tasks can wait on these if all of them are otherwise idle.
- **/
-
- public synchronized void checkActive(FJTaskRunner t, long scans) {
-
- setInactive(t);
-
- try {
- // if nothing available, do a hard wait
- if (activeCount == 0 && entryQueue.peek() == null) {
- wait();
- }
- else {
- // If there is possibly some work,
- // sleep for a while before rechecking
-
- long msecs = scans / SCANS_PER_SLEEP;
- if (msecs > MAX_SLEEP_TIME) msecs = MAX_SLEEP_TIME;
- int nsecs = (msecs == 0) ? 1 : 0; // forces shortest possible sleep
- wait(msecs, nsecs);
- }
- }
- catch (InterruptedException ex) {
- notify(); // avoid lost notifies on interrupts
- Thread.currentThread().interrupt();
- }
- }
-
- /* ------------ Utility methods ------------- */
-
- /**
- * Start or wake up any threads waiting for work
- **/
-
- protected synchronized void signalNewTask() {
- if (COLLECT_STATS) ++entries;
- if (nstarted < threads.length)
- threads[nstarted++].start();
- else
- notify();
- }
-
- /**
- * Create all FJTaskRunner threads in this group.
- **/
-
- protected void initializeThreads() {
- for (int i = 0; i < threads.length; ++i) threads[i] = new FJTaskRunner(this);
- }
-
-
-
-
- /**
- * Wrap wait/notify mechanics around a task so that
- * invoke() can wait it out
- **/
- protected static final class InvokableFJTask extends FJTask {
- protected final Runnable wrapped;
- protected boolean terminated = false;
-
- protected InvokableFJTask(Runnable r) { wrapped = r; }
-
- public void run() {
- try {
- if (wrapped instanceof FJTask)
- FJTask.invoke((FJTask)(wrapped));
- else
- wrapped.run();
- }
- finally {
- setTerminated();
- }
- }
-
- protected synchronized void setTerminated() {
- terminated = true;
- notifyAll();
- }
-
- protected synchronized void awaitTermination() throws InterruptedException {
- while (!terminated) wait();
- }
- }
-
-
-}
-
diff --git a/src/actors/scala/actors/FJTaskScheduler2.scala b/src/actors/scala/actors/FJTaskScheduler2.scala
deleted file mode 100644
index cf26b33916..0000000000
--- a/src/actors/scala/actors/FJTaskScheduler2.scala
+++ /dev/null
@@ -1,133 +0,0 @@
-/* __ *\
-** ________ ___ / / ___ Scala API **
-** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL **
-** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
-** /____/\___/_/ |_/____/_/ | | **
-** |/ **
-\* */
-
-// $Id$
-
-package scala.actors
-
-import compat.Platform
-
-import java.lang.{Runnable, Thread, InterruptedException, System, Runtime}
-import java.lang.Thread.State
-
-import scala.collection.Set
-import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, HashSet}
-
-import scheduler.{ThreadPoolConfig, QuitException}
-
-/**
- * FJTaskScheduler2
- *
- * @version 0.9.18
- * @author Philipp Haller
- */
-class FJTaskScheduler2(val initCoreSize: Int, val maxSize: Int, daemon: Boolean) extends Thread with IScheduler with ActorGC {
- setDaemon(daemon)
-
- /** Default constructor creates a non-daemon thread. */
- def this() =
- this(ThreadPoolConfig.corePoolSize, ThreadPoolConfig.maxPoolSize, false)
-
- def this(daemon: Boolean) =
- this(ThreadPoolConfig.corePoolSize, ThreadPoolConfig.maxPoolSize, daemon)
-
- var printStats = false
-
- private var coreSize = initCoreSize
-
- private val executor =
- new FJTaskRunnerGroup(coreSize)
-
- private var terminating = false
- private var suspending = false
-
- private var submittedTasks = 0
-
- private val CHECK_FREQ = 100
-
- private def allWorkersBlocked: Boolean =
- executor.threads.forall(t => {
- val s = t.getState()
- s == State.BLOCKED || s == State.WAITING || s == State.TIMED_WAITING
- })
-
- override def run() {
- try {
- while (!terminating) {
- this.synchronized {
- try {
- wait(CHECK_FREQ)
- } catch {
- case _: InterruptedException =>
- if (terminating) throw new QuitException
- }
-
- if (!suspending) {
-
- gc()
-
- // check if we need more threads
- if (coreSize < maxSize
- && allWorkersBlocked
- && executor.checkPoolSize()) {
- //Debug.info(this+": increasing thread pool size")
- coreSize += 1
- }
- else {
- if (allTerminated) {
- // if all worker threads idle terminate
- if (executor.getActiveCount() == 0) {
- Debug.info(this+": initiating shutdown...")
-
- // Note that we don't have to shutdown
- // the FJTaskRunnerGroup since there is
- // no separate thread associated with it,
- // and FJTaskRunner threads have daemon status.
- throw new QuitException
- }
- }
- }
- }
- } // sync
-
- } // while (!terminating)
- } catch {
- case _: QuitException =>
- // allow thread to exit
- if (printStats) executor.stats()
- }
- }
-
- /**
- * @param task the task to be executed
- */
- def execute(task: Runnable): Unit =
- executor execute task
-
- def execute(fun: => Unit): Unit =
- executor.execute(new Runnable {
- def run() { fun }
- })
-
- /** Shuts down all idle worker threads.
- */
- def shutdown(): Unit = synchronized {
- terminating = true
- }
-
- def snapshot(): LinkedQueue = {
- suspending = true
- executor.snapshot()
- }
-
- def isActive = !terminating && !suspending
-
- def managedBlock(blocker: scala.concurrent.ManagedBlocker) {
- blocker.block()
- }
-}
diff --git a/src/actors/scala/actors/IFJTaskRunnerGroup.java b/src/actors/scala/actors/IFJTaskRunnerGroup.java
deleted file mode 100644
index dbbb325710..0000000000
--- a/src/actors/scala/actors/IFJTaskRunnerGroup.java
+++ /dev/null
@@ -1,18 +0,0 @@
-
-package scala.actors;
-
-/**
- * IFJTaskRunnerGroup
- *
- * @version 0.9.8
- * @author Philipp Haller
- */
-interface IFJTaskRunnerGroup {
- public void executeTask(FJTask t);
- public FJTaskRunner[] getArray();
- public FJTask pollEntryQueue();
- public void setActive(FJTaskRunner t);
- public void checkActive(FJTaskRunner t, long scans);
- public void setInactive(FJTaskRunner t);
- public LinkedQueue getEntryQueue();
-}