summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2007-01-21 14:42:50 +0000
committerPhilipp Haller <hallerp@gmail.com>2007-01-21 14:42:50 +0000
commitec85d6ce0c8728a64b155b00b681b5f04768ad3a (patch)
tree1431c3d3509755c87dcbe1076c6473b5ae20e740
parentcf7c5917c984a424af718863d2cf0574cd710651 (diff)
downloadscala-ec85d6ce0c8728a64b155b00b681b5f04768ad3a.tar.gz
scala-ec85d6ce0c8728a64b155b00b681b5f04768ad3a.tar.bz2
scala-ec85d6ce0c8728a64b155b00b681b5f04768ad3a.zip
Removed obsolete TickedDebugScheduler and Spare...
Removed obsolete TickedDebugScheduler and SpareWorkerScheduler. Pending reactions are updated inside IScheduler.start() instead of outside.
-rw-r--r--src/actors/scala/actors/Actor.scala3
-rw-r--r--src/actors/scala/actors/JDK5Scheduler.scala4
-rw-r--r--src/actors/scala/actors/Scheduler.scala330
-rw-r--r--src/actors/scala/actors/TickedScheduler.scala4
4 files changed, 15 insertions, 326 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala
index f07cdd92db..b220af1396 100644
--- a/src/actors/scala/actors/Actor.scala
+++ b/src/actors/scala/actors/Actor.scala
@@ -518,8 +518,7 @@ trait Actor extends OutputChannel[Any] {
/**
* Starts this actor.
*/
- def start(): Unit = {
- Scheduler.pendReaction
+ def start() {
Scheduler start new Reaction(this)
}
diff --git a/src/actors/scala/actors/JDK5Scheduler.scala b/src/actors/scala/actors/JDK5Scheduler.scala
index 8361fc43fb..e56a891385 100644
--- a/src/actors/scala/actors/JDK5Scheduler.scala
+++ b/src/actors/scala/actors/JDK5Scheduler.scala
@@ -55,18 +55,16 @@ class JDK5Scheduler(initCoreSize: int, maxSize: int) extends Thread with ISchedu
private var pendingReactions = 0
def pendReaction: unit = synchronized {
- //Debug.info("pend reaction")
pendingReactions = pendingReactions + 1
}
def unPendReaction: unit = synchronized {
- //Debug.info("unpend reaction")
pendingReactions = pendingReactions - 1
}
def printActorDump {}
def start(task: Reaction): unit = synchronized {
- //Debug.info("Starting " + task.actor)
+ pendingReactions = pendingReactions + 1
execute(task)
}
diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala
index eb5e92ef88..0f3d2424fe 100644
--- a/src/actors/scala/actors/Scheduler.scala
+++ b/src/actors/scala/actors/Scheduler.scala
@@ -77,6 +77,7 @@ object Scheduler {
def printActorDump = sched.printActorDump
}
+
/**
* This abstract class provides a common interface for all
* schedulers used to execute reactors.
@@ -105,257 +106,6 @@ trait IScheduler {
}
}
-/**
- * The class <code>TickedScheduler</code> ...
- *
- * @author Philipp Haller
- */
-class TickedDebugScheduler extends Thread with IScheduler {
- private val tasks = new Queue[Reaction]
-
- // Worker threads
- private val workers: Buffer[WorkerThread] = new ArrayBuffer[WorkerThread]
- private val idle = new Queue[WorkerThread]
- private val ticks = new HashMap[WorkerThread, long]
- private val occupied = new HashMap[Actor, WorkerThread]
-
- private var terminating = false
-
- private var pendingReactions = new Stack[unit]
- def pendReaction: unit = {
- //Debug.info("pend reaction")
- pendingReactions push ()
- }
- def unPendReaction: unit = {
- //Debug.info("unpend reaction")
- if (!pendingReactions.isEmpty)
- pendingReactions.pop
- }
-
- /*
- * An actor is alive if it has been started and
- * has not yet terminated.
- */
- private val alive = new HashSet[Actor]
-
- def printActorDump {
- var num = 0
- for (val a <- alive.elements) {
- Console.println("Actor ("+num+"): "+a)
- if (a.isDetached)
- Console.println("Detached")
- else {
- val flag = if (isActive(a)) "ACTIVE" else "INACTIVE"
- Console.println("Occupies thread: "+occupied.get(a)+" ["+flag+"]")
- }
-
- if (a.isDetached || a.isWaiting) {
- // dump contents of mailbox
- Console.println("Waiting with mailbox:")
- //a.printMailbox
- }
-
- Console.println
- num = num + 1
- }
- }
-
- def start(task: Reaction): unit = synchronized {
- Debug.info("Starting " + task.actor)
- alive += task.actor
- execute(task)
- }
-
- def terminated(a: Actor): unit =
- alive -= a
-
- private var TICK_FREQ = 5
- private var CHECK_FREQ = 50
-
- private var LOCKUP_CHECK_FREQ = 10 // 10 * CHECK_FREQ
- private var lockupCnt = 0
- private var stateChanged = false
-
- for (val i <- List.range(0, 2)) {
- val worker = new WorkerThread(this)
- workers += worker
- worker.start()
- }
-
- def onLockup(handler: () => unit) =
- lockupHandler = handler
-
- def onLockup(millis: int)(handler: () => unit) = {
- LOCKUP_CHECK_FREQ = millis / CHECK_FREQ
- lockupHandler = handler
- }
-
- private var lockupHandler: () => unit = null
-
- def isActive(a: Actor): boolean = occupied.get(a) match {
- case None =>
- // thread outside of scheduler;
- // error("No worker thread associated with actor " + a)
- false
- case Some(wt) => isActive(wt)
- }
-
- def isActive(wt: WorkerThread): boolean = ticks.get(wt) match {
- case None => false
- case Some(ts) =>
- val currTime = Platform.currentTime
- if (currTime - ts < TICK_FREQ) true
- else false
- }
-
- override def run(): unit = {
- try {
- while (!terminating) {
- this.synchronized {
- try {
- wait(CHECK_FREQ)
-
- if (!stateChanged) lockupCnt = lockupCnt + 1
- else stateChanged = false
-
- if (lockupCnt == LOCKUP_CHECK_FREQ) {
- lockupCnt = 0
- if (lockupHandler != null)
- lockupHandler()
- }
- } catch {
- case _: InterruptedException =>
- if (terminating) throw new QuitException
- }
-
- if (tasks.length > 0) {
- // check if we need more threads
- val iter = workers.elements
- var foundBusy = false
- while (iter.hasNext && !foundBusy) {
- val wt = iter.next
- foundBusy = isActive(wt)
- }
-
- if (!foundBusy) {
- val newWorker = new WorkerThread(this)
- workers += newWorker
-
- // dequeue item to be processed
- val item = tasks.dequeue
-
- occupied.update(item.actor, newWorker)
- newWorker.execute(item)
- newWorker.start()
-
- stateChanged = true
- }
- } // tasks.length > 0
- else {
- if (pendingReactions.isEmpty) {
- // if all worker threads idle terminate
- if (workers.length == idle.length) {
- Debug.info("all threads idle, terminating")
- val idleThreads = idle.elements
- while (idleThreads.hasNext) {
- val worker = idleThreads.next
- worker.running = false
- worker.interrupt()
- }
- // terminate timer thread
- TimerThread.t.interrupt()
- throw new QuitException
- }
- }
- }
- } // sync
-
- } // while (!terminating)
- } catch {
- case _: QuitException =>
- // allow thread to exit
- }
- }
-
- /**
- * @param item the task to be executed.
- */
- def execute(item: Reaction): unit = synchronized {
- if (!terminating) {
- if (idle.length > 0) {
- val wt = idle.dequeue
- occupied.update(item.actor, wt)
- wt.execute(item)
- }
- else
- tasks += item
- stateChanged = true
- }
- }
-
- /**
- * @param worker the worker thread executing tasks
- * @return the executed task
- */
- def getTask(worker: WorkerThread) = synchronized {
- if (terminating)
- QUIT_TASK
- stateChanged = true
- if (tasks.length > 0) {
- val item = tasks.dequeue
- occupied.update(item.actor, worker)
- item
- }
- else {
- idle += worker
- null
- }
- }
-
- /**
- * @param a the actor
- */
- def tick(a: Actor): unit = synchronized {
- stateChanged = true
- occupied.get(a) match {
- case None =>
- // thread outside of scheduler;
- // error("No worker thread associated with actor " + a)
- case Some(wt) =>
- ticks.update(wt, Platform.currentTime)
- }
- }
-
- /** Shuts down all idle worker threads.
- */
- def shutdown(): unit = synchronized {
- terminating = true
-
- val idleThreads = idle.elements
- while (idleThreads.hasNext) {
- val worker = idleThreads.next
- worker.running = false
- worker.interrupt()
- // caused deadlock (tries to acquire lock of worker)
- //worker.join()
- }
- }
-}
-
-
-/**
- * The <code>QuickException</code> class ...
- */
-class QuitException extends Throwable {
- /*
- For efficiency reasons we do not fill in
- the execution stack trace.
- */
- override def fillInStackTrace(): Throwable = {
- this
- }
-}
-
/**
* This scheduler executes the tasks of a reactor on a single
@@ -365,12 +115,12 @@ class QuitException extends Throwable {
* @author Philipp Haller
*/
class SingleThreadedScheduler extends IScheduler {
- def start(task: Reaction): unit = {
+ def start(task: Reaction) {
// execute task immediately on same thread
task.run()
}
- def execute(task: Reaction): unit = {
+ def execute(task: Reaction) {
// execute task immediately on same thread
task.run()
}
@@ -388,76 +138,20 @@ class SingleThreadedScheduler extends IScheduler {
def printActorDump: unit = {}
}
+
/**
- * This scheduler creates additional threads whenever there is no
- * idle thread available.
- *
- * @version 0.9.0
- * @author Philipp Haller
+ * The <code>QuickException</code> class ...
*/
-abstract class SpareWorkerScheduler extends IScheduler {
- private val tasks = new Queue[Reaction]
- private val idle = new Queue[WorkerThread]
- private var workers: Buffer[WorkerThread] = new ArrayBuffer[WorkerThread]
-
- private var terminating = false
-
- def init() = {
- for (val i <- 0 until 2) {
- val worker = new WorkerThread(this)
- workers += worker
- worker.start()
- }
- }
- init()
-
- def execute(task: Reaction): unit = synchronized {
- if (!terminating) {
- if (idle.length == 0) {
- tasks += task
- val newWorker = new WorkerThread(this)
- workers += newWorker
- newWorker.start()
- }
- else {
- val worker = idle.dequeue
- worker.execute(task)
- }
- }
- }
-
- def getTask(worker: WorkerThread) = synchronized {
- if (terminating)
- QUIT_TASK
- else {
- if (tasks.length > 0) tasks.dequeue
- else {
- idle += worker
- null
- }
- }
- }
-
- def tick(a: Actor): unit = {}
-
- def shutdown(): unit = synchronized {
- terminating = true
-
- val idleThreads = idle.elements
- while (idleThreads.hasNext) {
- val worker = idleThreads.next
- worker.running = false
- worker.interrupt()
- // caused deadlock (tries to acquire lock of worker)
- //worker.join()
- }
+class QuitException extends Throwable {
+ /*
+ For efficiency reasons we do not fill in
+ the execution stack trace.
+ */
+ override def fillInStackTrace(): Throwable = {
+ this
}
-
- def pendReaction: unit = {}
- def unPendReaction: unit = {}
}
-
/**
* <p>
* The class <code>WorkerThread</code> is used by schedulers to execute
diff --git a/src/actors/scala/actors/TickedScheduler.scala b/src/actors/scala/actors/TickedScheduler.scala
index 63651cff40..48326aaaf9 100644
--- a/src/actors/scala/actors/TickedScheduler.scala
+++ b/src/actors/scala/actors/TickedScheduler.scala
@@ -27,18 +27,16 @@ class TickedScheduler extends Thread with IScheduler {
private var pendingReactions = 0
def pendReaction: unit = synchronized {
- //Debug.info("pend reaction")
pendingReactions = pendingReactions + 1
}
def unPendReaction: unit = synchronized {
- //Debug.info("unpend reaction")
pendingReactions = pendingReactions - 1
}
def printActorDump {}
def start(task: Reaction): unit = synchronized {
- //Debug.info("Starting " + task.actor)
+ pendingReactions = pendingReactions + 1
execute(task)
}