summaryrefslogtreecommitdiff
path: root/src/actors
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2006-11-21 15:07:53 +0000
committerPhilipp Haller <hallerp@gmail.com>2006-11-21 15:07:53 +0000
commit4a2b662fa8140e65a0217c7c3b93bbebcfc918ca (patch)
tree08319a9688c64333e811537d3cc16a7279bbe20e /src/actors
parent216f8bf4c23b66c786d93444f57e95beadb0bac7 (diff)
downloadscala-4a2b662fa8140e65a0217c7c3b93bbebcfc918ca.tar.gz
scala-4a2b662fa8140e65a0217c7c3b93bbebcfc918ca.tar.bz2
scala-4a2b662fa8140e65a0217c7c3b93bbebcfc918ca.zip
Checked-in enhanced debugging support for actors.
Diffstat (limited to 'src/actors')
-rw-r--r--src/actors/scala/actors/Actor.scala43
-rw-r--r--src/actors/scala/actors/Channel.scala17
-rw-r--r--src/actors/scala/actors/Reaction.scala11
-rw-r--r--src/actors/scala/actors/Scheduler.scala184
4 files changed, 202 insertions, 53 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala
index c4d36824eb..3e8bf190c3 100644
--- a/src/actors/scala/actors/Actor.scala
+++ b/src/actors/scala/actors/Actor.scala
@@ -362,6 +362,8 @@ trait Actor extends OutputChannel[Any] {
private[actors] var continuation: PartialFunction[Any, Unit] = null
private[actors] var timeoutPending = false
+ private[actors] var isDetached = false
+ private[actors] var isWaiting = false
private[actors] def scheduleActor(f: PartialFunction[Any, Unit], msg: Any) =
if ((f eq null) && (continuation eq null)) {
@@ -371,15 +373,16 @@ trait Actor extends OutputChannel[Any] {
val task = new Reaction(this,
if (f eq null) continuation else f,
msg)
- Scheduler.execute(task)
+ Scheduler execute task
}
private[actors] def tick(): Unit =
- Scheduler.tick(this)
+ Scheduler tick this
private[actors] def defaultDetachActor: PartialFunction[Any, Unit] => Unit =
(f: PartialFunction[Any, Unit]) => {
continuation = f
+ isDetached = true
throw new SuspendActorException
}
@@ -389,14 +392,12 @@ trait Actor extends OutputChannel[Any] {
private[actors] var detachActor: PartialFunction[Any, Unit] => Unit = _
private[actors] var kill: () => Unit = _
- private var continue = false
-
private class ExitSuspendLoop extends Throwable
private[actors] def resetActor(): Unit = {
suspendActor = () => {
- continue = false
- while(!continue) {
+ isWaiting = true
+ while(isWaiting) {
try {
wait()
} catch {
@@ -408,11 +409,11 @@ trait Actor extends OutputChannel[Any] {
suspendActorFor = (msec: long) => {
val ts = Platform.currentTime
var waittime = msec
- continue = false
var fromExc = false
+ isWaiting = true
try {
- while(!continue) {
+ while(isWaiting) {
try {
fromExc = false
wait(waittime)
@@ -422,7 +423,7 @@ trait Actor extends OutputChannel[Any] {
val now = Platform.currentTime
val waited = now-ts
waittime = msec-waited
- if (waittime < 0) { continue = true }
+ if (waittime < 0) { isWaiting = false }
}
}
if (!fromExc) throw new ExitSuspendLoop
@@ -432,7 +433,7 @@ trait Actor extends OutputChannel[Any] {
}
resumeActor = () => {
- continue = true
+ isWaiting = false
notify()
}
@@ -447,7 +448,24 @@ trait Actor extends OutputChannel[Any] {
* Starts this actor.
*/
def start(): Unit =
- Scheduler.execute(new Reaction(this))
+ Scheduler start new Reaction(this)
+
+
+ /*
+ * Debugging support.
+ */
+ private[actors] var name = ""
+
+ private var childCnt = 0
+
+ private[actors] def nextChildName = {
+ val s = childCnt + name
+ childCnt = childCnt + 1
+ s
+ }
+
+ private[actors] def setName(n: String) =
+ name = n
private val links = new HashSet[Actor]
@@ -548,6 +566,9 @@ trait Actor extends OutputChannel[Any] {
linked.exit(this, reason)
}
exitMarks -= this
+
+ // unregister in scheduler
+ Scheduler terminated this
}
}
}
diff --git a/src/actors/scala/actors/Channel.scala b/src/actors/scala/actors/Channel.scala
index 11497e8a10..2010c8c1e6 100644
--- a/src/actors/scala/actors/Channel.scala
+++ b/src/actors/scala/actors/Channel.scala
@@ -296,6 +296,7 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] {
}
case None => {
this.synchronized {
+ //Scheduler.detached(receiver)
receiver.detachActor(f)
}
}
@@ -350,4 +351,20 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] {
throw new SuspendActorException
}
}
+
+ /*
+ * Prints contents of mailbox to standard out.
+ * This is used for printing actor dumps.
+ */
+ private[actors] def printMailbox = {
+ Console.print("[")
+ val msgs = mailbox.elements
+ if (msgs.hasNext)
+ Console.print(msgs.next._1.toString())
+ while (msgs.hasNext) {
+ Console.print(", "+msgs.next._1.toString())
+ }
+ Console.println("]")
+ }
+
}
diff --git a/src/actors/scala/actors/Reaction.scala b/src/actors/scala/actors/Reaction.scala
index 571d32aa44..06410ff899 100644
--- a/src/actors/scala/actors/Reaction.scala
+++ b/src/actors/scala/actors/Reaction.scala
@@ -50,6 +50,7 @@ private[actors] class Reaction(a: Actor,
val saved = Actor.selfs.get(t).asInstanceOf[Actor]
Actor.selfs.put(t, a)
Scheduler.unPendReaction
+ a.isDetached = false
try {
if (f == null)
a.act()
@@ -86,4 +87,14 @@ private[actors] class Reaction(a: Actor,
Actor.selfs.put(t, saved)
}
}
+
+ private var runnable = false
+
+ def isRunnable = synchronized {
+ runnable
+ }
+
+ def setRunnable(on: boolean) = synchronized {
+ runnable = on
+ }
}
diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala
index 2f4c2827c6..b6696f2793 100644
--- a/src/actors/scala/actors/Scheduler.scala
+++ b/src/actors/scala/actors/Scheduler.scala
@@ -16,7 +16,8 @@ import compat.Platform
import java.lang.{Runnable, Thread, InterruptedException}
import java.util.logging.{Logger, FileHandler, Level}
-import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack}
+import scala.collection.Set
+import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, HashSet}
/**
* The <code>Scheduler</code> object is used by
@@ -42,16 +43,18 @@ object Scheduler {
sched = scheduler
}
- def execute(task: Reaction) = synchronized {
- sched.execute(task)
- }
-
+ def start(task: Reaction) = sched.start(task)
+ def execute(task: Reaction) = sched.execute(task)
def tick(a: Actor) = sched.tick(a)
-
- def shutdown(): unit = sched.shutdown()
-
+ def terminated(a: Actor) = sched.terminated(a)
def pendReaction: unit = sched.pendReaction
def unPendReaction: unit = sched.unPendReaction
+
+ def shutdown() = sched.shutdown()
+
+ def onLockup(handler: () => unit) = sched.onLockup(handler)
+ def onLockup(millis: int)(handler: () => unit) = sched.onLockup(millis)(handler)
+ def printActorDump = sched.printActorDump
}
/**
@@ -62,19 +65,24 @@ object Scheduler {
* @author Philipp Haller
*/
trait IScheduler {
+ def start(task: Reaction): unit
def execute(task: Reaction): unit
def getTask(worker: WorkerThread): Runnable
def tick(a: Actor): unit
+ def terminated(a: Actor): unit
+ def pendReaction: unit
+ def unPendReaction: unit
def shutdown(): unit
+ def onLockup(handler: () => unit): unit
+ def onLockup(millis: int)(handler: () => unit): unit
+ def printActorDump: unit
+
val QUIT_TASK = new Reaction(null) {
override def run(): unit = {}
override def toString() = "QUIT_TASK"
}
-
- def pendReaction: unit
- def unPendReaction: unit
}
/**
@@ -84,11 +92,12 @@ trait IScheduler {
*/
class TickedScheduler extends Thread with IScheduler {
private val tasks = new Queue[Reaction]
- private var workers: Buffer[WorkerThread] = new ArrayBuffer[WorkerThread]
+ // Worker threads
+ private val workers: Buffer[WorkerThread] = new ArrayBuffer[WorkerThread]
private val idle = new Queue[WorkerThread]
private val ticks = new HashMap[WorkerThread, long]
- private val executing = new HashMap[Actor, WorkerThread]
+ private val occupied = new HashMap[Actor, WorkerThread]
private var terminating = false
@@ -103,24 +112,110 @@ class TickedScheduler extends Thread with IScheduler {
pendingReactions.pop
}
- var TICKFREQ = 5
- var CHECKFREQ = 50
+ /*
+ * An actor is alive if it has been started and
+ * has not yet terminated.
+ */
+ private val alive = new HashSet[Actor]
+
+ def printActorDump {
+ var num = 0
+ for (val a <- alive.elements) {
+ Console.println("Actor `"+a.name+"' ("+num+"): "+a)
+ if (a.isDetached)
+ Console.println("Detached")
+ else {
+ val flag = if (isActive(a)) "ACTIVE" else "INACTIVE"
+ Console.println("Occupies thread: "+occupied.get(a)+" ["+flag+"]")
+ }
+
+ if (a.isDetached || a.isWaiting) {
+ // dump contents of mailbox
+ Console.println("Waiting with mailbox:")
+ a.in.printMailbox
+ }
- def init() = {
- for (val i <- List.range(0, 2)) {
- val worker = new WorkerThread(this)
- workers += worker
- worker.start()
+ Console.println
+ num = num + 1
}
}
- init()
+
+ def start(task: Reaction): unit = synchronized {
+ Debug.info("Starting " + task.actor)
+ alive += task.actor
+
+ // determine name of actor
+ val creator = Actor.self
+ if (creator.isInstanceOf[ActorProxy]) {
+ // created by Java thread
+ // only ok, if it is the main thread
+ val tname = currentThread.toString()
+ if (tname.indexOf("main") == -1) {
+ // print/log warning
+ Console.println("Warning: Some debugging features not available if actors are created by non-main Java threads.")
+ } else task.actor.name = creator.nextChildName
+ } else task.actor.name = creator.nextChildName
+
+ execute(task)
+ }
+
+ def terminated(a: Actor): unit =
+ alive -= a
+
+ private var TICK_FREQ = 5
+ private var CHECK_FREQ = 50
+
+ private var LOCKUP_CHECK_FREQ = 10 // 10 * CHECK_FREQ
+ private var lockupCnt = 0
+ private var stateChanged = false
+
+ for (val i <- List.range(0, 2)) {
+ val worker = new WorkerThread(this)
+ workers += worker
+ worker.start()
+ }
+
+ def onLockup(handler: () => unit) =
+ lockupHandler = handler
+
+ def onLockup(millis: int)(handler: () => unit) = {
+ LOCKUP_CHECK_FREQ = millis / CHECK_FREQ
+ lockupHandler = handler
+ }
+
+ private var lockupHandler: () => unit = null
+
+ def isActive(a: Actor): boolean = occupied.get(a) match {
+ case None =>
+ // thread outside of scheduler;
+ // error("No worker thread associated with actor " + a)
+ false
+ case Some(wt) => isActive(wt)
+ }
+
+ def isActive(wt: WorkerThread): boolean = ticks.get(wt) match {
+ case None => false
+ case Some(ts) =>
+ val currTime = Platform.currentTime
+ if (currTime - ts < TICK_FREQ) true
+ else false
+ }
override def run(): unit = {
try {
while (!terminating) {
this.synchronized {
try {
- wait(CHECKFREQ)
+ wait(CHECK_FREQ)
+
+ if (!stateChanged) lockupCnt = lockupCnt + 1
+ else stateChanged = false
+
+ if (lockupCnt == LOCKUP_CHECK_FREQ) {
+ lockupCnt = 0
+ if (lockupHandler != null)
+ lockupHandler()
+ }
} catch {
case _: InterruptedException =>
if (terminating) throw new QuitException
@@ -132,14 +227,7 @@ class TickedScheduler extends Thread with IScheduler {
var foundBusy = false
while (iter.hasNext && !foundBusy) {
val wt = iter.next
- ticks.get(wt) match {
- case None =>
- foundBusy = false
- case Some(ts) =>
- val currTime = Platform.currentTime
- if (currTime - ts < TICKFREQ)
- foundBusy = true
- }
+ foundBusy = isActive(wt)
}
if (!foundBusy) {
@@ -149,15 +237,15 @@ class TickedScheduler extends Thread with IScheduler {
// dequeue item to be processed
val item = tasks.dequeue
- executing.update(item.actor, newWorker)
+ occupied.update(item.actor, newWorker)
newWorker.execute(item)
newWorker.start()
+
+ stateChanged = true
}
} // tasks.length > 0
else {
- //Debug.info("task queue empty")
if (pendingReactions.isEmpty) {
- //Debug.info("no pending reactions")
// if all worker threads idle terminate
if (workers.length == idle.length) {
Debug.info("all threads idle, terminating")
@@ -186,14 +274,16 @@ class TickedScheduler extends Thread with IScheduler {
* @param item the task to be executed.
*/
def execute(item: Reaction): unit = synchronized {
- if (!terminating)
+ if (!terminating) {
if (idle.length > 0) {
val wt = idle.dequeue
- executing.update(item.actor, wt)
+ occupied.update(item.actor, wt)
wt.execute(item)
}
else
tasks += item
+ stateChanged = true
+ }
}
/**
@@ -203,9 +293,10 @@ class TickedScheduler extends Thread with IScheduler {
def getTask(worker: WorkerThread) = synchronized {
if (terminating)
QUIT_TASK
+ stateChanged = true
if (tasks.length > 0) {
val item = tasks.dequeue
- executing.update(item.actor, worker)
+ occupied.update(item.actor, worker)
item
}
else {
@@ -214,14 +305,12 @@ class TickedScheduler extends Thread with IScheduler {
}
}
- var ticksCnt = 0
-
/**
* @param a the actor
*/
def tick(a: Actor): unit = synchronized {
- ticksCnt = ticksCnt + 1
- executing.get(a) match {
+ stateChanged = true
+ occupied.get(a) match {
case None =>
// thread outside of scheduler;
// error("No worker thread associated with actor " + a)
@@ -269,6 +358,11 @@ class QuitException extends Throwable {
* @author Philipp Haller
*/
class SingleThreadedScheduler extends IScheduler {
+ def start(task: Reaction): unit = {
+ // execute task immediately on same thread
+ task.run()
+ }
+
def execute(task: Reaction): unit = {
// execute task immediately on same thread
task.run()
@@ -276,9 +370,15 @@ class SingleThreadedScheduler extends IScheduler {
def getTask(worker: WorkerThread): Runnable = null
def tick(a: Actor): Unit = {}
- def shutdown(): Unit = {}
+ def terminated(a: Actor): unit = {}
def pendReaction: unit = {}
def unPendReaction: unit = {}
+
+ def shutdown(): Unit = {}
+
+ def onLockup(handler: () => unit): unit = {}
+ def onLockup(millis: int)(handler: () => unit): unit = {}
+ def printActorDump: unit = {}
}
/**
@@ -288,7 +388,7 @@ class SingleThreadedScheduler extends IScheduler {
* @version 0.9.0
* @author Philipp Haller
*/
-class SpareWorkerScheduler extends IScheduler {
+abstract class SpareWorkerScheduler extends IScheduler {
private val tasks = new Queue[Reaction]
private val idle = new Queue[WorkerThread]
private var workers: Buffer[WorkerThread] = new ArrayBuffer[WorkerThread]