diff options
author | Philipp Haller <hallerp@gmail.com> | 2006-11-08 21:11:33 +0000 |
---|---|---|
committer | Philipp Haller <hallerp@gmail.com> | 2006-11-08 21:11:33 +0000 |
commit | f18a26d8b9bf51d1f25035558988a744f320a527 (patch) | |
tree | f1ac2dd01c1e40e94a944dde6aba5421afd03988 /src/actors | |
parent | 939774370edfca1b44e21243b4fbfee55fa17b5e (diff) | |
download | scala-f18a26d8b9bf51d1f25035558988a744f320a527.tar.gz scala-f18a26d8b9bf51d1f25035558988a744f320a527.tar.bz2 scala-f18a26d8b9bf51d1f25035558988a744f320a527.zip |
added pending reactions to prevent scheduler fr...
added pending reactions to prevent scheduler from terminating early
Diffstat (limited to 'src/actors')
-rw-r--r-- | src/actors/scala/actors/Actor.scala | 4 | ||||
-rw-r--r-- | src/actors/scala/actors/Channel.scala | 2 | ||||
-rw-r--r-- | src/actors/scala/actors/Reaction.scala | 2 | ||||
-rw-r--r-- | src/actors/scala/actors/Scheduler.scala | 46 |
4 files changed, 40 insertions, 14 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala index 712fb41d85..dcf5b64f4a 100644 --- a/src/actors/scala/actors/Actor.scala +++ b/src/actors/scala/actors/Actor.scala @@ -195,6 +195,7 @@ object Actor { } s.detachActor = f => { s.in.waitingFor = s.in.waitingForNone + Scheduler.unPendReaction throw new SuspendActorException } @@ -233,7 +234,8 @@ object Actor { */ def seq[a, b >: a](first: => a, next: => b): b = { val s = self - s.kill = () => { next; s.kill() } + val killNext = s.kill + s.kill = () => { s.kill = killNext; next; s.kill() } first } diff --git a/src/actors/scala/actors/Channel.scala b/src/actors/scala/actors/Channel.scala index 3ec944846d..fa474f3fc1 100644 --- a/src/actors/scala/actors/Channel.scala +++ b/src/actors/scala/actors/Channel.scala @@ -263,6 +263,7 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] { */ def react(f: PartialFunction[Any, Unit]): Nothing = { assert(Actor.self == receiver, "react on channel belonging to other actor") + Scheduler.pendReaction receiver.synchronized { receiver.tick() waitingFor = f.isDefinedAt @@ -304,6 +305,7 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] { */ def reactWithin(msec: long)(f: PartialFunction[Any, Unit]): Nothing = { assert(Actor.self == receiver, "react on channel belonging to other actor") + Scheduler.pendReaction receiver.synchronized { receiver.tick() waitingFor = f.isDefinedAt diff --git a/src/actors/scala/actors/Reaction.scala b/src/actors/scala/actors/Reaction.scala index 79e54a52d7..ee25c73e3a 100644 --- a/src/actors/scala/actors/Reaction.scala +++ b/src/actors/scala/actors/Reaction.scala @@ -43,6 +43,7 @@ private[actors] class StartTask(a: Actor) extends Reaction { val t = currentThread val saved = Actor.selfs.get(t).asInstanceOf[Actor] Actor.selfs.put(t, a) + Scheduler.unPendReaction try { a.act() if (currentThread.isInterrupted()) @@ -89,6 +90,7 @@ private[actors] class ActorTask(a: Actor, val t = currentThread val saved = Actor.selfs.get(t).asInstanceOf[Actor] Actor.selfs.put(t, a) + Scheduler.unPendReaction try { f(msg) if (currentThread.isInterrupted()) diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala index a206a7899b..ad2050c871 100644 --- a/src/actors/scala/actors/Scheduler.scala +++ b/src/actors/scala/actors/Scheduler.scala @@ -15,7 +15,7 @@ import java.lang.{Runnable, Thread} import java.lang.InterruptedException import compat.Platform -import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue} +import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack} /** * The <code>Scheduler</code> object is used by @@ -46,6 +46,9 @@ object Scheduler { def tick(a: Actor) = sched.tick(a) def shutdown(): Unit = sched.shutdown() + + def pendReaction: unit = sched.pendReaction + def unPendReaction: unit = sched.unPendReaction } /** @@ -67,6 +70,9 @@ trait IScheduler { def run(): Unit = {} override def toString() = "QUIT_TASK" } + + def pendReaction: unit + def unPendReaction: unit } /** @@ -76,7 +82,7 @@ trait IScheduler { * @version Beta2 * @author Philipp Haller */ -class SingleThreadedScheduler extends IScheduler { +abstract class SingleThreadedScheduler extends IScheduler { def execute(task: Reaction): Unit = { // execute task immediately on same thread task.run() @@ -96,7 +102,7 @@ class SingleThreadedScheduler extends IScheduler { * @version Beta2 * @author Philipp Haller */ -class SpareWorkerScheduler extends IScheduler { +abstract class SpareWorkerScheduler extends IScheduler { private val tasks = new Queue[Reaction] private val idle = new Queue[WorkerThread] private var workers: Buffer[WorkerThread] = new ArrayBuffer[WorkerThread] @@ -170,6 +176,17 @@ class TickedScheduler extends Thread with IScheduler { private var terminating = false + private var pendingReactions = new Stack[unit] + def pendReaction: unit = { + Debug.info("pend reaction") + pendingReactions push () + } + def unPendReaction: unit = { + Debug.info("unpend reaction") + if (!pendingReactions.isEmpty) + pendingReactions.pop + } + var TICKFREQ = 5 var CHECKFREQ = 50 @@ -222,17 +239,20 @@ class TickedScheduler extends Thread with IScheduler { } } // tasks.length > 0 else { - Debug.info("task queue empty, checking...") - // if all worker threads idle terminate - if (workers.length == idle.length) { - Debug.info("all threads idle, terminating") - val idleThreads = idle.elements - while (idleThreads.hasNext) { - val worker = idleThreads.next - worker.running = false - worker.interrupt() + Debug.info("task queue empty") + if (pendingReactions.isEmpty) { + Debug.info("no pending reactions") + // if all worker threads idle terminate + if (workers.length == idle.length) { + Debug.info("all threads idle, terminating") + val idleThreads = idle.elements + while (idleThreads.hasNext) { + val worker = idleThreads.next + worker.running = false + worker.interrupt() + } + throw new QuitException } - throw new QuitException } } } // sync |