summaryrefslogtreecommitdiff
path: root/src/actors
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2007-02-19 10:45:23 +0000
committerPhilipp Haller <hallerp@gmail.com>2007-02-19 10:45:23 +0000
commitd710f4e615de1b2875c72ba7d4a56c9a893b2a16 (patch)
tree684fc4cc0c62ea47bc11b24f496ffb7f09cc6340 /src/actors
parent2a27ffb80e27685e36c170b096d71efec0c45357 (diff)
downloadscala-d710f4e615de1b2875c72ba7d4a56c9a893b2a16.tar.gz
scala-d710f4e615de1b2875c72ba7d4a56c9a893b2a16.tar.bz2
scala-d710f4e615de1b2875c72ba7d4a56c9a893b2a16.zip
scala.actors: integrated FJ. added futures.
Diffstat (limited to 'src/actors')
-rw-r--r--src/actors/scala/actors/Actor.scala84
-rw-r--r--src/actors/scala/actors/Channel.scala14
-rw-r--r--src/actors/scala/actors/FJTask.java531
-rw-r--r--src/actors/scala/actors/FJTaskRunner.java974
-rw-r--r--src/actors/scala/actors/FJTaskRunnerGroup.java685
-rw-r--r--src/actors/scala/actors/FJTaskScheduler2.scala144
-rw-r--r--src/actors/scala/actors/Future.scala105
-rw-r--r--src/actors/scala/actors/IFJTaskRunnerGroup.java12
-rw-r--r--src/actors/scala/actors/InputChannel.scala8
-rw-r--r--src/actors/scala/actors/LinkedNode.java25
-rw-r--r--src/actors/scala/actors/LinkedQueue.java185
-rw-r--r--src/actors/scala/actors/Scheduler.scala27
12 files changed, 2758 insertions, 36 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala
index 42b870a383..701bfa6172 100644
--- a/src/actors/scala/actors/Actor.scala
+++ b/src/actors/scala/actors/Actor.scala
@@ -19,7 +19,7 @@ import compat.Platform
* <code>receive</code>, <code>react</code>, <code>reply</code>,
* etc.
*
- * @version 0.9.2
+ * @version 0.9.4
* @author Philipp Haller
*/
object Actor {
@@ -257,12 +257,15 @@ object Actor {
* implementation of event-based actors.
* </p>
* <p>
- * The main ideas of our approach are explained in the paper<br>
+ * The main ideas of our approach are explained in the papers<br>
* <b>Event-Based Programming without Inversion of Control</b>,
- * Philipp Haller, Martin Odersky <i>Proc. JMLC 2006</i>
+ * Philipp Haller and Martin Odersky, <i>Proc. JMLC 2006</i>
+ * <br><br>
+ * <b>Actors that Unify Threads and Events</b>,
+ * Philipp Haller and Martin Odersky, <i>LAMP-REPORT-2007-001, EPFL</i>
* </p>
*
- * @version 0.9.2
+ * @version 0.9.4
* @author Philipp Haller
*/
trait Actor extends OutputChannel[Any] {
@@ -328,20 +331,28 @@ trait Actor extends OutputChannel[Any] {
tick()
val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m))
if (null eq qel) {
- waitingFor = f.isDefinedAt
- isSuspended = true
- received = None
- suspendActorFor(msec)
- if (received.isEmpty) {
- if (f.isDefinedAt(TIMEOUT)) {
- waitingFor = waitingForNone
- isSuspended = false
- val result = f(TIMEOUT)
- return result
- }
+ if (msec == 0) {
+ if (f.isDefinedAt(TIMEOUT))
+ return f(TIMEOUT)
else
error("unhandled timeout")
}
+ else {
+ waitingFor = f.isDefinedAt
+ isSuspended = true
+ received = None
+ suspendActorFor(msec)
+ if (received.isEmpty) {
+ if (f.isDefinedAt(TIMEOUT)) {
+ waitingFor = waitingForNone
+ isSuspended = false
+ val result = f(TIMEOUT)
+ return result
+ }
+ else
+ error("unhandled timeout")
+ }
+ }
} else {
received = Some(qel.msg)
sessions = qel.session :: sessions
@@ -432,6 +443,44 @@ trait Actor extends OutputChannel[Any] {
}
}
+ def !!(msg: Any): Future[Any] = {
+ val ftch = new Channel[Any](Actor.self)
+ send(msg, ftch)
+ new Future[Any](ftch) {
+ def apply() =
+ if (isSet) value.get
+ else ch.receive {
+ case any => value = Some(any); any
+ }
+ def isSet = value match {
+ case None => ch.receiveWithin(0) {
+ case TIMEOUT => false
+ case any => value = Some(any); true
+ }
+ case Some(_) => true
+ }
+ }
+ }
+
+ def !![a](msg: Any, f: PartialFunction[Any, a]): Future[a] = {
+ val ftch = new Channel[Any](Actor.self)
+ send(msg, ftch)
+ new Future[a](ftch) {
+ def apply() =
+ if (isSet) value.get
+ else ch.receive {
+ case any => value = Some(f(any)); value.get
+ }
+ def isSet = value match {
+ case None => ch.receiveWithin(0) {
+ case TIMEOUT => false
+ case any => value = Some(f(any)); true
+ }
+ case Some(_) => true
+ }
+ }
+ }
+
def reply(msg: Any): Unit = session ! msg
private var rc = new Channel[Any](this)
@@ -450,7 +499,6 @@ trait Actor extends OutputChannel[Any] {
if (sessions.isEmpty) null
else sessions.head.asInstanceOf[Channel[Any]]
-
private[actors] var continuation: PartialFunction[Any, Unit] = null
private[actors] var timeoutPending = false
private[actors] var isDetached = false
@@ -654,7 +702,7 @@ trait Actor extends OutputChannel[Any] {
* }
* </pre>
*
- * @version 0.9.2
+ * @version 0.9.4
* @author Philipp Haller
*/
case object TIMEOUT
@@ -664,7 +712,7 @@ case object TIMEOUT
* <p>This class is used to manage control flow of actor
* executions.</p>
*
- * @version 0.9.2
+ * @version 0.9.4
* @author Philipp Haller
*/
private[actors] class SuspendActorException extends Throwable {
diff --git a/src/actors/scala/actors/Channel.scala b/src/actors/scala/actors/Channel.scala
index fc473d76cc..8031ea297e 100644
--- a/src/actors/scala/actors/Channel.scala
+++ b/src/actors/scala/actors/Channel.scala
@@ -25,17 +25,17 @@ package scala.actors
* }
* </pre>
*
- * @version 0.9.2
+ * @version 0.9.4
* @author Philipp Haller
*/
-case class ! [a](ch: Channel[a], msg: a)
+case class ! [a](ch: InputChannel[a], msg: a)
/**
* This class provides a means for typed communication among
* actors. Only the actor creating an instance of a
* <code>Channel</code> may receive from it.
*
- * @version 0.9.2
+ * @version 0.9.4
* @author Philipp Haller
*/
class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] {
@@ -74,10 +74,10 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] {
* @param f a partial function with message patterns and actions
* @return result of processing the received value
*/
- def receive[R](f: PartialFunction[Any, R]): R = {
+ def receive[R](f: PartialFunction[Msg, R]): R = {
val C = this.asInstanceOf[Channel[Any]]
receiver.receive {
- case C ! msg if (f.isDefinedAt(msg)) => f(msg)
+ case C ! msg if (f.isDefinedAt(msg.asInstanceOf[Msg])) => f(msg.asInstanceOf[Msg])
}
}
@@ -105,10 +105,10 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] {
*
* @param f a partial function with message patterns and actions
*/
- def react(f: PartialFunction[Any, Unit]): Nothing = {
+ def react(f: PartialFunction[Msg, Unit]): Nothing = {
val C = this.asInstanceOf[Channel[Any]]
receiver.react {
- case C ! msg if (f.isDefinedAt(msg)) => f(msg)
+ case C ! msg if (f.isDefinedAt(msg.asInstanceOf[Msg])) => f(msg.asInstanceOf[Msg])
}
}
diff --git a/src/actors/scala/actors/FJTask.java b/src/actors/scala/actors/FJTask.java
new file mode 100644
index 0000000000..6398f1400e
--- /dev/null
+++ b/src/actors/scala/actors/FJTask.java
@@ -0,0 +1,531 @@
+/*
+ File: Task.java
+
+ Originally written by Doug Lea and released into the public domain.
+ This may be used for any purposes whatsoever without acknowledgment.
+ Thanks for the assistance and support of Sun Microsystems Labs,
+ and everyone contributing, testing, and using this code.
+
+ History:
+ Date Who What
+ 7Jan1999 dl first release
+ 14jan1999 dl simplify start() semantics;
+ improve documentation
+ 18Jan1999 dl Eliminate useless time-based waits.
+ 7Mar1999 dl Add reset method,
+ add array-based composite operations
+ 27Apr1999 dl Rename
+*/
+
+package scala.actors;
+
+
+/**
+ * Abstract base class for Fork/Join Tasks.
+ *
+ * <p>
+ * FJTasks are lightweight, stripped-down analogs of Threads.
+ * Many FJTasks share the same pool of Java threads. This is
+ * supported by the FJTaskRunnerGroup and FJTaskRunner classes, that
+ * mainly contain
+ * methods called only internally by FJTasks.
+ * FJTasks support versions of the most common methods found in class Thread,
+ * including start(), yield() and join(). However, they
+ * don't support priorities, ThreadGroups or other bookkeeping
+ * or control methods of class Thread.
+ * <p>
+ * FJTasks should normally be defined by subclassing and adding a run() method.
+ * Alternatively, static inner class <code>Wrap(Runnable r)</code>
+ * can be used to
+ * wrap an existing Runnable object in a FJTask.
+ * <p>
+ * <code>FJTaskRunnerGroup.execute(FJTask)</code> can be used to
+ * initiate a FJTask from a non-FJTask thread.
+ * And <code>FJTaskRunnerGroup.invoke(FJTask)</code> can be used to initiate
+ * a FJTask and then wait for it to complete before returning.
+ * These are the only entry-points from normal threads to FJTasks.
+ * Most FJTask methods themselves may only be called from within running FJTasks.
+ * They throw ClassCastExceptions if they are not,
+ * reflecting the fact that these methods
+ * can only be executed using FJTaskRunner threads, not generic
+ * java.lang.Threads.
+ * <p>
+ * There are three different ways to run a FJTask,
+ * with different scheduling semantics:
+ * <ul>
+ * <li> FJTask.start() (as well as FJTaskRunnerGroup.execute(FJTask))
+ * behaves pretty much like Thread.start(). It enqueues a task to be
+ * run the next time any FJTaskRunner thread is otherwise idle.
+ * It maintains standard FIFO ordering with respect to
+ * the group of worker threads.
+ * <li> FJTask.fork() (as well as the two-task spawning method,
+ * coInvoke(task1, task2), and the array version
+ * coInvoke(FJTask[] tasks)) starts a task
+ * that will be executed in
+ * procedure-call-like LIFO order if executed by the
+ * same worker thread as the one that created it, but is FIFO
+ * with respect to other tasks if it is run by
+ * other worker threads. That is, earlier-forked
+ * tasks are preferred to later-forked tasks by other idle workers.
+ * Fork() is noticeably faster than start(), but can only be
+ * used when these scheduling semantics are acceptable.
+ * <li> FJTask.invoke(FJTask) just executes the run method
+ * of one task from within another. It is the analog of a
+ * direct call.
+ * </ul>
+ * <p>
+ * The main economies of FJTasks stem from the fact that
+ * FJTasks do not support blocking operations of any kind.
+ * FJTasks should just run to completion without
+ * issuing waits or performing blocking IO.
+ * There are several styles for creating the run methods that
+ * execute as tasks, including
+ * event-style methods, and pure computational methods.
+ * Generally, the best kinds of FJTasks are those that in turn
+ * generate other FJTasks.
+ * <p>
+ * There is nothing actually
+ * preventing you from blocking within a FJTask, and very short waits/blocks are
+ * completely well behaved. But FJTasks are not designed
+ * to support arbitrary synchronization
+ * since there is no way to suspend and resume individual tasks
+ * once they have begun executing. FJTasks should also be finite
+ * in duration -- they should not contain infinite loops.
+ * FJTasks that might need to perform a blocking
+ * action, or hold locks for extended periods, or
+ * loop forever can instead create normal
+ * java Thread objects that will do so. FJTasks are just not
+ * designed to support these things.
+ * FJTasks may however yield() control to allow their FJTaskRunner threads
+ * to run other tasks,
+ * and may wait for other dependent tasks via join(). These
+ * are the only coordination mechanisms supported by FJTasks.
+ * <p>
+ * FJTasks, and the FJTaskRunners that execute them are not
+ * intrinsically robust with respect to exceptions.
+ * A FJTask that aborts via an exception does not automatically
+ * have its completion flag (isDone) set.
+ * As with ordinary Threads, an uncaught exception will normally cause
+ * its FJTaskRunner thread to die, which in turn may sometimes
+ * cause other computations being performed to hang or abort.
+ * You can of course
+ * do better by trapping exceptions inside the run methods of FJTasks.
+ * <p>
+ * The overhead differences between FJTasks and Threads are substantial,
+ * especially when using fork() or coInvoke().
+ * FJTasks can be two or three orders of magnitude faster than Threads,
+ * at least when run on JVMs with high-performance garbage collection
+ * (every FJTask quickly becomes garbage) and good native thread support.
+ * <p>
+ * Given these overhead savings, you might be tempted to use FJTasks for
+ * everything you would use a normal Thread to do. Don't. Java Threads
+ * remain better for general purpose thread-based programming. Remember
+ * that FJTasks cannot be used for designs involving arbitrary blocking
+ * synchronization or I/O. Extending FJTasks to support such capabilities
+ * would amount to re-inventing the Thread class, and would make them
+ * less optimal in the contexts that they were designed for.
+ * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
+ * <p>
+ * @see FJTaskRunner
+ * @see FJTaskRunnerGroup
+ **/
+
+public abstract class FJTask implements Runnable {
+
+ /**
+ * The only status information associated with FJTasks is whether
+ * the they are considered to have completed.
+ * It is set true automatically within
+ * FJTaskRunner methods upon completion
+ * of the run method, or manually via cancel.
+ **/
+
+ private volatile boolean done; // = false;
+
+ /**
+ * Return the FJTaskRunner thread running the current FJTask.
+ * Most FJTask methods are just relays to their current
+ * FJTaskRunners, that perform the indicated actions.
+ * @exception ClassCastException if caller thread is not a
+ * running FJTask.
+ **/
+
+ public static FJTaskRunner getFJTaskRunner() {
+ return (FJTaskRunner)(Thread.currentThread());
+ }
+
+ /**
+ * Return the FJTaskRunnerGroup of the thread running the current FJTask.
+ * @exception ClassCastException if caller thread is not a
+ * running FJTask.
+ **/
+ public static IFJTaskRunnerGroup getFJTaskRunnerGroup() {
+ return getFJTaskRunner().getGroup();
+ }
+
+
+ /**
+ * Return true if current task has terminated or been cancelled.
+ * The method is a simple analog of the Thread.isAlive()
+ * method. However, it reports true only when the task has terminated
+ * or has been cancelled. It does not distinguish these two cases.
+ * And there is no way to determine whether a FJTask has been started
+ * or is currently executing.
+ **/
+
+ public final boolean isDone() { return done; }
+
+ /**
+ * Indicate termination. Intended only to be called by FJTaskRunner.
+ * FJTasks themselves should use (non-final) method
+ * cancel() to suppress execution.
+ **/
+
+ protected final void setDone() { done = true; }
+
+ /**
+ * Set the termination status of this task. This simple-minded
+ * analog of Thread.interrupt
+ * causes the task not to execute if it has not already been started.
+ * Cancelling a running FJTask
+ * has no effect unless the run method itself uses isDone()
+ * to probe cancellation and take appropriate action.
+ * Individual run() methods may sense status and
+ * act accordingly, normally by returning early.
+ **/
+
+ public void cancel() { setDone(); }
+
+
+ /**
+ * Clear the termination status of this task.
+ * This method is intended to be used
+ * only as a means to allow task objects to be recycled. It should
+ * be called only when you are sure that the previous
+ * execution of this task has terminated and, if applicable, has
+ * been joined by all other waiting tasks. Usage in any other
+ * context is a very bad idea.
+ **/
+
+ public void reset() { done = false; }
+
+
+ /**
+ * Execute this task. This method merely places the task in a
+ * group-wide scheduling queue.
+ * It will be run
+ * the next time any TaskRunner thread is otherwise idle.
+ * This scheduling maintains FIFO ordering of started tasks
+ * with respect to
+ * the group of worker threads.
+ * @exception ClassCastException if caller thread is not
+ * running in a FJTaskRunner thread.
+ **/
+
+ public void start() { getFJTaskRunnerGroup().executeTask(this); }
+
+
+ /**
+ * Arrange for execution of a strictly dependent task.
+ * The task that will be executed in
+ * procedure-call-like LIFO order if executed by the
+ * same worker thread, but is FIFO with respect to other tasks
+ * forked by this thread when taken by other worker threads.
+ * That is, earlier-forked
+ * tasks are preferred to later-forked tasks by other idle workers.
+ * <p>
+ * Fork() is noticeably
+ * faster than start(). However, it may only
+ * be used for strictly dependent tasks -- generally, those that
+ * could logically be issued as straight method calls without
+ * changing the logic of the program.
+ * The method is optimized for use in parallel fork/join designs
+ * in which the thread that issues one or more forks
+ * cannot continue until at least some of the forked
+ * threads terminate and are joined.
+ * @exception ClassCastException if caller thread is not
+ * running in a FJTaskRunner thread.
+ **/
+
+ public void fork() { getFJTaskRunner().push(this); }
+
+ /**
+ * Allow the current underlying FJTaskRunner thread to process other tasks.
+ * <p>
+ * Spinloops based on yield() are well behaved so long
+ * as the event or condition being waited for is produced via another
+ * FJTask. Additionally, you must never hold a lock
+ * while performing a yield or join. (This is because
+ * multiple FJTasks can be run by the same Thread during
+ * a yield. Since java locks are held per-thread, the lock would not
+ * maintain the conceptual exclusion you have in mind.)
+ * <p>
+ * Otherwise, spinloops using
+ * yield are the main construction of choice when a task must wait
+ * for a condition that it is sure will eventually occur because it
+ * is being produced by some other FJTask. The most common
+ * such condition is built-in: join() repeatedly yields until a task
+ * has terminated after producing some needed results. You can also
+ * use yield to wait for callbacks from other FJTasks, to wait for
+ * status flags to be set, and so on. However, in all these cases,
+ * you should be confident that the condition being waited for will
+ * occur, essentially always because it is produced by
+ * a FJTask generated by the current task, or one of its subtasks.
+ *
+ * @exception ClassCastException if caller thread is not
+ * running in a FJTaskRunner thread.
+ **/
+
+ public static void yield() { getFJTaskRunner().taskYield(); }
+
+ /**
+ * Yield until this task isDone.
+ * Equivalent to <code>while(!isDone()) yield(); </code>
+ * @exception ClassCastException if caller thread is not
+ * running in a FJTaskRunner thread.
+ **/
+
+ public void join() { getFJTaskRunner().taskJoin(this); }
+
+ /**
+ * Immediately execute task t by calling its run method. Has no
+ * effect if t has already been run or has been cancelled.
+ * It is equivalent to calling t.run except that it
+ * deals with completion status, so should always be used
+ * instead of directly calling run.
+ * The method can be useful
+ * when a computation has been packaged as a FJTask, but you just need to
+ * directly execute its body from within some other task.
+ **/
+
+ public static void invoke(FJTask t) {
+ if (!t.isDone()) {
+ t.run();
+ t.setDone();
+ }
+ }
+
+ /**
+ * Fork both tasks and then wait for their completion. It behaves as:
+ * <pre>
+ * task1.fork(); task2.fork(); task2.join(); task1.join();
+ * </pre>
+ * As a simple classic example, here is
+ * a class that computes the Fibonacci function:
+ * <pre>
+ * public class Fib extends FJTask {
+ *
+ * // Computes fibonacci(n) = fibonacci(n-1) + fibonacci(n-2); for n> 1
+ * // fibonacci(0) = 0;
+ * // fibonacci(1) = 1.
+ *
+ * // Value to compute fibonacci function for.
+ * // It is replaced with the answer when computed.
+ * private volatile int number;
+ *
+ * public Fib(int n) { number = n; }
+ *
+ * public int getAnswer() {
+ * if (!isDone()) throw new Error("Not yet computed");
+ * return number;
+ * }
+ *
+ * public void run() {
+ * int n = number;
+ * if (n > 1) {
+ * Fib f1 = new Fib(n - 1);
+ * Fib f2 = new Fib(n - 2);
+ *
+ * coInvoke(f1, f2); // run these in parallel
+ *
+ * // we know f1 and f2 are computed, so just directly access numbers
+ * number = f1.number + f2.number;
+ * }
+ * }
+ *
+ * public static void main(String[] args) { // sample driver
+ * try {
+ * int groupSize = 2; // 2 worker threads
+ * int num = 35; // compute fib(35)
+ * FJTaskRunnerGroup group = new FJTaskRunnerGroup(groupSize);
+ * Fib f = new Fib(num);
+ * group.invoke(f);
+ * int result = f.getAnswer();
+ * System.out.println(" Answer: " + result);
+ * }
+ * catch (InterruptedException ex) {
+ * System.out.println("Interrupted");
+ * }
+ * }
+ * }
+ * </pre>
+ *
+ * @exception ClassCastException if caller thread is not
+ * running in a FJTaskRunner thread.
+ **/
+
+ public static void coInvoke(FJTask task1, FJTask task2) {
+ getFJTaskRunner().coInvoke(task1, task2);
+ }
+
+
+ /**
+ * Fork all tasks in array, and await their completion.
+ * Behaviorally equivalent to:
+ * <pre>
+ * for (int i = 0; i &lt; tasks.length; ++i) tasks[i].fork();
+ * for (int i = 0; i &lt; tasks.length; ++i) tasks[i].join();
+ * </pre>
+ **/
+
+ public static void coInvoke(FJTask[] tasks) {
+ getFJTaskRunner().coInvoke(tasks);
+ }
+
+ /**
+ * A FJTask that holds a Runnable r, and calls r.run when executed.
+ * The class is a simple utilty to allow arbitrary Runnables
+ * to be used as FJTasks.
+ **/
+
+ public static class Wrap extends FJTask {
+ protected final Runnable runnable;
+ public Wrap(Runnable r) { runnable = r; }
+ public void run() { runnable.run(); }
+ }
+
+
+ /**
+ * A <code>new Seq</code>, when executed,
+ * invokes each task provided in the constructor, in order.
+ * The class is a simple utility
+ * that makes it easier to create composite FJTasks.
+ **/
+ public static class Seq extends FJTask {
+ protected final FJTask[] tasks;
+
+ /**
+ * Construct a Seq that, when executed, will process each of the
+ * tasks in the tasks array in order
+ **/
+ public Seq(FJTask[] tasks) {
+ this.tasks = tasks;
+ }
+
+ /**
+ * Two-task constructor, for compatibility with previous release.
+ **/
+ public Seq(FJTask task1, FJTask task2) {
+ this.tasks = new FJTask[] { task1, task2 };
+ }
+
+ public void run() {
+ for (int i = 0; i < tasks.length; ++i) FJTask.invoke(tasks[i]);
+ }
+ }
+
+ /**
+ * Construct and return a FJTask object that, when executed, will
+ * invoke the tasks in the tasks array in array order
+ **/
+
+ public static FJTask seq(FJTask[] tasks) {
+ return new Seq(tasks);
+ }
+
+ /**
+ * A <code>new Par</code>, when executed,
+ * runs the tasks provided in the constructor in parallel using
+ * coInvoke(tasks).
+ * The class is a simple utility
+ * that makes it easier to create composite FJTasks.
+ **/
+ public static class Par extends FJTask {
+ protected final FJTask[] tasks;
+
+ /**
+ * Construct a Seq that, when executed, will process each of the
+ * tasks in the tasks array in parallel
+ **/
+ public Par(FJTask[] tasks) {
+ this.tasks = tasks;
+ }
+
+ /**
+ * Two-task constructor, for compatibility with previous release.
+ **/
+ public Par(FJTask task1, FJTask task2) {
+ this.tasks = new FJTask[] { task1, task2 };
+ }
+
+
+ public void run() {
+ FJTask.coInvoke(tasks);
+ }
+ }
+
+
+ /**
+ * Construct and return a FJTask object that, when executed, will
+ * invoke the tasks in the tasks array in parallel using coInvoke
+ **/
+ public static FJTask par(FJTask[] tasks) {
+ return new Par(tasks);
+ }
+
+ /**
+ * A <code>new Seq2(task1, task2)</code>, when executed,
+ * invokes task1 and then task2, in order.
+ * The class is a simple utility
+ * that makes it easier to create composite Tasks.
+ **/
+ public static class Seq2 extends FJTask {
+ protected final FJTask fst;
+ protected final FJTask snd;
+ public Seq2(FJTask task1, FJTask task2) {
+ fst = task1;
+ snd = task2;
+ }
+ public void run() {
+ FJTask.invoke(fst);
+ FJTask.invoke(snd);
+ }
+ }
+
+ /**
+ * Construct and return a FJTask object that, when executed, will
+ * invoke task1 and task2, in order
+ **/
+
+ public static FJTask seq(FJTask task1, FJTask task2) {
+ return new Seq2(task1, task2);
+ }
+
+ /**
+ * A <code>new Par(task1, task2)</code>, when executed,
+ * runs task1 and task2 in parallel using coInvoke(task1, task2).
+ * The class is a simple utility
+ * that makes it easier to create composite Tasks.
+ **/
+ public static class Par2 extends FJTask {
+ protected final FJTask fst;
+ protected final FJTask snd;
+ public Par2(FJTask task1, FJTask task2) {
+ fst = task1;
+ snd = task2;
+ }
+ public void run() {
+ FJTask.coInvoke(fst, snd);
+ }
+ }
+
+
+ /**
+ * Construct and return a FJTask object that, when executed, will
+ * invoke task1 and task2, in parallel
+ **/
+ public static FJTask par(FJTask task1, FJTask task2) {
+ return new Par2(task1, task2);
+ }
+
+}
diff --git a/src/actors/scala/actors/FJTaskRunner.java b/src/actors/scala/actors/FJTaskRunner.java
new file mode 100644
index 0000000000..991ff97083
--- /dev/null
+++ b/src/actors/scala/actors/FJTaskRunner.java
@@ -0,0 +1,974 @@
+/*
+ File: FJTaskRunner.java
+
+ Originally written by Doug Lea and released into the public domain.
+ This may be used for any purposes whatsoever without acknowledgment.
+ Thanks for the assistance and support of Sun Microsystems Labs,
+ and everyone contributing, testing, and using this code.
+
+ History:
+ Date Who What
+ 7Jan1999 dl First public release
+ 13Jan1999 dl correct a stat counter update;
+ ensure inactive status on run termination;
+ misc minor cleaup
+ 14Jan1999 dl Use random starting point in scan;
+ variable renamings.
+ 18Jan1999 dl Runloop allowed to die on task exception;
+ remove useless timed join
+ 22Jan1999 dl Rework scan to allow use of priorities.
+ 6Feb1999 dl Documentation updates.
+ 7Mar1999 dl Add array-based coInvoke
+ 31Mar1999 dl Revise scan to remove need for NullTasks
+ 27Apr1999 dl Renamed
+ 23oct1999 dl Earlier detect of interrupt in scanWhileIdling
+ 24nov1999 dl Now works on JVMs that do not properly
+ implement read-after-write of 2 volatiles.
+*/
+
+package scala.actors;
+
+import java.util.Random;
+
+/**
+ * Specialized Thread subclass for running FJTasks.
+ * <p>
+ * Each FJTaskRunner keeps FJTasks in a double-ended queue (DEQ).
+ * Double-ended queues support stack-based operations
+ * push and pop, as well as queue-based operations put and take.
+ * Normally, threads run their own tasks. But they
+ * may also steal tasks from each others DEQs.
+ * <p>
+ * The algorithms are minor variants of those used
+ * in <A href="http://supertech.lcs.mit.edu/cilk/"> Cilk</A> and
+ * <A href="http://www.cs.utexas.edu/users/hood/"> Hood</A>, and
+ * to a lesser extent
+ * <A href="http://www.cs.uga.edu/~dkl/filaments/dist.html"> Filaments</A>,
+ * but are adapted to work in Java.
+ * <p>
+ * The two most important capabilities are:
+ * <ul>
+ * <li> Fork a FJTask:
+ * <pre>
+ * Push task onto DEQ
+ * </pre>
+ * <li> Get a task to run (for example within taskYield)
+ * <pre>
+ * If DEQ is not empty,
+ * Pop a task and run it.
+ * Else if any other DEQ is not empty,
+ * Take ("steal") a task from it and run it.
+ * Else if the entry queue for our group is not empty,
+ * Take a task from it and run it.
+ * Else if current thread is otherwise idling
+ * If all threads are idling
+ * Wait for a task to be put on group entry queue
+ * Else
+ * Yield or Sleep for a while, and then retry
+ * </pre>
+ * </ul>
+ * The push, pop, and put are designed to only ever called by the
+ * current thread, and take (steal) is only ever called by
+ * other threads.
+ * All other operations are composites and variants of these,
+ * plus a few miscellaneous bookkeeping methods.
+ * <p>
+ * Implementations of the underlying representations and operations
+ * are geared for use on JVMs operating on multiple CPUs (although
+ * they should of course work fine on single CPUs as well).
+ * <p>
+ * A possible snapshot of a FJTaskRunner's DEQ is:
+ * <pre>
+ * 0 1 2 3 4 5 6 ...
+ * +-----+-----+-----+-----+-----+-----+-----+--
+ * | | t | t | t | t | | | ... deq array
+ * +-----+-----+-----+-----+-----+-----+-----+--
+ * ^ ^
+ * base top
+ * (incremented (incremented
+ * on take, on push
+ * decremented decremented
+ * on put) on pop)
+ * </pre>
+ * <p>
+ * FJTasks are held in elements of the DEQ.
+ * They are maintained in a bounded array that
+ * works similarly to a circular bounded buffer. To ensure
+ * visibility of stolen FJTasks across threads, the array elements
+ * must be <code>volatile</code>.
+ * Using volatile rather than synchronizing suffices here since
+ * each task accessed by a thread is either one that it
+ * created or one that has never seen before. Thus we cannot
+ * encounter any staleness problems executing run methods,
+ * although FJTask programmers must be still sure to either synch or use
+ * volatile for shared data within their run methods.
+ * <p>
+ * However, since there is no way
+ * to declare an array of volatiles in Java, the DEQ elements actually
+ * hold VolatileTaskRef objects, each of which in turn holds a
+ * volatile reference to a FJTask.
+ * Even with the double-indirection overhead of
+ * volatile refs, using an array for the DEQ works out
+ * better than linking them since fewer shared
+ * memory locations need to be
+ * touched or modified by the threads while using the DEQ.
+ * Further, the double indirection may alleviate cache-line
+ * sharing effects (which cannot otherwise be directly dealt with in Java).
+ * <p>
+ * The indices for the <code>base</code> and <code>top</code> of the DEQ
+ * are declared as volatile. The main contention point with
+ * multiple FJTaskRunner threads occurs when one thread is trying
+ * to pop its own stack while another is trying to steal from it.
+ * This is handled via a specialization of Dekker's algorithm,
+ * in which the popping thread pre-decrements <code>top</code>,
+ * and then checks it against <code>base</code>.
+ * To be conservative in the face of JVMs that only partially
+ * honor the specification for volatile, the pop proceeds
+ * without synchronization only if there are apparently enough
+ * items for both a simultaneous pop and take to succeed.
+ * It otherwise enters a
+ * synchronized lock to check if the DEQ is actually empty,
+ * if so failing. The stealing thread
+ * does almost the opposite, but is set up to be less likely
+ * to win in cases of contention: Steals always run under synchronized
+ * locks in order to avoid conflicts with other ongoing steals.
+ * They pre-increment <code>base</code>, and then check against
+ * <code>top</code>. They back out (resetting the base index
+ * and failing to steal) if the
+ * DEQ is empty or is about to become empty by an ongoing pop.
+ * <p>
+ * A push operation can normally run concurrently with a steal.
+ * A push enters a synch lock only if the DEQ appears full so must
+ * either be resized or have indices adjusted due to wrap-around
+ * of the bounded DEQ. The put operation always requires synchronization.
+ * <p>
+ * When a FJTaskRunner thread has no tasks of its own to run,
+ * it tries to be a good citizen.
+ * Threads run at lower priority while scanning for work.
+ * <p>
+ * If the task is currently waiting
+ * via yield, the thread alternates scans (starting at a randomly
+ * chosen victim) with Thread.yields. This is
+ * well-behaved so long as the JVM handles Thread.yield in a
+ * sensible fashion. (It need not. Thread.yield is so underspecified
+ * that it is legal for a JVM to treat it as a no-op.) This also
+ * keeps things well-behaved even if we are running on a uniprocessor
+ * JVM using a simple cooperative threading model.
+ * <p>
+ * If a thread needing work is
+ * is otherwise idle (which occurs only in the main runloop), and
+ * there are no available tasks to steal or poll, it
+ * instead enters into a sleep-based (actually timed wait(msec))
+ * phase in which it progressively sleeps for longer durations
+ * (up to a maximum of FJTaskRunnerGroup.MAX_SLEEP_TIME,
+ * currently 100ms) between scans.
+ * If all threads in the group
+ * are idling, they further progress to a hard wait phase, suspending
+ * until a new task is entered into the FJTaskRunnerGroup entry queue.
+ * A sleeping FJTaskRunner thread may be awakened by a new
+ * task being put into the group entry queue or by another FJTaskRunner
+ * becoming active, but not merely by some DEQ becoming non-empty.
+ * Thus the MAX_SLEEP_TIME provides a bound for sleep durations
+ * in cases where all but one worker thread start sleeping
+ * even though there will eventually be work produced
+ * by a thread that is taking a long time to place tasks in DEQ.
+ * These sleep mechanics are handled in the FJTaskRunnerGroup class.
+ * <p>
+ * Composite operations such as taskJoin include heavy
+ * manual inlining of the most time-critical operations
+ * (mainly FJTask.invoke).
+ * This opens up a few opportunities for further hand-optimizations.
+ * Until Java compilers get a lot smarter, these tweaks
+ * improve performance significantly enough for task-intensive
+ * programs to be worth the poorer maintainability and code duplication.
+ * <p>
+ * Because they are so fragile and performance-sensitive, nearly
+ * all methods are declared as final. However, nearly all fields
+ * and methods are also declared as protected, so it is possible,
+ * with much care, to extend functionality in subclasses. (Normally
+ * you would also need to subclass FJTaskRunnerGroup.)
+ * <p>
+ * None of the normal java.lang.Thread class methods should ever be called
+ * on FJTaskRunners. For this reason, it might have been nicer to
+ * declare FJTaskRunner as a Runnable to run within a Thread. However,
+ * this would have complicated many minor logistics. And since
+ * no FJTaskRunner methods should normally be called from outside the
+ * FJTask and FJTaskRunnerGroup classes either, this decision doesn't impact
+ * usage.
+ * <p>
+ * You might think that layering this kind of framework on top of
+ * Java threads, which are already several levels removed from raw CPU
+ * scheduling on most systems, would lead to very poor performance.
+ * But on the platforms
+ * tested, the performance is quite good.
+ * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
+ * @see FJTask
+ * @see FJTaskRunnerGroup
+ **/
+
+public class FJTaskRunner extends Thread {
+
+ /** The group of which this FJTaskRunner is a member **/
+ protected final IFJTaskRunnerGroup group;
+
+ /**
+ * Constructor called only during FJTaskRunnerGroup initialization
+ **/
+
+ /*protected*/ public FJTaskRunner(IFJTaskRunnerGroup g) {
+ group = g;
+ victimRNG = new Random(System.identityHashCode(this));
+ runPriority = getPriority();
+ setDaemon(true);
+ }
+
+ /**
+ * Return the FJTaskRunnerGroup of which this thread is a member
+ **/
+
+ protected final IFJTaskRunnerGroup getGroup() { return group; }
+
+
+ /* ------------ DEQ Representation ------------------- */
+
+
+ /**
+ * FJTasks are held in an array-based DEQ with INITIAL_CAPACITY
+ * elements. The DEQ is grown if necessary, but default value is
+ * normally much more than sufficient unless there are
+ * user programming errors or questionable operations generating
+ * large numbers of Tasks without running them.
+ * Capacities must be a power of two.
+ **/
+
+ protected static final int INITIAL_CAPACITY = 4096;
+
+ /**
+ * The maximum supported DEQ capacity.
+ * When exceeded, FJTaskRunner operations throw Errors
+ **/
+
+ protected static final int MAX_CAPACITY = 1 << 30;
+
+ /**
+ * An object holding a single volatile reference to a FJTask.
+ **/
+
+ protected final static class VolatileTaskRef {
+ /** The reference **/
+ protected volatile FJTask ref;
+
+ /** Set the reference **/
+ protected final void put(FJTask r) { ref = r; }
+ /** Return the reference **/
+ protected final FJTask get() { return ref; }
+ /** Return the reference and clear it **/
+ protected final FJTask take() { FJTask r = ref; ref = null; return r; }
+
+ /**
+ * Initialization utility for constructing arrays.
+ * Make an array of given capacity and fill it with
+ * VolatileTaskRefs.
+ **/
+ protected static VolatileTaskRef[] newArray(int cap) {
+ VolatileTaskRef[] a = new VolatileTaskRef[cap];
+ for (int k = 0; k < cap; k++) a[k] = new VolatileTaskRef();
+ return a;
+ }
+
+ }
+
+ /**
+ * The DEQ array.
+ **/
+
+ protected VolatileTaskRef[] deq = VolatileTaskRef.newArray(INITIAL_CAPACITY);
+
+ /** Current size of the task DEQ **/
+ protected int deqSize() { return deq.length; }
+
+ /**
+ * Current top of DEQ. Generally acts just like a stack pointer in an
+ * array-based stack, except that it circularly wraps around the
+ * array, as in an array-based queue. The value is NOT
+ * always kept within <code>0 ... deq.length</code> though.
+ * The current top element is always at <code>top & (deq.length-1)</code>.
+ * To avoid integer overflow, top is reset down
+ * within bounds whenever it is noticed to be out out bounds;
+ * at worst when it is at <code>2 * deq.length</code>.
+ **/
+ protected volatile int top = 0;
+
+
+ /**
+ * Current base of DEQ. Acts like a take-pointer in an
+ * array-based bounded queue. Same bounds and usage as top.
+ **/
+
+ protected volatile int base = 0;
+
+
+ /**
+ * An extra object to synchronize on in order to
+ * achieve a memory barrier.
+ **/
+
+ protected final Object barrier = new Object();
+
+ /* ------------ Other BookKeeping ------------------- */
+
+ /**
+ * Record whether current thread may be processing a task
+ * (i.e., has been started and is not in an idle wait).
+ * Accessed, under synch, ONLY by FJTaskRunnerGroup, but the field is
+ * stored here for simplicity.
+ **/
+
+ /*protected*/ public boolean active = false;
+
+ /** Random starting point generator for scan() **/
+ protected final Random victimRNG;
+
+
+ /** Priority to use while scanning for work **/
+ protected int scanPriority = Thread.MIN_PRIORITY + 1;
+
+ /** Priority to use while running tasks **/
+ protected int runPriority;
+
+ /**
+ * Set the priority to use while scanning.
+ * We do not bother synchronizing access, since
+ * by the time the value is needed, both this FJTaskRunner
+ * and its FJTaskRunnerGroup will
+ * necessarily have performed enough synchronization
+ * to avoid staleness problems of any consequence.
+ **/
+ protected void setScanPriority(int pri) { scanPriority = pri; }
+
+
+ /**
+ * Set the priority to use while running tasks.
+ * Same usage and rationale as setScanPriority.
+ **/
+ protected void setRunPriority(int pri) { runPriority = pri; }
+
+ /**
+ * Compile-time constant for statistics gathering.
+ * Even when set, reported values may not be accurate
+ * since all are read and written without synchronization.
+ **/
+
+
+
+ static final boolean COLLECT_STATS = true;
+ // static final boolean COLLECT_STATS = false;
+
+
+ // for stat collection
+
+ /** Total number of tasks run **/
+ protected int runs = 0;
+
+ /** Total number of queues scanned for work **/
+ protected int scans = 0;
+
+ /** Total number of tasks obtained via scan **/
+ protected int steals = 0;
+
+
+
+
+ /* ------------ DEQ operations ------------------- */
+
+
+ /**
+ * Push a task onto DEQ.
+ * Called ONLY by current thread.
+ **/
+
+ /*protected*/ public final void push(final FJTask r) {
+ int t = top;
+
+ /*
+ This test catches both overflows and index wraps. It doesn't
+ really matter if base value is in the midst of changing in take.
+ As long as deq length is < 2^30, we are guaranteed to catch wrap in
+ time since base can only be incremented at most length times
+ between pushes (or puts).
+ */
+
+ if (t < (base & (deq.length-1)) + deq.length) {
+
+ deq[t & (deq.length-1)].put(r);
+ top = t + 1;
+ }
+
+ else // isolate slow case to increase chances push is inlined
+ slowPush(r); // check overflow and retry
+ }
+
+
+ /**
+ * Handle slow case for push
+ **/
+
+ protected synchronized void slowPush(final FJTask r) {
+ checkOverflow();
+ push(r); // just recurse -- this one is sure to succeed.
+ }
+
+
+ /**
+ * Enqueue task at base of DEQ.
+ * Called ONLY by current thread.
+ * This method is currently not called from class FJTask. It could be used
+ * as a faster way to do FJTask.start, but most users would
+ * find the semantics too confusing and unpredictable.
+ **/
+
+ protected final synchronized void put(final FJTask r) {
+ for (;;) {
+ int b = base - 1;
+ if (top < b + deq.length) {
+
+ int newBase = b & (deq.length-1);
+ deq[newBase].put(r);
+ base = newBase;
+
+ if (b != newBase) { // Adjust for index underflow
+ int newTop = top & (deq.length-1);
+ if (newTop < newBase) newTop += deq.length;
+ top = newTop;
+ }
+ return;
+ }
+ else {
+ checkOverflow();
+ // ... and retry
+ }
+ }
+ }
+
+ /**
+ * Return a popped task, or null if DEQ is empty.
+ * Called ONLY by current thread.
+ * <p>
+ * This is not usually called directly but is
+ * instead inlined in callers. This version differs from the
+ * cilk algorithm in that pop does not fully back down and
+ * retry in the case of potential conflict with take. It simply
+ * rechecks under synch lock. This gives a preference
+ * for threads to run their own tasks, which seems to
+ * reduce flailing a bit when there are few tasks to run.
+ **/
+
+ protected final FJTask pop() {
+ /*
+ Decrement top, to force a contending take to back down.
+ */
+
+ int t = --top;
+
+ /*
+ To avoid problems with JVMs that do not properly implement
+ read-after-write of a pair of volatiles, we conservatively
+ grab without lock only if the DEQ appears to have at least two
+ elements, thus guaranteeing that both a pop and take will succeed,
+ even if the pre-increment in take is not seen by current thread.
+ Otherwise we recheck under synch.
+ */
+
+ if (base + 1 < t)
+ return deq[t & (deq.length-1)].take();
+ else
+ return confirmPop(t);
+
+ }
+
+
+ /**
+ * Check under synch lock if DEQ is really empty when doing pop.
+ * Return task if not empty, else null.
+ **/
+
+ protected final synchronized FJTask confirmPop(int provisionalTop) {
+ if (base <= provisionalTop)
+ return deq[provisionalTop & (deq.length-1)].take();
+ else { // was empty
+ /*
+ Reset DEQ indices to zero whenever it is empty.
+ This both avoids unnecessary calls to checkOverflow
+ in push, and helps keep the DEQ from accumulating garbage
+ */
+
+ top = base = 0;
+ return null;
+ }
+ }
+
+
+ /**
+ * Take a task from the base of the DEQ.
+ * Always called by other threads via scan()
+ **/
+
+
+ protected final synchronized FJTask take() {
+
+ /*
+ Increment base in order to suppress a contending pop
+ */
+
+ int b = base++;
+
+ if (b < top)
+ return confirmTake(b);
+ else {
+ // back out
+ base = b;
+ return null;
+ }
+ }
+
+
+ /**
+ * double-check a potential take
+ **/
+
+ protected FJTask confirmTake(int oldBase) {
+
+ /*
+ Use a second (guaranteed uncontended) synch
+ to serve as a barrier in case JVM does not
+ properly process read-after-write of 2 volatiles
+ */
+
+ synchronized(barrier) {
+ if (oldBase < top) {
+ /*
+ We cannot call deq[oldBase].take here because of possible races when
+ nulling out versus concurrent push operations. Resulting
+ accumulated garbage is swept out periodically in
+ checkOverflow, or more typically, just by keeping indices
+ zero-based when found to be empty in pop, which keeps active
+ region small and constantly overwritten.
+ */
+
+ return deq[oldBase & (deq.length-1)].get();
+ }
+ else {
+ base = oldBase;
+ return null;
+ }
+ }
+ }
+
+
+ /**
+ * Adjust top and base, and grow DEQ if necessary.
+ * Called only while DEQ synch lock being held.
+ * We don't expect this to be called very often. In most
+ * programs using FJTasks, it is never called.
+ **/
+
+ protected void checkOverflow() {
+ int t = top;
+ int b = base;
+
+ if (t - b < deq.length-1) { // check if just need an index reset
+
+ int newBase = b & (deq.length-1);
+ int newTop = top & (deq.length-1);
+ if (newTop < newBase) newTop += deq.length;
+ top = newTop;
+ base = newBase;
+
+ /*
+ Null out refs to stolen tasks.
+ This is the only time we can safely do it.
+ */
+
+ int i = newBase;
+ while (i != newTop && deq[i].ref != null) {
+ deq[i].ref = null;
+ i = (i - 1) & (deq.length-1);
+ }
+
+ }
+ else { // grow by doubling array
+
+ int newTop = t - b;
+ int oldcap = deq.length;
+ int newcap = oldcap * 2;
+
+ if (newcap >= MAX_CAPACITY)
+ throw new Error("FJTask queue maximum capacity exceeded");
+
+ VolatileTaskRef[] newdeq = new VolatileTaskRef[newcap];
+
+ // copy in bottom half of new deq with refs from old deq
+ for (int j = 0; j < oldcap; ++j) newdeq[j] = deq[b++ & (oldcap-1)];
+
+ // fill top half of new deq with new refs
+ for (int j = oldcap; j < newcap; ++j) newdeq[j] = new VolatileTaskRef();
+
+ deq = newdeq;
+ base = 0;
+ top = newTop;
+ }
+ }
+
+
+ /* ------------ Scheduling ------------------- */
+
+
+ /**
+ * Do all but the pop() part of yield or join, by
+ * traversing all DEQs in our group looking for a task to
+ * steal. If none, it checks the entry queue.
+ * <p>
+ * Since there are no good, portable alternatives,
+ * we rely here on a mixture of Thread.yield and priorities
+ * to reduce wasted spinning, even though these are
+ * not well defined. We are hoping here that the JVM
+ * does something sensible.
+ * @param waitingFor if non-null, the current task being joined
+ **/
+
+ protected void scan(final FJTask waitingFor) {
+
+ FJTask task = null;
+
+ // to delay lowering priority until first failure to steal
+ boolean lowered = false;
+
+ /*
+ Circularly traverse from a random start index.
+
+ This differs slightly from cilk version that uses a random index
+ for each attempted steal.
+ Exhaustive scanning might impede analytic tractablity of
+ the scheduling policy, but makes it much easier to deal with
+ startup and shutdown.
+ */
+
+ FJTaskRunner[] ts = group.getArray();
+ int idx = victimRNG.nextInt(ts.length);
+
+ for (int i = 0; i < ts.length; ++i) {
+
+ FJTaskRunner t = ts[idx];
+ if (++idx >= ts.length) idx = 0; // circularly traverse
+
+ if (t != null && t != this) {
+
+ if (waitingFor != null && waitingFor.isDone()) {
+ break;
+ }
+ else {
+ if (COLLECT_STATS) ++scans;
+ task = t.take();
+ if (task != null) {
+ if (COLLECT_STATS) ++steals;
+ break;
+ }
+ else if (isInterrupted()) {
+ break;
+ }
+ else if (!lowered) { // if this is first fail, lower priority
+ lowered = true;
+ setPriority(scanPriority);
+ }
+ else { // otherwise we are at low priority; just yield
+ yield();
+ }
+ }
+ }
+
+ }
+
+ if (task == null) {
+ if (COLLECT_STATS) ++scans;
+ task = group.pollEntryQueue();
+ if (COLLECT_STATS) if (task != null) ++steals;
+ }
+
+ if (lowered) setPriority(runPriority);
+
+ if (task != null && !task.isDone()) {
+ if (COLLECT_STATS) ++runs;
+ task.run();
+ task.setDone();
+ }
+
+ }
+
+ /**
+ * Same as scan, but called when current thread is idling.
+ * It repeatedly scans other threads for tasks,
+ * sleeping while none are available.
+ * <p>
+ * This differs from scan mainly in that
+ * since there is no reason to return to recheck any
+ * condition, we iterate until a task is found, backing
+ * off via sleeps if necessary.
+ **/
+
+ protected void scanWhileIdling() {
+ FJTask task = null;
+
+ boolean lowered = false;
+ long iters = 0;
+
+ FJTaskRunner[] ts = group.getArray();
+ int idx = victimRNG.nextInt(ts.length);
+
+ do {
+ for (int i = 0; i < ts.length; ++i) {
+
+ FJTaskRunner t = ts[idx];
+ if (++idx >= ts.length) idx = 0; // circularly traverse
+
+ if (t != null && t != this) {
+ if (COLLECT_STATS) ++scans;
+
+ task = t.take();
+ if (task != null) {
+ if (COLLECT_STATS) ++steals;
+ if (lowered) setPriority(runPriority);
+ group.setActive(this);
+ break;
+ }
+ }
+ }
+
+ if (task == null) {
+ if (isInterrupted())
+ return;
+
+ if (COLLECT_STATS) ++scans;
+ task = group.pollEntryQueue();
+
+ if (task != null) {
+ if (COLLECT_STATS) ++steals;
+ if (lowered) setPriority(runPriority);
+ group.setActive(this);
+ }
+ else {
+ ++iters;
+ // Check here for yield vs sleep to avoid entering group synch lock
+ if (iters >= /*group.SCANS_PER_SLEEP*/ 15) {
+ group.checkActive(this, iters);
+ if (isInterrupted())
+ return;
+ }
+ else if (!lowered) {
+ lowered = true;
+ setPriority(scanPriority);
+ }
+ else {
+ yield();
+ }
+ }
+ }
+ } while (task == null);
+
+
+ if (!task.isDone()) {
+ if (COLLECT_STATS) ++runs;
+ task.run();
+ task.setDone();
+ }
+
+ }
+
+ /* ------------ composite operations ------------------- */
+
+
+ /**
+ * Main runloop
+ **/
+
+ public void run() {
+ try{
+ while (!interrupted()) {
+
+ FJTask task = pop();
+ if (task != null) {
+ if (!task.isDone()) {
+ // inline FJTask.invoke
+ if (COLLECT_STATS) ++runs;
+ task.run();
+ task.setDone();
+ }
+ }
+ else
+ scanWhileIdling();
+ }
+ }
+ finally {
+ group.setInactive(this);
+ }
+ }
+
+ /**
+ * Execute a task in this thread. Generally called when current task
+ * cannot otherwise continue.
+ **/
+
+
+ protected final void taskYield() {
+ FJTask task = pop();
+ if (task != null) {
+ if (!task.isDone()) {
+ if (COLLECT_STATS) ++runs;
+ task.run();
+ task.setDone();
+ }
+ }
+ else
+ scan(null);
+ }
+
+
+ /**
+ * Process tasks until w is done.
+ * Equivalent to <code>while(!w.isDone()) taskYield(); </code>
+ **/
+
+ protected final void taskJoin(final FJTask w) {
+
+ while (!w.isDone()) {
+
+ FJTask task = pop();
+ if (task != null) {
+ if (!task.isDone()) {
+ if (COLLECT_STATS) ++runs;
+ task.run();
+ task.setDone();
+ if (task == w) return; // fast exit if we just ran w
+ }
+ }
+ else
+ scan(w);
+ }
+ }
+
+ /**
+ * A specialized expansion of
+ * <code> w.fork(); invoke(v); w.join(); </code>
+ **/
+
+
+ protected final void coInvoke(final FJTask w, final FJTask v) {
+
+ // inline push
+
+ int t = top;
+ if (t < (base & (deq.length-1)) + deq.length) {
+
+ deq[t & (deq.length-1)].put(w);
+ top = t + 1;
+
+ // inline invoke
+
+ if (!v.isDone()) {
+ if (COLLECT_STATS) ++runs;
+ v.run();
+ v.setDone();
+ }
+
+ // inline taskJoin
+
+ while (!w.isDone()) {
+ FJTask task = pop();
+ if (task != null) {
+ if (!task.isDone()) {
+ if (COLLECT_STATS) ++runs;
+ task.run();
+ task.setDone();
+ if (task == w) return; // fast exit if we just ran w
+ }
+ }
+ else
+ scan(w);
+ }
+ }
+
+ else // handle non-inlinable cases
+ slowCoInvoke(w, v);
+ }
+
+
+ /**
+ * Backup to handle noninlinable cases of coInvoke
+ **/
+
+ protected void slowCoInvoke(final FJTask w, final FJTask v) {
+ push(w); // let push deal with overflow
+ FJTask.invoke(v);
+ taskJoin(w);
+ }
+
+
+ /**
+ * Array-based version of coInvoke
+ **/
+
+ protected final void coInvoke(FJTask[] tasks) {
+ int nforks = tasks.length - 1;
+
+ // inline bulk push of all but one task
+
+ int t = top;
+
+ if (nforks >= 0 && t + nforks < (base & (deq.length-1)) + deq.length) {
+ for (int i = 0; i < nforks; ++i) {
+ deq[t++ & (deq.length-1)].put(tasks[i]);
+ top = t;
+ }
+
+ // inline invoke of one task
+ FJTask v = tasks[nforks];
+ if (!v.isDone()) {
+ if (COLLECT_STATS) ++runs;
+ v.run();
+ v.setDone();
+ }
+
+ // inline taskJoins
+
+ for (int i = 0; i < nforks; ++i) {
+ FJTask w = tasks[i];
+ while (!w.isDone()) {
+
+ FJTask task = pop();
+ if (task != null) {
+ if (!task.isDone()) {
+ if (COLLECT_STATS) ++runs;
+ task.run();
+ task.setDone();
+ }
+ }
+ else
+ scan(w);
+ }
+ }
+ }
+
+ else // handle non-inlinable cases
+ slowCoInvoke(tasks);
+ }
+
+ /**
+ * Backup to handle atypical or noninlinable cases of coInvoke
+ **/
+
+ protected void slowCoInvoke(FJTask[] tasks) {
+ for (int i = 0; i < tasks.length; ++i) push(tasks[i]);
+ for (int i = 0; i < tasks.length; ++i) taskJoin(tasks[i]);
+ }
+
+}
+
diff --git a/src/actors/scala/actors/FJTaskRunnerGroup.java b/src/actors/scala/actors/FJTaskRunnerGroup.java
new file mode 100644
index 0000000000..27c870453f
--- /dev/null
+++ b/src/actors/scala/actors/FJTaskRunnerGroup.java
@@ -0,0 +1,685 @@
+/*
+ File: FJTaskRunnerGroup.java
+
+ Originally written by Doug Lea and released into the public domain.
+ This may be used for any purposes whatsoever without acknowledgment.
+ Thanks for the assistance and support of Sun Microsystems Labs,
+ and everyone contributing, testing, and using this code.
+
+ History:
+ Date Who What
+ 7Jan1999 dl First public release
+ 12Jan1999 dl made getActiveCount public; misc minor cleanup.
+ 14Jan1999 dl Added executeTask
+ 20Jan1999 dl Allow use of priorities; reformat stats
+ 6Feb1999 dl Lazy thread starts
+ 27Apr1999 dl Renamed
+*/
+
+package scala.actors;
+
+/**
+ * A stripped down analog of a ThreadGroup used for
+ * establishing and managing FJTaskRunner threads.
+ * ThreadRunnerGroups serve as the control boundary separating
+ * the general world of normal threads from the specialized world
+ * of FJTasks.
+ * <p>
+ * By intent, this class does not subclass java.lang.ThreadGroup, and
+ * does not support most methods found in ThreadGroups, since they
+ * would make no sense for FJTaskRunner threads. In fact, the class
+ * does not deal with ThreadGroups at all. If you want to restrict
+ * a FJTaskRunnerGroup to a particular ThreadGroup, you can create
+ * it from within that ThreadGroup.
+ * <p>
+ * The main contextual parameter for a FJTaskRunnerGroup is
+ * the group size, established in the constructor.
+ * Groups must be of a fixed size.
+ * There is no way to dynamically increase or decrease the number
+ * of threads in an existing group.
+ * <p>
+ * In general, the group size should be equal to the number
+ * of CPUs on the system. (Unfortunately, there is no portable
+ * means of automatically detecting the number of CPUs on a JVM, so there is
+ * no good way to automate defaults.) In principle, when
+ * FJTasks are used for computation-intensive tasks, having only
+ * as many threads as CPUs should minimize bookkeeping overhead
+ * and contention, and so maximize throughput. However, because
+ * FJTaskRunners lie atop Java threads, and in turn operating system
+ * thread support and scheduling policies,
+ * it is very possible that using more threads
+ * than CPUs will improve overall throughput even though it adds
+ * to overhead. This will always be so if FJTasks are I/O bound.
+ * So it may pay to experiment a bit when tuning on particular platforms.
+ * You can also use <code>setRunPriorities</code> to either
+ * increase or decrease the priorities of active threads, which
+ * may interact with group size choice.
+ * <p>
+ * In any case, overestimating group sizes never
+ * seriously degrades performance (at least within reasonable bounds).
+ * You can also use a value
+ * less than the number of CPUs in order to reserve processing
+ * for unrelated threads.
+ * <p>
+ * There are two general styles for using a FJTaskRunnerGroup.
+ * You can create one group per entire program execution, for example
+ * as a static singleton, and use it for all parallel tasks:
+ * <pre>
+ * class Tasks {
+ * static FJTaskRunnerGroup group;
+ * public void initialize(int groupsize) {
+ * group = new FJTaskRunnerGroup(groupSize);
+ * }
+ * // ...
+ * }
+ * </pre>
+ * Alternatively, you can make new groups on the fly and use them only for
+ * particular task sets. This is more flexible,,
+ * and leads to more controllable and deterministic execution patterns,
+ * but it encounters greater overhead on startup. Also, to reclaim
+ * system resources, you should
+ * call <code>FJTaskRunnerGroup.interruptAll</code> when you are done
+ * using one-shot groups. Otherwise, because FJTaskRunners set
+ * <code>Thread.isDaemon</code>
+ * status, they will not normally be reclaimed until program termination.
+ * <p>
+ * The main supported methods are <code>execute</code>,
+ * which starts a task processed by FJTaskRunner threads,
+ * and <code>invoke</code>, which starts one and waits for completion.
+ * For example, you might extend the above <code>FJTasks</code>
+ * class to support a task-based computation, say, the
+ * <code>Fib</code> class from the <code>FJTask</code> documentation:
+ * <pre>
+ * class Tasks { // continued
+ * // ...
+ * static int fib(int n) {
+ * try {
+ * Fib f = new Fib(n);
+ * group.invoke(f);
+ * return f.getAnswer();
+ * }
+ * catch (InterruptedException ex) {
+ * throw new Error("Interrupted during computation");
+ * }
+ * }
+ * }
+ * </pre>
+ * <p>
+ * Method <code>stats()</code> can be used to monitor performance.
+ * Both FJTaskRunnerGroup and FJTaskRunner may be compiled with
+ * the compile-time constant COLLECT_STATS set to false. In this
+ * case, various simple counts reported in stats() are not collected.
+ * On platforms tested,
+ * this leads to such a tiny performance improvement that there is
+ * very little motivation to bother.
+ *
+ * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
+ * <p>
+ * @see FJTask
+ * @see FJTaskRunner
+ **/
+
+public class FJTaskRunnerGroup implements IFJTaskRunnerGroup {
+
+ /** The threads in this group **/
+ protected /*final*/ FJTaskRunner[] threads;
+
+ /** Group-wide queue for tasks entered via execute() **/
+ protected final LinkedQueue entryQueue = new LinkedQueue();
+
+ /** Number of threads that are not waiting for work **/
+ protected int activeCount = 0;
+
+ /** Number of threads that have been started. Used to avoid
+ unecessary contention during startup of task sets.
+ **/
+ protected int nstarted = 0;
+
+ /**
+ * Compile-time constant. If true, various counts of
+ * runs, waits, etc., are maintained. These are NOT
+ * updated with synchronization, so statistics reports
+ * might not be accurate.
+ **/
+
+ static final boolean COLLECT_STATS = true;
+ // static final boolean COLLECT_STATS = false;
+
+ // for stats
+
+ /** The time at which this ThreadRunnerGroup was constructed **/
+ long initTime = 0;
+
+ /** Total number of executes or invokes **/
+ int entries = 0;
+
+ static final int DEFAULT_SCAN_PRIORITY = Thread.MIN_PRIORITY+1;
+
+ /**
+ * Create a FJTaskRunnerGroup with the indicated number
+ * of FJTaskRunner threads. Normally, the best size to use is
+ * the number of CPUs on the system.
+ * <p>
+ * The threads in a FJTaskRunnerGroup are created with their
+ * isDaemon status set, so do not normally need to be
+ * shut down manually upon program termination.
+ **/
+
+ public FJTaskRunnerGroup(int groupSize) {
+ //System.out.println("Creating FJTaskRunnerGroup of size "+groupSize);
+ threads = new FJTaskRunner[groupSize];
+ initializeThreads();
+ initTime = System.currentTimeMillis();
+ }
+
+
+ public boolean existsTask() {
+ FJTask task = null;
+ /*
+ Circularly traverse from a random start index.
+
+ This differs slightly from cilk version that uses a random index
+ for each attempted steal.
+ Exhaustive scanning might impede analytic tractablity of
+ the scheduling policy, but makes it much easier to deal with
+ startup and shutdown.
+ */
+
+ FJTaskRunner[] ts = threads;
+ int idx = /*victimRNG.nextInt(ts.length)*/ 0;
+
+ for (int i = 0; i < ts.length; ++i) {
+ FJTaskRunner t = ts[idx];
+ if (++idx >= ts.length) idx = 0; // circularly traverse
+
+ if (t != null) {
+ task = t.take();
+ if (task != null) {
+ break;
+ }
+ }
+ }
+
+ if (task == null) {
+ task = pollEntryQueue();
+ }
+
+ if (task != null && !task.isDone()) {
+ boolean quit = false;
+ while (!quit) {
+ quit = true;
+ try {
+ // put task back into entry queue and return true
+ entryQueue.put(task);
+ } catch (InterruptedException ie) {
+ quit = false;
+ }
+ }
+ return true;
+ }
+ else return false;
+ }
+
+ public boolean checkPoolSize() {
+ // if there is a task in the entryQueue or any of the
+ // worker queues, increase thread pool size by one
+ if (!entryQueue.isEmpty() ||
+ existsTask()) {
+ int newsize = threads.length + 1;
+ FJTaskRunner[] newar = new FJTaskRunner[newsize];
+ System.arraycopy(threads, 0, newar, 0, newsize-1);
+ synchronized(this) {
+ threads = newar;
+ threads[newsize-1] = new FJTaskRunner(this);
+ }
+ return true;
+ }
+ else return false;
+ }
+
+
+ /**
+ * Arrange for execution of the given task
+ * by placing it in a work queue. If the argument
+ * is not of type FJTask, it is embedded in a FJTask via
+ * <code>FJTask.Wrap</code>.
+ * @exception InterruptedException if current Thread is
+ * currently interrupted
+ **/
+
+ public void execute(Runnable r) throws InterruptedException {
+ if (r instanceof FJTask) {
+ entryQueue.put((FJTask)r);
+ }
+ else {
+ entryQueue.put(new FJTask.Wrap(r));
+ }
+ signalNewTask();
+ }
+
+
+ /**
+ * Specialized form of execute called only from within FJTasks
+ **/
+ public void executeTask(FJTask t) {
+ try {
+ entryQueue.put(t);
+ signalNewTask();
+ }
+ catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+
+ /**
+ * Start a task and wait it out. Returns when the task completes.
+ * @exception InterruptedException if current Thread is
+ * interrupted before completion of the task.
+ **/
+
+ public void invoke(Runnable r) throws InterruptedException {
+ InvokableFJTask w = new InvokableFJTask(r);
+ entryQueue.put(w);
+ signalNewTask();
+ w.awaitTermination();
+ }
+
+
+ /**
+ * Try to shut down all FJTaskRunner threads in this group
+ * by interrupting them all. This method is designed
+ * to be used during cleanup when it is somehow known
+ * that all threads are idle.
+ * FJTaskRunners only
+ * check for interruption when they are not otherwise
+ * processing a task (and its generated subtasks,
+ * if any), so if any threads are active, shutdown may
+ * take a while, and may lead to unpredictable
+ * task processing.
+ **/
+
+ public void interruptAll() {
+ // paranoically interrupt current thread last if in group.
+ Thread current = Thread.currentThread();
+ boolean stopCurrent = false;
+
+ for (int i = 0; i < threads.length; ++i) {
+ Thread t = threads[i];
+ if (t == current)
+ stopCurrent = true;
+ else
+ t.interrupt();
+ }
+ if (stopCurrent)
+ current.interrupt();
+ }
+
+
+ /**
+ * Set the priority to use while a FJTaskRunner is
+ * polling for new tasks to perform. Default
+ * is currently Thread.MIN_PRIORITY+1. The value
+ * set may not go into effect immediately, but
+ * will be used at least the next time a thread scans for work.
+ **/
+ public synchronized void setScanPriorities(int pri) {
+ for (int i = 0; i < threads.length; ++i) {
+ FJTaskRunner t = threads[i];
+ t.setScanPriority(pri);
+ if (!t.active) t.setPriority(pri);
+ }
+ }
+
+
+ /**
+ * Set the priority to use while a FJTaskRunner is
+ * actively running tasks. Default
+ * is the priority that was in effect by the thread that
+ * constructed this FJTaskRunnerGroup. Setting this value
+ * while threads are running may momentarily result in
+ * them running at this priority even when idly waiting for work.
+ **/
+ public synchronized void setRunPriorities(int pri) {
+ for (int i = 0; i < threads.length; ++i) {
+ FJTaskRunner t = threads[i];
+ t.setRunPriority(pri);
+ if (t.active) t.setPriority(pri);
+ }
+ }
+
+
+
+ /** Return the number of FJTaskRunner threads in this group **/
+
+ public int size() { return threads.length; }
+
+
+ /**
+ * Return the number of threads that are not idly waiting for work.
+ * Beware that even active threads might not be doing any useful
+ * work, but just spinning waiting for other dependent tasks.
+ * Also, since this is just a snapshot value, some tasks
+ * may be in the process of becoming idle.
+ **/
+ public synchronized int getActiveCount() { return activeCount; }
+
+ /**
+ * Prints various snapshot statistics to System.out.
+ * <ul>
+ * <li> For each FJTaskRunner thread (labeled as T<em>n</em>, for
+ * <em>n</em> from zero to group size - 1):
+ * <ul>
+ * <li> A star "*" is printed if the thread is currently active;
+ * that is, not sleeping while waiting for work. Because
+ * threads gradually enter sleep modes, an active thread
+ * may in fact be about to sleep (or wake up).
+ * <li> <em>Q Cap</em> The current capacity of its task queue.
+ * <li> <em>Run</em> The total number of tasks that have been run.
+ * <li> <em>New</em> The number of these tasks that were
+ * taken from either the entry queue or from other
+ * thread queues; that is, the number of tasks run
+ * that were <em>not</em> forked by the thread itself.
+ * <li> <em>Scan</em> The number of times other task
+ * queues or the entry queue were polled for tasks.
+ * </ul>
+ * <li> <em>Execute</em> The total number of tasks entered
+ * (but not necessarily yet run) via execute or invoke.
+ * <li> <em>Time</em> Time in seconds since construction of this
+ * FJTaskRunnerGroup.
+ * <li> <em>Rate</em> The total number of tasks processed
+ * per second across all threads. This
+ * may be useful as a simple throughput indicator
+ * if all processed tasks take approximately the
+ * same time to run.
+ * </ul>
+ * <p>
+ * Cautions: Some statistics are updated and gathered
+ * without synchronization,
+ * so may not be accurate. However, reported counts may be considered
+ * as lower bounds of actual values.
+ * Some values may be zero if classes are compiled
+ * with COLLECT_STATS set to false. (FJTaskRunner and FJTaskRunnerGroup
+ * classes can be independently compiled with different values of
+ * COLLECT_STATS.) Also, the counts are maintained as ints so could
+ * overflow in exceptionally long-lived applications.
+ * <p>
+ * These statistics can be useful when tuning algorithms or diagnosing
+ * problems. For example:
+ * <ul>
+ * <li> High numbers of scans may mean that there is insufficient
+ * parallelism to keep threads busy. However, high scan rates
+ * are expected if the number
+ * of Executes is also high or there is a lot of global
+ * synchronization in the application, and the system is not otherwise
+ * busy. Threads may scan
+ * for work hundreds of times upon startup, shutdown, and
+ * global synch points of task sets.
+ * <li> Large imbalances in tasks run across different threads might
+ * just reflect contention with unrelated threads on a system
+ * (possibly including JVM threads such as GC), but may also
+ * indicate some systematic bias in how you generate tasks.
+ * <li> Large task queue capacities may mean that too many tasks are being
+ * generated before they can be run.
+ * Capacities are reported rather than current numbers of tasks
+ * in queues because they are better indicators of the existence
+ * of these kinds of possibly-transient problems.
+ * Queue capacities are
+ * resized on demand from their initial value of 4096 elements,
+ * which is much more than sufficient for the kinds of
+ * applications that this framework is intended to best support.
+ * </ul>
+ **/
+
+ public void stats() {
+ long time = System.currentTimeMillis() - initTime;
+ double secs = ((double)time) / 1000.0;
+ long totalRuns = 0;
+ long totalScans = 0;
+ long totalSteals = 0;
+
+ System.out.print("Thread" +
+ "\tQ Cap" +
+ "\tScans" +
+ "\tNew" +
+ "\tRuns" +
+ "\n");
+
+ for (int i = 0; i < threads.length; ++i) {
+ FJTaskRunner t = threads[i];
+ int truns = t.runs;
+ totalRuns += truns;
+
+ int tscans = t.scans;
+ totalScans += tscans;
+
+ int tsteals = t.steals;
+ totalSteals += tsteals;
+
+ String star = (getActive(t))? "*" : " ";
+
+
+ System.out.print("T" + i + star +
+ "\t" + t.deqSize() +
+ "\t" + tscans +
+ "\t" + tsteals +
+ "\t" + truns +
+ "\n");
+ }
+
+ System.out.print("Total" +
+ "\t " +
+ "\t" + totalScans +
+ "\t" + totalSteals +
+ "\t" + totalRuns +
+ "\n");
+
+ System.out.print("Execute: " + entries);
+
+ System.out.print("\tTime: " + secs);
+
+ long rps = 0;
+ if (secs != 0) rps = Math.round((double)(totalRuns) / secs);
+
+ System.out.println("\tRate: " + rps);
+ }
+
+
+ /* ------------ Methods called only by FJTaskRunners ------------- */
+
+
+ /**
+ * Return the array of threads in this group.
+ * Called only by FJTaskRunner.scan().
+ **/
+
+ public FJTaskRunner[] getArray() { return threads; }
+
+
+ /**
+ * Return a task from entry queue, or null if empty.
+ * Called only by FJTaskRunner.scan().
+ **/
+
+ public FJTask pollEntryQueue() {
+ try {
+ FJTask t = (FJTask)(entryQueue.poll(0));
+ return t;
+ }
+ catch(InterruptedException ex) { // ignore interrupts
+ Thread.currentThread().interrupt();
+ return null;
+ }
+ }
+
+
+ /**
+ * Return active status of t.
+ * Per-thread active status can only be accessed and
+ * modified via synchronized method here in the group class.
+ **/
+
+ protected synchronized boolean getActive(FJTaskRunner t) {
+ return t.active;
+ }
+
+
+ /**
+ * Set active status of thread t to true, and notify others
+ * that might be waiting for work.
+ **/
+
+ public synchronized void setActive(FJTaskRunner t) {
+ if (!t.active) {
+ t.active = true;
+ ++activeCount;
+ if (nstarted < threads.length)
+ threads[nstarted++].start();
+ else
+ notifyAll();
+ }
+ }
+
+ /**
+ * Set active status of thread t to false.
+ **/
+
+ public synchronized void setInactive(FJTaskRunner t) {
+ if (t.active) {
+ t.active = false;
+ --activeCount;
+ }
+ }
+
+ /**
+ * The number of times to scan other threads for tasks
+ * before transitioning to a mode where scans are
+ * interleaved with sleeps (actually timed waits).
+ * Upon transition, sleeps are for duration of
+ * scans / SCANS_PER_SLEEP milliseconds.
+ * <p>
+ * This is not treated as a user-tunable parameter because
+ * good values do not appear to vary much across JVMs or
+ * applications. Its main role is to help avoid some
+ * useless spinning and contention during task startup.
+ **/
+ static final long SCANS_PER_SLEEP = 15;
+
+ /**
+ * The maximum time (in msecs) to sleep when a thread is idle,
+ * yet others are not, so may eventually generate work that
+ * the current thread can steal. This value reflects the maximum time
+ * that a thread may sleep when it possibly should not, because there
+ * are other active threads that might generate work. In practice,
+ * designs in which some threads become stalled because others
+ * are running yet not generating tasks are not likely to work
+ * well in this framework anyway, so the exact value does not matter
+ * too much. However, keeping it in the sub-second range does
+ * help smooth out startup and shutdown effects.
+ **/
+
+ static final long MAX_SLEEP_TIME = 100;
+
+ /**
+ * Set active status of thread t to false, and
+ * then wait until: (a) there is a task in the entry
+ * queue, or (b) other threads are active, or (c) the current
+ * thread is interrupted. Upon return, it
+ * is not certain that there will be work available.
+ * The thread must itself check.
+ * <p>
+ * The main underlying reason
+ * for these mechanics is that threads do not
+ * signal each other when they add elements to their queues.
+ * (This would add to task overhead, reduce locality.
+ * and increase contention.)
+ * So we must rely on a tamed form of polling. However, tasks
+ * inserted into the entry queue do result in signals, so
+ * tasks can wait on these if all of them are otherwise idle.
+ **/
+
+ public synchronized void checkActive(FJTaskRunner t, long scans) {
+
+ setInactive(t);
+
+ try {
+ // if nothing available, do a hard wait
+ if (activeCount == 0 && entryQueue.peek() == null) {
+ wait();
+ }
+ else {
+ // If there is possibly some work,
+ // sleep for a while before rechecking
+
+ long msecs = scans / SCANS_PER_SLEEP;
+ if (msecs > MAX_SLEEP_TIME) msecs = MAX_SLEEP_TIME;
+ int nsecs = (msecs == 0) ? 1 : 0; // forces shortest possible sleep
+ wait(msecs, nsecs);
+ }
+ }
+ catch (InterruptedException ex) {
+ notify(); // avoid lost notifies on interrupts
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ /* ------------ Utility methods ------------- */
+
+ /**
+ * Start or wake up any threads waiting for work
+ **/
+
+ protected synchronized void signalNewTask() {
+ if (COLLECT_STATS) ++entries;
+ if (nstarted < threads.length)
+ threads[nstarted++].start();
+ else
+ notify();
+ }
+
+ /**
+ * Create all FJTaskRunner threads in this group.
+ **/
+
+ protected void initializeThreads() {
+ for (int i = 0; i < threads.length; ++i) threads[i] = new FJTaskRunner(this);
+ }
+
+
+
+
+ /**
+ * Wrap wait/notify mechanics around a task so that
+ * invoke() can wait it out
+ **/
+ protected static final class InvokableFJTask extends FJTask {
+ protected final Runnable wrapped;
+ protected boolean terminated = false;
+
+ protected InvokableFJTask(Runnable r) { wrapped = r; }
+
+ public void run() {
+ try {
+ if (wrapped instanceof FJTask)
+ FJTask.invoke((FJTask)(wrapped));
+ else
+ wrapped.run();
+ }
+ finally {
+ setTerminated();
+ }
+ }
+
+ protected synchronized void setTerminated() {
+ terminated = true;
+ notifyAll();
+ }
+
+ protected synchronized void awaitTermination() throws InterruptedException {
+ while (!terminated) wait();
+ }
+ }
+
+
+}
+
diff --git a/src/actors/scala/actors/FJTaskScheduler2.scala b/src/actors/scala/actors/FJTaskScheduler2.scala
new file mode 100644
index 0000000000..ddb0908fd7
--- /dev/null
+++ b/src/actors/scala/actors/FJTaskScheduler2.scala
@@ -0,0 +1,144 @@
+
+package scala.actors
+
+import compat.Platform
+
+import java.lang.{Runnable, Thread, InterruptedException, System}
+
+import scala.collection.Set
+import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, HashSet}
+
+/**
+ * FJTaskScheduler2
+ *
+ * @version 0.9.4
+ * @author Philipp Haller
+ */
+class FJTaskScheduler2 extends Thread with IScheduler {
+
+ val printStats = false
+ //val printStats = true
+
+ val coreProp = System.getProperty("actors.corePoolSize")
+ val maxProp = System.getProperty("actors.maxPoolSize")
+
+ val initCoreSize =
+ if (null ne coreProp) Integer.parseInt(coreProp)
+ else 4
+
+ val maxSize =
+ if (null ne maxProp) Integer.parseInt(maxProp)
+ else 256
+
+ private var coreSize = initCoreSize
+
+ private val executor =
+ new FJTaskRunnerGroup(coreSize)
+
+ private var terminating = false
+
+ private var lastActivity = Platform.currentTime
+
+ private var submittedTasks = 0
+
+ private var pendingReactions = 0
+ def pendReaction: unit = synchronized {
+ pendingReactions = pendingReactions + 1
+ }
+ def unPendReaction: unit = synchronized {
+ pendingReactions = pendingReactions - 1
+ }
+
+ def printActorDump {}
+ def terminated(a: Actor) {}
+
+ private val TICK_FREQ = 50
+ private val CHECK_FREQ = 100
+
+ def onLockup(handler: () => unit) =
+ lockupHandler = handler
+
+ def onLockup(millis: int)(handler: () => unit) = {
+ //LOCKUP_CHECK_FREQ = millis / CHECK_FREQ
+ lockupHandler = handler
+ }
+
+ private var lockupHandler: () => unit = null
+
+ override def run(): unit = {
+ try {
+ while (!terminating) {
+ this.synchronized {
+ try {
+ wait(CHECK_FREQ)
+ } catch {
+ case _: InterruptedException =>
+ if (terminating) throw new QuitException
+ }
+
+ // check if we need more threads
+ if (Platform.currentTime - lastActivity >= TICK_FREQ
+ && coreSize < maxSize
+ && executor.checkPoolSize()) {
+ // do nothing
+ }
+ else {
+ if (pendingReactions == 0) {
+ // if all worker threads idle terminate
+ if (executor.getActiveCount() == 0) {
+ // Note that we don't have to shutdown
+ // the FJTaskRunnerGroup since there is
+ // no separate thread associated with it,
+ // and FJTaskRunner threads have daemon status.
+
+ // terminate timer thread
+ TimerThread.t.interrupt()
+ throw new QuitException
+ }
+ }
+ }
+ } // sync
+
+ } // while (!terminating)
+ } catch {
+ case _: QuitException =>
+ // allow thread to exit
+ if (printStats) executor.stats()
+ }
+ }
+
+ /**
+ * @param item the task to be executed.
+ */
+ def execute(task: Reaction) {
+ executor.execute(task)
+ }
+
+ def start(task: Reaction) {
+ this.synchronized {
+ pendingReactions = pendingReactions + 1
+ }
+ executor.execute(task)
+ }
+
+ /**
+ * @param worker the worker thread executing tasks
+ * @return the executed task
+ */
+ def getTask(worker: WorkerThread) = null
+
+ /**
+ * @param a the actor
+ */
+ def tick(a: Actor) {
+ lastActivity = Platform.currentTime
+ }
+
+ /** Shuts down all idle worker threads.
+ */
+ def shutdown(): unit = synchronized {
+ terminating = true
+ // terminate timer thread
+ TimerThread.t.interrupt()
+ }
+}
diff --git a/src/actors/scala/actors/Future.scala b/src/actors/scala/actors/Future.scala
new file mode 100644
index 0000000000..3ea87ae29d
--- /dev/null
+++ b/src/actors/scala/actors/Future.scala
@@ -0,0 +1,105 @@
+
+package scala.actors
+
+/**
+ * A Future is a function of arity 0 that returns a value of type Any.
+ * Applying a future blocks the current actor until its value
+ * is available.
+ * A future can be queried to find out whether its value
+ * is already available.
+ *
+ * @version 0.9.4
+ * @author Philipp Haller
+ */
+abstract class Future[T](val ch: InputChannel[Any]) extends Function0[T] {
+ def isSet: boolean
+ var value: Option[T] = None
+}
+
+/**
+ * The Futures object contains methods that operate on Futures.
+ *
+ * @version 0.9.4
+ * @author Philipp Haller
+ */
+object Futures {
+
+ def spawn[T](body: => T): Future[T] = {
+ case object Eval
+ val a = Actor.actor {
+ Actor.react {
+ case Eval => Actor.reply(body)
+ }
+ }
+ a !! (Eval, { case any => any.asInstanceOf[T] })
+ }
+
+ def alarm(t: long) = spawn { Thread.sleep(t) }
+
+ def awaitEither[a, b](ft1: Future[a], ft2: Future[b]): Any = {
+ val FutCh1 = ft1.ch; val FutCh2 = ft2.ch
+ Actor.receive {
+ case FutCh1 ! arg1 => arg1
+ case FutCh2 ! arg2 => arg2
+ }
+ }
+
+ /**
+ * Awaits all futures returning an option containing a list of replies,
+ * or timeouts returning None.
+ * Note that some of the futures might already have been awaited.
+ */
+ def awaitAll(timeout: long, fts: Future[Any]*): List[Option[Any]] = {
+ var resultsMap: collection.mutable.Map[Int, Option[Any]] = new collection.mutable.HashMap[Int, Option[Any]]
+
+ var cnt = 0
+ val mappedFts = fts.map(ft =>
+ Pair({cnt=cnt+1; cnt-1}, ft))
+
+ val unsetFts = mappedFts.filter((p: Pair[int, Future[Any]]) => {
+ if (p._2.isSet) { resultsMap(p._1) = Some(p._2()); false }
+ else { resultsMap(p._1) = None; true }
+ })
+
+ val partFuns = unsetFts.map((p: Pair[int, Future[Any]]) => {
+ val FutCh = p._2.ch
+ val singleCase: PartialFunction[Any, Pair[int, Any]] = {
+ case FutCh ! any => Pair(p._1, any)
+ }
+ singleCase
+ })
+
+ def awaitWith(partFuns: Seq[PartialFunction[Any, Pair[int, Any]]]) {
+ val reaction: PartialFunction[Any, unit] = new PartialFunction[Any, unit] {
+ def isDefinedAt(msg: Any) = msg match {
+ case TIMEOUT => true
+ case _ => partFuns exists (.isDefinedAt(msg))
+ }
+ def apply(msg: Any): unit = msg match {
+ case TIMEOUT => // do nothing
+ case _ => {
+ val pfOpt = partFuns find (.isDefinedAt(msg))
+ val pf = pfOpt.get // succeeds always
+ val Pair(idx, subres) = pf(msg)
+ resultsMap(idx) = Some(subres)
+
+ val partFunsRest = partFuns filter (.!=(pf))
+ // wait on rest of partial functions
+ if (partFunsRest.length > 0)
+ awaitWith(partFunsRest)
+ }
+ }
+ }
+ Actor.receiveWithin(timeout)(reaction)
+ }
+
+ awaitWith(partFuns)
+
+ var results: List[Option[Any]] = Nil
+ val size = resultsMap.size
+ for (val i <- 0 until size) {
+ results = resultsMap(size - i - 1) :: results
+ }
+ results
+ }
+}
diff --git a/src/actors/scala/actors/IFJTaskRunnerGroup.java b/src/actors/scala/actors/IFJTaskRunnerGroup.java
new file mode 100644
index 0000000000..2e9f3359b8
--- /dev/null
+++ b/src/actors/scala/actors/IFJTaskRunnerGroup.java
@@ -0,0 +1,12 @@
+
+package scala.actors;
+
+interface IFJTaskRunnerGroup {
+ public void executeTask(FJTask t);
+ public FJTaskRunner[] getArray();
+ public FJTask pollEntryQueue();
+ public void setActive(FJTaskRunner t);
+ public void checkActive(FJTaskRunner t, long scans);
+ public void setInactive(FJTaskRunner t);
+
+}
diff --git a/src/actors/scala/actors/InputChannel.scala b/src/actors/scala/actors/InputChannel.scala
index 80f334b30d..f537c754a0 100644
--- a/src/actors/scala/actors/InputChannel.scala
+++ b/src/actors/scala/actors/InputChannel.scala
@@ -14,12 +14,12 @@ package scala.actors
* The <code>InputChannel</code> trait provides a common interface
* for all channels from which values can be received.
*
- * @version 0.9.2
+ * @version 0.9.4
* @author Philipp Haller
*/
-trait InputChannel[Msg] {
- def receive[R](f: PartialFunction[Any, R]): R
+trait InputChannel[+Msg] {
+ def receive[R](f: PartialFunction[Msg, R]): R
def receiveWithin[R](msec: long)(f: PartialFunction[Any, R]): R
- def react(f: PartialFunction[Any, Unit]): Nothing
+ def react(f: PartialFunction[Msg, Unit]): Nothing
def reactWithin(msec: long)(f: PartialFunction[Any, Unit]): Nothing
}
diff --git a/src/actors/scala/actors/LinkedNode.java b/src/actors/scala/actors/LinkedNode.java
new file mode 100644
index 0000000000..bf8ca02a74
--- /dev/null
+++ b/src/actors/scala/actors/LinkedNode.java
@@ -0,0 +1,25 @@
+/*
+ File: LinkedNode.java
+
+ Originally written by Doug Lea and released into the public domain.
+ This may be used for any purposes whatsoever without acknowledgment.
+ Thanks for the assistance and support of Sun Microsystems Labs,
+ and everyone contributing, testing, and using this code.
+
+ History:
+ Date Who What
+ 11Jun1998 dl Create public version
+ 25may2000 dl Change class access to public
+ 26nov2001 dl Added no-arg constructor, all public access.
+*/
+
+package scala.actors;
+
+/** A standard linked list node used in various queue classes **/
+public class LinkedNode {
+ public Object value;
+ public LinkedNode next;
+ public LinkedNode() {}
+ public LinkedNode(Object x) { value = x; }
+ public LinkedNode(Object x, LinkedNode n) { value = x; next = n; }
+}
diff --git a/src/actors/scala/actors/LinkedQueue.java b/src/actors/scala/actors/LinkedQueue.java
new file mode 100644
index 0000000000..796f428cf5
--- /dev/null
+++ b/src/actors/scala/actors/LinkedQueue.java
@@ -0,0 +1,185 @@
+/*
+ File: LinkedQueue.java
+
+ Originally written by Doug Lea and released into the public domain.
+ This may be used for any purposes whatsoever without acknowledgment.
+ Thanks for the assistance and support of Sun Microsystems Labs,
+ and everyone contributing, testing, and using this code.
+
+ History:
+ Date Who What
+ 11Jun1998 dl Create public version
+ 25aug1998 dl added peek
+ 10dec1998 dl added isEmpty
+ 10oct1999 dl lock on node object to ensure visibility
+*/
+
+package scala.actors;
+
+/**
+ * A linked list based channel implementation.
+ * The algorithm avoids contention between puts
+ * and takes when the queue is not empty.
+ * Normally a put and a take can proceed simultaneously.
+ * (Although it does not allow multiple concurrent puts or takes.)
+ * This class tends to perform more efficently than
+ * other Channel implementations in producer/consumer
+ * applications.
+ * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
+ **/
+
+public class LinkedQueue {
+
+
+ /**
+ * Dummy header node of list. The first actual node, if it exists, is always
+ * at head_.next. After each take, the old first node becomes the head.
+ **/
+ protected LinkedNode head_;
+
+ /**
+ * Helper monitor for managing access to last node.
+ **/
+ protected final Object putLock_ = new Object();
+
+ /**
+ * The last node of list. Put() appends to list, so modifies last_
+ **/
+ protected LinkedNode last_;
+
+ /**
+ * The number of threads waiting for a take.
+ * Notifications are provided in put only if greater than zero.
+ * The bookkeeping is worth it here since in reasonably balanced
+ * usages, the notifications will hardly ever be necessary, so
+ * the call overhead to notify can be eliminated.
+ **/
+ protected int waitingForTake_ = 0;
+
+ public LinkedQueue() {
+ head_ = new LinkedNode(null);
+ last_ = head_;
+ }
+
+ /** Main mechanics for put/offer **/
+ protected void insert(Object x) {
+ synchronized(putLock_) {
+ LinkedNode p = new LinkedNode(x);
+ synchronized(last_) {
+ last_.next = p;
+ last_ = p;
+ }
+ if (waitingForTake_ > 0)
+ putLock_.notify();
+ }
+ }
+
+ /** Main mechanics for take/poll **/
+ protected synchronized Object extract() {
+ synchronized(head_) {
+ Object x = null;
+ LinkedNode first = head_.next;
+ if (first != null) {
+ x = first.value;
+ first.value = null;
+ head_ = first;
+ }
+ return x;
+ }
+ }
+
+
+ public void put(Object x) throws InterruptedException {
+ if (x == null) throw new IllegalArgumentException();
+ if (Thread.interrupted()) throw new InterruptedException();
+ insert(x);
+ }
+
+ public boolean offer(Object x, long msecs) throws InterruptedException {
+ if (x == null) throw new IllegalArgumentException();
+ if (Thread.interrupted()) throw new InterruptedException();
+ insert(x);
+ return true;
+ }
+
+ public Object take() throws InterruptedException {
+ if (Thread.interrupted()) throw new InterruptedException();
+ // try to extract. If fail, then enter wait-based retry loop
+ Object x = extract();
+ if (x != null)
+ return x;
+ else {
+ synchronized(putLock_) {
+ try {
+ ++waitingForTake_;
+ for (;;) {
+ x = extract();
+ if (x != null) {
+ --waitingForTake_;
+ return x;
+ }
+ else {
+ putLock_.wait();
+ }
+ }
+ }
+ catch(InterruptedException ex) {
+ --waitingForTake_;
+ putLock_.notify();
+ throw ex;
+ }
+ }
+ }
+ }
+
+ public Object peek() {
+ synchronized(head_) {
+ LinkedNode first = head_.next;
+ if (first != null)
+ return first.value;
+ else
+ return null;
+ }
+ }
+
+
+ public boolean isEmpty() {
+ synchronized(head_) {
+ return head_.next == null;
+ }
+ }
+
+ public Object poll(long msecs) throws InterruptedException {
+ if (Thread.interrupted()) throw new InterruptedException();
+ Object x = extract();
+ if (x != null)
+ return x;
+ else {
+ synchronized(putLock_) {
+ try {
+ long waitTime = msecs;
+ long start = (msecs <= 0)? 0 : System.currentTimeMillis();
+ ++waitingForTake_;
+ for (;;) {
+ x = extract();
+ if (x != null || waitTime <= 0) {
+ --waitingForTake_;
+ return x;
+ }
+ else {
+ putLock_.wait(waitTime);
+ waitTime = msecs - (System.currentTimeMillis() - start);
+ }
+ }
+ }
+ catch(InterruptedException ex) {
+ --waitingForTake_;
+ putLock_.notify();
+ throw ex;
+ }
+ }
+ }
+ }
+}
+
+
diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala
index 947ad9947d..fe9fe9d52b 100644
--- a/src/actors/scala/actors/Scheduler.scala
+++ b/src/actors/scala/actors/Scheduler.scala
@@ -22,14 +22,15 @@ import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, Has
* The <code>Scheduler</code> object is used by
* <code>Actor</code> to execute tasks of an execution of an actor.
*
- * @version 0.9.2
+ * @version 0.9.4
* @author Philipp Haller
*/
object Scheduler {
private var sched: IScheduler =
{
- var s: IScheduler = null
+ var s: IScheduler = new FJTaskScheduler2
+/*
// Check for JDK version >= 1.5
var olderThanJDK5 = false
try {
@@ -43,6 +44,7 @@ object Scheduler {
new TickedScheduler
else
Class.forName("scala.actors.ThreadPoolScheduler").newInstance().asInstanceOf[IScheduler]
+*/
s.start()
s
}
@@ -53,7 +55,18 @@ object Scheduler {
}
def start(task: Reaction) = sched.start(task)
- def execute(task: Reaction) = sched.execute(task)
+ def execute(task: Reaction) = {
+ val t = currentThread
+ if (t.isInstanceOf[FJTaskRunner]) {
+ val tr = t.asInstanceOf[FJTaskRunner]
+ tr.push(new FJTask {
+ def run() {
+ task.run()
+ }
+ })
+ } else sched.execute(task)
+ }
+
def tick(a: Actor) = sched.tick(a)
def terminated(a: Actor) = sched.terminated(a)
def pendReaction: unit = sched.pendReaction
@@ -71,7 +84,7 @@ object Scheduler {
* This abstract class provides a common interface for all
* schedulers used to execute actor tasks.
*
- * @version 0.9.2
+ * @version 0.9.4
* @author Philipp Haller
*/
trait IScheduler {
@@ -101,7 +114,7 @@ trait IScheduler {
* This scheduler executes the tasks of an actor on a single
* thread (the current thread).
*
- * @version 0.9.2
+ * @version 0.9.4
* @author Philipp Haller
*/
class SingleThreadedScheduler extends IScheduler {
@@ -135,7 +148,7 @@ class SingleThreadedScheduler extends IScheduler {
* The <code>QuickException</code> class is used to manage control flow
* of certain schedulers and worker threads.
*
- * @version 0.9.2
+ * @version 0.9.4
* @author Philipp Haller
*/
private[actors] class QuitException extends Throwable {
@@ -195,7 +208,7 @@ private[actors] class QuitException extends Throwable {
* execution. QED
* </p>
*
- * @version 0.9.2
+ * @version 0.9.4
* @author Philipp Haller
*/
class WorkerThread(sched: IScheduler) extends Thread {