diff options
author | Philipp Haller <hallerp@gmail.com> | 2007-01-17 15:08:21 +0000 |
---|---|---|
committer | Philipp Haller <hallerp@gmail.com> | 2007-01-17 15:08:21 +0000 |
commit | 0d6d353412537b3b51ee4b48ad0bbbc54ed0c01b (patch) | |
tree | fb62b4333d8329b56f3e51b94ee1fc0abcd19fb4 /src/actors | |
parent | 81d2963d4c9f959429ec2e38f4928ec62db9c410 (diff) | |
download | scala-0d6d353412537b3b51ee4b48ad0bbbc54ed0c01b.tar.gz scala-0d6d353412537b3b51ee4b48ad0bbbc54ed0c01b.tar.bz2 scala-0d6d353412537b3b51ee4b48ad0bbbc54ed0c01b.zip |
Added faster non-debugging version of basic sch...
Added faster non-debugging version of basic scheduler. Fixed bug in
termination detection.
Diffstat (limited to 'src/actors')
-rw-r--r-- | src/actors/scala/actors/Actor.scala | 5 | ||||
-rw-r--r-- | src/actors/scala/actors/Scheduler.scala | 2 | ||||
-rw-r--r-- | src/actors/scala/actors/TickedScheduler.scala | 172 |
3 files changed, 176 insertions, 3 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala index 4444378a5b..c2c9dd738f 100644 --- a/src/actors/scala/actors/Actor.scala +++ b/src/actors/scala/actors/Actor.scala @@ -548,7 +548,6 @@ trait Actor extends OutputChannel[Any] { if (!fromExc) throw new ExitSuspendLoop } } catch { case _: ExitSuspendLoop => } - Debug.info("leaving suspendActorFor("+msec+")") } resumeActor = () => { @@ -566,8 +565,10 @@ trait Actor extends OutputChannel[Any] { /** * Starts this actor. */ - def start(): Unit = + def start(): Unit = { + Scheduler.pendReaction Scheduler start new Reaction(this) + } private val links = new HashSet[Actor] diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala index 1e78798337..83d5202974 100644 --- a/src/actors/scala/actors/Scheduler.scala +++ b/src/actors/scala/actors/Scheduler.scala @@ -90,7 +90,7 @@ trait IScheduler { * * @author Philipp Haller */ -class TickedScheduler extends Thread with IScheduler { +class TickedDebugScheduler extends Thread with IScheduler { private val tasks = new Queue[Reaction] // Worker threads diff --git a/src/actors/scala/actors/TickedScheduler.scala b/src/actors/scala/actors/TickedScheduler.scala new file mode 100644 index 0000000000..2806a420f6 --- /dev/null +++ b/src/actors/scala/actors/TickedScheduler.scala @@ -0,0 +1,172 @@ + +package scala.actors + +import compat.Platform + +import java.lang.{Runnable, Thread, InterruptedException} + +import scala.collection.Set +import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, HashSet} + +/** + * + * @version 0.9.2 + * @author Philipp Haller + */ +class TickedScheduler extends Thread with IScheduler { + private val tasks = new Queue[Reaction] + + // Worker threads + private val workers: Buffer[WorkerThread] = new ArrayBuffer[WorkerThread] + private val idle = new Queue[WorkerThread] + private val ticks = new HashMap[WorkerThread, long] + + private var terminating = false + + private var lastActivity = Platform.currentTime + + private var pendingReactions = 0 + def pendReaction: unit = synchronized { + //Debug.info("pend reaction") + pendingReactions = pendingReactions + 1 + } + def unPendReaction: unit = synchronized { + //Debug.info("unpend reaction") + pendingReactions = pendingReactions - 1 + } + + def printActorDump {} + + def start(task: Reaction): unit = synchronized { + //Debug.info("Starting " + task.actor) + execute(task) + } + + def terminated(a: Actor) {} + + private var TICK_FREQ = 5 + private var CHECK_FREQ = 50 + + for (val i <- List.range(0, 2)) { + val worker = new WorkerThread(this) + workers += worker + worker.start() + } + + def onLockup(handler: () => unit) = + lockupHandler = handler + + def onLockup(millis: int)(handler: () => unit) = { + //LOCKUP_CHECK_FREQ = millis / CHECK_FREQ + lockupHandler = handler + } + + private var lockupHandler: () => unit = null + + override def run(): unit = { + try { + while (!terminating) { + this.synchronized { + try { + wait(CHECK_FREQ) + } catch { + case _: InterruptedException => + if (terminating) throw new QuitException + } + + Debug.info("tasks.length: "+tasks.length) + Debug.info("pendingReactions: "+pendingReactions) + + if (tasks.length > 0) { + // check if we need more threads + if (Platform.currentTime - lastActivity >= TICK_FREQ) { + val newWorker = new WorkerThread(this) + workers += newWorker + + // dequeue item to be processed + val item = tasks.dequeue + + newWorker.execute(item) + newWorker.start() + } + } // tasks.length > 0 + else { + if (pendingReactions == 0) { + // if all worker threads idle terminate + if (workers.length == idle.length) { + Debug.info("all threads idle, terminating") + val idleThreads = idle.elements + while (idleThreads.hasNext) { + val worker = idleThreads.next + worker.running = false + worker.interrupt() + } + // terminate timer thread + TimerThread.t.interrupt() + throw new QuitException + } + } + } + } // sync + + } // while (!terminating) + } catch { + case _: QuitException => + // allow thread to exit + } + } + + /** + * @param item the task to be executed. + */ + def execute(item: Reaction): unit = synchronized { + Debug.info("got new task") + if (!terminating) { + if (idle.length > 0) { + val wt = idle.dequeue + wt.execute(item) + } + else + tasks += item + } + } + + /** + * @param worker the worker thread executing tasks + * @return the executed task + */ + def getTask(worker: WorkerThread) = synchronized { + if (terminating) + QUIT_TASK + if (tasks.length > 0) { + val item = tasks.dequeue + item + } + else { + idle += worker + null + } + } + + /** + * @param a the actor + */ + def tick(a: Actor) { + lastActivity = Platform.currentTime + } + + /** Shuts down all idle worker threads. + */ + def shutdown(): unit = synchronized { + terminating = true + + val idleThreads = idle.elements + while (idleThreads.hasNext) { + val worker = idleThreads.next + worker.running = false + worker.interrupt() + // caused deadlock (tries to acquire lock of worker) + //worker.join() + } + } +} |