summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2007-01-17 15:08:21 +0000
committerPhilipp Haller <hallerp@gmail.com>2007-01-17 15:08:21 +0000
commit0d6d353412537b3b51ee4b48ad0bbbc54ed0c01b (patch)
treefb62b4333d8329b56f3e51b94ee1fc0abcd19fb4
parent81d2963d4c9f959429ec2e38f4928ec62db9c410 (diff)
downloadscala-0d6d353412537b3b51ee4b48ad0bbbc54ed0c01b.tar.gz
scala-0d6d353412537b3b51ee4b48ad0bbbc54ed0c01b.tar.bz2
scala-0d6d353412537b3b51ee4b48ad0bbbc54ed0c01b.zip
Added faster non-debugging version of basic sch...
Added faster non-debugging version of basic scheduler. Fixed bug in termination detection.
-rw-r--r--src/actors/scala/actors/Actor.scala5
-rw-r--r--src/actors/scala/actors/Scheduler.scala2
-rw-r--r--src/actors/scala/actors/TickedScheduler.scala172
3 files changed, 176 insertions, 3 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala
index 4444378a5b..c2c9dd738f 100644
--- a/src/actors/scala/actors/Actor.scala
+++ b/src/actors/scala/actors/Actor.scala
@@ -548,7 +548,6 @@ trait Actor extends OutputChannel[Any] {
if (!fromExc) throw new ExitSuspendLoop
}
} catch { case _: ExitSuspendLoop => }
- Debug.info("leaving suspendActorFor("+msec+")")
}
resumeActor = () => {
@@ -566,8 +565,10 @@ trait Actor extends OutputChannel[Any] {
/**
* Starts this actor.
*/
- def start(): Unit =
+ def start(): Unit = {
+ Scheduler.pendReaction
Scheduler start new Reaction(this)
+ }
private val links = new HashSet[Actor]
diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala
index 1e78798337..83d5202974 100644
--- a/src/actors/scala/actors/Scheduler.scala
+++ b/src/actors/scala/actors/Scheduler.scala
@@ -90,7 +90,7 @@ trait IScheduler {
*
* @author Philipp Haller
*/
-class TickedScheduler extends Thread with IScheduler {
+class TickedDebugScheduler extends Thread with IScheduler {
private val tasks = new Queue[Reaction]
// Worker threads
diff --git a/src/actors/scala/actors/TickedScheduler.scala b/src/actors/scala/actors/TickedScheduler.scala
new file mode 100644
index 0000000000..2806a420f6
--- /dev/null
+++ b/src/actors/scala/actors/TickedScheduler.scala
@@ -0,0 +1,172 @@
+
+package scala.actors
+
+import compat.Platform
+
+import java.lang.{Runnable, Thread, InterruptedException}
+
+import scala.collection.Set
+import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, HashSet}
+
+/**
+ *
+ * @version 0.9.2
+ * @author Philipp Haller
+ */
+class TickedScheduler extends Thread with IScheduler {
+ private val tasks = new Queue[Reaction]
+
+ // Worker threads
+ private val workers: Buffer[WorkerThread] = new ArrayBuffer[WorkerThread]
+ private val idle = new Queue[WorkerThread]
+ private val ticks = new HashMap[WorkerThread, long]
+
+ private var terminating = false
+
+ private var lastActivity = Platform.currentTime
+
+ private var pendingReactions = 0
+ def pendReaction: unit = synchronized {
+ //Debug.info("pend reaction")
+ pendingReactions = pendingReactions + 1
+ }
+ def unPendReaction: unit = synchronized {
+ //Debug.info("unpend reaction")
+ pendingReactions = pendingReactions - 1
+ }
+
+ def printActorDump {}
+
+ def start(task: Reaction): unit = synchronized {
+ //Debug.info("Starting " + task.actor)
+ execute(task)
+ }
+
+ def terminated(a: Actor) {}
+
+ private var TICK_FREQ = 5
+ private var CHECK_FREQ = 50
+
+ for (val i <- List.range(0, 2)) {
+ val worker = new WorkerThread(this)
+ workers += worker
+ worker.start()
+ }
+
+ def onLockup(handler: () => unit) =
+ lockupHandler = handler
+
+ def onLockup(millis: int)(handler: () => unit) = {
+ //LOCKUP_CHECK_FREQ = millis / CHECK_FREQ
+ lockupHandler = handler
+ }
+
+ private var lockupHandler: () => unit = null
+
+ override def run(): unit = {
+ try {
+ while (!terminating) {
+ this.synchronized {
+ try {
+ wait(CHECK_FREQ)
+ } catch {
+ case _: InterruptedException =>
+ if (terminating) throw new QuitException
+ }
+
+ Debug.info("tasks.length: "+tasks.length)
+ Debug.info("pendingReactions: "+pendingReactions)
+
+ if (tasks.length > 0) {
+ // check if we need more threads
+ if (Platform.currentTime - lastActivity >= TICK_FREQ) {
+ val newWorker = new WorkerThread(this)
+ workers += newWorker
+
+ // dequeue item to be processed
+ val item = tasks.dequeue
+
+ newWorker.execute(item)
+ newWorker.start()
+ }
+ } // tasks.length > 0
+ else {
+ if (pendingReactions == 0) {
+ // if all worker threads idle terminate
+ if (workers.length == idle.length) {
+ Debug.info("all threads idle, terminating")
+ val idleThreads = idle.elements
+ while (idleThreads.hasNext) {
+ val worker = idleThreads.next
+ worker.running = false
+ worker.interrupt()
+ }
+ // terminate timer thread
+ TimerThread.t.interrupt()
+ throw new QuitException
+ }
+ }
+ }
+ } // sync
+
+ } // while (!terminating)
+ } catch {
+ case _: QuitException =>
+ // allow thread to exit
+ }
+ }
+
+ /**
+ * @param item the task to be executed.
+ */
+ def execute(item: Reaction): unit = synchronized {
+ Debug.info("got new task")
+ if (!terminating) {
+ if (idle.length > 0) {
+ val wt = idle.dequeue
+ wt.execute(item)
+ }
+ else
+ tasks += item
+ }
+ }
+
+ /**
+ * @param worker the worker thread executing tasks
+ * @return the executed task
+ */
+ def getTask(worker: WorkerThread) = synchronized {
+ if (terminating)
+ QUIT_TASK
+ if (tasks.length > 0) {
+ val item = tasks.dequeue
+ item
+ }
+ else {
+ idle += worker
+ null
+ }
+ }
+
+ /**
+ * @param a the actor
+ */
+ def tick(a: Actor) {
+ lastActivity = Platform.currentTime
+ }
+
+ /** Shuts down all idle worker threads.
+ */
+ def shutdown(): unit = synchronized {
+ terminating = true
+
+ val idleThreads = idle.elements
+ while (idleThreads.hasNext) {
+ val worker = idleThreads.next
+ worker.running = false
+ worker.interrupt()
+ // caused deadlock (tries to acquire lock of worker)
+ //worker.join()
+ }
+ }
+}