diff options
-rw-r--r-- | src/actors/scala/actors/Actor.scala | 42 | ||||
-rw-r--r-- | src/actors/scala/actors/Reaction.scala | 11 | ||||
-rw-r--r-- | src/actors/scala/actors/Scheduler.scala | 101 |
3 files changed, 104 insertions, 50 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala index 380486d87d..b09b9b4a1e 100644 --- a/src/actors/scala/actors/Actor.scala +++ b/src/actors/scala/actors/Actor.scala @@ -11,6 +11,7 @@ package scala.actors import scala.collection.mutable.{HashSet, Stack} +import compat.Platform /** * The <code>Actor</code> object provides functions for the definition of @@ -123,7 +124,6 @@ object Actor { def reactWithin(msec: long)(f: PartialFunction[Any, Unit]): Nothing = self.in.reactWithin(msec)(f) - /* def eventloop(f: PartialFunction[Any, Unit]): Nothing = self.in.react(new RecursiveProxyHandler(self, f)) @@ -136,7 +136,6 @@ object Actor { self.in.react(this) } } - */ /** * <p>Used for receiving a message from a specific actor.</p> @@ -374,11 +373,44 @@ trait Actor extends OutputChannel[Any] { private[actors] var detachActor: PartialFunction[Any, Unit] => Unit = _ private[actors] var kill: () => Unit = _ + private var continue = false + private[actors] def resetActor(): Unit = { - suspendActor = () => wait() - suspendActorFor = (msec: long) => wait(msec) - resumeActor = () => notify() + suspendActor = () => { + continue = false + while(!continue) { + try { + wait() + } catch { + case t: InterruptedException => + } + } + } + + suspendActorFor = (msec: long) => { + val ts = Platform.currentTime + var waittime = msec + continue = false + while(!continue) { + try { + wait(waittime) + } catch { + case t: InterruptedException => { + val now = Platform.currentTime + val waited = now-ts + waittime = msec-waited + } + } + } + } + + resumeActor = () => { + continue = true + notify() + } + detachActor = defaultDetachActor + kill = () => {} } diff --git a/src/actors/scala/actors/Reaction.scala b/src/actors/scala/actors/Reaction.scala index f88e6b7c77..da57c0cd3c 100644 --- a/src/actors/scala/actors/Reaction.scala +++ b/src/actors/scala/actors/Reaction.scala @@ -53,12 +53,17 @@ private[actors] class StartTask(a: Actor) extends Reaction { a.exit("normal") } catch { - case _: InterruptedException => + case ie: InterruptedException => { + ie.printStackTrace() a.exitLinked() - case d: SuspendActorException => + } + case d: SuspendActorException => { // do nothing (continuation is already saved) - case t: Throwable => + } + case t: Throwable => { + t.printStackTrace() a.exit(t.toString()) + } } finally { Actor.selfs.put(t, saved) diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala index 49c14f010f..c79f9c49ab 100644 --- a/src/actors/scala/actors/Scheduler.scala +++ b/src/actors/scala/actors/Scheduler.scala @@ -28,7 +28,11 @@ import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue} object Scheduler { private var sched: IScheduler = //new SpareWorkerScheduler - new TickedScheduler + { + val s = new TickedScheduler + s.start() + s + } def impl = sched def impl_= (scheduler: IScheduler) = { @@ -51,7 +55,7 @@ object Scheduler { * @version Beta2 * @author Philipp Haller */ -abstract class IScheduler { +trait IScheduler { def execute(task: Reaction): Unit def getTask(worker: WorkerThread): Runnable def tick(a: Actor): Unit @@ -156,7 +160,7 @@ class SpareWorkerScheduler extends IScheduler { * * @author Philipp Haller */ -class TickedScheduler extends IScheduler { +class TickedScheduler extends Thread with IScheduler { private val tasks = new Queue[Reaction] private var workers: Buffer[WorkerThread] = new ArrayBuffer[WorkerThread] @@ -166,7 +170,8 @@ class TickedScheduler extends IScheduler { private var terminating = false - var TICKFREQ = 50 + var TICKFREQ = 5 + var CHECKFREQ = 50 def init() = { for (val i <- List.range(0, 2)) { @@ -177,6 +182,54 @@ class TickedScheduler extends IScheduler { } init() + override def run(): unit = { + try { + while (!terminating) { + this.synchronized { + try { + wait(CHECKFREQ) + } catch { + case _: InterruptedException => + if (terminating) throw new QuitException + } + + if (tasks.length > 0) { + // check if we need more threads + val iter = workers.elements + var foundBusy = false + while (iter.hasNext && !foundBusy) { + val wt = iter.next + ticks.get(wt) match { + case None => + foundBusy = false + case Some(ts) => + val currTime = Platform.currentTime + if (currTime - ts < TICKFREQ) + foundBusy = true + } + } + + if (!foundBusy) { + val newWorker = new WorkerThread(this) + workers += newWorker + + // dequeue item to be processed + val item = tasks.dequeue + + executing.update(item.actor, newWorker) + newWorker.execute(item) + newWorker.start() + } + } // tasks.length > 0 + } // sync + + } // while (!terminating) + } catch { + case _: QuitException => + // allow thread to exit + } + } + def execute(item: Reaction): unit = synchronized { if (!terminating) if (idle.length > 0) { @@ -184,44 +237,8 @@ class TickedScheduler extends IScheduler { executing.update(item.actor, wt) wt.execute(item) } - else { - /* - only create new worker thread when all are blocked - according to heuristic - - we check time stamps of latest send/receive ops of ALL - workers - - we stop if there is one which is not blocked - */ - - val iter = workers.elements - var foundBusy = false - while (iter.hasNext && !foundBusy) { - val wt = iter.next - ticks.get(wt) match { - case None => - foundBusy = true // assume not blocked - case Some(ts) => - val currTime = Platform.currentTime - if (currTime - ts < TICKFREQ) - foundBusy = true - } - } - - if (!foundBusy) { - val newWorker = new WorkerThread(this) - workers += newWorker - executing.update(item.actor, newWorker) - newWorker.execute(item) - newWorker.start() - } - else { - // wait assuming busy thread will be finished soon - // and ask for more work - tasks += item - } - } + else + tasks += item } def getTask(worker: WorkerThread) = synchronized { |