diff options
author | Philipp Haller <hallerp@gmail.com> | 2010-03-08 15:01:53 +0000 |
---|---|---|
committer | Philipp Haller <hallerp@gmail.com> | 2010-03-08 15:01:53 +0000 |
commit | 57261cf375a8442a267b918ed582af526f8491fa (patch) | |
tree | 7198dd7ee431697803bf865ccb7343aa2f939664 /src/actors/scala/actors/Actor.scala | |
parent | 13f24056a444fd5038cebdb294a0959bfe979492 (diff) | |
download | scala-57261cf375a8442a267b918ed582af526f8491fa.tar.gz scala-57261cf375a8442a267b918ed582af526f8491fa.tar.bz2 scala-57261cf375a8442a267b918ed582af526f8491fa.zip |
Reactor now has type parameter.
Diffstat (limited to 'src/actors/scala/actors/Actor.scala')
-rw-r--r-- | src/actors/scala/actors/Actor.scala | 201 |
1 files changed, 82 insertions, 119 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala index 6c8647daaa..69e3bd243e 100644 --- a/src/actors/scala/actors/Actor.scala +++ b/src/actors/scala/actors/Actor.scala @@ -24,7 +24,35 @@ import java.util.concurrent.{ExecutionException, Callable} */ object Actor extends Combinators { - private[actors] val tl = new ThreadLocal[Reactor] + /** An actor state. An actor can be in one of the following states: + * <ul> + * <li>New<br> + * An actor that has not yet started is in this state.</li> + * <li>Runnable<br> + * An actor executing is in this state.</li> + * <li>Suspended<br> + * An actor that is suspended waiting in a react is in this state.</li> + * <li>TimedSuspended<br> + * An actor that is suspended waiting in a reactWithin is in this state.</li> + * <li>Blocked<br> + * An actor that is blocked waiting in a receive is in this state.</li> + * <li>TimedBlocked<br> + * An actor that is blocked waiting in a receiveWithin is in this state.</li> + * <li>Terminated<br> + * An actor that has terminated is in this state.</li> + * </ul> + */ + object State extends Enumeration { + val New, + Runnable, + Suspended, + TimedSuspended, + Blocked, + TimedBlocked, + Terminated = Value + } + + private[actors] val tl = new ThreadLocal[ReplyReactor] // timer thread runs as daemon private[actors] val timer = new Timer(true) @@ -43,9 +71,10 @@ object Actor extends Combinators { private[actors] def self(sched: IScheduler): Actor = rawSelf(sched).asInstanceOf[Actor] - private[actors] def rawSelf: Reactor = rawSelf(Scheduler) + private[actors] def rawSelf: ReplyReactor = + rawSelf(Scheduler) - private[actors] def rawSelf(sched: IScheduler): Reactor = { + private[actors] def rawSelf(sched: IScheduler): ReplyReactor = { val s = tl.get if (s eq null) { val r = new ActorProxy(currentThread, sched) @@ -208,7 +237,7 @@ object Actor extends Combinators { def eventloop(f: PartialFunction[Any, Unit]): Nothing = rawSelf.react(new RecursiveProxyHandler(rawSelf, f)) - private class RecursiveProxyHandler(a: Reactor, f: PartialFunction[Any, Unit]) + private class RecursiveProxyHandler(a: ReplyReactor, f: PartialFunction[Any, Unit]) extends PartialFunction[Any, Unit] { def isDefinedAt(m: Any): Boolean = true // events are immediately removed from the mailbox @@ -222,21 +251,21 @@ object Actor extends Combinators { * Returns the actor which sent the last received message. */ def sender: OutputChannel[Any] = - rawSelf.asInstanceOf[ReplyReactor].sender + rawSelf.sender /** * Send <code>msg</code> to the actor waiting in a call to * <code>!?</code>. */ def reply(msg: Any): Unit = - rawSelf.asInstanceOf[ReplyReactor].reply(msg) + rawSelf.reply(msg) /** * Send <code>()</code> to the actor waiting in a call to * <code>!?</code>. */ def reply(): Unit = - rawSelf.asInstanceOf[ReplyReactor].reply(()) + rawSelf.reply(()) /** * Returns the number of messages in <code>self</code>'s mailbox @@ -328,7 +357,7 @@ object Actor extends Combinators { * <code>Exit(self, 'normal)</code> to <code>a</code>. * </p> */ - def exit(): Nothing = self.exit() + def exit(): Nothing = rawSelf.exit() } @@ -371,12 +400,6 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor { @volatile private var received: Option[Any] = None - /* This option holds a TimerTask when the actor waits in a - * reactWithin/receiveWithin. The TimerTask is cancelled when - * the actor can continue. - */ - private var onTimeout: Option[TimerTask] = None - protected[actors] override def scheduler: IScheduler = Scheduler private[actors] override def startSearch(msg: Any, replyTo: OutputChannel[Any], handler: PartialFunction[Any, Any]) = @@ -390,17 +413,6 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor { private[actors] override def makeReaction(fun: () => Unit): Runnable = new ActorTask(this, fun) - private[actors] override def resumeReceiver(item: (Any, OutputChannel[Any]), handler: PartialFunction[Any, Any], onSameThread: Boolean) { - synchronized { - if (!onTimeout.isEmpty) { - onTimeout.get.cancel() - onTimeout = None - } - } - senders = List(item._2) - super.resumeReceiver(item, handler, onSameThread) - } - /** * Receives a message from this actor's mailbox. * @@ -530,89 +542,18 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor { result } - /** - * Receives a message from this actor's mailbox. - * <p> - * This method never returns. Therefore, the rest of the computation - * has to be contained in the actions of the partial function. - * - * @param f a partial function with message patterns and actions - */ - override def react(f: PartialFunction[Any, Unit]): Nothing = { - assert(Actor.self(scheduler) == this, "react on channel belonging to other actor") + override def react(handler: PartialFunction[Any, Unit]): Nothing = { synchronized { - if (shouldExit) exit() // links - drainSendBuffer(mailbox) + if (shouldExit) exit() } - searchMailbox(mailbox, f, false) - throw Actor.suspendException + super.react(handler) } - /** - * Receives a message from this actor's mailbox within a certain - * time span. - * <p> - * This method never returns. Therefore, the rest of the computation - * has to be contained in the actions of the partial function. - * - * @param msec the time span before timeout - * @param f a partial function with message patterns and actions - */ - def reactWithin(msec: Long)(f: PartialFunction[Any, Unit]): Nothing = { - assert(Actor.self(scheduler) == this, "react on channel belonging to other actor") - + override def reactWithin(msec: Long)(handler: PartialFunction[Any, Unit]): Nothing = { synchronized { - if (shouldExit) exit() // links - drainSendBuffer(mailbox) - } - - // first, remove spurious TIMEOUT message from mailbox if any - mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => m == TIMEOUT) - - val receiveTimeout = () => { - if (f.isDefinedAt(TIMEOUT)) { - senders = List(this) - scheduleActor(f, TIMEOUT) - } else - error("unhandled timeout") - } - - var done = false - while (!done) { - val qel = mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => { - senders = List(replyTo) - f.isDefinedAt(m) - }) - if (null eq qel) { - val todo = synchronized { - // in mean time new stuff might have arrived - if (!sendBuffer.isEmpty) { - drainSendBuffer(mailbox) - // keep going - () => {} - } else if (msec == 0L) { - done = true - receiveTimeout - } else { - waitingFor = f - val thisActor = this - onTimeout = Some(new TimerTask { - def run() { thisActor.send(TIMEOUT, thisActor) } - }) - Actor.timer.schedule(onTimeout.get, msec) - done = true - () => {} - } - } - todo() - } else { - senders = List(qel.session) - scheduleActor(f, qel.msg) - done = true - } + if (shouldExit) exit() } - - throw Actor.suspendException + super.reactWithin(msec)(handler) } /** @@ -660,25 +601,44 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor { notify() } + private[actors] override def exiting = synchronized { + _state == Actor.State.Terminated + } + /** * Starts this actor. */ override def start(): Actor = synchronized { - // Reset various flags. - // - // Note that we do *not* reset `trapExit`. The reason is that - // users should be able to set the field in the constructor - // and before `act` is called. - exitReason = 'normal - exiting = false - shouldExit = false + if (_state == Actor.State.New) { + _state = Actor.State.Runnable + + // Reset various flags. + // + // Note that we do *not* reset `trapExit`. The reason is that + // users should be able to set the field in the constructor + // and before `act` is called. + exitReason = 'normal + shouldExit = false - scheduler newActor this - scheduler execute (new Reaction(this)) + scheduler newActor this + scheduler execute (new Reaction(this)) - this + this + } else + this } + override def getState: Actor.State.Value = synchronized { + if (isSuspended) { + if (onTimeout.isEmpty) + Actor.State.Blocked + else + Actor.State.TimedBlocked + } else + super.getState + } + + // guarded by this private[actors] var links: List[AbstractActor] = Nil /** @@ -728,8 +688,11 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor { links = links.filterNot(from.==) } + @volatile var trapExit = false + // guarded by this private var exitReason: AnyRef = 'normal + // guarded by this private[actors] var shouldExit = false /** @@ -749,7 +712,7 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor { * <code>reason != 'normal</code>. * </p> */ - protected[actors] def exit(reason: AnyRef): Nothing = { + protected[actors] def exit(reason: AnyRef): Nothing = synchronized { exitReason = reason exit() } @@ -757,17 +720,16 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor { /** * Terminates with exit reason <code>'normal</code>. */ - protected[actors] override def exit(): Nothing = { - // links + protected[actors] override def exit(): Nothing = synchronized { if (!links.isEmpty) exitLinked() - terminated() - throw Actor.suspendException + super.exit() } // Assume !links.isEmpty + // guarded by this private[actors] def exitLinked() { - exiting = true + _state = Actor.State.Terminated // remove this from links val mylinks = links.filterNot(this.==) // exit linked processes @@ -779,6 +741,7 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor { } // Assume !links.isEmpty + // guarded by this private[actors] def exitLinked(reason: AnyRef) { exitReason = reason exitLinked() |