diff options
author | Philipp Haller <hallerp@gmail.com> | 2006-11-21 15:07:53 +0000 |
---|---|---|
committer | Philipp Haller <hallerp@gmail.com> | 2006-11-21 15:07:53 +0000 |
commit | 4a2b662fa8140e65a0217c7c3b93bbebcfc918ca (patch) | |
tree | 08319a9688c64333e811537d3cc16a7279bbe20e /src/actors | |
parent | 216f8bf4c23b66c786d93444f57e95beadb0bac7 (diff) | |
download | scala-4a2b662fa8140e65a0217c7c3b93bbebcfc918ca.tar.gz scala-4a2b662fa8140e65a0217c7c3b93bbebcfc918ca.tar.bz2 scala-4a2b662fa8140e65a0217c7c3b93bbebcfc918ca.zip |
Checked-in enhanced debugging support for actors.
Diffstat (limited to 'src/actors')
-rw-r--r-- | src/actors/scala/actors/Actor.scala | 43 | ||||
-rw-r--r-- | src/actors/scala/actors/Channel.scala | 17 | ||||
-rw-r--r-- | src/actors/scala/actors/Reaction.scala | 11 | ||||
-rw-r--r-- | src/actors/scala/actors/Scheduler.scala | 184 |
4 files changed, 202 insertions, 53 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala index c4d36824eb..3e8bf190c3 100644 --- a/src/actors/scala/actors/Actor.scala +++ b/src/actors/scala/actors/Actor.scala @@ -362,6 +362,8 @@ trait Actor extends OutputChannel[Any] { private[actors] var continuation: PartialFunction[Any, Unit] = null private[actors] var timeoutPending = false + private[actors] var isDetached = false + private[actors] var isWaiting = false private[actors] def scheduleActor(f: PartialFunction[Any, Unit], msg: Any) = if ((f eq null) && (continuation eq null)) { @@ -371,15 +373,16 @@ trait Actor extends OutputChannel[Any] { val task = new Reaction(this, if (f eq null) continuation else f, msg) - Scheduler.execute(task) + Scheduler execute task } private[actors] def tick(): Unit = - Scheduler.tick(this) + Scheduler tick this private[actors] def defaultDetachActor: PartialFunction[Any, Unit] => Unit = (f: PartialFunction[Any, Unit]) => { continuation = f + isDetached = true throw new SuspendActorException } @@ -389,14 +392,12 @@ trait Actor extends OutputChannel[Any] { private[actors] var detachActor: PartialFunction[Any, Unit] => Unit = _ private[actors] var kill: () => Unit = _ - private var continue = false - private class ExitSuspendLoop extends Throwable private[actors] def resetActor(): Unit = { suspendActor = () => { - continue = false - while(!continue) { + isWaiting = true + while(isWaiting) { try { wait() } catch { @@ -408,11 +409,11 @@ trait Actor extends OutputChannel[Any] { suspendActorFor = (msec: long) => { val ts = Platform.currentTime var waittime = msec - continue = false var fromExc = false + isWaiting = true try { - while(!continue) { + while(isWaiting) { try { fromExc = false wait(waittime) @@ -422,7 +423,7 @@ trait Actor extends OutputChannel[Any] { val now = Platform.currentTime val waited = now-ts waittime = msec-waited - if (waittime < 0) { continue = true } + if (waittime < 0) { isWaiting = false } } } if (!fromExc) throw new ExitSuspendLoop @@ -432,7 +433,7 @@ trait Actor extends OutputChannel[Any] { } resumeActor = () => { - continue = true + isWaiting = false notify() } @@ -447,7 +448,24 @@ trait Actor extends OutputChannel[Any] { * Starts this actor. */ def start(): Unit = - Scheduler.execute(new Reaction(this)) + Scheduler start new Reaction(this) + + + /* + * Debugging support. + */ + private[actors] var name = "" + + private var childCnt = 0 + + private[actors] def nextChildName = { + val s = childCnt + name + childCnt = childCnt + 1 + s + } + + private[actors] def setName(n: String) = + name = n private val links = new HashSet[Actor] @@ -548,6 +566,9 @@ trait Actor extends OutputChannel[Any] { linked.exit(this, reason) } exitMarks -= this + + // unregister in scheduler + Scheduler terminated this } } } diff --git a/src/actors/scala/actors/Channel.scala b/src/actors/scala/actors/Channel.scala index 11497e8a10..2010c8c1e6 100644 --- a/src/actors/scala/actors/Channel.scala +++ b/src/actors/scala/actors/Channel.scala @@ -296,6 +296,7 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] { } case None => { this.synchronized { + //Scheduler.detached(receiver) receiver.detachActor(f) } } @@ -350,4 +351,20 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] { throw new SuspendActorException } } + + /* + * Prints contents of mailbox to standard out. + * This is used for printing actor dumps. + */ + private[actors] def printMailbox = { + Console.print("[") + val msgs = mailbox.elements + if (msgs.hasNext) + Console.print(msgs.next._1.toString()) + while (msgs.hasNext) { + Console.print(", "+msgs.next._1.toString()) + } + Console.println("]") + } + } diff --git a/src/actors/scala/actors/Reaction.scala b/src/actors/scala/actors/Reaction.scala index 571d32aa44..06410ff899 100644 --- a/src/actors/scala/actors/Reaction.scala +++ b/src/actors/scala/actors/Reaction.scala @@ -50,6 +50,7 @@ private[actors] class Reaction(a: Actor, val saved = Actor.selfs.get(t).asInstanceOf[Actor] Actor.selfs.put(t, a) Scheduler.unPendReaction + a.isDetached = false try { if (f == null) a.act() @@ -86,4 +87,14 @@ private[actors] class Reaction(a: Actor, Actor.selfs.put(t, saved) } } + + private var runnable = false + + def isRunnable = synchronized { + runnable + } + + def setRunnable(on: boolean) = synchronized { + runnable = on + } } diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala index 2f4c2827c6..b6696f2793 100644 --- a/src/actors/scala/actors/Scheduler.scala +++ b/src/actors/scala/actors/Scheduler.scala @@ -16,7 +16,8 @@ import compat.Platform import java.lang.{Runnable, Thread, InterruptedException} import java.util.logging.{Logger, FileHandler, Level} -import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack} +import scala.collection.Set +import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, HashSet} /** * The <code>Scheduler</code> object is used by @@ -42,16 +43,18 @@ object Scheduler { sched = scheduler } - def execute(task: Reaction) = synchronized { - sched.execute(task) - } - + def start(task: Reaction) = sched.start(task) + def execute(task: Reaction) = sched.execute(task) def tick(a: Actor) = sched.tick(a) - - def shutdown(): unit = sched.shutdown() - + def terminated(a: Actor) = sched.terminated(a) def pendReaction: unit = sched.pendReaction def unPendReaction: unit = sched.unPendReaction + + def shutdown() = sched.shutdown() + + def onLockup(handler: () => unit) = sched.onLockup(handler) + def onLockup(millis: int)(handler: () => unit) = sched.onLockup(millis)(handler) + def printActorDump = sched.printActorDump } /** @@ -62,19 +65,24 @@ object Scheduler { * @author Philipp Haller */ trait IScheduler { + def start(task: Reaction): unit def execute(task: Reaction): unit def getTask(worker: WorkerThread): Runnable def tick(a: Actor): unit + def terminated(a: Actor): unit + def pendReaction: unit + def unPendReaction: unit def shutdown(): unit + def onLockup(handler: () => unit): unit + def onLockup(millis: int)(handler: () => unit): unit + def printActorDump: unit + val QUIT_TASK = new Reaction(null) { override def run(): unit = {} override def toString() = "QUIT_TASK" } - - def pendReaction: unit - def unPendReaction: unit } /** @@ -84,11 +92,12 @@ trait IScheduler { */ class TickedScheduler extends Thread with IScheduler { private val tasks = new Queue[Reaction] - private var workers: Buffer[WorkerThread] = new ArrayBuffer[WorkerThread] + // Worker threads + private val workers: Buffer[WorkerThread] = new ArrayBuffer[WorkerThread] private val idle = new Queue[WorkerThread] private val ticks = new HashMap[WorkerThread, long] - private val executing = new HashMap[Actor, WorkerThread] + private val occupied = new HashMap[Actor, WorkerThread] private var terminating = false @@ -103,24 +112,110 @@ class TickedScheduler extends Thread with IScheduler { pendingReactions.pop } - var TICKFREQ = 5 - var CHECKFREQ = 50 + /* + * An actor is alive if it has been started and + * has not yet terminated. + */ + private val alive = new HashSet[Actor] + + def printActorDump { + var num = 0 + for (val a <- alive.elements) { + Console.println("Actor `"+a.name+"' ("+num+"): "+a) + if (a.isDetached) + Console.println("Detached") + else { + val flag = if (isActive(a)) "ACTIVE" else "INACTIVE" + Console.println("Occupies thread: "+occupied.get(a)+" ["+flag+"]") + } + + if (a.isDetached || a.isWaiting) { + // dump contents of mailbox + Console.println("Waiting with mailbox:") + a.in.printMailbox + } - def init() = { - for (val i <- List.range(0, 2)) { - val worker = new WorkerThread(this) - workers += worker - worker.start() + Console.println + num = num + 1 } } - init() + + def start(task: Reaction): unit = synchronized { + Debug.info("Starting " + task.actor) + alive += task.actor + + // determine name of actor + val creator = Actor.self + if (creator.isInstanceOf[ActorProxy]) { + // created by Java thread + // only ok, if it is the main thread + val tname = currentThread.toString() + if (tname.indexOf("main") == -1) { + // print/log warning + Console.println("Warning: Some debugging features not available if actors are created by non-main Java threads.") + } else task.actor.name = creator.nextChildName + } else task.actor.name = creator.nextChildName + + execute(task) + } + + def terminated(a: Actor): unit = + alive -= a + + private var TICK_FREQ = 5 + private var CHECK_FREQ = 50 + + private var LOCKUP_CHECK_FREQ = 10 // 10 * CHECK_FREQ + private var lockupCnt = 0 + private var stateChanged = false + + for (val i <- List.range(0, 2)) { + val worker = new WorkerThread(this) + workers += worker + worker.start() + } + + def onLockup(handler: () => unit) = + lockupHandler = handler + + def onLockup(millis: int)(handler: () => unit) = { + LOCKUP_CHECK_FREQ = millis / CHECK_FREQ + lockupHandler = handler + } + + private var lockupHandler: () => unit = null + + def isActive(a: Actor): boolean = occupied.get(a) match { + case None => + // thread outside of scheduler; + // error("No worker thread associated with actor " + a) + false + case Some(wt) => isActive(wt) + } + + def isActive(wt: WorkerThread): boolean = ticks.get(wt) match { + case None => false + case Some(ts) => + val currTime = Platform.currentTime + if (currTime - ts < TICK_FREQ) true + else false + } override def run(): unit = { try { while (!terminating) { this.synchronized { try { - wait(CHECKFREQ) + wait(CHECK_FREQ) + + if (!stateChanged) lockupCnt = lockupCnt + 1 + else stateChanged = false + + if (lockupCnt == LOCKUP_CHECK_FREQ) { + lockupCnt = 0 + if (lockupHandler != null) + lockupHandler() + } } catch { case _: InterruptedException => if (terminating) throw new QuitException @@ -132,14 +227,7 @@ class TickedScheduler extends Thread with IScheduler { var foundBusy = false while (iter.hasNext && !foundBusy) { val wt = iter.next - ticks.get(wt) match { - case None => - foundBusy = false - case Some(ts) => - val currTime = Platform.currentTime - if (currTime - ts < TICKFREQ) - foundBusy = true - } + foundBusy = isActive(wt) } if (!foundBusy) { @@ -149,15 +237,15 @@ class TickedScheduler extends Thread with IScheduler { // dequeue item to be processed val item = tasks.dequeue - executing.update(item.actor, newWorker) + occupied.update(item.actor, newWorker) newWorker.execute(item) newWorker.start() + + stateChanged = true } } // tasks.length > 0 else { - //Debug.info("task queue empty") if (pendingReactions.isEmpty) { - //Debug.info("no pending reactions") // if all worker threads idle terminate if (workers.length == idle.length) { Debug.info("all threads idle, terminating") @@ -186,14 +274,16 @@ class TickedScheduler extends Thread with IScheduler { * @param item the task to be executed. */ def execute(item: Reaction): unit = synchronized { - if (!terminating) + if (!terminating) { if (idle.length > 0) { val wt = idle.dequeue - executing.update(item.actor, wt) + occupied.update(item.actor, wt) wt.execute(item) } else tasks += item + stateChanged = true + } } /** @@ -203,9 +293,10 @@ class TickedScheduler extends Thread with IScheduler { def getTask(worker: WorkerThread) = synchronized { if (terminating) QUIT_TASK + stateChanged = true if (tasks.length > 0) { val item = tasks.dequeue - executing.update(item.actor, worker) + occupied.update(item.actor, worker) item } else { @@ -214,14 +305,12 @@ class TickedScheduler extends Thread with IScheduler { } } - var ticksCnt = 0 - /** * @param a the actor */ def tick(a: Actor): unit = synchronized { - ticksCnt = ticksCnt + 1 - executing.get(a) match { + stateChanged = true + occupied.get(a) match { case None => // thread outside of scheduler; // error("No worker thread associated with actor " + a) @@ -269,6 +358,11 @@ class QuitException extends Throwable { * @author Philipp Haller */ class SingleThreadedScheduler extends IScheduler { + def start(task: Reaction): unit = { + // execute task immediately on same thread + task.run() + } + def execute(task: Reaction): unit = { // execute task immediately on same thread task.run() @@ -276,9 +370,15 @@ class SingleThreadedScheduler extends IScheduler { def getTask(worker: WorkerThread): Runnable = null def tick(a: Actor): Unit = {} - def shutdown(): Unit = {} + def terminated(a: Actor): unit = {} def pendReaction: unit = {} def unPendReaction: unit = {} + + def shutdown(): Unit = {} + + def onLockup(handler: () => unit): unit = {} + def onLockup(millis: int)(handler: () => unit): unit = {} + def printActorDump: unit = {} } /** @@ -288,7 +388,7 @@ class SingleThreadedScheduler extends IScheduler { * @version 0.9.0 * @author Philipp Haller */ -class SpareWorkerScheduler extends IScheduler { +abstract class SpareWorkerScheduler extends IScheduler { private val tasks = new Queue[Reaction] private val idle = new Queue[WorkerThread] private var workers: Buffer[WorkerThread] = new ArrayBuffer[WorkerThread] |