diff options
author | Philipp Haller <hallerp@gmail.com> | 2007-01-21 14:18:54 +0000 |
---|---|---|
committer | Philipp Haller <hallerp@gmail.com> | 2007-01-21 14:18:54 +0000 |
commit | 9e58ed4d399947b263b46bb250ca0b645ea55c80 (patch) | |
tree | 1c73ae7b43ccf560ab4a97b89cf8e42bc8c0c629 /src/actors | |
parent | e4282e0148efd03f06eb03d347d110c15ab28ab9 (diff) | |
download | scala-9e58ed4d399947b263b46bb250ca0b645ea55c80.tar.gz scala-9e58ed4d399947b263b46bb250ca0b645ea55c80.tar.bz2 scala-9e58ed4d399947b263b46bb250ca0b645ea55c80.zip |
Changed sender stack and links to be simple Lists.
Diffstat (limited to 'src/actors')
-rw-r--r-- | src/actors/scala/actors/Actor.scala | 291 | ||||
-rw-r--r-- | src/actors/scala/actors/JDK5Scheduler.scala | 2 | ||||
-rw-r--r-- | src/actors/scala/actors/Reaction.scala | 39 | ||||
-rw-r--r-- | src/actors/scala/actors/Scheduler.scala | 13 |
4 files changed, 142 insertions, 203 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala index 3fce66b1a8..f07cdd92db 100644 --- a/src/actors/scala/actors/Actor.scala +++ b/src/actors/scala/actors/Actor.scala @@ -13,8 +13,6 @@ package scala.actors import scala.collection.mutable.{HashSet, Queue} import compat.Platform -import java.util.Stack - /** * The <code>Actor</code> object provides functions for the definition of * actors, as well as all actor operations, such as @@ -264,15 +262,14 @@ trait Actor extends OutputChannel[Any] { private[actors] var waitingFor: Any => boolean = waitingForNone private[actors] var isSuspended = false - private val sessions = new Stack//[Channel[Any]] - - private val mailbox = new Queue[Pair[Any, Channel[Any]]] + private val mailbox = new MessageQueue + private var sessions: List[Channel[Any]] = Nil private def send(msg: Any, session: Channel[Any]) = synchronized { tick() if (waitingFor(msg)) { received = Some(msg) - sessions push session + sessions = session :: sessions waitingFor = waitingForNone if (timeoutPending) { @@ -285,90 +282,84 @@ trait Actor extends OutputChannel[Any] { else scheduleActor(null, msg) } else { - mailbox += Pair(msg, session) + mailbox.append(msg, session) } } def receive[R](f: PartialFunction[Any, R]): R = { assert(Actor.self == this, "receive from channel belonging to other actor") + // links + if (shouldExit) exit() this.synchronized { tick() - mailbox.dequeueFirst((p: Pair[Any, Channel[Any]]) => { - f.isDefinedAt(p._1) - }) match { - case Some(Pair(msg, session)) => { - received = Some(msg) - sessions push session - } - case None => { - waitingFor = f.isDefinedAt - isSuspended = true - suspendActor() - } + val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m)) + if (null eq qel) { + waitingFor = f.isDefinedAt + isSuspended = true + suspendActor() + } else { + received = Some(qel.msg) + sessions = qel.session :: sessions } waitingFor = waitingForNone isSuspended = false } val result = f(received.get) - sessions.pop + sessions = sessions.tail result } def receiveWithin[R](msec: long)(f: PartialFunction[Any, R]): R = { assert(Actor.self == this, "receive from channel belonging to other actor") + // links + if (shouldExit) exit() this.synchronized { tick() - mailbox.dequeueFirst((p: Pair[Any, Channel[Any]]) => { - f.isDefinedAt(p._1) - }) match { - case Some(Pair(msg, session)) => { - received = Some(msg) - sessions push session - } - case None => { - waitingFor = f.isDefinedAt - isSuspended = true - received = None - suspendActorFor(msec) - Debug.info("received: "+received) - if (received.isEmpty) { - Debug.info("no message received after "+msec+" millis") - if (f.isDefinedAt(TIMEOUT)) { - Debug.info("executing TIMEOUT action") - waitingFor = waitingForNone - isSuspended = false - val result = f(TIMEOUT) - return result - } - else - error("unhandled timeout") + val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m)) + if (null eq qel) { + waitingFor = f.isDefinedAt + isSuspended = true + received = None + suspendActorFor(msec) + Debug.info("received: "+received) + if (received.isEmpty) { + Debug.info("no message received after "+msec+" millis") + if (f.isDefinedAt(TIMEOUT)) { + Debug.info("executing TIMEOUT action") + waitingFor = waitingForNone + isSuspended = false + val result = f(TIMEOUT) + return result } + else + error("unhandled timeout") } + } else { + received = Some(qel.msg) + sessions = qel.session :: sessions } waitingFor = waitingForNone isSuspended = false } val result = f(received.get) - sessions.pop + sessions = sessions.tail result } def react(f: PartialFunction[Any, Unit]): Nothing = { assert(Actor.self == this, "react on channel belonging to other actor") + // links + if (shouldExit) exit() Scheduler.pendReaction this.synchronized { tick() - mailbox.dequeueFirst((p: Pair[Any, Channel[Any]]) => { - f.isDefinedAt(p._1) - }) match { - case Some(Pair(msg, session)) => { - sessions push session - scheduleActor(f, msg) - } - case None => { - waitingFor = f.isDefinedAt - detachActor(f) - } + val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m)) + if (null eq qel) { + waitingFor = f.isDefinedAt + detachActor(f) + } else { + sessions = qel.session :: sessions + scheduleActor(f, qel.msg) } throw new SuspendActorException } @@ -376,22 +367,20 @@ trait Actor extends OutputChannel[Any] { def reactWithin(msec: long)(f: PartialFunction[Any, Unit]): Nothing = { assert(Actor.self == this, "react on channel belonging to other actor") + // links + if (shouldExit) exit() Scheduler.pendReaction this.synchronized { tick() - mailbox.dequeueFirst((p: Pair[Any, Channel[Any]]) => { - f.isDefinedAt(p._1) - }) match { - case Some(Pair(msg, session)) => { - sessions push session - scheduleActor(f, msg) - } - case None => { - waitingFor = f.isDefinedAt - TimerThread.requestTimeout(this, f, msec) - timeoutPending = true - detachActor(f) - } + val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m)) + if (null eq qel) { + waitingFor = f.isDefinedAt + TimerThread.requestTimeout(this, f, msec) + timeoutPending = true + detachActor(f) + } else { + sessions = qel.session :: sessions + scheduleActor(f, qel.msg) } throw new SuspendActorException } @@ -444,12 +433,12 @@ trait Actor extends OutputChannel[Any] { } private[actors] def sender: Actor = - if (sessions.empty) null - else sessions.peek.asInstanceOf[Channel[Any]].receiver + if (sessions.isEmpty) null + else sessions.head.asInstanceOf[Channel[Any]].receiver private[actors] def session: Channel[Any] = - if (sessions.empty) null - else sessions.peek.asInstanceOf[Channel[Any]] + if (sessions.isEmpty) null + else sessions.head.asInstanceOf[Channel[Any]] private[actors] var continuation: PartialFunction[Any, Unit] = null @@ -490,6 +479,8 @@ trait Actor extends OutputChannel[Any] { case _: InterruptedException => } } + // links: check if we should exit + if (shouldExit) exit() } def suspendActorFor(msec: long) { @@ -515,6 +506,8 @@ trait Actor extends OutputChannel[Any] { if (!fromExc) throw new ExitSuspendLoop } } catch { case _: ExitSuspendLoop => } + // links: check if we should exit + if (shouldExit) exit() } def resumeActor() { @@ -530,7 +523,7 @@ trait Actor extends OutputChannel[Any] { Scheduler start new Reaction(this) } - private val links = new HashSet[Actor] + private[actors] var links: List[Actor] = Nil /** * Links <code>self</code> to actor <code>to</code>. @@ -539,7 +532,7 @@ trait Actor extends OutputChannel[Any] { * @return ... */ def link(to: Actor): Actor = { - links += to + links = to :: links to.linkTo(this) to } @@ -556,23 +549,26 @@ trait Actor extends OutputChannel[Any] { actor } - private[actors] def linkTo(to: Actor): Unit = - links += to + private[actors] def linkTo(to: Actor) { + links = to :: links + } /** Unlinks <code>self</code> from actor <code>from</code>. */ - def unlink(from: Actor): Unit = { - links -= from + def unlink(from: Actor) { + links = links.remove(from.==) from.unlinkFrom(this) } - private[actors] def unlinkFrom(from: Actor): Unit = - links -= from + private[actors] def unlinkFrom(from: Actor) { + links = links.remove(from.==) + } var trapExit = false - - private[actors] var exitReason: String = "" + private[actors] var exitReason: String = "normal" + private[actors] var exiting = false + private[actors] var shouldExit = false /** * <p> @@ -593,49 +589,51 @@ trait Actor extends OutputChannel[Any] { */ def exit(reason: String): Nothing = { kill() - exitReason = reason - //currentThread.interrupt() + // links + if (!links.isEmpty) { + exitReason = reason + exitLinked() + } throw new ExitActorException } - private[actors] def exit(from: Actor, reason: String): Unit = { - if (from == this) { - exit(reason) - } - else { - if (trapExit) - this ! Exit(from, reason) - else if (!reason.equals("normal")) - exit(reason) + def exit(): Nothing = { + kill() + // links + if (!links.isEmpty) { + exitLinked() } + throw new ExitActorException } - private[actors] def exitLinked(): Unit = - exitLinked(exitReason, new HashSet[Actor]) - - private[actors] def exitLinked(reason: String): Unit = - exitLinked(reason, new HashSet[Actor]) + // Assume !links.isEmpty + private[actors] def exitLinked() { + exiting = true + // remove this from links + links = links.remove(this.==) + // exit linked processes + links.foreach((linked: Actor) => { + unlink(linked) + if (!linked.exiting) + linked.exit(this, exitReason) + }) + } - private[actors] def exitLinked(reason: String, - exitMarks: HashSet[Actor]): Unit = { - if (exitMarks contains this) { - // we are marked, do nothing - } - else { - exitMarks += this // mark this as exiting - // exit linked processes - val iter = links.elements - while (iter.hasNext) { - val linked = iter.next - unlink(linked) - linked.exit(this, reason) + // Assume !this.exiting + private[actors] def exit(from: Actor, reason: String) { + if (trapExit) + this ! Exit(from, reason) + else if (!reason.equals("normal")) + this.synchronized { + shouldExit = true + exitReason = reason + if (isSuspended) + resumeActor() + else if (isDetached) + scheduleActor(null, null) } - exitMarks -= this - - // unregister in scheduler - Scheduler terminated this - } } + } @@ -650,62 +648,3 @@ trait Actor extends OutputChannel[Any] { */ case class Exit(from: Actor, reason: String) - -/** - * This class is used by our efficient message queue - * implementation. - */ -private[actors] abstract class MessageQueueResult[Msg] { - def msg: Msg - def sender: Actor -} - -/** - * The class <code>MessageQueue</code> provides an efficient - * implementation of a message queue specialized for this actor - * library. Classes in this package are supposed to be the only - * clients of this class. - * - * @author Martin Odersky, Philipp Haller - */ -private[actors] class MessageQueue[Msg] extends MessageQueueResult[Msg] { - var msg: Msg = _ - var sender: Actor = _ - private var next: MessageQueue[Msg] = this - - def append(msg: Msg, sender: Actor) = { - val q = new MessageQueue[Msg] - q.msg = msg - q.sender = sender - q.next = next - next = q - } - - def extractFirst(p: Msg => boolean): MessageQueueResult[Msg] = { - var q = this - var qnext = q.next - while (qnext != this) { - if (p(qnext.msg)) { - q.next = qnext.next - return qnext - } - q = qnext - qnext = qnext.next - } - null - } - - def dequeueFirst(p: MessageQueueResult[Msg] => boolean): MessageQueueResult[Msg] = { - var q = this - var qnext = q.next - while (qnext != this) { - if (p(qnext)) { - q.next = qnext.next - return qnext - } - q = qnext - qnext = qnext.next - } - null - } -} diff --git a/src/actors/scala/actors/JDK5Scheduler.scala b/src/actors/scala/actors/JDK5Scheduler.scala index f3496585fb..8361fc43fb 100644 --- a/src/actors/scala/actors/JDK5Scheduler.scala +++ b/src/actors/scala/actors/JDK5Scheduler.scala @@ -115,7 +115,7 @@ class JDK5Scheduler(initCoreSize: int, maxSize: int) extends Thread with ISchedu executor.shutdown() // terminate timer thread TimerThread.t.interrupt() - Console.println("threads used: "+coreSize) + //Debug.info("threads used: "+coreSize) throw new QuitException } } diff --git a/src/actors/scala/actors/Reaction.scala b/src/actors/scala/actors/Reaction.scala index 9bb6c849f5..e8d9ee7c72 100644 --- a/src/actors/scala/actors/Reaction.scala +++ b/src/actors/scala/actors/Reaction.scala @@ -20,7 +20,7 @@ class ExitActorException extends Throwable * an instance of an <code>Actor</code> with a * <code>java.lang.Runnable</code>. * - * @version 0.9.0 + * @version 0.9.2 * @author Philipp Haller */ private[actors] class Reaction(a: Actor, @@ -39,25 +39,29 @@ private[actors] class Reaction(a: Actor, a.isDetached = false try { try { - if (f == null) - a.act() - else - f(msg) - a.exit("normal") + // links + if (a.shouldExit) + a.exit() + else { + if (f == null) + a.act() + else + f(msg) + a.exit() + } } catch { case _: ExitActorException => - throw new InterruptedException } } catch { - case ie: InterruptedException => { - a.exitLinked() - } - case d: SuspendActorException => { + case _: SuspendActorException => { // do nothing (continuation is already saved) } - case t: Throwable => { - a.exitLinked() + case _: Throwable => { + // links + if (!a.links.isEmpty) { + a.exitLinked() + } } } /*finally { @@ -65,13 +69,4 @@ private[actors] class Reaction(a: Actor, }*/ } - private var runnable = false - - def isRunnable = synchronized { - runnable - } - - def setRunnable(on: boolean) = synchronized { - runnable = on - } } diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala index f61d44a5a7..eb5e92ef88 100644 --- a/src/actors/scala/actors/Scheduler.scala +++ b/src/actors/scala/actors/Scheduler.scala @@ -43,10 +43,15 @@ object Scheduler { s = if (olderThanJDK5) new TickedScheduler else { - val corePoolSize = - Integer.parseInt(java.lang.System.getProperty("actors.corePoolSize")) - val maxPoolSize = - Integer.parseInt(java.lang.System.getProperty("actors.maxPoolSize")) + var corePoolSize = 4 + var maxPoolSize = 16 + val prop = java.lang.System.getProperty("actors.corePoolSize") + if (null ne prop) { + corePoolSize = + Integer.parseInt(java.lang.System.getProperty("actors.corePoolSize")) + maxPoolSize = + Integer.parseInt(java.lang.System.getProperty("actors.maxPoolSize")) + } new JDK5Scheduler(corePoolSize, maxPoolSize) } s.start() |