diff options
author | Philipp Haller <hallerp@gmail.com> | 2007-01-21 14:42:50 +0000 |
---|---|---|
committer | Philipp Haller <hallerp@gmail.com> | 2007-01-21 14:42:50 +0000 |
commit | ec85d6ce0c8728a64b155b00b681b5f04768ad3a (patch) | |
tree | 1431c3d3509755c87dcbe1076c6473b5ae20e740 /src/actors | |
parent | cf7c5917c984a424af718863d2cf0574cd710651 (diff) | |
download | scala-ec85d6ce0c8728a64b155b00b681b5f04768ad3a.tar.gz scala-ec85d6ce0c8728a64b155b00b681b5f04768ad3a.tar.bz2 scala-ec85d6ce0c8728a64b155b00b681b5f04768ad3a.zip |
Removed obsolete TickedDebugScheduler and Spare...
Removed obsolete TickedDebugScheduler and SpareWorkerScheduler. Pending
reactions are updated inside IScheduler.start() instead of outside.
Diffstat (limited to 'src/actors')
-rw-r--r-- | src/actors/scala/actors/Actor.scala | 3 | ||||
-rw-r--r-- | src/actors/scala/actors/JDK5Scheduler.scala | 4 | ||||
-rw-r--r-- | src/actors/scala/actors/Scheduler.scala | 330 | ||||
-rw-r--r-- | src/actors/scala/actors/TickedScheduler.scala | 4 |
4 files changed, 15 insertions, 326 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala index f07cdd92db..b220af1396 100644 --- a/src/actors/scala/actors/Actor.scala +++ b/src/actors/scala/actors/Actor.scala @@ -518,8 +518,7 @@ trait Actor extends OutputChannel[Any] { /** * Starts this actor. */ - def start(): Unit = { - Scheduler.pendReaction + def start() { Scheduler start new Reaction(this) } diff --git a/src/actors/scala/actors/JDK5Scheduler.scala b/src/actors/scala/actors/JDK5Scheduler.scala index 8361fc43fb..e56a891385 100644 --- a/src/actors/scala/actors/JDK5Scheduler.scala +++ b/src/actors/scala/actors/JDK5Scheduler.scala @@ -55,18 +55,16 @@ class JDK5Scheduler(initCoreSize: int, maxSize: int) extends Thread with ISchedu private var pendingReactions = 0 def pendReaction: unit = synchronized { - //Debug.info("pend reaction") pendingReactions = pendingReactions + 1 } def unPendReaction: unit = synchronized { - //Debug.info("unpend reaction") pendingReactions = pendingReactions - 1 } def printActorDump {} def start(task: Reaction): unit = synchronized { - //Debug.info("Starting " + task.actor) + pendingReactions = pendingReactions + 1 execute(task) } diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala index eb5e92ef88..0f3d2424fe 100644 --- a/src/actors/scala/actors/Scheduler.scala +++ b/src/actors/scala/actors/Scheduler.scala @@ -77,6 +77,7 @@ object Scheduler { def printActorDump = sched.printActorDump } + /** * This abstract class provides a common interface for all * schedulers used to execute reactors. @@ -105,257 +106,6 @@ trait IScheduler { } } -/** - * The class <code>TickedScheduler</code> ... - * - * @author Philipp Haller - */ -class TickedDebugScheduler extends Thread with IScheduler { - private val tasks = new Queue[Reaction] - - // Worker threads - private val workers: Buffer[WorkerThread] = new ArrayBuffer[WorkerThread] - private val idle = new Queue[WorkerThread] - private val ticks = new HashMap[WorkerThread, long] - private val occupied = new HashMap[Actor, WorkerThread] - - private var terminating = false - - private var pendingReactions = new Stack[unit] - def pendReaction: unit = { - //Debug.info("pend reaction") - pendingReactions push () - } - def unPendReaction: unit = { - //Debug.info("unpend reaction") - if (!pendingReactions.isEmpty) - pendingReactions.pop - } - - /* - * An actor is alive if it has been started and - * has not yet terminated. - */ - private val alive = new HashSet[Actor] - - def printActorDump { - var num = 0 - for (val a <- alive.elements) { - Console.println("Actor ("+num+"): "+a) - if (a.isDetached) - Console.println("Detached") - else { - val flag = if (isActive(a)) "ACTIVE" else "INACTIVE" - Console.println("Occupies thread: "+occupied.get(a)+" ["+flag+"]") - } - - if (a.isDetached || a.isWaiting) { - // dump contents of mailbox - Console.println("Waiting with mailbox:") - //a.printMailbox - } - - Console.println - num = num + 1 - } - } - - def start(task: Reaction): unit = synchronized { - Debug.info("Starting " + task.actor) - alive += task.actor - execute(task) - } - - def terminated(a: Actor): unit = - alive -= a - - private var TICK_FREQ = 5 - private var CHECK_FREQ = 50 - - private var LOCKUP_CHECK_FREQ = 10 // 10 * CHECK_FREQ - private var lockupCnt = 0 - private var stateChanged = false - - for (val i <- List.range(0, 2)) { - val worker = new WorkerThread(this) - workers += worker - worker.start() - } - - def onLockup(handler: () => unit) = - lockupHandler = handler - - def onLockup(millis: int)(handler: () => unit) = { - LOCKUP_CHECK_FREQ = millis / CHECK_FREQ - lockupHandler = handler - } - - private var lockupHandler: () => unit = null - - def isActive(a: Actor): boolean = occupied.get(a) match { - case None => - // thread outside of scheduler; - // error("No worker thread associated with actor " + a) - false - case Some(wt) => isActive(wt) - } - - def isActive(wt: WorkerThread): boolean = ticks.get(wt) match { - case None => false - case Some(ts) => - val currTime = Platform.currentTime - if (currTime - ts < TICK_FREQ) true - else false - } - - override def run(): unit = { - try { - while (!terminating) { - this.synchronized { - try { - wait(CHECK_FREQ) - - if (!stateChanged) lockupCnt = lockupCnt + 1 - else stateChanged = false - - if (lockupCnt == LOCKUP_CHECK_FREQ) { - lockupCnt = 0 - if (lockupHandler != null) - lockupHandler() - } - } catch { - case _: InterruptedException => - if (terminating) throw new QuitException - } - - if (tasks.length > 0) { - // check if we need more threads - val iter = workers.elements - var foundBusy = false - while (iter.hasNext && !foundBusy) { - val wt = iter.next - foundBusy = isActive(wt) - } - - if (!foundBusy) { - val newWorker = new WorkerThread(this) - workers += newWorker - - // dequeue item to be processed - val item = tasks.dequeue - - occupied.update(item.actor, newWorker) - newWorker.execute(item) - newWorker.start() - - stateChanged = true - } - } // tasks.length > 0 - else { - if (pendingReactions.isEmpty) { - // if all worker threads idle terminate - if (workers.length == idle.length) { - Debug.info("all threads idle, terminating") - val idleThreads = idle.elements - while (idleThreads.hasNext) { - val worker = idleThreads.next - worker.running = false - worker.interrupt() - } - // terminate timer thread - TimerThread.t.interrupt() - throw new QuitException - } - } - } - } // sync - - } // while (!terminating) - } catch { - case _: QuitException => - // allow thread to exit - } - } - - /** - * @param item the task to be executed. - */ - def execute(item: Reaction): unit = synchronized { - if (!terminating) { - if (idle.length > 0) { - val wt = idle.dequeue - occupied.update(item.actor, wt) - wt.execute(item) - } - else - tasks += item - stateChanged = true - } - } - - /** - * @param worker the worker thread executing tasks - * @return the executed task - */ - def getTask(worker: WorkerThread) = synchronized { - if (terminating) - QUIT_TASK - stateChanged = true - if (tasks.length > 0) { - val item = tasks.dequeue - occupied.update(item.actor, worker) - item - } - else { - idle += worker - null - } - } - - /** - * @param a the actor - */ - def tick(a: Actor): unit = synchronized { - stateChanged = true - occupied.get(a) match { - case None => - // thread outside of scheduler; - // error("No worker thread associated with actor " + a) - case Some(wt) => - ticks.update(wt, Platform.currentTime) - } - } - - /** Shuts down all idle worker threads. - */ - def shutdown(): unit = synchronized { - terminating = true - - val idleThreads = idle.elements - while (idleThreads.hasNext) { - val worker = idleThreads.next - worker.running = false - worker.interrupt() - // caused deadlock (tries to acquire lock of worker) - //worker.join() - } - } -} - - -/** - * The <code>QuickException</code> class ... - */ -class QuitException extends Throwable { - /* - For efficiency reasons we do not fill in - the execution stack trace. - */ - override def fillInStackTrace(): Throwable = { - this - } -} - /** * This scheduler executes the tasks of a reactor on a single @@ -365,12 +115,12 @@ class QuitException extends Throwable { * @author Philipp Haller */ class SingleThreadedScheduler extends IScheduler { - def start(task: Reaction): unit = { + def start(task: Reaction) { // execute task immediately on same thread task.run() } - def execute(task: Reaction): unit = { + def execute(task: Reaction) { // execute task immediately on same thread task.run() } @@ -388,76 +138,20 @@ class SingleThreadedScheduler extends IScheduler { def printActorDump: unit = {} } + /** - * This scheduler creates additional threads whenever there is no - * idle thread available. - * - * @version 0.9.0 - * @author Philipp Haller + * The <code>QuickException</code> class ... */ -abstract class SpareWorkerScheduler extends IScheduler { - private val tasks = new Queue[Reaction] - private val idle = new Queue[WorkerThread] - private var workers: Buffer[WorkerThread] = new ArrayBuffer[WorkerThread] - - private var terminating = false - - def init() = { - for (val i <- 0 until 2) { - val worker = new WorkerThread(this) - workers += worker - worker.start() - } - } - init() - - def execute(task: Reaction): unit = synchronized { - if (!terminating) { - if (idle.length == 0) { - tasks += task - val newWorker = new WorkerThread(this) - workers += newWorker - newWorker.start() - } - else { - val worker = idle.dequeue - worker.execute(task) - } - } - } - - def getTask(worker: WorkerThread) = synchronized { - if (terminating) - QUIT_TASK - else { - if (tasks.length > 0) tasks.dequeue - else { - idle += worker - null - } - } - } - - def tick(a: Actor): unit = {} - - def shutdown(): unit = synchronized { - terminating = true - - val idleThreads = idle.elements - while (idleThreads.hasNext) { - val worker = idleThreads.next - worker.running = false - worker.interrupt() - // caused deadlock (tries to acquire lock of worker) - //worker.join() - } +class QuitException extends Throwable { + /* + For efficiency reasons we do not fill in + the execution stack trace. + */ + override def fillInStackTrace(): Throwable = { + this } - - def pendReaction: unit = {} - def unPendReaction: unit = {} } - /** * <p> * The class <code>WorkerThread</code> is used by schedulers to execute diff --git a/src/actors/scala/actors/TickedScheduler.scala b/src/actors/scala/actors/TickedScheduler.scala index 63651cff40..48326aaaf9 100644 --- a/src/actors/scala/actors/TickedScheduler.scala +++ b/src/actors/scala/actors/TickedScheduler.scala @@ -27,18 +27,16 @@ class TickedScheduler extends Thread with IScheduler { private var pendingReactions = 0 def pendReaction: unit = synchronized { - //Debug.info("pend reaction") pendingReactions = pendingReactions + 1 } def unPendReaction: unit = synchronized { - //Debug.info("unpend reaction") pendingReactions = pendingReactions - 1 } def printActorDump {} def start(task: Reaction): unit = synchronized { - //Debug.info("Starting " + task.actor) + pendingReactions = pendingReactions + 1 execute(task) } |