diff options
author | Philipp Haller <hallerp@gmail.com> | 2008-08-04 14:53:04 +0000 |
---|---|---|
committer | Philipp Haller <hallerp@gmail.com> | 2008-08-04 14:53:04 +0000 |
commit | 8b7c4138c6d90cd4bcac90867141e568041c9de0 (patch) | |
tree | e9a1831ffe41aee4520493d9aced6777c8d1d01a /src/actors | |
parent | 699e811f1a1a9970ebed6ee5ec366aa8aefea6cf (diff) | |
download | scala-8b7c4138c6d90cd4bcac90867141e568041c9de0.tar.gz scala-8b7c4138c6d90cd4bcac90867141e568041c9de0.tar.bz2 scala-8b7c4138c6d90cd4bcac90867141e568041c9de0.zip |
Actor trait abstracts from scheduling strategy.
Diffstat (limited to 'src/actors')
-rw-r--r-- | src/actors/scala/actors/Actor.scala | 19 | ||||
-rw-r--r-- | src/actors/scala/actors/FJTaskScheduler2.scala | 28 | ||||
-rw-r--r-- | src/actors/scala/actors/Scheduler.scala | 137 | ||||
-rw-r--r-- | src/actors/scala/actors/SchedulerAdapter.scala | 49 | ||||
-rw-r--r-- | src/actors/scala/actors/TickedScheduler.scala | 36 |
5 files changed, 164 insertions, 105 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala index 2edb360ae9..3bd27be895 100644 --- a/src/actors/scala/actors/Actor.scala +++ b/src/actors/scala/actors/Actor.scala @@ -19,7 +19,7 @@ import scala.compat.Platform * <code>receive</code>, <code>react</code>, <code>reply</code>, * etc. * - * @version 0.9.14 + * @version 0.9.18 * @author Philipp Haller */ object Actor { @@ -350,7 +350,7 @@ object Actor { * </li> * </ul> * - * @version 0.9.16 + * @version 0.9.18 * @author Philipp Haller */ @serializable @@ -365,6 +365,9 @@ trait Actor extends AbstractActor { protected val mailbox = new MessageQueue private var sessions: List[OutputChannel[Any]] = Nil + protected val scheduler: IScheduler = + Scheduler + /** * Returns the number of messages in this actor's mailbox * @@ -401,7 +404,7 @@ trait Actor extends AbstractActor { if (isSuspended) resumeActor() else // assert continuation != null - Scheduler.execute(new Reaction(this, continuation, msg)) + scheduler.execute(new Reaction(this, continuation, msg)) } else { mailbox.append(msg, replyTo) } @@ -708,11 +711,11 @@ trait Actor extends AbstractActor { val task = new Reaction(this, if (f eq null) continuation else f, msg) - Scheduler execute task + scheduler execute task } private def tick(): Unit = - Scheduler tick this + scheduler tick this private[actors] var kill: () => Unit = () => {} @@ -772,7 +775,11 @@ trait Actor extends AbstractActor { exiting = false shouldExit = false - Scheduler start new Reaction(this) + scheduler execute { + ActorGC.newActor(Actor.this) + (new Reaction(Actor.this)).run() + } + this } diff --git a/src/actors/scala/actors/FJTaskScheduler2.scala b/src/actors/scala/actors/FJTaskScheduler2.scala index 34095da299..e7d1d7656d 100644 --- a/src/actors/scala/actors/FJTaskScheduler2.scala +++ b/src/actors/scala/actors/FJTaskScheduler2.scala @@ -20,7 +20,7 @@ import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, Has /** * FJTaskScheduler2 * - * @version 0.9.12 + * @version 0.9.18 * @author Philipp Haller */ class FJTaskScheduler2 extends Thread with IScheduler { @@ -139,28 +139,18 @@ class FJTaskScheduler2 extends Thread with IScheduler { } /** - * @param item the task to be executed. + * @param task the task to be executed */ - def execute(task: Runnable) { - executor.execute(task) - } - - def start(task: Runnable) { - if (task.isInstanceOf[Reaction]) { - val reaction = task.asInstanceOf[Reaction] - ActorGC.newActor(reaction.a) - } - executor.execute(task) - } + def execute(task: Runnable): Unit = + executor execute task - /** - * @param worker the worker thread executing tasks - * @return the executed task - */ - def getTask(worker: WorkerThread) = null + def execute(fun: => Unit): Unit = + executor.execute(new Runnable { + def run() { fun } + }) /** - * @param a the actor + * @param a the actor */ def tick(a: Actor) { lastActivity = Platform.currentTime diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala index 41a69d12e0..12429e83e1 100644 --- a/src/actors/scala/actors/Scheduler.scala +++ b/src/actors/scala/actors/Scheduler.scala @@ -8,7 +8,6 @@ // $Id$ - package scala.actors import compat.Platform @@ -22,12 +21,13 @@ import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, Has * The <code>Scheduler</code> object is used by * <code>Actor</code> to execute tasks of an execution of an actor. * - * @version 0.9.10 + * @version 0.9.18 * @author Philipp Haller */ -object Scheduler { +object Scheduler extends IScheduler { + private var sched: IScheduler = { - var s: IScheduler = new FJTaskScheduler2 + val s = new FJTaskScheduler2 s.start() s } @@ -35,24 +35,29 @@ object Scheduler { def impl = sched def impl_= (scheduler: IScheduler) = { sched = scheduler - sched.start() } private var tasks: LinkedQueue = null private var pendingCount = 0 - def snapshot(): Unit = { - tasks = sched.snapshot() - pendingCount = ActorGC.getPendingCount - sched.shutdown() - } + /* Assumes <code>sched</code> holds an instance + * of <code>FJTaskScheduler2</code>. + */ + def snapshot(): Unit = + if (sched.isInstanceOf[FJTaskScheduler2]) { + val fjts = sched.asInstanceOf[FJTaskScheduler2] + tasks = fjts.snapshot() + pendingCount = ActorGC.getPendingCount + fjts.shutdown() + } else + error("snapshot operation not supported.") /* Creates an instance of class <code>FJTaskScheduler2</code> * and submits <code>tasks</code> for execution. */ def restart(): Unit = synchronized { sched = { - var s: IScheduler = new FJTaskScheduler2 + val s = new FJTaskScheduler2 ActorGC.setPendingCount(pendingCount) s.start() s @@ -64,30 +69,32 @@ object Scheduler { tasks = null } - /* The following two methods (<code>start</code> and - * <code>execute</code>) are called from within - * <code>Actor</code> to submit tasks for execution. - */ - def start(task: Runnable) = sched.start(task) + def execute(task: Runnable) { + val t = currentThread + if (t.isInstanceOf[FJTaskRunner]) { + val tr = t.asInstanceOf[FJTaskRunner] + tr.push(new FJTask { + def run() { task.run() } + }) + } else + sched execute task + } - def execute(task: Runnable) = { + def execute(fun: => Unit) { val t = currentThread if (t.isInstanceOf[FJTaskRunner]) { val tr = t.asInstanceOf[FJTaskRunner] tr.push(new FJTask { - def run() { - task.run() - } + def run() { fun } }) - } else sched.execute(task) + } else + sched execute { fun } } /* This method is used to notify the scheduler * of library activity by the argument Actor. - * - * It is only called from within <code>Actor</code>. */ - def tick(a: Actor) = sched.tick(a) + def tick(a: Actor) = sched tick a def shutdown() = sched.shutdown() @@ -98,22 +105,40 @@ object Scheduler { /** - * This abstract class provides a common interface for all - * schedulers used to execute actor tasks. + * The <code>IScheduler</code> trait provides a common interface + * for all schedulers used to execute actor tasks. * - * @version 0.9.8 + * Subclasses of <code>Actor</code> that override its + * <code>scheduler</code> member value must provide + * an implementation of the <code>IScheduler</code> + * trait. + * + * @version 0.9.18 * @author Philipp Haller */ trait IScheduler { - def start(): Unit - def start(task: Runnable): Unit + /** Submits a closure for execution. + * + * @param fun the closure to be executed + */ + def execute(fun: => Unit): Unit + + /** Submits a <code>Runnable</code> for execution. + * + * @param task the task to be executed + */ def execute(task: Runnable): Unit - def getTask(worker: WorkerThread): Runnable + /** Notifies the scheduler about activity of the + * executing actor. + * + * @param a the active actor + */ def tick(a: Actor): Unit - def snapshot(): LinkedQueue + /** Shuts down the scheduler. + */ def shutdown(): Unit def onLockup(handler: () => Unit): Unit @@ -127,47 +152,36 @@ trait IScheduler { } +trait WorkerThreadScheduler extends IScheduler { + /** + * @param worker the worker thread executing tasks + * @return the task to be executed + */ + def getTask(worker: WorkerThread): Runnable +} + + /** * This scheduler executes the tasks of an actor on a single * thread (the current thread). * - * @version 0.9.9 + * @version 0.9.18 * @author Philipp Haller */ class SingleThreadedScheduler extends IScheduler { - def start() {} - val taskQ = new scala.collection.mutable.Queue[Runnable] - - def start(task: Runnable) { - // execute task immediately on same thread + def execute(task: Runnable) { task.run() - while (taskQ.length > 0) { - val nextTask = taskQ.dequeue - nextTask.run() - } } - def execute(task: Runnable) { - val a = Actor.tl.get.asInstanceOf[Actor] - if ((null ne a) && a.isInstanceOf[ActorProxy]) { - // execute task immediately on same thread - task.run() - while (taskQ.length > 0) { - val nextTask = taskQ.dequeue - nextTask.run() - } - } else { - // queue task for later execution - taskQ += task - } - } + def execute(fun: => Unit): Unit = + execute(new Runnable { + def run() { fun } + }) - def getTask(worker: WorkerThread): Runnable = null def tick(a: Actor) {} def shutdown() {} - def snapshot(): LinkedQueue = { null } def onLockup(handler: () => Unit) {} def onLockup(millis: Int)(handler: () => Unit) {} @@ -176,7 +190,7 @@ class SingleThreadedScheduler extends IScheduler { /** - * The <code>QuickException</code> class is used to manage control flow + * The <code>QuitException</code> class is used to manage control flow * of certain schedulers and worker threads. * * @version 0.9.8 @@ -190,6 +204,7 @@ private[actors] class QuitException extends Throwable { override def fillInStackTrace(): Throwable = this } + /** * <p> * The class <code>WorkerThread</code> is used by schedulers to execute @@ -239,10 +254,10 @@ private[actors] class QuitException extends Throwable { * execution. QED * </p> * - * @version 0.9.8 + * @version 0.9.18 * @author Philipp Haller */ -class WorkerThread(sched: IScheduler) extends Thread { +class WorkerThread(sched: WorkerThreadScheduler) extends Thread { private var task: Runnable = null private[actors] var running = true @@ -263,7 +278,7 @@ class WorkerThread(sched: IScheduler) extends Thread { } } this.synchronized { - task = sched.getTask(this) + task = sched getTask this while (task eq null) { try { diff --git a/src/actors/scala/actors/SchedulerAdapter.scala b/src/actors/scala/actors/SchedulerAdapter.scala new file mode 100644 index 0000000000..dc1e442f9a --- /dev/null +++ b/src/actors/scala/actors/SchedulerAdapter.scala @@ -0,0 +1,49 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2005-2008, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.actors + +/** The <code>SchedulerAdapter</code> trait is used to adapt + * the behavior of the standard <code>Scheduler</code> object. + * + * Providing an implementation for the + * <code>execute(f: => Unit)</code> method is sufficient to + * obtain a concrete <code>IScheduler</code> class. + * + * @version 0.9.18 + * @author Philipp Haller + */ +trait SchedulerAdapter extends IScheduler { + + /** Submits a <code>Runnable</code> for execution. + * + * @param task the task to be executed + */ + def execute(task: Runnable): Unit = + execute { task.run() } + + /** Notifies the scheduler about activity of the + * executing actor. + * + * @param a the active actor + */ + def tick(a: Actor): Unit = + Scheduler tick a + + /** Shuts down the scheduler. + */ + def shutdown(): Unit = + Scheduler.shutdown() + + def onLockup(handler: () => Unit) {} + + def onLockup(millis: Int)(handler: () => Unit) {} + + def printActorDump {} + +} diff --git a/src/actors/scala/actors/TickedScheduler.scala b/src/actors/scala/actors/TickedScheduler.scala index 2d90a906cd..3ce1fab731 100644 --- a/src/actors/scala/actors/TickedScheduler.scala +++ b/src/actors/scala/actors/TickedScheduler.scala @@ -20,10 +20,16 @@ import scala.compat.Platform * <p>This scheduler uses a thread pool to execute tasks that are generated * by the execution of actors.</p> * - * @version 0.9.8 + * Use class <code>FJTaskScheduler2</code> instead. + * + * @version 0.9.18 * @author Philipp Haller */ -class TickedScheduler extends Thread with IScheduler { +@deprecated +class TickedScheduler extends Thread with WorkerThreadScheduler { + // as long as this thread runs, JVM should not exit + setDaemon(false) + private val tasks = new Queue[Runnable] // Worker threads @@ -35,23 +41,8 @@ class TickedScheduler extends Thread with IScheduler { private var lastActivity = Platform.currentTime - private var pendingReactions = 0 - def pendReaction: Unit = synchronized { - pendingReactions += 1 - } - def unPendReaction: Unit = synchronized { - pendingReactions -= 1 - } - def printActorDump {} - def start(task: Runnable): Unit = synchronized { - pendingReactions += 1 - execute(task) - } - - def terminated(a: Actor) {} - private var TICK_FREQ = 5 private var CHECK_FREQ = 50 @@ -83,6 +74,8 @@ class TickedScheduler extends Thread with IScheduler { if (terminating) throw new QuitException } + ActorGC.gc() + if (tasks.length > 0) { // check if we need more threads if (Platform.currentTime - lastActivity >= TICK_FREQ) { @@ -97,9 +90,11 @@ class TickedScheduler extends Thread with IScheduler { } } // tasks.length > 0 else { - if (pendingReactions == 0) { + if (ActorGC.allTerminated) { // if all worker threads idle terminate if (workers.length == idle.length) { + Debug.info(this+": initiating shutdown...") + val idleThreads = idle.elements while (idleThreads.hasNext) { val worker = idleThreads.next @@ -135,7 +130,10 @@ class TickedScheduler extends Thread with IScheduler { } } - def snapshot(): LinkedQueue = null + def execute(fun: => Unit): Unit = + execute(new Runnable { + def run() { fun } + }) /** * @param worker the worker thread executing tasks |