diff options
author | Philipp Haller <hallerp@gmail.com> | 2008-09-29 09:28:09 +0000 |
---|---|---|
committer | Philipp Haller <hallerp@gmail.com> | 2008-09-29 09:28:09 +0000 |
commit | ee9e91a10702300bb2ab282ca36c9bebf43d62f1 (patch) | |
tree | 64a971d854afc99982a1958204efa55aa76bca73 /src/actors | |
parent | 184383a519ab637530899ffd446a30c919e4a991 (diff) | |
download | scala-ee9e91a10702300bb2ab282ca36c9bebf43d62f1.tar.gz scala-ee9e91a10702300bb2ab282ca36c9bebf43d62f1.tar.bz2 scala-ee9e91a10702300bb2ab282ca36c9bebf43d62f1.zip |
Replaced TimerThread with java.util.Timer.
Diffstat (limited to 'src/actors')
-rw-r--r-- | src/actors/scala/actors/Actor.scala | 21 | ||||
-rw-r--r-- | src/actors/scala/actors/FJTaskScheduler2.scala | 4 | ||||
-rw-r--r-- | src/actors/scala/actors/Future.scala | 5 | ||||
-rw-r--r-- | src/actors/scala/actors/Scheduler.scala | 2 | ||||
-rw-r--r-- | src/actors/scala/actors/TickedScheduler.scala | 4 | ||||
-rw-r--r-- | src/actors/scala/actors/TimerThread.scala | 141 |
6 files changed, 24 insertions, 153 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala index be25478fb4..384bfadc1a 100644 --- a/src/actors/scala/actors/Actor.scala +++ b/src/actors/scala/actors/Actor.scala @@ -13,6 +13,8 @@ package scala.actors import scala.collection.mutable.{HashSet, Queue} import scala.compat.Platform +import java.util.{Timer, TimerTask} + /** * The <code>Actor</code> object provides functions for the definition of * actors, as well as actor operations, such as @@ -26,6 +28,8 @@ object Actor { private[actors] val tl = new ThreadLocal[Actor] + private[actors] var timer = new Timer + /** * Returns the currently executing actor. Should be used instead * of <code>this</code> in all blocks of code executed by @@ -396,9 +400,9 @@ trait Actor extends AbstractActor { waitingFor = waitingForNone - if (timeoutPending) { - timeoutPending = false - TimerThread.trashRequest(this) + if (!onTimeout.isEmpty) { + onTimeout.get.cancel() + onTimeout = None } if (isSuspended) @@ -546,8 +550,13 @@ trait Actor extends AbstractActor { } else { waitingFor = f.isDefinedAt - TimerThread.requestTimeout(this, f, msec) - timeoutPending = true + + val thisActor = this + onTimeout = Some(new TimerTask { + def run() { thisActor ! TIMEOUT } + }) + Actor.timer.schedule(onTimeout.get, msec) + continuation = f isDetached = true } @@ -697,7 +706,7 @@ trait Actor extends AbstractActor { def receiver: Actor = this private var continuation: PartialFunction[Any, Unit] = null - private var timeoutPending = false + private var onTimeout: Option[TimerTask] = None // accessed in Reaction private[actors] var isDetached = false private var isWaiting = false diff --git a/src/actors/scala/actors/FJTaskScheduler2.scala b/src/actors/scala/actors/FJTaskScheduler2.scala index e7d1d7656d..54c0f9dc28 100644 --- a/src/actors/scala/actors/FJTaskScheduler2.scala +++ b/src/actors/scala/actors/FJTaskScheduler2.scala @@ -122,7 +122,7 @@ class FJTaskScheduler2 extends Thread with IScheduler { // and FJTaskRunner threads have daemon status. // terminate timer thread - TimerThread.shutdown() + Actor.timer.cancel() throw new QuitException } } @@ -161,7 +161,7 @@ class FJTaskScheduler2 extends Thread with IScheduler { def shutdown(): Unit = synchronized { terminating = true // terminate timer thread - TimerThread.shutdown() + Actor.timer.cancel() } def snapshot(): LinkedQueue = { diff --git a/src/actors/scala/actors/Future.scala b/src/actors/scala/actors/Future.scala index d796c15dcd..9347131ad9 100644 --- a/src/actors/scala/actors/Future.scala +++ b/src/actors/scala/actors/Future.scala @@ -72,7 +72,10 @@ object Futures { * </p> */ def awaitAll(timeout: Long, fts: Future[Any]*): List[Option[Any]] = { - TimerThread.requestTimeout(Actor.self, null, timeout) + val thisActor = Actor.self + Actor.timer.schedule(new java.util.TimerTask { + def run() { thisActor ! TIMEOUT } + }, timeout) var resultsMap: collection.mutable.Map[Int, Option[Any]] = new collection.mutable.HashMap[Int, Option[Any]] diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala index 12429e83e1..764236f9d8 100644 --- a/src/actors/scala/actors/Scheduler.scala +++ b/src/actors/scala/actors/Scheduler.scala @@ -62,7 +62,7 @@ object Scheduler extends IScheduler { s.start() s } - TimerThread.restart() + Actor.timer = new java.util.Timer while (!tasks.isEmpty()) { sched.execute(tasks.take().asInstanceOf[FJTask]) } diff --git a/src/actors/scala/actors/TickedScheduler.scala b/src/actors/scala/actors/TickedScheduler.scala index 3ce1fab731..69b4eaa050 100644 --- a/src/actors/scala/actors/TickedScheduler.scala +++ b/src/actors/scala/actors/TickedScheduler.scala @@ -102,7 +102,7 @@ class TickedScheduler extends Thread with WorkerThreadScheduler { worker.interrupt() } // terminate timer thread - TimerThread.shutdown() + Actor.timer.cancel() throw new QuitException } } @@ -170,6 +170,6 @@ class TickedScheduler extends Thread with WorkerThreadScheduler { worker.interrupt() } // terminate timer thread - TimerThread.shutdown() + Actor.timer.cancel() } } diff --git a/src/actors/scala/actors/TimerThread.scala b/src/actors/scala/actors/TimerThread.scala deleted file mode 100644 index 6a4abccd23..0000000000 --- a/src/actors/scala/actors/TimerThread.scala +++ /dev/null @@ -1,141 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2007, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -// $Id$ - - -package scala.actors - -import java.lang.{InterruptedException, Runnable, Thread} - -import scala.collection.mutable.PriorityQueue -import scala.compat.Platform - -/** - * This class allows the (local) sending of a message to an actor after - * a timeout. Used by the library to build <code>receiveWithin(time: long)</code>. - * Note that the library deletes non-received <code>TIMEOUT</code> message if a - * message is received before the time-out occurs. - * - * @version 0.9.8 - * @author Sebastien Noir, Philipp Haller - */ - -object TimerThread { - - private case class WakedActor(actor: Actor, f: PartialFunction[Any, Unit], time: Long) - extends Ordered[WakedActor] { - var valid = true - def compare(that: WakedActor): Int = -(this.time compare that.time) - } - - private var queue = new PriorityQueue[WakedActor] - private var lateList: List[WakedActor] = Nil - - private val timerTask = new Runnable { - override def run = { - try { - while(true) { - timerThread.synchronized { - try { - val sleepTime = dequeueLateAndGetSleepTime - if (lateList.isEmpty) timerThread.wait(sleepTime) - } catch { - case t: Throwable => { throw t } - } - } - - // process guys waiting for signal and empty list - for (wa <- lateList) { - if (wa.valid) { - wa.actor ! TIMEOUT - } - } - lateList = Nil - } - } catch { - case consumed: InterruptedException => - // allow thread to quit - } - } - } - - private var timerThread: Thread = { - val t = new Thread(timerTask) - t.start() - t - } - - def shutdown() { - timerThread.interrupt() - } - - def restart() { - timerThread = { - val t = new Thread(timerTask) - t.start() - t - } - } - - def requestTimeout(a: Actor, f: PartialFunction[Any, Unit], - waitMillis: Long): Unit = timerThread.synchronized { - val wakeTime = now + waitMillis - if (waitMillis <= 0) { - a ! TIMEOUT - return - } - - if (queue.isEmpty) { // add to queue and restart sleeping - queue += WakedActor(a, f, wakeTime) - timerThread.notify() - } else - if (queue.max.time > wakeTime) { // add to 1st position and restart sleeping - queue += WakedActor (a, f, wakeTime) - timerThread.notify() - } - else // simply add to queue - queue += WakedActor (a, f, wakeTime) - } - - def trashRequest(a: Actor) = timerThread.synchronized { - // keep in mind: killing dead people is a bad idea! - queue.elements.find((wa: WakedActor) => wa.actor == a && wa.valid) match { - case Some(b) => - b.valid = false - case None => - lateList.find((wa2: WakedActor) => wa2.actor == a && wa2.valid) match { - case Some(b2) => - b2.valid = false - case None => - } - } - } - - private def dequeueLateAndGetSleepTime: Long = { - val FOREVER: Long = 0 - var waitingList: List[WakedActor] = Nil - - while (!queue.isEmpty) { - val next = queue.max.time - val amount = next - now - if (amount > 0) { // guy in queue is not late - lateList = waitingList // give back the list of waiting guys for signaling - return amount - } - else // we're late: dequeue and examine next guy - waitingList = queue.dequeue :: waitingList - } - - // empty queue => sleep forever - lateList = waitingList - FOREVER - } - - private def now = Platform.currentTime -} |