summaryrefslogtreecommitdiff
path: root/src/actors
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2006-11-08 21:11:33 +0000
committerPhilipp Haller <hallerp@gmail.com>2006-11-08 21:11:33 +0000
commitf18a26d8b9bf51d1f25035558988a744f320a527 (patch)
treef1ac2dd01c1e40e94a944dde6aba5421afd03988 /src/actors
parent939774370edfca1b44e21243b4fbfee55fa17b5e (diff)
downloadscala-f18a26d8b9bf51d1f25035558988a744f320a527.tar.gz
scala-f18a26d8b9bf51d1f25035558988a744f320a527.tar.bz2
scala-f18a26d8b9bf51d1f25035558988a744f320a527.zip
added pending reactions to prevent scheduler fr...
added pending reactions to prevent scheduler from terminating early
Diffstat (limited to 'src/actors')
-rw-r--r--src/actors/scala/actors/Actor.scala4
-rw-r--r--src/actors/scala/actors/Channel.scala2
-rw-r--r--src/actors/scala/actors/Reaction.scala2
-rw-r--r--src/actors/scala/actors/Scheduler.scala46
4 files changed, 40 insertions, 14 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala
index 712fb41d85..dcf5b64f4a 100644
--- a/src/actors/scala/actors/Actor.scala
+++ b/src/actors/scala/actors/Actor.scala
@@ -195,6 +195,7 @@ object Actor {
}
s.detachActor = f => {
s.in.waitingFor = s.in.waitingForNone
+ Scheduler.unPendReaction
throw new SuspendActorException
}
@@ -233,7 +234,8 @@ object Actor {
*/
def seq[a, b >: a](first: => a, next: => b): b = {
val s = self
- s.kill = () => { next; s.kill() }
+ val killNext = s.kill
+ s.kill = () => { s.kill = killNext; next; s.kill() }
first
}
diff --git a/src/actors/scala/actors/Channel.scala b/src/actors/scala/actors/Channel.scala
index 3ec944846d..fa474f3fc1 100644
--- a/src/actors/scala/actors/Channel.scala
+++ b/src/actors/scala/actors/Channel.scala
@@ -263,6 +263,7 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] {
*/
def react(f: PartialFunction[Any, Unit]): Nothing = {
assert(Actor.self == receiver, "react on channel belonging to other actor")
+ Scheduler.pendReaction
receiver.synchronized {
receiver.tick()
waitingFor = f.isDefinedAt
@@ -304,6 +305,7 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] {
*/
def reactWithin(msec: long)(f: PartialFunction[Any, Unit]): Nothing = {
assert(Actor.self == receiver, "react on channel belonging to other actor")
+ Scheduler.pendReaction
receiver.synchronized {
receiver.tick()
waitingFor = f.isDefinedAt
diff --git a/src/actors/scala/actors/Reaction.scala b/src/actors/scala/actors/Reaction.scala
index 79e54a52d7..ee25c73e3a 100644
--- a/src/actors/scala/actors/Reaction.scala
+++ b/src/actors/scala/actors/Reaction.scala
@@ -43,6 +43,7 @@ private[actors] class StartTask(a: Actor) extends Reaction {
val t = currentThread
val saved = Actor.selfs.get(t).asInstanceOf[Actor]
Actor.selfs.put(t, a)
+ Scheduler.unPendReaction
try {
a.act()
if (currentThread.isInterrupted())
@@ -89,6 +90,7 @@ private[actors] class ActorTask(a: Actor,
val t = currentThread
val saved = Actor.selfs.get(t).asInstanceOf[Actor]
Actor.selfs.put(t, a)
+ Scheduler.unPendReaction
try {
f(msg)
if (currentThread.isInterrupted())
diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala
index a206a7899b..ad2050c871 100644
--- a/src/actors/scala/actors/Scheduler.scala
+++ b/src/actors/scala/actors/Scheduler.scala
@@ -15,7 +15,7 @@ import java.lang.{Runnable, Thread}
import java.lang.InterruptedException
import compat.Platform
-import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue}
+import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack}
/**
* The <code>Scheduler</code> object is used by
@@ -46,6 +46,9 @@ object Scheduler {
def tick(a: Actor) = sched.tick(a)
def shutdown(): Unit = sched.shutdown()
+
+ def pendReaction: unit = sched.pendReaction
+ def unPendReaction: unit = sched.unPendReaction
}
/**
@@ -67,6 +70,9 @@ trait IScheduler {
def run(): Unit = {}
override def toString() = "QUIT_TASK"
}
+
+ def pendReaction: unit
+ def unPendReaction: unit
}
/**
@@ -76,7 +82,7 @@ trait IScheduler {
* @version Beta2
* @author Philipp Haller
*/
-class SingleThreadedScheduler extends IScheduler {
+abstract class SingleThreadedScheduler extends IScheduler {
def execute(task: Reaction): Unit = {
// execute task immediately on same thread
task.run()
@@ -96,7 +102,7 @@ class SingleThreadedScheduler extends IScheduler {
* @version Beta2
* @author Philipp Haller
*/
-class SpareWorkerScheduler extends IScheduler {
+abstract class SpareWorkerScheduler extends IScheduler {
private val tasks = new Queue[Reaction]
private val idle = new Queue[WorkerThread]
private var workers: Buffer[WorkerThread] = new ArrayBuffer[WorkerThread]
@@ -170,6 +176,17 @@ class TickedScheduler extends Thread with IScheduler {
private var terminating = false
+ private var pendingReactions = new Stack[unit]
+ def pendReaction: unit = {
+ Debug.info("pend reaction")
+ pendingReactions push ()
+ }
+ def unPendReaction: unit = {
+ Debug.info("unpend reaction")
+ if (!pendingReactions.isEmpty)
+ pendingReactions.pop
+ }
+
var TICKFREQ = 5
var CHECKFREQ = 50
@@ -222,17 +239,20 @@ class TickedScheduler extends Thread with IScheduler {
}
} // tasks.length > 0
else {
- Debug.info("task queue empty, checking...")
- // if all worker threads idle terminate
- if (workers.length == idle.length) {
- Debug.info("all threads idle, terminating")
- val idleThreads = idle.elements
- while (idleThreads.hasNext) {
- val worker = idleThreads.next
- worker.running = false
- worker.interrupt()
+ Debug.info("task queue empty")
+ if (pendingReactions.isEmpty) {
+ Debug.info("no pending reactions")
+ // if all worker threads idle terminate
+ if (workers.length == idle.length) {
+ Debug.info("all threads idle, terminating")
+ val idleThreads = idle.elements
+ while (idleThreads.hasNext) {
+ val worker = idleThreads.next
+ worker.running = false
+ worker.interrupt()
+ }
+ throw new QuitException
}
- throw new QuitException
}
}
} // sync