diff options
author | Philipp Haller <hallerp@gmail.com> | 2007-03-16 17:34:00 +0000 |
---|---|---|
committer | Philipp Haller <hallerp@gmail.com> | 2007-03-16 17:34:00 +0000 |
commit | 3ee224f431b2563391e148d2a954d52308b67192 (patch) | |
tree | 4bab1910217923da3c16d452e25f32b40d798586 | |
parent | bf280fbf451109de93b9920c3cd4b4f441738e7e (diff) | |
download | scala-3ee224f431b2563391e148d2a954d52308b67192.tar.gz scala-3ee224f431b2563391e148d2a954d52308b67192.tar.bz2 scala-3ee224f431b2563391e148d2a954d52308b67192.zip |
scala.actors: timer thread survives snapshot/re...
scala.actors: timer thread survives snapshot/restart of Scheduler.
-rw-r--r-- | src/actors/scala/actors/FJTaskScheduler2.scala | 14 | ||||
-rw-r--r-- | src/actors/scala/actors/Scheduler.scala | 4 | ||||
-rw-r--r-- | src/actors/scala/actors/ThreadPoolScheduler.scala | 4 | ||||
-rw-r--r-- | src/actors/scala/actors/TickedScheduler.scala | 4 | ||||
-rw-r--r-- | src/actors/scala/actors/TimerThread.scala | 115 |
5 files changed, 83 insertions, 58 deletions
diff --git a/src/actors/scala/actors/FJTaskScheduler2.scala b/src/actors/scala/actors/FJTaskScheduler2.scala index 6fed4bdd3a..e8e4ac0e14 100644 --- a/src/actors/scala/actors/FJTaskScheduler2.scala +++ b/src/actors/scala/actors/FJTaskScheduler2.scala @@ -43,13 +43,23 @@ class FJTaskScheduler2 extends Thread with IScheduler { private var submittedTasks = 0 private var pendingReactions = 0 + def pendReaction: unit = synchronized { pendingReactions = pendingReactions + 1 } + def unPendReaction: unit = synchronized { pendingReactions = pendingReactions - 1 } + def getPendingCount = synchronized { + pendingReactions + } + + def setPendingCount(cnt: int) = synchronized { + pendingReactions = cnt + } + def printActorDump {} def terminated(a: Actor) {} @@ -94,7 +104,7 @@ class FJTaskScheduler2 extends Thread with IScheduler { // and FJTaskRunner threads have daemon status. // terminate timer thread - TimerThread.t.interrupt() + TimerThread.shutdown() throw new QuitException } } @@ -146,7 +156,7 @@ class FJTaskScheduler2 extends Thread with IScheduler { def shutdown(): unit = synchronized { terminating = true // terminate timer thread - TimerThread.t.interrupt() + TimerThread.shutdown() } def snapshot(): LinkedQueue = synchronized { diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala index b259817f04..17b6beeb94 100644 --- a/src/actors/scala/actors/Scheduler.scala +++ b/src/actors/scala/actors/Scheduler.scala @@ -39,18 +39,22 @@ object Scheduler { } var tasks: LinkedQueue = null + var pendingCount = 0 def snapshot(): unit = synchronized { tasks = sched.snapshot() + pendingCount = sched.asInstanceOf[FJTaskScheduler2].getPendingCount sched.shutdown() } def restart(): unit = synchronized { sched = { var s: IScheduler = new FJTaskScheduler2 + s.asInstanceOf[FJTaskScheduler2].setPendingCount(pendingCount) s.start() s } + TimerThread.restart() while (!tasks.isEmpty()) { sched.execute(tasks.take().asInstanceOf[FJTask]) } diff --git a/src/actors/scala/actors/ThreadPoolScheduler.scala b/src/actors/scala/actors/ThreadPoolScheduler.scala index caa18ecbdc..0722673da5 100644 --- a/src/actors/scala/actors/ThreadPoolScheduler.scala +++ b/src/actors/scala/actors/ThreadPoolScheduler.scala @@ -134,7 +134,7 @@ class ThreadPoolScheduler extends Thread with IScheduler { if (executor.getActiveCount() == 0) { executor.shutdown() // terminate timer thread - TimerThread.t.interrupt() + TimerThread.shutdown() throw new QuitException } } @@ -179,6 +179,6 @@ class ThreadPoolScheduler extends Thread with IScheduler { terminating = true executor.shutdown() // terminate timer thread - TimerThread.t.interrupt() + TimerThread.shutdown() } } diff --git a/src/actors/scala/actors/TickedScheduler.scala b/src/actors/scala/actors/TickedScheduler.scala index e17c1232c9..c4da9bd298 100644 --- a/src/actors/scala/actors/TickedScheduler.scala +++ b/src/actors/scala/actors/TickedScheduler.scala @@ -99,7 +99,7 @@ class TickedScheduler extends Thread with IScheduler { worker.interrupt() } // terminate timer thread - TimerThread.t.interrupt() + TimerThread.shutdown() throw new QuitException } } @@ -166,6 +166,6 @@ class TickedScheduler extends Thread with IScheduler { worker.interrupt() } // terminate timer thread - TimerThread.t.interrupt() + TimerThread.shutdown() } } diff --git a/src/actors/scala/actors/TimerThread.scala b/src/actors/scala/actors/TimerThread.scala index 397c489414..7f355761fc 100644 --- a/src/actors/scala/actors/TimerThread.scala +++ b/src/actors/scala/actors/TimerThread.scala @@ -21,72 +21,69 @@ import scala.collection.mutable.PriorityQueue * Note that the library deletes non-received <code>TIMEOUT</code> message if a * message is received before the time-out occurs. * - * @version 0.9.4 + * @version 0.9.5 * @author Sebastien Noir, Philipp Haller */ -object TimerThread extends AnyRef with Runnable { +object TimerThread { - case class WakedActor(actor: Actor, f: PartialFunction[Any, Unit], time: long) - extends Ordered[WakedActor] { + private case class WakedActor(actor: Actor, f: PartialFunction[Any, Unit], time: long) + extends Ordered[WakedActor] { var valid = true def compare(that: WakedActor): int = -(this.time compare that.time) } - var queue = new PriorityQueue[WakedActor] - val t = new Thread(this); t.start - - var lateList: List[WakedActor] = Nil + private var queue = new PriorityQueue[WakedActor] + private var lateList: List[WakedActor] = Nil + + private val timerTask = new Runnable { + override def run = { + try { + while(true) { + timerThread.synchronized { + try { + val sleepTime = dequeueLateAndGetSleepTime + if (lateList.isEmpty) timerThread.wait(sleepTime) + } catch { + case t: Throwable => { throw t } + } + } - /** - * @param a ... - */ - def trashRequest(a: Actor) = synchronized { - // keep in mind: killing dead people is a bad idea! - queue.elements.find((wa: WakedActor) => wa.actor == a && wa.valid) match { - case Some(b) => - b.valid = false - case None => - lateList.find((wa2: WakedActor) => wa2.actor == a && wa2.valid) match { - case Some(b2) => - b2.valid = false - case None => + // process guys waiting for signal and empty list + for (val wa <- lateList) { + if (wa.valid) { + wa.actor ! TIMEOUT + } + } + lateList = Nil } + } catch { + case consumed: InterruptedException => + // allow thread to quit + } } } - override def run = { - try { - while(true) { - this.synchronized { - try { - val sleepTime = dequeueLateAndGetSleepTime - if (lateList.isEmpty) wait(sleepTime) - } catch { - case t: Throwable => { throw t } - } - } + private var timerThread: Thread = { + val t = new Thread(timerTask) + t.start() + t + } - // process guys waiting for signal and empty list - for (val wa <- lateList) { - if (wa.valid) { - wa.actor ! TIMEOUT - } - } - lateList = Nil - } - } catch { - case consumed: InterruptedException => - // allow thread to quit + def shutdown() { + timerThread.interrupt() + } + + def restart() { + timerThread = { + val t = new Thread(timerTask) + t.start() + t } } - /** - * @param a ... - * @param f ... - * @param waitMillis ... - */ - def requestTimeout(a: Actor, f: PartialFunction[Any, Unit], waitMillis: long): unit = synchronized { + def requestTimeout(a: Actor, f: PartialFunction[Any, Unit], + waitMillis: long): unit = timerThread.synchronized { val wakeTime = now + waitMillis if (waitMillis <= 0) { a ! TIMEOUT @@ -95,16 +92,30 @@ object TimerThread extends AnyRef with Runnable { if (queue.isEmpty) { // add to queue and restart sleeping queue += WakedActor(a, f, wakeTime) - notify() + timerThread.notify() } else if (queue.max.time > wakeTime) { // add to 1st position and restart sleeping queue += WakedActor (a, f, wakeTime) - notify() + timerThread.notify() } else // simply add to queue queue += WakedActor (a, f, wakeTime) } + def trashRequest(a: Actor) = timerThread.synchronized { + // keep in mind: killing dead people is a bad idea! + queue.elements.find((wa: WakedActor) => wa.actor == a && wa.valid) match { + case Some(b) => + b.valid = false + case None => + lateList.find((wa2: WakedActor) => wa2.actor == a && wa2.valid) match { + case Some(b2) => + b2.valid = false + case None => + } + } + } + private def dequeueLateAndGetSleepTime: long = { val FOREVER: long = 0 var waitingList: List[WakedActor] = Nil @@ -125,5 +136,5 @@ object TimerThread extends AnyRef with Runnable { return FOREVER } - def now = Platform.currentTime + private def now = Platform.currentTime } |