diff options
-rw-r--r-- | src/actors/scala/actors/multi/MailBox.scala | 100 | ||||
-rw-r--r-- | src/actors/scala/actors/multi/Process.scala | 16 | ||||
-rw-r--r-- | src/actors/scala/actors/multi/ReceiverTask.scala | 1 | ||||
-rw-r--r-- | src/actors/scala/actors/multi/Scheduler.scala | 62 | ||||
-rw-r--r-- | src/actors/scala/actors/multi/TimerThread.scala | 4 |
5 files changed, 88 insertions, 95 deletions
diff --git a/src/actors/scala/actors/multi/MailBox.scala b/src/actors/scala/actors/multi/MailBox.scala index 7d6da8924f..52aa0a7486 100644 --- a/src/actors/scala/actors/multi/MailBox.scala +++ b/src/actors/scala/actors/multi/MailBox.scala @@ -19,10 +19,10 @@ trait MailBox { /** Unconsumed messages. */ var sent = new Queue[Any] - var continuation: PartialFunction[Any,Unit] = null + var continuation: PartialFunction[Any, Unit] = null // more complex continuation - var contCases: PartialFunction[Any,Any] = null - var contThen: Any => unit = null + var contCases: PartialFunction[Any, Any] = null + var contThen: Any => Unit = null def hasCont = if ((continuation == null) && (contCases == null)) false @@ -40,7 +40,14 @@ trait MailBox { private var pendingSignal = false - def send(msg: Any): unit = synchronized { + def scheduleContinuation(msg: Any): Unit = { + val task = new ReceiverTask(this, msg) + //Debug.info("ready to receive. dispatch new task " + task) + scheduled = true + Scheduler.execute(task) + } + + def send(msg: Any): Unit = synchronized { if (isAlive) { if (!hasCont || scheduled) { //Debug.info("no cont avail/task already scheduled. appending msg to mailbox.") @@ -55,12 +62,8 @@ trait MailBox { msg match { case Signal() => if (!contDefinedAt(TIMEOUT())) die() - else { - val task = new ReceiverTask(this, TIMEOUT()) - //Debug.info("ready to receive. dispatch new task " + task) - scheduled = true - Scheduler.execute(task) - } + else + scheduleContinuation(TIMEOUT()) case _ => if (!contDefinedAt(msg)) sent += msg @@ -69,10 +72,7 @@ trait MailBox { pendingSignal = false TimerThread.trashRequest(this) } - val task = new ReceiverTask(this, msg) - //Debug.info("ready to receive. dispatch new task " + task) - scheduled = true - Scheduler.execute(task) + scheduleContinuation(msg) } } } @@ -82,8 +82,10 @@ trait MailBox { //Debug.info("" + Thread.currentThread() + ": Resuming " + this) if (continuation != null) { val f = continuation - continuation = null - scheduled = false + this.synchronized { + continuation = null + scheduled = false + } f(msg) die() } @@ -100,14 +102,14 @@ trait MailBox { } } - def receive(f: PartialFunction[Any,Unit]): Nothing = { + def receive(f: PartialFunction[Any, Unit]): Nothing = synchronized { if (isAlive) { Scheduler.tick(this) continuation = null sent.dequeueFirst(f.isDefinedAt) match { case Some(msg) => - f(msg) - die() + continuation = f + scheduleContinuation(msg) case None => continuation = f //Debug.info("No msg found. " + this + " has continuation " + continuation + ".") @@ -116,33 +118,37 @@ trait MailBox { throw new Done } - def receiveWithin(msec: long)(f: PartialFunction[Any, unit]): Nothing = { - Scheduler.tick(this) - continuation = null - sent.dequeueFirst(f.isDefinedAt) match { - case Some(msg) => - f(msg) - die() - case None => - // if timeout == 0 then execute timeout action if specified (see Erlang book) - if (msec == 0) { - if (f.isDefinedAt(TIMEOUT())) - f(TIMEOUT()) - die() - } - else { - if (msec > 0) { - TimerThread.requestTimeout(this, msec) - pendingSignal = true - } + def receiveWithin(msec: long)(f: PartialFunction[Any, Unit]): Nothing = synchronized { + if (isAlive) { + Scheduler.tick(this) + continuation = null + sent.dequeueFirst(f.isDefinedAt) match { + case Some(msg) => { continuation = f - //Debug.info("No msg found. " + this + " has continuation " + continuation + ".") + scheduleContinuation(msg) } + case None => + // if timeout == 0 then execute timeout action if specified (see Erlang book) + if (msec == 0) { + if (f.isDefinedAt(TIMEOUT())) { + continuation = f + scheduleContinuation(TIMEOUT()) + } + die() + } else { + if (msec > 0) { + TimerThread.requestTimeout(this, msec) + pendingSignal = true + } + continuation = f + //Debug.info("No msg found. " + this + " has continuation " + continuation + ".") + } + } } throw new Done } - def receiveAndReturn(cases: PartialFunction[Any,Any], then: Any => unit): unit = { + def receiveAndReturn(cases: PartialFunction[Any, Any], then: Any => Unit): Unit = { contCases = null contThen = null sent.dequeueFirst(cases.isDefinedAt) match { @@ -162,17 +168,15 @@ trait MailBox { // receiv {...} then (msg => {...msg...}) - class ReceiveAndReturn(cases: PartialFunction[Any,Any]) { - def then(body: Any => unit): unit = receiveAndReturn(cases, body) + class ReceiveAndReturn(cases: PartialFunction[Any, Any]) { + def then(body: Any => Unit): Unit = receiveAndReturn(cases, body) } - def receiv(cases: PartialFunction[Any,Any]): ReceiveAndReturn = + def receiv(cases: PartialFunction[Any, Any]): ReceiveAndReturn = new ReceiveAndReturn(cases) - def die() = { - if (isAlive) { - isAlive = false - //Debug.info("" + this + " died.") - } + def die() = if (isAlive) { + isAlive = false + //Debug.info("" + this + " died.") } } diff --git a/src/actors/scala/actors/multi/Process.scala b/src/actors/scala/actors/multi/Process.scala index afac16cbdb..6eba3a995a 100644 --- a/src/actors/scala/actors/multi/Process.scala +++ b/src/actors/scala/actors/multi/Process.scala @@ -211,22 +211,6 @@ class Process extends scala.actors.Process with Actor[Any] { } } - override def receive(f: PartialFunction[Any,unit]): Nothing = { - if (isAlive) { - Scheduler.tick(this) - continuation = null - sent.dequeueFirst(f.isDefinedAt) match { - case Some(msg) => - process(f, msg) - die() - case None => - continuation = f - //Debug.info("No msg found. " + this + " has continuation " + continuation + ".") - } - } - throw new Done - } - override def receiveMsg(msg: Any) = { //Debug.info("" + Thread.currentThread() + ": Resuming " + this) if (continuation != null) { diff --git a/src/actors/scala/actors/multi/ReceiverTask.scala b/src/actors/scala/actors/multi/ReceiverTask.scala index 033df00ef0..348c2247d3 100644 --- a/src/actors/scala/actors/multi/ReceiverTask.scala +++ b/src/actors/scala/actors/multi/ReceiverTask.scala @@ -15,6 +15,7 @@ package scala.actors.multi */ class ReceiverTask(val actor: MailBox, msg: Any) extends Runnable { def run(): Unit = { + Scheduler.setProcess(Thread.currentThread(), actor) try { actor receiveMsg msg } diff --git a/src/actors/scala/actors/multi/Scheduler.scala b/src/actors/scala/actors/multi/Scheduler.scala index 62d4f80cc7..88ec685368 100644 --- a/src/actors/scala/actors/multi/Scheduler.scala +++ b/src/actors/scala/actors/multi/Scheduler.scala @@ -15,22 +15,9 @@ import scala.collection.mutable._ /** * @author Philipp Haller */ -abstract class IScheduler /*extends java.util.concurrent.Executor*/ { - def execute(item: ReceiverTask): Unit - def getTask(worker: WorkerThread): Runnable - def tick(a: MailBox): Unit - def getProcess(t: Thread): Process - - val QUIT_TASK = new Runnable() { - def run(): Unit = {} - override def toString() = "QUIT_TASK" - } -} - - object Scheduler /*extends java.util.concurrent.Executor*/ { private var sched: /*java.util.concurrent.Executor*/ IScheduler = - //java.util.concurrent.Executors.newFixedThreadPool(2); + //java.util.concurrent.Executors.newFixedThreadPool(4); //new FixedWorkersScheduler(2); new SpareWorkerScheduler2 //new SpareWorkerScheduler @@ -41,17 +28,45 @@ object Scheduler /*extends java.util.concurrent.Executor*/ { sched = scheduler } - def execute(item: ReceiverTask) = + def execute(item: ReceiverTask) = synchronized { sched.execute(item) + } - def tick(a: MailBox) = + def tick(a: MailBox) = { sched.tick(a) + } - def getProcess(t: Thread): Process = - sched.getProcess(t) + private val process = new HashMap[Thread, MailBox] + + def getProcess(t: Thread): Process = synchronized { + process.get(t) match { + case None => null + case Some(p: Process) => p + } + } + + def setProcess(t: Thread, m: MailBox) = synchronized { + process.update(t, m) + } } +/** + * @author Philipp Haller + */ +abstract class IScheduler /*extends java.util.concurrent.Executor*/ { + def execute(item: ReceiverTask): Unit + def getTask(worker: WorkerThread): Runnable + def tick(a: MailBox): Unit + + val QUIT_TASK = new Runnable() { + def run(): Unit = {} + override def toString() = "QUIT_TASK" + } +} +/** + * @author Philipp Haller + */ class SpareWorkerScheduler2 extends IScheduler { private val tasks = new Queue[ReceiverTask]; private var workers: Buffer[WorkerThread] = new ArrayBuffer[WorkerThread]; @@ -59,7 +74,6 @@ class SpareWorkerScheduler2 extends IScheduler { val idle = new Queue[WorkerThread]; val ticks = new HashMap[WorkerThread, long] val executing = new HashMap[MailBox, WorkerThread] - val rexec = new HashMap[Thread, MailBox] var TICKFREQ = 50 @@ -84,18 +98,10 @@ class SpareWorkerScheduler2 extends IScheduler { } } - def getProcess(t: Thread): Process = synchronized { - rexec.get(t) match { - case None => null - case Some(p: Process) => p - } - } - def execute(item: ReceiverTask): unit = synchronized { if (idle.length > 0) { val wt = idle.dequeue executing.update(item.actor, wt) - rexec.update(wt, item.actor) wt.execute(item) } else { @@ -127,7 +133,6 @@ class SpareWorkerScheduler2 extends IScheduler { maxWorkers = workers.length // statistics executing.update(item.actor, newWorker) - rexec.update(newWorker, item.actor) newWorker.execute(item) newWorker.start() @@ -144,7 +149,6 @@ class SpareWorkerScheduler2 extends IScheduler { if (tasks.length > 0) { val item = tasks.dequeue executing.update(item.actor, worker) - rexec.update(worker, item.actor) item } else { diff --git a/src/actors/scala/actors/multi/TimerThread.scala b/src/actors/scala/actors/multi/TimerThread.scala index 436b27354f..d344e7acef 100644 --- a/src/actors/scala/actors/multi/TimerThread.scala +++ b/src/actors/scala/actors/multi/TimerThread.scala @@ -102,8 +102,8 @@ object TimerThread extends AnyRef with Runnable { notify() } else if (queue.max.time > wakeTime) { // add to 1st position and restart sleeping - queue += WakedActor (a, wakeTime, "") - notify() + queue += WakedActor (a, wakeTime, "") + notify() } else // simply add to queue queue += WakedActor (a, wakeTime, "") |