diff options
author | Philipp Haller <hallerp@gmail.com> | 2007-02-19 10:45:23 +0000 |
---|---|---|
committer | Philipp Haller <hallerp@gmail.com> | 2007-02-19 10:45:23 +0000 |
commit | d710f4e615de1b2875c72ba7d4a56c9a893b2a16 (patch) | |
tree | 684fc4cc0c62ea47bc11b24f496ffb7f09cc6340 | |
parent | 2a27ffb80e27685e36c170b096d71efec0c45357 (diff) | |
download | scala-d710f4e615de1b2875c72ba7d4a56c9a893b2a16.tar.gz scala-d710f4e615de1b2875c72ba7d4a56c9a893b2a16.tar.bz2 scala-d710f4e615de1b2875c72ba7d4a56c9a893b2a16.zip |
scala.actors: integrated FJ. added futures.
-rw-r--r-- | src/actors/scala/actors/Actor.scala | 84 | ||||
-rw-r--r-- | src/actors/scala/actors/Channel.scala | 14 | ||||
-rw-r--r-- | src/actors/scala/actors/FJTask.java | 531 | ||||
-rw-r--r-- | src/actors/scala/actors/FJTaskRunner.java | 974 | ||||
-rw-r--r-- | src/actors/scala/actors/FJTaskRunnerGroup.java | 685 | ||||
-rw-r--r-- | src/actors/scala/actors/FJTaskScheduler2.scala | 144 | ||||
-rw-r--r-- | src/actors/scala/actors/Future.scala | 105 | ||||
-rw-r--r-- | src/actors/scala/actors/IFJTaskRunnerGroup.java | 12 | ||||
-rw-r--r-- | src/actors/scala/actors/InputChannel.scala | 8 | ||||
-rw-r--r-- | src/actors/scala/actors/LinkedNode.java | 25 | ||||
-rw-r--r-- | src/actors/scala/actors/LinkedQueue.java | 185 | ||||
-rw-r--r-- | src/actors/scala/actors/Scheduler.scala | 27 |
12 files changed, 2758 insertions, 36 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala index 42b870a383..701bfa6172 100644 --- a/src/actors/scala/actors/Actor.scala +++ b/src/actors/scala/actors/Actor.scala @@ -19,7 +19,7 @@ import compat.Platform * <code>receive</code>, <code>react</code>, <code>reply</code>, * etc. * - * @version 0.9.2 + * @version 0.9.4 * @author Philipp Haller */ object Actor { @@ -257,12 +257,15 @@ object Actor { * implementation of event-based actors. * </p> * <p> - * The main ideas of our approach are explained in the paper<br> + * The main ideas of our approach are explained in the papers<br> * <b>Event-Based Programming without Inversion of Control</b>, - * Philipp Haller, Martin Odersky <i>Proc. JMLC 2006</i> + * Philipp Haller and Martin Odersky, <i>Proc. JMLC 2006</i> + * <br><br> + * <b>Actors that Unify Threads and Events</b>, + * Philipp Haller and Martin Odersky, <i>LAMP-REPORT-2007-001, EPFL</i> * </p> * - * @version 0.9.2 + * @version 0.9.4 * @author Philipp Haller */ trait Actor extends OutputChannel[Any] { @@ -328,20 +331,28 @@ trait Actor extends OutputChannel[Any] { tick() val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m)) if (null eq qel) { - waitingFor = f.isDefinedAt - isSuspended = true - received = None - suspendActorFor(msec) - if (received.isEmpty) { - if (f.isDefinedAt(TIMEOUT)) { - waitingFor = waitingForNone - isSuspended = false - val result = f(TIMEOUT) - return result - } + if (msec == 0) { + if (f.isDefinedAt(TIMEOUT)) + return f(TIMEOUT) else error("unhandled timeout") } + else { + waitingFor = f.isDefinedAt + isSuspended = true + received = None + suspendActorFor(msec) + if (received.isEmpty) { + if (f.isDefinedAt(TIMEOUT)) { + waitingFor = waitingForNone + isSuspended = false + val result = f(TIMEOUT) + return result + } + else + error("unhandled timeout") + } + } } else { received = Some(qel.msg) sessions = qel.session :: sessions @@ -432,6 +443,44 @@ trait Actor extends OutputChannel[Any] { } } + def !!(msg: Any): Future[Any] = { + val ftch = new Channel[Any](Actor.self) + send(msg, ftch) + new Future[Any](ftch) { + def apply() = + if (isSet) value.get + else ch.receive { + case any => value = Some(any); any + } + def isSet = value match { + case None => ch.receiveWithin(0) { + case TIMEOUT => false + case any => value = Some(any); true + } + case Some(_) => true + } + } + } + + def !![a](msg: Any, f: PartialFunction[Any, a]): Future[a] = { + val ftch = new Channel[Any](Actor.self) + send(msg, ftch) + new Future[a](ftch) { + def apply() = + if (isSet) value.get + else ch.receive { + case any => value = Some(f(any)); value.get + } + def isSet = value match { + case None => ch.receiveWithin(0) { + case TIMEOUT => false + case any => value = Some(f(any)); true + } + case Some(_) => true + } + } + } + def reply(msg: Any): Unit = session ! msg private var rc = new Channel[Any](this) @@ -450,7 +499,6 @@ trait Actor extends OutputChannel[Any] { if (sessions.isEmpty) null else sessions.head.asInstanceOf[Channel[Any]] - private[actors] var continuation: PartialFunction[Any, Unit] = null private[actors] var timeoutPending = false private[actors] var isDetached = false @@ -654,7 +702,7 @@ trait Actor extends OutputChannel[Any] { * } * </pre> * - * @version 0.9.2 + * @version 0.9.4 * @author Philipp Haller */ case object TIMEOUT @@ -664,7 +712,7 @@ case object TIMEOUT * <p>This class is used to manage control flow of actor * executions.</p> * - * @version 0.9.2 + * @version 0.9.4 * @author Philipp Haller */ private[actors] class SuspendActorException extends Throwable { diff --git a/src/actors/scala/actors/Channel.scala b/src/actors/scala/actors/Channel.scala index fc473d76cc..8031ea297e 100644 --- a/src/actors/scala/actors/Channel.scala +++ b/src/actors/scala/actors/Channel.scala @@ -25,17 +25,17 @@ package scala.actors * } * </pre> * - * @version 0.9.2 + * @version 0.9.4 * @author Philipp Haller */ -case class ! [a](ch: Channel[a], msg: a) +case class ! [a](ch: InputChannel[a], msg: a) /** * This class provides a means for typed communication among * actors. Only the actor creating an instance of a * <code>Channel</code> may receive from it. * - * @version 0.9.2 + * @version 0.9.4 * @author Philipp Haller */ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] { @@ -74,10 +74,10 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] { * @param f a partial function with message patterns and actions * @return result of processing the received value */ - def receive[R](f: PartialFunction[Any, R]): R = { + def receive[R](f: PartialFunction[Msg, R]): R = { val C = this.asInstanceOf[Channel[Any]] receiver.receive { - case C ! msg if (f.isDefinedAt(msg)) => f(msg) + case C ! msg if (f.isDefinedAt(msg.asInstanceOf[Msg])) => f(msg.asInstanceOf[Msg]) } } @@ -105,10 +105,10 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] { * * @param f a partial function with message patterns and actions */ - def react(f: PartialFunction[Any, Unit]): Nothing = { + def react(f: PartialFunction[Msg, Unit]): Nothing = { val C = this.asInstanceOf[Channel[Any]] receiver.react { - case C ! msg if (f.isDefinedAt(msg)) => f(msg) + case C ! msg if (f.isDefinedAt(msg.asInstanceOf[Msg])) => f(msg.asInstanceOf[Msg]) } } diff --git a/src/actors/scala/actors/FJTask.java b/src/actors/scala/actors/FJTask.java new file mode 100644 index 0000000000..6398f1400e --- /dev/null +++ b/src/actors/scala/actors/FJTask.java @@ -0,0 +1,531 @@ +/* + File: Task.java + + Originally written by Doug Lea and released into the public domain. + This may be used for any purposes whatsoever without acknowledgment. + Thanks for the assistance and support of Sun Microsystems Labs, + and everyone contributing, testing, and using this code. + + History: + Date Who What + 7Jan1999 dl first release + 14jan1999 dl simplify start() semantics; + improve documentation + 18Jan1999 dl Eliminate useless time-based waits. + 7Mar1999 dl Add reset method, + add array-based composite operations + 27Apr1999 dl Rename +*/ + +package scala.actors; + + +/** + * Abstract base class for Fork/Join Tasks. + * + * <p> + * FJTasks are lightweight, stripped-down analogs of Threads. + * Many FJTasks share the same pool of Java threads. This is + * supported by the FJTaskRunnerGroup and FJTaskRunner classes, that + * mainly contain + * methods called only internally by FJTasks. + * FJTasks support versions of the most common methods found in class Thread, + * including start(), yield() and join(). However, they + * don't support priorities, ThreadGroups or other bookkeeping + * or control methods of class Thread. + * <p> + * FJTasks should normally be defined by subclassing and adding a run() method. + * Alternatively, static inner class <code>Wrap(Runnable r)</code> + * can be used to + * wrap an existing Runnable object in a FJTask. + * <p> + * <code>FJTaskRunnerGroup.execute(FJTask)</code> can be used to + * initiate a FJTask from a non-FJTask thread. + * And <code>FJTaskRunnerGroup.invoke(FJTask)</code> can be used to initiate + * a FJTask and then wait for it to complete before returning. + * These are the only entry-points from normal threads to FJTasks. + * Most FJTask methods themselves may only be called from within running FJTasks. + * They throw ClassCastExceptions if they are not, + * reflecting the fact that these methods + * can only be executed using FJTaskRunner threads, not generic + * java.lang.Threads. + * <p> + * There are three different ways to run a FJTask, + * with different scheduling semantics: + * <ul> + * <li> FJTask.start() (as well as FJTaskRunnerGroup.execute(FJTask)) + * behaves pretty much like Thread.start(). It enqueues a task to be + * run the next time any FJTaskRunner thread is otherwise idle. + * It maintains standard FIFO ordering with respect to + * the group of worker threads. + * <li> FJTask.fork() (as well as the two-task spawning method, + * coInvoke(task1, task2), and the array version + * coInvoke(FJTask[] tasks)) starts a task + * that will be executed in + * procedure-call-like LIFO order if executed by the + * same worker thread as the one that created it, but is FIFO + * with respect to other tasks if it is run by + * other worker threads. That is, earlier-forked + * tasks are preferred to later-forked tasks by other idle workers. + * Fork() is noticeably faster than start(), but can only be + * used when these scheduling semantics are acceptable. + * <li> FJTask.invoke(FJTask) just executes the run method + * of one task from within another. It is the analog of a + * direct call. + * </ul> + * <p> + * The main economies of FJTasks stem from the fact that + * FJTasks do not support blocking operations of any kind. + * FJTasks should just run to completion without + * issuing waits or performing blocking IO. + * There are several styles for creating the run methods that + * execute as tasks, including + * event-style methods, and pure computational methods. + * Generally, the best kinds of FJTasks are those that in turn + * generate other FJTasks. + * <p> + * There is nothing actually + * preventing you from blocking within a FJTask, and very short waits/blocks are + * completely well behaved. But FJTasks are not designed + * to support arbitrary synchronization + * since there is no way to suspend and resume individual tasks + * once they have begun executing. FJTasks should also be finite + * in duration -- they should not contain infinite loops. + * FJTasks that might need to perform a blocking + * action, or hold locks for extended periods, or + * loop forever can instead create normal + * java Thread objects that will do so. FJTasks are just not + * designed to support these things. + * FJTasks may however yield() control to allow their FJTaskRunner threads + * to run other tasks, + * and may wait for other dependent tasks via join(). These + * are the only coordination mechanisms supported by FJTasks. + * <p> + * FJTasks, and the FJTaskRunners that execute them are not + * intrinsically robust with respect to exceptions. + * A FJTask that aborts via an exception does not automatically + * have its completion flag (isDone) set. + * As with ordinary Threads, an uncaught exception will normally cause + * its FJTaskRunner thread to die, which in turn may sometimes + * cause other computations being performed to hang or abort. + * You can of course + * do better by trapping exceptions inside the run methods of FJTasks. + * <p> + * The overhead differences between FJTasks and Threads are substantial, + * especially when using fork() or coInvoke(). + * FJTasks can be two or three orders of magnitude faster than Threads, + * at least when run on JVMs with high-performance garbage collection + * (every FJTask quickly becomes garbage) and good native thread support. + * <p> + * Given these overhead savings, you might be tempted to use FJTasks for + * everything you would use a normal Thread to do. Don't. Java Threads + * remain better for general purpose thread-based programming. Remember + * that FJTasks cannot be used for designs involving arbitrary blocking + * synchronization or I/O. Extending FJTasks to support such capabilities + * would amount to re-inventing the Thread class, and would make them + * less optimal in the contexts that they were designed for. + * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] + * <p> + * @see FJTaskRunner + * @see FJTaskRunnerGroup + **/ + +public abstract class FJTask implements Runnable { + + /** + * The only status information associated with FJTasks is whether + * the they are considered to have completed. + * It is set true automatically within + * FJTaskRunner methods upon completion + * of the run method, or manually via cancel. + **/ + + private volatile boolean done; // = false; + + /** + * Return the FJTaskRunner thread running the current FJTask. + * Most FJTask methods are just relays to their current + * FJTaskRunners, that perform the indicated actions. + * @exception ClassCastException if caller thread is not a + * running FJTask. + **/ + + public static FJTaskRunner getFJTaskRunner() { + return (FJTaskRunner)(Thread.currentThread()); + } + + /** + * Return the FJTaskRunnerGroup of the thread running the current FJTask. + * @exception ClassCastException if caller thread is not a + * running FJTask. + **/ + public static IFJTaskRunnerGroup getFJTaskRunnerGroup() { + return getFJTaskRunner().getGroup(); + } + + + /** + * Return true if current task has terminated or been cancelled. + * The method is a simple analog of the Thread.isAlive() + * method. However, it reports true only when the task has terminated + * or has been cancelled. It does not distinguish these two cases. + * And there is no way to determine whether a FJTask has been started + * or is currently executing. + **/ + + public final boolean isDone() { return done; } + + /** + * Indicate termination. Intended only to be called by FJTaskRunner. + * FJTasks themselves should use (non-final) method + * cancel() to suppress execution. + **/ + + protected final void setDone() { done = true; } + + /** + * Set the termination status of this task. This simple-minded + * analog of Thread.interrupt + * causes the task not to execute if it has not already been started. + * Cancelling a running FJTask + * has no effect unless the run method itself uses isDone() + * to probe cancellation and take appropriate action. + * Individual run() methods may sense status and + * act accordingly, normally by returning early. + **/ + + public void cancel() { setDone(); } + + + /** + * Clear the termination status of this task. + * This method is intended to be used + * only as a means to allow task objects to be recycled. It should + * be called only when you are sure that the previous + * execution of this task has terminated and, if applicable, has + * been joined by all other waiting tasks. Usage in any other + * context is a very bad idea. + **/ + + public void reset() { done = false; } + + + /** + * Execute this task. This method merely places the task in a + * group-wide scheduling queue. + * It will be run + * the next time any TaskRunner thread is otherwise idle. + * This scheduling maintains FIFO ordering of started tasks + * with respect to + * the group of worker threads. + * @exception ClassCastException if caller thread is not + * running in a FJTaskRunner thread. + **/ + + public void start() { getFJTaskRunnerGroup().executeTask(this); } + + + /** + * Arrange for execution of a strictly dependent task. + * The task that will be executed in + * procedure-call-like LIFO order if executed by the + * same worker thread, but is FIFO with respect to other tasks + * forked by this thread when taken by other worker threads. + * That is, earlier-forked + * tasks are preferred to later-forked tasks by other idle workers. + * <p> + * Fork() is noticeably + * faster than start(). However, it may only + * be used for strictly dependent tasks -- generally, those that + * could logically be issued as straight method calls without + * changing the logic of the program. + * The method is optimized for use in parallel fork/join designs + * in which the thread that issues one or more forks + * cannot continue until at least some of the forked + * threads terminate and are joined. + * @exception ClassCastException if caller thread is not + * running in a FJTaskRunner thread. + **/ + + public void fork() { getFJTaskRunner().push(this); } + + /** + * Allow the current underlying FJTaskRunner thread to process other tasks. + * <p> + * Spinloops based on yield() are well behaved so long + * as the event or condition being waited for is produced via another + * FJTask. Additionally, you must never hold a lock + * while performing a yield or join. (This is because + * multiple FJTasks can be run by the same Thread during + * a yield. Since java locks are held per-thread, the lock would not + * maintain the conceptual exclusion you have in mind.) + * <p> + * Otherwise, spinloops using + * yield are the main construction of choice when a task must wait + * for a condition that it is sure will eventually occur because it + * is being produced by some other FJTask. The most common + * such condition is built-in: join() repeatedly yields until a task + * has terminated after producing some needed results. You can also + * use yield to wait for callbacks from other FJTasks, to wait for + * status flags to be set, and so on. However, in all these cases, + * you should be confident that the condition being waited for will + * occur, essentially always because it is produced by + * a FJTask generated by the current task, or one of its subtasks. + * + * @exception ClassCastException if caller thread is not + * running in a FJTaskRunner thread. + **/ + + public static void yield() { getFJTaskRunner().taskYield(); } + + /** + * Yield until this task isDone. + * Equivalent to <code>while(!isDone()) yield(); </code> + * @exception ClassCastException if caller thread is not + * running in a FJTaskRunner thread. + **/ + + public void join() { getFJTaskRunner().taskJoin(this); } + + /** + * Immediately execute task t by calling its run method. Has no + * effect if t has already been run or has been cancelled. + * It is equivalent to calling t.run except that it + * deals with completion status, so should always be used + * instead of directly calling run. + * The method can be useful + * when a computation has been packaged as a FJTask, but you just need to + * directly execute its body from within some other task. + **/ + + public static void invoke(FJTask t) { + if (!t.isDone()) { + t.run(); + t.setDone(); + } + } + + /** + * Fork both tasks and then wait for their completion. It behaves as: + * <pre> + * task1.fork(); task2.fork(); task2.join(); task1.join(); + * </pre> + * As a simple classic example, here is + * a class that computes the Fibonacci function: + * <pre> + * public class Fib extends FJTask { + * + * // Computes fibonacci(n) = fibonacci(n-1) + fibonacci(n-2); for n> 1 + * // fibonacci(0) = 0; + * // fibonacci(1) = 1. + * + * // Value to compute fibonacci function for. + * // It is replaced with the answer when computed. + * private volatile int number; + * + * public Fib(int n) { number = n; } + * + * public int getAnswer() { + * if (!isDone()) throw new Error("Not yet computed"); + * return number; + * } + * + * public void run() { + * int n = number; + * if (n > 1) { + * Fib f1 = new Fib(n - 1); + * Fib f2 = new Fib(n - 2); + * + * coInvoke(f1, f2); // run these in parallel + * + * // we know f1 and f2 are computed, so just directly access numbers + * number = f1.number + f2.number; + * } + * } + * + * public static void main(String[] args) { // sample driver + * try { + * int groupSize = 2; // 2 worker threads + * int num = 35; // compute fib(35) + * FJTaskRunnerGroup group = new FJTaskRunnerGroup(groupSize); + * Fib f = new Fib(num); + * group.invoke(f); + * int result = f.getAnswer(); + * System.out.println(" Answer: " + result); + * } + * catch (InterruptedException ex) { + * System.out.println("Interrupted"); + * } + * } + * } + * </pre> + * + * @exception ClassCastException if caller thread is not + * running in a FJTaskRunner thread. + **/ + + public static void coInvoke(FJTask task1, FJTask task2) { + getFJTaskRunner().coInvoke(task1, task2); + } + + + /** + * Fork all tasks in array, and await their completion. + * Behaviorally equivalent to: + * <pre> + * for (int i = 0; i < tasks.length; ++i) tasks[i].fork(); + * for (int i = 0; i < tasks.length; ++i) tasks[i].join(); + * </pre> + **/ + + public static void coInvoke(FJTask[] tasks) { + getFJTaskRunner().coInvoke(tasks); + } + + /** + * A FJTask that holds a Runnable r, and calls r.run when executed. + * The class is a simple utilty to allow arbitrary Runnables + * to be used as FJTasks. + **/ + + public static class Wrap extends FJTask { + protected final Runnable runnable; + public Wrap(Runnable r) { runnable = r; } + public void run() { runnable.run(); } + } + + + /** + * A <code>new Seq</code>, when executed, + * invokes each task provided in the constructor, in order. + * The class is a simple utility + * that makes it easier to create composite FJTasks. + **/ + public static class Seq extends FJTask { + protected final FJTask[] tasks; + + /** + * Construct a Seq that, when executed, will process each of the + * tasks in the tasks array in order + **/ + public Seq(FJTask[] tasks) { + this.tasks = tasks; + } + + /** + * Two-task constructor, for compatibility with previous release. + **/ + public Seq(FJTask task1, FJTask task2) { + this.tasks = new FJTask[] { task1, task2 }; + } + + public void run() { + for (int i = 0; i < tasks.length; ++i) FJTask.invoke(tasks[i]); + } + } + + /** + * Construct and return a FJTask object that, when executed, will + * invoke the tasks in the tasks array in array order + **/ + + public static FJTask seq(FJTask[] tasks) { + return new Seq(tasks); + } + + /** + * A <code>new Par</code>, when executed, + * runs the tasks provided in the constructor in parallel using + * coInvoke(tasks). + * The class is a simple utility + * that makes it easier to create composite FJTasks. + **/ + public static class Par extends FJTask { + protected final FJTask[] tasks; + + /** + * Construct a Seq that, when executed, will process each of the + * tasks in the tasks array in parallel + **/ + public Par(FJTask[] tasks) { + this.tasks = tasks; + } + + /** + * Two-task constructor, for compatibility with previous release. + **/ + public Par(FJTask task1, FJTask task2) { + this.tasks = new FJTask[] { task1, task2 }; + } + + + public void run() { + FJTask.coInvoke(tasks); + } + } + + + /** + * Construct and return a FJTask object that, when executed, will + * invoke the tasks in the tasks array in parallel using coInvoke + **/ + public static FJTask par(FJTask[] tasks) { + return new Par(tasks); + } + + /** + * A <code>new Seq2(task1, task2)</code>, when executed, + * invokes task1 and then task2, in order. + * The class is a simple utility + * that makes it easier to create composite Tasks. + **/ + public static class Seq2 extends FJTask { + protected final FJTask fst; + protected final FJTask snd; + public Seq2(FJTask task1, FJTask task2) { + fst = task1; + snd = task2; + } + public void run() { + FJTask.invoke(fst); + FJTask.invoke(snd); + } + } + + /** + * Construct and return a FJTask object that, when executed, will + * invoke task1 and task2, in order + **/ + + public static FJTask seq(FJTask task1, FJTask task2) { + return new Seq2(task1, task2); + } + + /** + * A <code>new Par(task1, task2)</code>, when executed, + * runs task1 and task2 in parallel using coInvoke(task1, task2). + * The class is a simple utility + * that makes it easier to create composite Tasks. + **/ + public static class Par2 extends FJTask { + protected final FJTask fst; + protected final FJTask snd; + public Par2(FJTask task1, FJTask task2) { + fst = task1; + snd = task2; + } + public void run() { + FJTask.coInvoke(fst, snd); + } + } + + + /** + * Construct and return a FJTask object that, when executed, will + * invoke task1 and task2, in parallel + **/ + public static FJTask par(FJTask task1, FJTask task2) { + return new Par2(task1, task2); + } + +} diff --git a/src/actors/scala/actors/FJTaskRunner.java b/src/actors/scala/actors/FJTaskRunner.java new file mode 100644 index 0000000000..991ff97083 --- /dev/null +++ b/src/actors/scala/actors/FJTaskRunner.java @@ -0,0 +1,974 @@ +/* + File: FJTaskRunner.java + + Originally written by Doug Lea and released into the public domain. + This may be used for any purposes whatsoever without acknowledgment. + Thanks for the assistance and support of Sun Microsystems Labs, + and everyone contributing, testing, and using this code. + + History: + Date Who What + 7Jan1999 dl First public release + 13Jan1999 dl correct a stat counter update; + ensure inactive status on run termination; + misc minor cleaup + 14Jan1999 dl Use random starting point in scan; + variable renamings. + 18Jan1999 dl Runloop allowed to die on task exception; + remove useless timed join + 22Jan1999 dl Rework scan to allow use of priorities. + 6Feb1999 dl Documentation updates. + 7Mar1999 dl Add array-based coInvoke + 31Mar1999 dl Revise scan to remove need for NullTasks + 27Apr1999 dl Renamed + 23oct1999 dl Earlier detect of interrupt in scanWhileIdling + 24nov1999 dl Now works on JVMs that do not properly + implement read-after-write of 2 volatiles. +*/ + +package scala.actors; + +import java.util.Random; + +/** + * Specialized Thread subclass for running FJTasks. + * <p> + * Each FJTaskRunner keeps FJTasks in a double-ended queue (DEQ). + * Double-ended queues support stack-based operations + * push and pop, as well as queue-based operations put and take. + * Normally, threads run their own tasks. But they + * may also steal tasks from each others DEQs. + * <p> + * The algorithms are minor variants of those used + * in <A href="http://supertech.lcs.mit.edu/cilk/"> Cilk</A> and + * <A href="http://www.cs.utexas.edu/users/hood/"> Hood</A>, and + * to a lesser extent + * <A href="http://www.cs.uga.edu/~dkl/filaments/dist.html"> Filaments</A>, + * but are adapted to work in Java. + * <p> + * The two most important capabilities are: + * <ul> + * <li> Fork a FJTask: + * <pre> + * Push task onto DEQ + * </pre> + * <li> Get a task to run (for example within taskYield) + * <pre> + * If DEQ is not empty, + * Pop a task and run it. + * Else if any other DEQ is not empty, + * Take ("steal") a task from it and run it. + * Else if the entry queue for our group is not empty, + * Take a task from it and run it. + * Else if current thread is otherwise idling + * If all threads are idling + * Wait for a task to be put on group entry queue + * Else + * Yield or Sleep for a while, and then retry + * </pre> + * </ul> + * The push, pop, and put are designed to only ever called by the + * current thread, and take (steal) is only ever called by + * other threads. + * All other operations are composites and variants of these, + * plus a few miscellaneous bookkeeping methods. + * <p> + * Implementations of the underlying representations and operations + * are geared for use on JVMs operating on multiple CPUs (although + * they should of course work fine on single CPUs as well). + * <p> + * A possible snapshot of a FJTaskRunner's DEQ is: + * <pre> + * 0 1 2 3 4 5 6 ... + * +-----+-----+-----+-----+-----+-----+-----+-- + * | | t | t | t | t | | | ... deq array + * +-----+-----+-----+-----+-----+-----+-----+-- + * ^ ^ + * base top + * (incremented (incremented + * on take, on push + * decremented decremented + * on put) on pop) + * </pre> + * <p> + * FJTasks are held in elements of the DEQ. + * They are maintained in a bounded array that + * works similarly to a circular bounded buffer. To ensure + * visibility of stolen FJTasks across threads, the array elements + * must be <code>volatile</code>. + * Using volatile rather than synchronizing suffices here since + * each task accessed by a thread is either one that it + * created or one that has never seen before. Thus we cannot + * encounter any staleness problems executing run methods, + * although FJTask programmers must be still sure to either synch or use + * volatile for shared data within their run methods. + * <p> + * However, since there is no way + * to declare an array of volatiles in Java, the DEQ elements actually + * hold VolatileTaskRef objects, each of which in turn holds a + * volatile reference to a FJTask. + * Even with the double-indirection overhead of + * volatile refs, using an array for the DEQ works out + * better than linking them since fewer shared + * memory locations need to be + * touched or modified by the threads while using the DEQ. + * Further, the double indirection may alleviate cache-line + * sharing effects (which cannot otherwise be directly dealt with in Java). + * <p> + * The indices for the <code>base</code> and <code>top</code> of the DEQ + * are declared as volatile. The main contention point with + * multiple FJTaskRunner threads occurs when one thread is trying + * to pop its own stack while another is trying to steal from it. + * This is handled via a specialization of Dekker's algorithm, + * in which the popping thread pre-decrements <code>top</code>, + * and then checks it against <code>base</code>. + * To be conservative in the face of JVMs that only partially + * honor the specification for volatile, the pop proceeds + * without synchronization only if there are apparently enough + * items for both a simultaneous pop and take to succeed. + * It otherwise enters a + * synchronized lock to check if the DEQ is actually empty, + * if so failing. The stealing thread + * does almost the opposite, but is set up to be less likely + * to win in cases of contention: Steals always run under synchronized + * locks in order to avoid conflicts with other ongoing steals. + * They pre-increment <code>base</code>, and then check against + * <code>top</code>. They back out (resetting the base index + * and failing to steal) if the + * DEQ is empty or is about to become empty by an ongoing pop. + * <p> + * A push operation can normally run concurrently with a steal. + * A push enters a synch lock only if the DEQ appears full so must + * either be resized or have indices adjusted due to wrap-around + * of the bounded DEQ. The put operation always requires synchronization. + * <p> + * When a FJTaskRunner thread has no tasks of its own to run, + * it tries to be a good citizen. + * Threads run at lower priority while scanning for work. + * <p> + * If the task is currently waiting + * via yield, the thread alternates scans (starting at a randomly + * chosen victim) with Thread.yields. This is + * well-behaved so long as the JVM handles Thread.yield in a + * sensible fashion. (It need not. Thread.yield is so underspecified + * that it is legal for a JVM to treat it as a no-op.) This also + * keeps things well-behaved even if we are running on a uniprocessor + * JVM using a simple cooperative threading model. + * <p> + * If a thread needing work is + * is otherwise idle (which occurs only in the main runloop), and + * there are no available tasks to steal or poll, it + * instead enters into a sleep-based (actually timed wait(msec)) + * phase in which it progressively sleeps for longer durations + * (up to a maximum of FJTaskRunnerGroup.MAX_SLEEP_TIME, + * currently 100ms) between scans. + * If all threads in the group + * are idling, they further progress to a hard wait phase, suspending + * until a new task is entered into the FJTaskRunnerGroup entry queue. + * A sleeping FJTaskRunner thread may be awakened by a new + * task being put into the group entry queue or by another FJTaskRunner + * becoming active, but not merely by some DEQ becoming non-empty. + * Thus the MAX_SLEEP_TIME provides a bound for sleep durations + * in cases where all but one worker thread start sleeping + * even though there will eventually be work produced + * by a thread that is taking a long time to place tasks in DEQ. + * These sleep mechanics are handled in the FJTaskRunnerGroup class. + * <p> + * Composite operations such as taskJoin include heavy + * manual inlining of the most time-critical operations + * (mainly FJTask.invoke). + * This opens up a few opportunities for further hand-optimizations. + * Until Java compilers get a lot smarter, these tweaks + * improve performance significantly enough for task-intensive + * programs to be worth the poorer maintainability and code duplication. + * <p> + * Because they are so fragile and performance-sensitive, nearly + * all methods are declared as final. However, nearly all fields + * and methods are also declared as protected, so it is possible, + * with much care, to extend functionality in subclasses. (Normally + * you would also need to subclass FJTaskRunnerGroup.) + * <p> + * None of the normal java.lang.Thread class methods should ever be called + * on FJTaskRunners. For this reason, it might have been nicer to + * declare FJTaskRunner as a Runnable to run within a Thread. However, + * this would have complicated many minor logistics. And since + * no FJTaskRunner methods should normally be called from outside the + * FJTask and FJTaskRunnerGroup classes either, this decision doesn't impact + * usage. + * <p> + * You might think that layering this kind of framework on top of + * Java threads, which are already several levels removed from raw CPU + * scheduling on most systems, would lead to very poor performance. + * But on the platforms + * tested, the performance is quite good. + * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] + * @see FJTask + * @see FJTaskRunnerGroup + **/ + +public class FJTaskRunner extends Thread { + + /** The group of which this FJTaskRunner is a member **/ + protected final IFJTaskRunnerGroup group; + + /** + * Constructor called only during FJTaskRunnerGroup initialization + **/ + + /*protected*/ public FJTaskRunner(IFJTaskRunnerGroup g) { + group = g; + victimRNG = new Random(System.identityHashCode(this)); + runPriority = getPriority(); + setDaemon(true); + } + + /** + * Return the FJTaskRunnerGroup of which this thread is a member + **/ + + protected final IFJTaskRunnerGroup getGroup() { return group; } + + + /* ------------ DEQ Representation ------------------- */ + + + /** + * FJTasks are held in an array-based DEQ with INITIAL_CAPACITY + * elements. The DEQ is grown if necessary, but default value is + * normally much more than sufficient unless there are + * user programming errors or questionable operations generating + * large numbers of Tasks without running them. + * Capacities must be a power of two. + **/ + + protected static final int INITIAL_CAPACITY = 4096; + + /** + * The maximum supported DEQ capacity. + * When exceeded, FJTaskRunner operations throw Errors + **/ + + protected static final int MAX_CAPACITY = 1 << 30; + + /** + * An object holding a single volatile reference to a FJTask. + **/ + + protected final static class VolatileTaskRef { + /** The reference **/ + protected volatile FJTask ref; + + /** Set the reference **/ + protected final void put(FJTask r) { ref = r; } + /** Return the reference **/ + protected final FJTask get() { return ref; } + /** Return the reference and clear it **/ + protected final FJTask take() { FJTask r = ref; ref = null; return r; } + + /** + * Initialization utility for constructing arrays. + * Make an array of given capacity and fill it with + * VolatileTaskRefs. + **/ + protected static VolatileTaskRef[] newArray(int cap) { + VolatileTaskRef[] a = new VolatileTaskRef[cap]; + for (int k = 0; k < cap; k++) a[k] = new VolatileTaskRef(); + return a; + } + + } + + /** + * The DEQ array. + **/ + + protected VolatileTaskRef[] deq = VolatileTaskRef.newArray(INITIAL_CAPACITY); + + /** Current size of the task DEQ **/ + protected int deqSize() { return deq.length; } + + /** + * Current top of DEQ. Generally acts just like a stack pointer in an + * array-based stack, except that it circularly wraps around the + * array, as in an array-based queue. The value is NOT + * always kept within <code>0 ... deq.length</code> though. + * The current top element is always at <code>top & (deq.length-1)</code>. + * To avoid integer overflow, top is reset down + * within bounds whenever it is noticed to be out out bounds; + * at worst when it is at <code>2 * deq.length</code>. + **/ + protected volatile int top = 0; + + + /** + * Current base of DEQ. Acts like a take-pointer in an + * array-based bounded queue. Same bounds and usage as top. + **/ + + protected volatile int base = 0; + + + /** + * An extra object to synchronize on in order to + * achieve a memory barrier. + **/ + + protected final Object barrier = new Object(); + + /* ------------ Other BookKeeping ------------------- */ + + /** + * Record whether current thread may be processing a task + * (i.e., has been started and is not in an idle wait). + * Accessed, under synch, ONLY by FJTaskRunnerGroup, but the field is + * stored here for simplicity. + **/ + + /*protected*/ public boolean active = false; + + /** Random starting point generator for scan() **/ + protected final Random victimRNG; + + + /** Priority to use while scanning for work **/ + protected int scanPriority = Thread.MIN_PRIORITY + 1; + + /** Priority to use while running tasks **/ + protected int runPriority; + + /** + * Set the priority to use while scanning. + * We do not bother synchronizing access, since + * by the time the value is needed, both this FJTaskRunner + * and its FJTaskRunnerGroup will + * necessarily have performed enough synchronization + * to avoid staleness problems of any consequence. + **/ + protected void setScanPriority(int pri) { scanPriority = pri; } + + + /** + * Set the priority to use while running tasks. + * Same usage and rationale as setScanPriority. + **/ + protected void setRunPriority(int pri) { runPriority = pri; } + + /** + * Compile-time constant for statistics gathering. + * Even when set, reported values may not be accurate + * since all are read and written without synchronization. + **/ + + + + static final boolean COLLECT_STATS = true; + // static final boolean COLLECT_STATS = false; + + + // for stat collection + + /** Total number of tasks run **/ + protected int runs = 0; + + /** Total number of queues scanned for work **/ + protected int scans = 0; + + /** Total number of tasks obtained via scan **/ + protected int steals = 0; + + + + + /* ------------ DEQ operations ------------------- */ + + + /** + * Push a task onto DEQ. + * Called ONLY by current thread. + **/ + + /*protected*/ public final void push(final FJTask r) { + int t = top; + + /* + This test catches both overflows and index wraps. It doesn't + really matter if base value is in the midst of changing in take. + As long as deq length is < 2^30, we are guaranteed to catch wrap in + time since base can only be incremented at most length times + between pushes (or puts). + */ + + if (t < (base & (deq.length-1)) + deq.length) { + + deq[t & (deq.length-1)].put(r); + top = t + 1; + } + + else // isolate slow case to increase chances push is inlined + slowPush(r); // check overflow and retry + } + + + /** + * Handle slow case for push + **/ + + protected synchronized void slowPush(final FJTask r) { + checkOverflow(); + push(r); // just recurse -- this one is sure to succeed. + } + + + /** + * Enqueue task at base of DEQ. + * Called ONLY by current thread. + * This method is currently not called from class FJTask. It could be used + * as a faster way to do FJTask.start, but most users would + * find the semantics too confusing and unpredictable. + **/ + + protected final synchronized void put(final FJTask r) { + for (;;) { + int b = base - 1; + if (top < b + deq.length) { + + int newBase = b & (deq.length-1); + deq[newBase].put(r); + base = newBase; + + if (b != newBase) { // Adjust for index underflow + int newTop = top & (deq.length-1); + if (newTop < newBase) newTop += deq.length; + top = newTop; + } + return; + } + else { + checkOverflow(); + // ... and retry + } + } + } + + /** + * Return a popped task, or null if DEQ is empty. + * Called ONLY by current thread. + * <p> + * This is not usually called directly but is + * instead inlined in callers. This version differs from the + * cilk algorithm in that pop does not fully back down and + * retry in the case of potential conflict with take. It simply + * rechecks under synch lock. This gives a preference + * for threads to run their own tasks, which seems to + * reduce flailing a bit when there are few tasks to run. + **/ + + protected final FJTask pop() { + /* + Decrement top, to force a contending take to back down. + */ + + int t = --top; + + /* + To avoid problems with JVMs that do not properly implement + read-after-write of a pair of volatiles, we conservatively + grab without lock only if the DEQ appears to have at least two + elements, thus guaranteeing that both a pop and take will succeed, + even if the pre-increment in take is not seen by current thread. + Otherwise we recheck under synch. + */ + + if (base + 1 < t) + return deq[t & (deq.length-1)].take(); + else + return confirmPop(t); + + } + + + /** + * Check under synch lock if DEQ is really empty when doing pop. + * Return task if not empty, else null. + **/ + + protected final synchronized FJTask confirmPop(int provisionalTop) { + if (base <= provisionalTop) + return deq[provisionalTop & (deq.length-1)].take(); + else { // was empty + /* + Reset DEQ indices to zero whenever it is empty. + This both avoids unnecessary calls to checkOverflow + in push, and helps keep the DEQ from accumulating garbage + */ + + top = base = 0; + return null; + } + } + + + /** + * Take a task from the base of the DEQ. + * Always called by other threads via scan() + **/ + + + protected final synchronized FJTask take() { + + /* + Increment base in order to suppress a contending pop + */ + + int b = base++; + + if (b < top) + return confirmTake(b); + else { + // back out + base = b; + return null; + } + } + + + /** + * double-check a potential take + **/ + + protected FJTask confirmTake(int oldBase) { + + /* + Use a second (guaranteed uncontended) synch + to serve as a barrier in case JVM does not + properly process read-after-write of 2 volatiles + */ + + synchronized(barrier) { + if (oldBase < top) { + /* + We cannot call deq[oldBase].take here because of possible races when + nulling out versus concurrent push operations. Resulting + accumulated garbage is swept out periodically in + checkOverflow, or more typically, just by keeping indices + zero-based when found to be empty in pop, which keeps active + region small and constantly overwritten. + */ + + return deq[oldBase & (deq.length-1)].get(); + } + else { + base = oldBase; + return null; + } + } + } + + + /** + * Adjust top and base, and grow DEQ if necessary. + * Called only while DEQ synch lock being held. + * We don't expect this to be called very often. In most + * programs using FJTasks, it is never called. + **/ + + protected void checkOverflow() { + int t = top; + int b = base; + + if (t - b < deq.length-1) { // check if just need an index reset + + int newBase = b & (deq.length-1); + int newTop = top & (deq.length-1); + if (newTop < newBase) newTop += deq.length; + top = newTop; + base = newBase; + + /* + Null out refs to stolen tasks. + This is the only time we can safely do it. + */ + + int i = newBase; + while (i != newTop && deq[i].ref != null) { + deq[i].ref = null; + i = (i - 1) & (deq.length-1); + } + + } + else { // grow by doubling array + + int newTop = t - b; + int oldcap = deq.length; + int newcap = oldcap * 2; + + if (newcap >= MAX_CAPACITY) + throw new Error("FJTask queue maximum capacity exceeded"); + + VolatileTaskRef[] newdeq = new VolatileTaskRef[newcap]; + + // copy in bottom half of new deq with refs from old deq + for (int j = 0; j < oldcap; ++j) newdeq[j] = deq[b++ & (oldcap-1)]; + + // fill top half of new deq with new refs + for (int j = oldcap; j < newcap; ++j) newdeq[j] = new VolatileTaskRef(); + + deq = newdeq; + base = 0; + top = newTop; + } + } + + + /* ------------ Scheduling ------------------- */ + + + /** + * Do all but the pop() part of yield or join, by + * traversing all DEQs in our group looking for a task to + * steal. If none, it checks the entry queue. + * <p> + * Since there are no good, portable alternatives, + * we rely here on a mixture of Thread.yield and priorities + * to reduce wasted spinning, even though these are + * not well defined. We are hoping here that the JVM + * does something sensible. + * @param waitingFor if non-null, the current task being joined + **/ + + protected void scan(final FJTask waitingFor) { + + FJTask task = null; + + // to delay lowering priority until first failure to steal + boolean lowered = false; + + /* + Circularly traverse from a random start index. + + This differs slightly from cilk version that uses a random index + for each attempted steal. + Exhaustive scanning might impede analytic tractablity of + the scheduling policy, but makes it much easier to deal with + startup and shutdown. + */ + + FJTaskRunner[] ts = group.getArray(); + int idx = victimRNG.nextInt(ts.length); + + for (int i = 0; i < ts.length; ++i) { + + FJTaskRunner t = ts[idx]; + if (++idx >= ts.length) idx = 0; // circularly traverse + + if (t != null && t != this) { + + if (waitingFor != null && waitingFor.isDone()) { + break; + } + else { + if (COLLECT_STATS) ++scans; + task = t.take(); + if (task != null) { + if (COLLECT_STATS) ++steals; + break; + } + else if (isInterrupted()) { + break; + } + else if (!lowered) { // if this is first fail, lower priority + lowered = true; + setPriority(scanPriority); + } + else { // otherwise we are at low priority; just yield + yield(); + } + } + } + + } + + if (task == null) { + if (COLLECT_STATS) ++scans; + task = group.pollEntryQueue(); + if (COLLECT_STATS) if (task != null) ++steals; + } + + if (lowered) setPriority(runPriority); + + if (task != null && !task.isDone()) { + if (COLLECT_STATS) ++runs; + task.run(); + task.setDone(); + } + + } + + /** + * Same as scan, but called when current thread is idling. + * It repeatedly scans other threads for tasks, + * sleeping while none are available. + * <p> + * This differs from scan mainly in that + * since there is no reason to return to recheck any + * condition, we iterate until a task is found, backing + * off via sleeps if necessary. + **/ + + protected void scanWhileIdling() { + FJTask task = null; + + boolean lowered = false; + long iters = 0; + + FJTaskRunner[] ts = group.getArray(); + int idx = victimRNG.nextInt(ts.length); + + do { + for (int i = 0; i < ts.length; ++i) { + + FJTaskRunner t = ts[idx]; + if (++idx >= ts.length) idx = 0; // circularly traverse + + if (t != null && t != this) { + if (COLLECT_STATS) ++scans; + + task = t.take(); + if (task != null) { + if (COLLECT_STATS) ++steals; + if (lowered) setPriority(runPriority); + group.setActive(this); + break; + } + } + } + + if (task == null) { + if (isInterrupted()) + return; + + if (COLLECT_STATS) ++scans; + task = group.pollEntryQueue(); + + if (task != null) { + if (COLLECT_STATS) ++steals; + if (lowered) setPriority(runPriority); + group.setActive(this); + } + else { + ++iters; + // Check here for yield vs sleep to avoid entering group synch lock + if (iters >= /*group.SCANS_PER_SLEEP*/ 15) { + group.checkActive(this, iters); + if (isInterrupted()) + return; + } + else if (!lowered) { + lowered = true; + setPriority(scanPriority); + } + else { + yield(); + } + } + } + } while (task == null); + + + if (!task.isDone()) { + if (COLLECT_STATS) ++runs; + task.run(); + task.setDone(); + } + + } + + /* ------------ composite operations ------------------- */ + + + /** + * Main runloop + **/ + + public void run() { + try{ + while (!interrupted()) { + + FJTask task = pop(); + if (task != null) { + if (!task.isDone()) { + // inline FJTask.invoke + if (COLLECT_STATS) ++runs; + task.run(); + task.setDone(); + } + } + else + scanWhileIdling(); + } + } + finally { + group.setInactive(this); + } + } + + /** + * Execute a task in this thread. Generally called when current task + * cannot otherwise continue. + **/ + + + protected final void taskYield() { + FJTask task = pop(); + if (task != null) { + if (!task.isDone()) { + if (COLLECT_STATS) ++runs; + task.run(); + task.setDone(); + } + } + else + scan(null); + } + + + /** + * Process tasks until w is done. + * Equivalent to <code>while(!w.isDone()) taskYield(); </code> + **/ + + protected final void taskJoin(final FJTask w) { + + while (!w.isDone()) { + + FJTask task = pop(); + if (task != null) { + if (!task.isDone()) { + if (COLLECT_STATS) ++runs; + task.run(); + task.setDone(); + if (task == w) return; // fast exit if we just ran w + } + } + else + scan(w); + } + } + + /** + * A specialized expansion of + * <code> w.fork(); invoke(v); w.join(); </code> + **/ + + + protected final void coInvoke(final FJTask w, final FJTask v) { + + // inline push + + int t = top; + if (t < (base & (deq.length-1)) + deq.length) { + + deq[t & (deq.length-1)].put(w); + top = t + 1; + + // inline invoke + + if (!v.isDone()) { + if (COLLECT_STATS) ++runs; + v.run(); + v.setDone(); + } + + // inline taskJoin + + while (!w.isDone()) { + FJTask task = pop(); + if (task != null) { + if (!task.isDone()) { + if (COLLECT_STATS) ++runs; + task.run(); + task.setDone(); + if (task == w) return; // fast exit if we just ran w + } + } + else + scan(w); + } + } + + else // handle non-inlinable cases + slowCoInvoke(w, v); + } + + + /** + * Backup to handle noninlinable cases of coInvoke + **/ + + protected void slowCoInvoke(final FJTask w, final FJTask v) { + push(w); // let push deal with overflow + FJTask.invoke(v); + taskJoin(w); + } + + + /** + * Array-based version of coInvoke + **/ + + protected final void coInvoke(FJTask[] tasks) { + int nforks = tasks.length - 1; + + // inline bulk push of all but one task + + int t = top; + + if (nforks >= 0 && t + nforks < (base & (deq.length-1)) + deq.length) { + for (int i = 0; i < nforks; ++i) { + deq[t++ & (deq.length-1)].put(tasks[i]); + top = t; + } + + // inline invoke of one task + FJTask v = tasks[nforks]; + if (!v.isDone()) { + if (COLLECT_STATS) ++runs; + v.run(); + v.setDone(); + } + + // inline taskJoins + + for (int i = 0; i < nforks; ++i) { + FJTask w = tasks[i]; + while (!w.isDone()) { + + FJTask task = pop(); + if (task != null) { + if (!task.isDone()) { + if (COLLECT_STATS) ++runs; + task.run(); + task.setDone(); + } + } + else + scan(w); + } + } + } + + else // handle non-inlinable cases + slowCoInvoke(tasks); + } + + /** + * Backup to handle atypical or noninlinable cases of coInvoke + **/ + + protected void slowCoInvoke(FJTask[] tasks) { + for (int i = 0; i < tasks.length; ++i) push(tasks[i]); + for (int i = 0; i < tasks.length; ++i) taskJoin(tasks[i]); + } + +} + diff --git a/src/actors/scala/actors/FJTaskRunnerGroup.java b/src/actors/scala/actors/FJTaskRunnerGroup.java new file mode 100644 index 0000000000..27c870453f --- /dev/null +++ b/src/actors/scala/actors/FJTaskRunnerGroup.java @@ -0,0 +1,685 @@ +/* + File: FJTaskRunnerGroup.java + + Originally written by Doug Lea and released into the public domain. + This may be used for any purposes whatsoever without acknowledgment. + Thanks for the assistance and support of Sun Microsystems Labs, + and everyone contributing, testing, and using this code. + + History: + Date Who What + 7Jan1999 dl First public release + 12Jan1999 dl made getActiveCount public; misc minor cleanup. + 14Jan1999 dl Added executeTask + 20Jan1999 dl Allow use of priorities; reformat stats + 6Feb1999 dl Lazy thread starts + 27Apr1999 dl Renamed +*/ + +package scala.actors; + +/** + * A stripped down analog of a ThreadGroup used for + * establishing and managing FJTaskRunner threads. + * ThreadRunnerGroups serve as the control boundary separating + * the general world of normal threads from the specialized world + * of FJTasks. + * <p> + * By intent, this class does not subclass java.lang.ThreadGroup, and + * does not support most methods found in ThreadGroups, since they + * would make no sense for FJTaskRunner threads. In fact, the class + * does not deal with ThreadGroups at all. If you want to restrict + * a FJTaskRunnerGroup to a particular ThreadGroup, you can create + * it from within that ThreadGroup. + * <p> + * The main contextual parameter for a FJTaskRunnerGroup is + * the group size, established in the constructor. + * Groups must be of a fixed size. + * There is no way to dynamically increase or decrease the number + * of threads in an existing group. + * <p> + * In general, the group size should be equal to the number + * of CPUs on the system. (Unfortunately, there is no portable + * means of automatically detecting the number of CPUs on a JVM, so there is + * no good way to automate defaults.) In principle, when + * FJTasks are used for computation-intensive tasks, having only + * as many threads as CPUs should minimize bookkeeping overhead + * and contention, and so maximize throughput. However, because + * FJTaskRunners lie atop Java threads, and in turn operating system + * thread support and scheduling policies, + * it is very possible that using more threads + * than CPUs will improve overall throughput even though it adds + * to overhead. This will always be so if FJTasks are I/O bound. + * So it may pay to experiment a bit when tuning on particular platforms. + * You can also use <code>setRunPriorities</code> to either + * increase or decrease the priorities of active threads, which + * may interact with group size choice. + * <p> + * In any case, overestimating group sizes never + * seriously degrades performance (at least within reasonable bounds). + * You can also use a value + * less than the number of CPUs in order to reserve processing + * for unrelated threads. + * <p> + * There are two general styles for using a FJTaskRunnerGroup. + * You can create one group per entire program execution, for example + * as a static singleton, and use it for all parallel tasks: + * <pre> + * class Tasks { + * static FJTaskRunnerGroup group; + * public void initialize(int groupsize) { + * group = new FJTaskRunnerGroup(groupSize); + * } + * // ... + * } + * </pre> + * Alternatively, you can make new groups on the fly and use them only for + * particular task sets. This is more flexible,, + * and leads to more controllable and deterministic execution patterns, + * but it encounters greater overhead on startup. Also, to reclaim + * system resources, you should + * call <code>FJTaskRunnerGroup.interruptAll</code> when you are done + * using one-shot groups. Otherwise, because FJTaskRunners set + * <code>Thread.isDaemon</code> + * status, they will not normally be reclaimed until program termination. + * <p> + * The main supported methods are <code>execute</code>, + * which starts a task processed by FJTaskRunner threads, + * and <code>invoke</code>, which starts one and waits for completion. + * For example, you might extend the above <code>FJTasks</code> + * class to support a task-based computation, say, the + * <code>Fib</code> class from the <code>FJTask</code> documentation: + * <pre> + * class Tasks { // continued + * // ... + * static int fib(int n) { + * try { + * Fib f = new Fib(n); + * group.invoke(f); + * return f.getAnswer(); + * } + * catch (InterruptedException ex) { + * throw new Error("Interrupted during computation"); + * } + * } + * } + * </pre> + * <p> + * Method <code>stats()</code> can be used to monitor performance. + * Both FJTaskRunnerGroup and FJTaskRunner may be compiled with + * the compile-time constant COLLECT_STATS set to false. In this + * case, various simple counts reported in stats() are not collected. + * On platforms tested, + * this leads to such a tiny performance improvement that there is + * very little motivation to bother. + * + * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] + * <p> + * @see FJTask + * @see FJTaskRunner + **/ + +public class FJTaskRunnerGroup implements IFJTaskRunnerGroup { + + /** The threads in this group **/ + protected /*final*/ FJTaskRunner[] threads; + + /** Group-wide queue for tasks entered via execute() **/ + protected final LinkedQueue entryQueue = new LinkedQueue(); + + /** Number of threads that are not waiting for work **/ + protected int activeCount = 0; + + /** Number of threads that have been started. Used to avoid + unecessary contention during startup of task sets. + **/ + protected int nstarted = 0; + + /** + * Compile-time constant. If true, various counts of + * runs, waits, etc., are maintained. These are NOT + * updated with synchronization, so statistics reports + * might not be accurate. + **/ + + static final boolean COLLECT_STATS = true; + // static final boolean COLLECT_STATS = false; + + // for stats + + /** The time at which this ThreadRunnerGroup was constructed **/ + long initTime = 0; + + /** Total number of executes or invokes **/ + int entries = 0; + + static final int DEFAULT_SCAN_PRIORITY = Thread.MIN_PRIORITY+1; + + /** + * Create a FJTaskRunnerGroup with the indicated number + * of FJTaskRunner threads. Normally, the best size to use is + * the number of CPUs on the system. + * <p> + * The threads in a FJTaskRunnerGroup are created with their + * isDaemon status set, so do not normally need to be + * shut down manually upon program termination. + **/ + + public FJTaskRunnerGroup(int groupSize) { + //System.out.println("Creating FJTaskRunnerGroup of size "+groupSize); + threads = new FJTaskRunner[groupSize]; + initializeThreads(); + initTime = System.currentTimeMillis(); + } + + + public boolean existsTask() { + FJTask task = null; + /* + Circularly traverse from a random start index. + + This differs slightly from cilk version that uses a random index + for each attempted steal. + Exhaustive scanning might impede analytic tractablity of + the scheduling policy, but makes it much easier to deal with + startup and shutdown. + */ + + FJTaskRunner[] ts = threads; + int idx = /*victimRNG.nextInt(ts.length)*/ 0; + + for (int i = 0; i < ts.length; ++i) { + FJTaskRunner t = ts[idx]; + if (++idx >= ts.length) idx = 0; // circularly traverse + + if (t != null) { + task = t.take(); + if (task != null) { + break; + } + } + } + + if (task == null) { + task = pollEntryQueue(); + } + + if (task != null && !task.isDone()) { + boolean quit = false; + while (!quit) { + quit = true; + try { + // put task back into entry queue and return true + entryQueue.put(task); + } catch (InterruptedException ie) { + quit = false; + } + } + return true; + } + else return false; + } + + public boolean checkPoolSize() { + // if there is a task in the entryQueue or any of the + // worker queues, increase thread pool size by one + if (!entryQueue.isEmpty() || + existsTask()) { + int newsize = threads.length + 1; + FJTaskRunner[] newar = new FJTaskRunner[newsize]; + System.arraycopy(threads, 0, newar, 0, newsize-1); + synchronized(this) { + threads = newar; + threads[newsize-1] = new FJTaskRunner(this); + } + return true; + } + else return false; + } + + + /** + * Arrange for execution of the given task + * by placing it in a work queue. If the argument + * is not of type FJTask, it is embedded in a FJTask via + * <code>FJTask.Wrap</code>. + * @exception InterruptedException if current Thread is + * currently interrupted + **/ + + public void execute(Runnable r) throws InterruptedException { + if (r instanceof FJTask) { + entryQueue.put((FJTask)r); + } + else { + entryQueue.put(new FJTask.Wrap(r)); + } + signalNewTask(); + } + + + /** + * Specialized form of execute called only from within FJTasks + **/ + public void executeTask(FJTask t) { + try { + entryQueue.put(t); + signalNewTask(); + } + catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + } + + + /** + * Start a task and wait it out. Returns when the task completes. + * @exception InterruptedException if current Thread is + * interrupted before completion of the task. + **/ + + public void invoke(Runnable r) throws InterruptedException { + InvokableFJTask w = new InvokableFJTask(r); + entryQueue.put(w); + signalNewTask(); + w.awaitTermination(); + } + + + /** + * Try to shut down all FJTaskRunner threads in this group + * by interrupting them all. This method is designed + * to be used during cleanup when it is somehow known + * that all threads are idle. + * FJTaskRunners only + * check for interruption when they are not otherwise + * processing a task (and its generated subtasks, + * if any), so if any threads are active, shutdown may + * take a while, and may lead to unpredictable + * task processing. + **/ + + public void interruptAll() { + // paranoically interrupt current thread last if in group. + Thread current = Thread.currentThread(); + boolean stopCurrent = false; + + for (int i = 0; i < threads.length; ++i) { + Thread t = threads[i]; + if (t == current) + stopCurrent = true; + else + t.interrupt(); + } + if (stopCurrent) + current.interrupt(); + } + + + /** + * Set the priority to use while a FJTaskRunner is + * polling for new tasks to perform. Default + * is currently Thread.MIN_PRIORITY+1. The value + * set may not go into effect immediately, but + * will be used at least the next time a thread scans for work. + **/ + public synchronized void setScanPriorities(int pri) { + for (int i = 0; i < threads.length; ++i) { + FJTaskRunner t = threads[i]; + t.setScanPriority(pri); + if (!t.active) t.setPriority(pri); + } + } + + + /** + * Set the priority to use while a FJTaskRunner is + * actively running tasks. Default + * is the priority that was in effect by the thread that + * constructed this FJTaskRunnerGroup. Setting this value + * while threads are running may momentarily result in + * them running at this priority even when idly waiting for work. + **/ + public synchronized void setRunPriorities(int pri) { + for (int i = 0; i < threads.length; ++i) { + FJTaskRunner t = threads[i]; + t.setRunPriority(pri); + if (t.active) t.setPriority(pri); + } + } + + + + /** Return the number of FJTaskRunner threads in this group **/ + + public int size() { return threads.length; } + + + /** + * Return the number of threads that are not idly waiting for work. + * Beware that even active threads might not be doing any useful + * work, but just spinning waiting for other dependent tasks. + * Also, since this is just a snapshot value, some tasks + * may be in the process of becoming idle. + **/ + public synchronized int getActiveCount() { return activeCount; } + + /** + * Prints various snapshot statistics to System.out. + * <ul> + * <li> For each FJTaskRunner thread (labeled as T<em>n</em>, for + * <em>n</em> from zero to group size - 1): + * <ul> + * <li> A star "*" is printed if the thread is currently active; + * that is, not sleeping while waiting for work. Because + * threads gradually enter sleep modes, an active thread + * may in fact be about to sleep (or wake up). + * <li> <em>Q Cap</em> The current capacity of its task queue. + * <li> <em>Run</em> The total number of tasks that have been run. + * <li> <em>New</em> The number of these tasks that were + * taken from either the entry queue or from other + * thread queues; that is, the number of tasks run + * that were <em>not</em> forked by the thread itself. + * <li> <em>Scan</em> The number of times other task + * queues or the entry queue were polled for tasks. + * </ul> + * <li> <em>Execute</em> The total number of tasks entered + * (but not necessarily yet run) via execute or invoke. + * <li> <em>Time</em> Time in seconds since construction of this + * FJTaskRunnerGroup. + * <li> <em>Rate</em> The total number of tasks processed + * per second across all threads. This + * may be useful as a simple throughput indicator + * if all processed tasks take approximately the + * same time to run. + * </ul> + * <p> + * Cautions: Some statistics are updated and gathered + * without synchronization, + * so may not be accurate. However, reported counts may be considered + * as lower bounds of actual values. + * Some values may be zero if classes are compiled + * with COLLECT_STATS set to false. (FJTaskRunner and FJTaskRunnerGroup + * classes can be independently compiled with different values of + * COLLECT_STATS.) Also, the counts are maintained as ints so could + * overflow in exceptionally long-lived applications. + * <p> + * These statistics can be useful when tuning algorithms or diagnosing + * problems. For example: + * <ul> + * <li> High numbers of scans may mean that there is insufficient + * parallelism to keep threads busy. However, high scan rates + * are expected if the number + * of Executes is also high or there is a lot of global + * synchronization in the application, and the system is not otherwise + * busy. Threads may scan + * for work hundreds of times upon startup, shutdown, and + * global synch points of task sets. + * <li> Large imbalances in tasks run across different threads might + * just reflect contention with unrelated threads on a system + * (possibly including JVM threads such as GC), but may also + * indicate some systematic bias in how you generate tasks. + * <li> Large task queue capacities may mean that too many tasks are being + * generated before they can be run. + * Capacities are reported rather than current numbers of tasks + * in queues because they are better indicators of the existence + * of these kinds of possibly-transient problems. + * Queue capacities are + * resized on demand from their initial value of 4096 elements, + * which is much more than sufficient for the kinds of + * applications that this framework is intended to best support. + * </ul> + **/ + + public void stats() { + long time = System.currentTimeMillis() - initTime; + double secs = ((double)time) / 1000.0; + long totalRuns = 0; + long totalScans = 0; + long totalSteals = 0; + + System.out.print("Thread" + + "\tQ Cap" + + "\tScans" + + "\tNew" + + "\tRuns" + + "\n"); + + for (int i = 0; i < threads.length; ++i) { + FJTaskRunner t = threads[i]; + int truns = t.runs; + totalRuns += truns; + + int tscans = t.scans; + totalScans += tscans; + + int tsteals = t.steals; + totalSteals += tsteals; + + String star = (getActive(t))? "*" : " "; + + + System.out.print("T" + i + star + + "\t" + t.deqSize() + + "\t" + tscans + + "\t" + tsteals + + "\t" + truns + + "\n"); + } + + System.out.print("Total" + + "\t " + + "\t" + totalScans + + "\t" + totalSteals + + "\t" + totalRuns + + "\n"); + + System.out.print("Execute: " + entries); + + System.out.print("\tTime: " + secs); + + long rps = 0; + if (secs != 0) rps = Math.round((double)(totalRuns) / secs); + + System.out.println("\tRate: " + rps); + } + + + /* ------------ Methods called only by FJTaskRunners ------------- */ + + + /** + * Return the array of threads in this group. + * Called only by FJTaskRunner.scan(). + **/ + + public FJTaskRunner[] getArray() { return threads; } + + + /** + * Return a task from entry queue, or null if empty. + * Called only by FJTaskRunner.scan(). + **/ + + public FJTask pollEntryQueue() { + try { + FJTask t = (FJTask)(entryQueue.poll(0)); + return t; + } + catch(InterruptedException ex) { // ignore interrupts + Thread.currentThread().interrupt(); + return null; + } + } + + + /** + * Return active status of t. + * Per-thread active status can only be accessed and + * modified via synchronized method here in the group class. + **/ + + protected synchronized boolean getActive(FJTaskRunner t) { + return t.active; + } + + + /** + * Set active status of thread t to true, and notify others + * that might be waiting for work. + **/ + + public synchronized void setActive(FJTaskRunner t) { + if (!t.active) { + t.active = true; + ++activeCount; + if (nstarted < threads.length) + threads[nstarted++].start(); + else + notifyAll(); + } + } + + /** + * Set active status of thread t to false. + **/ + + public synchronized void setInactive(FJTaskRunner t) { + if (t.active) { + t.active = false; + --activeCount; + } + } + + /** + * The number of times to scan other threads for tasks + * before transitioning to a mode where scans are + * interleaved with sleeps (actually timed waits). + * Upon transition, sleeps are for duration of + * scans / SCANS_PER_SLEEP milliseconds. + * <p> + * This is not treated as a user-tunable parameter because + * good values do not appear to vary much across JVMs or + * applications. Its main role is to help avoid some + * useless spinning and contention during task startup. + **/ + static final long SCANS_PER_SLEEP = 15; + + /** + * The maximum time (in msecs) to sleep when a thread is idle, + * yet others are not, so may eventually generate work that + * the current thread can steal. This value reflects the maximum time + * that a thread may sleep when it possibly should not, because there + * are other active threads that might generate work. In practice, + * designs in which some threads become stalled because others + * are running yet not generating tasks are not likely to work + * well in this framework anyway, so the exact value does not matter + * too much. However, keeping it in the sub-second range does + * help smooth out startup and shutdown effects. + **/ + + static final long MAX_SLEEP_TIME = 100; + + /** + * Set active status of thread t to false, and + * then wait until: (a) there is a task in the entry + * queue, or (b) other threads are active, or (c) the current + * thread is interrupted. Upon return, it + * is not certain that there will be work available. + * The thread must itself check. + * <p> + * The main underlying reason + * for these mechanics is that threads do not + * signal each other when they add elements to their queues. + * (This would add to task overhead, reduce locality. + * and increase contention.) + * So we must rely on a tamed form of polling. However, tasks + * inserted into the entry queue do result in signals, so + * tasks can wait on these if all of them are otherwise idle. + **/ + + public synchronized void checkActive(FJTaskRunner t, long scans) { + + setInactive(t); + + try { + // if nothing available, do a hard wait + if (activeCount == 0 && entryQueue.peek() == null) { + wait(); + } + else { + // If there is possibly some work, + // sleep for a while before rechecking + + long msecs = scans / SCANS_PER_SLEEP; + if (msecs > MAX_SLEEP_TIME) msecs = MAX_SLEEP_TIME; + int nsecs = (msecs == 0) ? 1 : 0; // forces shortest possible sleep + wait(msecs, nsecs); + } + } + catch (InterruptedException ex) { + notify(); // avoid lost notifies on interrupts + Thread.currentThread().interrupt(); + } + } + + /* ------------ Utility methods ------------- */ + + /** + * Start or wake up any threads waiting for work + **/ + + protected synchronized void signalNewTask() { + if (COLLECT_STATS) ++entries; + if (nstarted < threads.length) + threads[nstarted++].start(); + else + notify(); + } + + /** + * Create all FJTaskRunner threads in this group. + **/ + + protected void initializeThreads() { + for (int i = 0; i < threads.length; ++i) threads[i] = new FJTaskRunner(this); + } + + + + + /** + * Wrap wait/notify mechanics around a task so that + * invoke() can wait it out + **/ + protected static final class InvokableFJTask extends FJTask { + protected final Runnable wrapped; + protected boolean terminated = false; + + protected InvokableFJTask(Runnable r) { wrapped = r; } + + public void run() { + try { + if (wrapped instanceof FJTask) + FJTask.invoke((FJTask)(wrapped)); + else + wrapped.run(); + } + finally { + setTerminated(); + } + } + + protected synchronized void setTerminated() { + terminated = true; + notifyAll(); + } + + protected synchronized void awaitTermination() throws InterruptedException { + while (!terminated) wait(); + } + } + + +} + diff --git a/src/actors/scala/actors/FJTaskScheduler2.scala b/src/actors/scala/actors/FJTaskScheduler2.scala new file mode 100644 index 0000000000..ddb0908fd7 --- /dev/null +++ b/src/actors/scala/actors/FJTaskScheduler2.scala @@ -0,0 +1,144 @@ + +package scala.actors + +import compat.Platform + +import java.lang.{Runnable, Thread, InterruptedException, System} + +import scala.collection.Set +import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, HashSet} + +/** + * FJTaskScheduler2 + * + * @version 0.9.4 + * @author Philipp Haller + */ +class FJTaskScheduler2 extends Thread with IScheduler { + + val printStats = false + //val printStats = true + + val coreProp = System.getProperty("actors.corePoolSize") + val maxProp = System.getProperty("actors.maxPoolSize") + + val initCoreSize = + if (null ne coreProp) Integer.parseInt(coreProp) + else 4 + + val maxSize = + if (null ne maxProp) Integer.parseInt(maxProp) + else 256 + + private var coreSize = initCoreSize + + private val executor = + new FJTaskRunnerGroup(coreSize) + + private var terminating = false + + private var lastActivity = Platform.currentTime + + private var submittedTasks = 0 + + private var pendingReactions = 0 + def pendReaction: unit = synchronized { + pendingReactions = pendingReactions + 1 + } + def unPendReaction: unit = synchronized { + pendingReactions = pendingReactions - 1 + } + + def printActorDump {} + def terminated(a: Actor) {} + + private val TICK_FREQ = 50 + private val CHECK_FREQ = 100 + + def onLockup(handler: () => unit) = + lockupHandler = handler + + def onLockup(millis: int)(handler: () => unit) = { + //LOCKUP_CHECK_FREQ = millis / CHECK_FREQ + lockupHandler = handler + } + + private var lockupHandler: () => unit = null + + override def run(): unit = { + try { + while (!terminating) { + this.synchronized { + try { + wait(CHECK_FREQ) + } catch { + case _: InterruptedException => + if (terminating) throw new QuitException + } + + // check if we need more threads + if (Platform.currentTime - lastActivity >= TICK_FREQ + && coreSize < maxSize + && executor.checkPoolSize()) { + // do nothing + } + else { + if (pendingReactions == 0) { + // if all worker threads idle terminate + if (executor.getActiveCount() == 0) { + // Note that we don't have to shutdown + // the FJTaskRunnerGroup since there is + // no separate thread associated with it, + // and FJTaskRunner threads have daemon status. + + // terminate timer thread + TimerThread.t.interrupt() + throw new QuitException + } + } + } + } // sync + + } // while (!terminating) + } catch { + case _: QuitException => + // allow thread to exit + if (printStats) executor.stats() + } + } + + /** + * @param item the task to be executed. + */ + def execute(task: Reaction) { + executor.execute(task) + } + + def start(task: Reaction) { + this.synchronized { + pendingReactions = pendingReactions + 1 + } + executor.execute(task) + } + + /** + * @param worker the worker thread executing tasks + * @return the executed task + */ + def getTask(worker: WorkerThread) = null + + /** + * @param a the actor + */ + def tick(a: Actor) { + lastActivity = Platform.currentTime + } + + /** Shuts down all idle worker threads. + */ + def shutdown(): unit = synchronized { + terminating = true + // terminate timer thread + TimerThread.t.interrupt() + } +} diff --git a/src/actors/scala/actors/Future.scala b/src/actors/scala/actors/Future.scala new file mode 100644 index 0000000000..3ea87ae29d --- /dev/null +++ b/src/actors/scala/actors/Future.scala @@ -0,0 +1,105 @@ + +package scala.actors + +/** + * A Future is a function of arity 0 that returns a value of type Any. + * Applying a future blocks the current actor until its value + * is available. + * A future can be queried to find out whether its value + * is already available. + * + * @version 0.9.4 + * @author Philipp Haller + */ +abstract class Future[T](val ch: InputChannel[Any]) extends Function0[T] { + def isSet: boolean + var value: Option[T] = None +} + +/** + * The Futures object contains methods that operate on Futures. + * + * @version 0.9.4 + * @author Philipp Haller + */ +object Futures { + + def spawn[T](body: => T): Future[T] = { + case object Eval + val a = Actor.actor { + Actor.react { + case Eval => Actor.reply(body) + } + } + a !! (Eval, { case any => any.asInstanceOf[T] }) + } + + def alarm(t: long) = spawn { Thread.sleep(t) } + + def awaitEither[a, b](ft1: Future[a], ft2: Future[b]): Any = { + val FutCh1 = ft1.ch; val FutCh2 = ft2.ch + Actor.receive { + case FutCh1 ! arg1 => arg1 + case FutCh2 ! arg2 => arg2 + } + } + + /** + * Awaits all futures returning an option containing a list of replies, + * or timeouts returning None. + * Note that some of the futures might already have been awaited. + */ + def awaitAll(timeout: long, fts: Future[Any]*): List[Option[Any]] = { + var resultsMap: collection.mutable.Map[Int, Option[Any]] = new collection.mutable.HashMap[Int, Option[Any]] + + var cnt = 0 + val mappedFts = fts.map(ft => + Pair({cnt=cnt+1; cnt-1}, ft)) + + val unsetFts = mappedFts.filter((p: Pair[int, Future[Any]]) => { + if (p._2.isSet) { resultsMap(p._1) = Some(p._2()); false } + else { resultsMap(p._1) = None; true } + }) + + val partFuns = unsetFts.map((p: Pair[int, Future[Any]]) => { + val FutCh = p._2.ch + val singleCase: PartialFunction[Any, Pair[int, Any]] = { + case FutCh ! any => Pair(p._1, any) + } + singleCase + }) + + def awaitWith(partFuns: Seq[PartialFunction[Any, Pair[int, Any]]]) { + val reaction: PartialFunction[Any, unit] = new PartialFunction[Any, unit] { + def isDefinedAt(msg: Any) = msg match { + case TIMEOUT => true + case _ => partFuns exists (.isDefinedAt(msg)) + } + def apply(msg: Any): unit = msg match { + case TIMEOUT => // do nothing + case _ => { + val pfOpt = partFuns find (.isDefinedAt(msg)) + val pf = pfOpt.get // succeeds always + val Pair(idx, subres) = pf(msg) + resultsMap(idx) = Some(subres) + + val partFunsRest = partFuns filter (.!=(pf)) + // wait on rest of partial functions + if (partFunsRest.length > 0) + awaitWith(partFunsRest) + } + } + } + Actor.receiveWithin(timeout)(reaction) + } + + awaitWith(partFuns) + + var results: List[Option[Any]] = Nil + val size = resultsMap.size + for (val i <- 0 until size) { + results = resultsMap(size - i - 1) :: results + } + results + } +} diff --git a/src/actors/scala/actors/IFJTaskRunnerGroup.java b/src/actors/scala/actors/IFJTaskRunnerGroup.java new file mode 100644 index 0000000000..2e9f3359b8 --- /dev/null +++ b/src/actors/scala/actors/IFJTaskRunnerGroup.java @@ -0,0 +1,12 @@ + +package scala.actors; + +interface IFJTaskRunnerGroup { + public void executeTask(FJTask t); + public FJTaskRunner[] getArray(); + public FJTask pollEntryQueue(); + public void setActive(FJTaskRunner t); + public void checkActive(FJTaskRunner t, long scans); + public void setInactive(FJTaskRunner t); + +} diff --git a/src/actors/scala/actors/InputChannel.scala b/src/actors/scala/actors/InputChannel.scala index 80f334b30d..f537c754a0 100644 --- a/src/actors/scala/actors/InputChannel.scala +++ b/src/actors/scala/actors/InputChannel.scala @@ -14,12 +14,12 @@ package scala.actors * The <code>InputChannel</code> trait provides a common interface * for all channels from which values can be received. * - * @version 0.9.2 + * @version 0.9.4 * @author Philipp Haller */ -trait InputChannel[Msg] { - def receive[R](f: PartialFunction[Any, R]): R +trait InputChannel[+Msg] { + def receive[R](f: PartialFunction[Msg, R]): R def receiveWithin[R](msec: long)(f: PartialFunction[Any, R]): R - def react(f: PartialFunction[Any, Unit]): Nothing + def react(f: PartialFunction[Msg, Unit]): Nothing def reactWithin(msec: long)(f: PartialFunction[Any, Unit]): Nothing } diff --git a/src/actors/scala/actors/LinkedNode.java b/src/actors/scala/actors/LinkedNode.java new file mode 100644 index 0000000000..bf8ca02a74 --- /dev/null +++ b/src/actors/scala/actors/LinkedNode.java @@ -0,0 +1,25 @@ +/* + File: LinkedNode.java + + Originally written by Doug Lea and released into the public domain. + This may be used for any purposes whatsoever without acknowledgment. + Thanks for the assistance and support of Sun Microsystems Labs, + and everyone contributing, testing, and using this code. + + History: + Date Who What + 11Jun1998 dl Create public version + 25may2000 dl Change class access to public + 26nov2001 dl Added no-arg constructor, all public access. +*/ + +package scala.actors; + +/** A standard linked list node used in various queue classes **/ +public class LinkedNode { + public Object value; + public LinkedNode next; + public LinkedNode() {} + public LinkedNode(Object x) { value = x; } + public LinkedNode(Object x, LinkedNode n) { value = x; next = n; } +} diff --git a/src/actors/scala/actors/LinkedQueue.java b/src/actors/scala/actors/LinkedQueue.java new file mode 100644 index 0000000000..796f428cf5 --- /dev/null +++ b/src/actors/scala/actors/LinkedQueue.java @@ -0,0 +1,185 @@ +/* + File: LinkedQueue.java + + Originally written by Doug Lea and released into the public domain. + This may be used for any purposes whatsoever without acknowledgment. + Thanks for the assistance and support of Sun Microsystems Labs, + and everyone contributing, testing, and using this code. + + History: + Date Who What + 11Jun1998 dl Create public version + 25aug1998 dl added peek + 10dec1998 dl added isEmpty + 10oct1999 dl lock on node object to ensure visibility +*/ + +package scala.actors; + +/** + * A linked list based channel implementation. + * The algorithm avoids contention between puts + * and takes when the queue is not empty. + * Normally a put and a take can proceed simultaneously. + * (Although it does not allow multiple concurrent puts or takes.) + * This class tends to perform more efficently than + * other Channel implementations in producer/consumer + * applications. + * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] + **/ + +public class LinkedQueue { + + + /** + * Dummy header node of list. The first actual node, if it exists, is always + * at head_.next. After each take, the old first node becomes the head. + **/ + protected LinkedNode head_; + + /** + * Helper monitor for managing access to last node. + **/ + protected final Object putLock_ = new Object(); + + /** + * The last node of list. Put() appends to list, so modifies last_ + **/ + protected LinkedNode last_; + + /** + * The number of threads waiting for a take. + * Notifications are provided in put only if greater than zero. + * The bookkeeping is worth it here since in reasonably balanced + * usages, the notifications will hardly ever be necessary, so + * the call overhead to notify can be eliminated. + **/ + protected int waitingForTake_ = 0; + + public LinkedQueue() { + head_ = new LinkedNode(null); + last_ = head_; + } + + /** Main mechanics for put/offer **/ + protected void insert(Object x) { + synchronized(putLock_) { + LinkedNode p = new LinkedNode(x); + synchronized(last_) { + last_.next = p; + last_ = p; + } + if (waitingForTake_ > 0) + putLock_.notify(); + } + } + + /** Main mechanics for take/poll **/ + protected synchronized Object extract() { + synchronized(head_) { + Object x = null; + LinkedNode first = head_.next; + if (first != null) { + x = first.value; + first.value = null; + head_ = first; + } + return x; + } + } + + + public void put(Object x) throws InterruptedException { + if (x == null) throw new IllegalArgumentException(); + if (Thread.interrupted()) throw new InterruptedException(); + insert(x); + } + + public boolean offer(Object x, long msecs) throws InterruptedException { + if (x == null) throw new IllegalArgumentException(); + if (Thread.interrupted()) throw new InterruptedException(); + insert(x); + return true; + } + + public Object take() throws InterruptedException { + if (Thread.interrupted()) throw new InterruptedException(); + // try to extract. If fail, then enter wait-based retry loop + Object x = extract(); + if (x != null) + return x; + else { + synchronized(putLock_) { + try { + ++waitingForTake_; + for (;;) { + x = extract(); + if (x != null) { + --waitingForTake_; + return x; + } + else { + putLock_.wait(); + } + } + } + catch(InterruptedException ex) { + --waitingForTake_; + putLock_.notify(); + throw ex; + } + } + } + } + + public Object peek() { + synchronized(head_) { + LinkedNode first = head_.next; + if (first != null) + return first.value; + else + return null; + } + } + + + public boolean isEmpty() { + synchronized(head_) { + return head_.next == null; + } + } + + public Object poll(long msecs) throws InterruptedException { + if (Thread.interrupted()) throw new InterruptedException(); + Object x = extract(); + if (x != null) + return x; + else { + synchronized(putLock_) { + try { + long waitTime = msecs; + long start = (msecs <= 0)? 0 : System.currentTimeMillis(); + ++waitingForTake_; + for (;;) { + x = extract(); + if (x != null || waitTime <= 0) { + --waitingForTake_; + return x; + } + else { + putLock_.wait(waitTime); + waitTime = msecs - (System.currentTimeMillis() - start); + } + } + } + catch(InterruptedException ex) { + --waitingForTake_; + putLock_.notify(); + throw ex; + } + } + } + } +} + + diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala index 947ad9947d..fe9fe9d52b 100644 --- a/src/actors/scala/actors/Scheduler.scala +++ b/src/actors/scala/actors/Scheduler.scala @@ -22,14 +22,15 @@ import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, Has * The <code>Scheduler</code> object is used by * <code>Actor</code> to execute tasks of an execution of an actor. * - * @version 0.9.2 + * @version 0.9.4 * @author Philipp Haller */ object Scheduler { private var sched: IScheduler = { - var s: IScheduler = null + var s: IScheduler = new FJTaskScheduler2 +/* // Check for JDK version >= 1.5 var olderThanJDK5 = false try { @@ -43,6 +44,7 @@ object Scheduler { new TickedScheduler else Class.forName("scala.actors.ThreadPoolScheduler").newInstance().asInstanceOf[IScheduler] +*/ s.start() s } @@ -53,7 +55,18 @@ object Scheduler { } def start(task: Reaction) = sched.start(task) - def execute(task: Reaction) = sched.execute(task) + def execute(task: Reaction) = { + val t = currentThread + if (t.isInstanceOf[FJTaskRunner]) { + val tr = t.asInstanceOf[FJTaskRunner] + tr.push(new FJTask { + def run() { + task.run() + } + }) + } else sched.execute(task) + } + def tick(a: Actor) = sched.tick(a) def terminated(a: Actor) = sched.terminated(a) def pendReaction: unit = sched.pendReaction @@ -71,7 +84,7 @@ object Scheduler { * This abstract class provides a common interface for all * schedulers used to execute actor tasks. * - * @version 0.9.2 + * @version 0.9.4 * @author Philipp Haller */ trait IScheduler { @@ -101,7 +114,7 @@ trait IScheduler { * This scheduler executes the tasks of an actor on a single * thread (the current thread). * - * @version 0.9.2 + * @version 0.9.4 * @author Philipp Haller */ class SingleThreadedScheduler extends IScheduler { @@ -135,7 +148,7 @@ class SingleThreadedScheduler extends IScheduler { * The <code>QuickException</code> class is used to manage control flow * of certain schedulers and worker threads. * - * @version 0.9.2 + * @version 0.9.4 * @author Philipp Haller */ private[actors] class QuitException extends Throwable { @@ -195,7 +208,7 @@ private[actors] class QuitException extends Throwable { * execution. QED * </p> * - * @version 0.9.2 + * @version 0.9.4 * @author Philipp Haller */ class WorkerThread(sched: IScheduler) extends Thread { |