summaryrefslogtreecommitdiff
path: root/src/actors
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2007-03-16 17:34:00 +0000
committerPhilipp Haller <hallerp@gmail.com>2007-03-16 17:34:00 +0000
commit3ee224f431b2563391e148d2a954d52308b67192 (patch)
tree4bab1910217923da3c16d452e25f32b40d798586 /src/actors
parentbf280fbf451109de93b9920c3cd4b4f441738e7e (diff)
downloadscala-3ee224f431b2563391e148d2a954d52308b67192.tar.gz
scala-3ee224f431b2563391e148d2a954d52308b67192.tar.bz2
scala-3ee224f431b2563391e148d2a954d52308b67192.zip
scala.actors: timer thread survives snapshot/re...
scala.actors: timer thread survives snapshot/restart of Scheduler.
Diffstat (limited to 'src/actors')
-rw-r--r--src/actors/scala/actors/FJTaskScheduler2.scala14
-rw-r--r--src/actors/scala/actors/Scheduler.scala4
-rw-r--r--src/actors/scala/actors/ThreadPoolScheduler.scala4
-rw-r--r--src/actors/scala/actors/TickedScheduler.scala4
-rw-r--r--src/actors/scala/actors/TimerThread.scala115
5 files changed, 83 insertions, 58 deletions
diff --git a/src/actors/scala/actors/FJTaskScheduler2.scala b/src/actors/scala/actors/FJTaskScheduler2.scala
index 6fed4bdd3a..e8e4ac0e14 100644
--- a/src/actors/scala/actors/FJTaskScheduler2.scala
+++ b/src/actors/scala/actors/FJTaskScheduler2.scala
@@ -43,13 +43,23 @@ class FJTaskScheduler2 extends Thread with IScheduler {
private var submittedTasks = 0
private var pendingReactions = 0
+
def pendReaction: unit = synchronized {
pendingReactions = pendingReactions + 1
}
+
def unPendReaction: unit = synchronized {
pendingReactions = pendingReactions - 1
}
+ def getPendingCount = synchronized {
+ pendingReactions
+ }
+
+ def setPendingCount(cnt: int) = synchronized {
+ pendingReactions = cnt
+ }
+
def printActorDump {}
def terminated(a: Actor) {}
@@ -94,7 +104,7 @@ class FJTaskScheduler2 extends Thread with IScheduler {
// and FJTaskRunner threads have daemon status.
// terminate timer thread
- TimerThread.t.interrupt()
+ TimerThread.shutdown()
throw new QuitException
}
}
@@ -146,7 +156,7 @@ class FJTaskScheduler2 extends Thread with IScheduler {
def shutdown(): unit = synchronized {
terminating = true
// terminate timer thread
- TimerThread.t.interrupt()
+ TimerThread.shutdown()
}
def snapshot(): LinkedQueue = synchronized {
diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala
index b259817f04..17b6beeb94 100644
--- a/src/actors/scala/actors/Scheduler.scala
+++ b/src/actors/scala/actors/Scheduler.scala
@@ -39,18 +39,22 @@ object Scheduler {
}
var tasks: LinkedQueue = null
+ var pendingCount = 0
def snapshot(): unit = synchronized {
tasks = sched.snapshot()
+ pendingCount = sched.asInstanceOf[FJTaskScheduler2].getPendingCount
sched.shutdown()
}
def restart(): unit = synchronized {
sched = {
var s: IScheduler = new FJTaskScheduler2
+ s.asInstanceOf[FJTaskScheduler2].setPendingCount(pendingCount)
s.start()
s
}
+ TimerThread.restart()
while (!tasks.isEmpty()) {
sched.execute(tasks.take().asInstanceOf[FJTask])
}
diff --git a/src/actors/scala/actors/ThreadPoolScheduler.scala b/src/actors/scala/actors/ThreadPoolScheduler.scala
index caa18ecbdc..0722673da5 100644
--- a/src/actors/scala/actors/ThreadPoolScheduler.scala
+++ b/src/actors/scala/actors/ThreadPoolScheduler.scala
@@ -134,7 +134,7 @@ class ThreadPoolScheduler extends Thread with IScheduler {
if (executor.getActiveCount() == 0) {
executor.shutdown()
// terminate timer thread
- TimerThread.t.interrupt()
+ TimerThread.shutdown()
throw new QuitException
}
}
@@ -179,6 +179,6 @@ class ThreadPoolScheduler extends Thread with IScheduler {
terminating = true
executor.shutdown()
// terminate timer thread
- TimerThread.t.interrupt()
+ TimerThread.shutdown()
}
}
diff --git a/src/actors/scala/actors/TickedScheduler.scala b/src/actors/scala/actors/TickedScheduler.scala
index e17c1232c9..c4da9bd298 100644
--- a/src/actors/scala/actors/TickedScheduler.scala
+++ b/src/actors/scala/actors/TickedScheduler.scala
@@ -99,7 +99,7 @@ class TickedScheduler extends Thread with IScheduler {
worker.interrupt()
}
// terminate timer thread
- TimerThread.t.interrupt()
+ TimerThread.shutdown()
throw new QuitException
}
}
@@ -166,6 +166,6 @@ class TickedScheduler extends Thread with IScheduler {
worker.interrupt()
}
// terminate timer thread
- TimerThread.t.interrupt()
+ TimerThread.shutdown()
}
}
diff --git a/src/actors/scala/actors/TimerThread.scala b/src/actors/scala/actors/TimerThread.scala
index 397c489414..7f355761fc 100644
--- a/src/actors/scala/actors/TimerThread.scala
+++ b/src/actors/scala/actors/TimerThread.scala
@@ -21,72 +21,69 @@ import scala.collection.mutable.PriorityQueue
* Note that the library deletes non-received <code>TIMEOUT</code> message if a
* message is received before the time-out occurs.
*
- * @version 0.9.4
+ * @version 0.9.5
* @author Sebastien Noir, Philipp Haller
*/
-object TimerThread extends AnyRef with Runnable {
+object TimerThread {
- case class WakedActor(actor: Actor, f: PartialFunction[Any, Unit], time: long)
- extends Ordered[WakedActor] {
+ private case class WakedActor(actor: Actor, f: PartialFunction[Any, Unit], time: long)
+ extends Ordered[WakedActor] {
var valid = true
def compare(that: WakedActor): int = -(this.time compare that.time)
}
- var queue = new PriorityQueue[WakedActor]
- val t = new Thread(this); t.start
-
- var lateList: List[WakedActor] = Nil
+ private var queue = new PriorityQueue[WakedActor]
+ private var lateList: List[WakedActor] = Nil
+
+ private val timerTask = new Runnable {
+ override def run = {
+ try {
+ while(true) {
+ timerThread.synchronized {
+ try {
+ val sleepTime = dequeueLateAndGetSleepTime
+ if (lateList.isEmpty) timerThread.wait(sleepTime)
+ } catch {
+ case t: Throwable => { throw t }
+ }
+ }
- /**
- * @param a ...
- */
- def trashRequest(a: Actor) = synchronized {
- // keep in mind: killing dead people is a bad idea!
- queue.elements.find((wa: WakedActor) => wa.actor == a && wa.valid) match {
- case Some(b) =>
- b.valid = false
- case None =>
- lateList.find((wa2: WakedActor) => wa2.actor == a && wa2.valid) match {
- case Some(b2) =>
- b2.valid = false
- case None =>
+ // process guys waiting for signal and empty list
+ for (val wa <- lateList) {
+ if (wa.valid) {
+ wa.actor ! TIMEOUT
+ }
+ }
+ lateList = Nil
}
+ } catch {
+ case consumed: InterruptedException =>
+ // allow thread to quit
+ }
}
}
- override def run = {
- try {
- while(true) {
- this.synchronized {
- try {
- val sleepTime = dequeueLateAndGetSleepTime
- if (lateList.isEmpty) wait(sleepTime)
- } catch {
- case t: Throwable => { throw t }
- }
- }
+ private var timerThread: Thread = {
+ val t = new Thread(timerTask)
+ t.start()
+ t
+ }
- // process guys waiting for signal and empty list
- for (val wa <- lateList) {
- if (wa.valid) {
- wa.actor ! TIMEOUT
- }
- }
- lateList = Nil
- }
- } catch {
- case consumed: InterruptedException =>
- // allow thread to quit
+ def shutdown() {
+ timerThread.interrupt()
+ }
+
+ def restart() {
+ timerThread = {
+ val t = new Thread(timerTask)
+ t.start()
+ t
}
}
- /**
- * @param a ...
- * @param f ...
- * @param waitMillis ...
- */
- def requestTimeout(a: Actor, f: PartialFunction[Any, Unit], waitMillis: long): unit = synchronized {
+ def requestTimeout(a: Actor, f: PartialFunction[Any, Unit],
+ waitMillis: long): unit = timerThread.synchronized {
val wakeTime = now + waitMillis
if (waitMillis <= 0) {
a ! TIMEOUT
@@ -95,16 +92,30 @@ object TimerThread extends AnyRef with Runnable {
if (queue.isEmpty) { // add to queue and restart sleeping
queue += WakedActor(a, f, wakeTime)
- notify()
+ timerThread.notify()
} else
if (queue.max.time > wakeTime) { // add to 1st position and restart sleeping
queue += WakedActor (a, f, wakeTime)
- notify()
+ timerThread.notify()
}
else // simply add to queue
queue += WakedActor (a, f, wakeTime)
}
+ def trashRequest(a: Actor) = timerThread.synchronized {
+ // keep in mind: killing dead people is a bad idea!
+ queue.elements.find((wa: WakedActor) => wa.actor == a && wa.valid) match {
+ case Some(b) =>
+ b.valid = false
+ case None =>
+ lateList.find((wa2: WakedActor) => wa2.actor == a && wa2.valid) match {
+ case Some(b2) =>
+ b2.valid = false
+ case None =>
+ }
+ }
+ }
+
private def dequeueLateAndGetSleepTime: long = {
val FOREVER: long = 0
var waitingList: List[WakedActor] = Nil
@@ -125,5 +136,5 @@ object TimerThread extends AnyRef with Runnable {
return FOREVER
}
- def now = Platform.currentTime
+ private def now = Platform.currentTime
}