summaryrefslogtreecommitdiff
path: root/src/actors/scala/actors/Actor.scala
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2010-03-08 15:01:53 +0000
committerPhilipp Haller <hallerp@gmail.com>2010-03-08 15:01:53 +0000
commit57261cf375a8442a267b918ed582af526f8491fa (patch)
tree7198dd7ee431697803bf865ccb7343aa2f939664 /src/actors/scala/actors/Actor.scala
parent13f24056a444fd5038cebdb294a0959bfe979492 (diff)
downloadscala-57261cf375a8442a267b918ed582af526f8491fa.tar.gz
scala-57261cf375a8442a267b918ed582af526f8491fa.tar.bz2
scala-57261cf375a8442a267b918ed582af526f8491fa.zip
Reactor now has type parameter.
Diffstat (limited to 'src/actors/scala/actors/Actor.scala')
-rw-r--r--src/actors/scala/actors/Actor.scala201
1 files changed, 82 insertions, 119 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala
index 6c8647daaa..69e3bd243e 100644
--- a/src/actors/scala/actors/Actor.scala
+++ b/src/actors/scala/actors/Actor.scala
@@ -24,7 +24,35 @@ import java.util.concurrent.{ExecutionException, Callable}
*/
object Actor extends Combinators {
- private[actors] val tl = new ThreadLocal[Reactor]
+ /** An actor state. An actor can be in one of the following states:
+ * <ul>
+ * <li>New<br>
+ * An actor that has not yet started is in this state.</li>
+ * <li>Runnable<br>
+ * An actor executing is in this state.</li>
+ * <li>Suspended<br>
+ * An actor that is suspended waiting in a react is in this state.</li>
+ * <li>TimedSuspended<br>
+ * An actor that is suspended waiting in a reactWithin is in this state.</li>
+ * <li>Blocked<br>
+ * An actor that is blocked waiting in a receive is in this state.</li>
+ * <li>TimedBlocked<br>
+ * An actor that is blocked waiting in a receiveWithin is in this state.</li>
+ * <li>Terminated<br>
+ * An actor that has terminated is in this state.</li>
+ * </ul>
+ */
+ object State extends Enumeration {
+ val New,
+ Runnable,
+ Suspended,
+ TimedSuspended,
+ Blocked,
+ TimedBlocked,
+ Terminated = Value
+ }
+
+ private[actors] val tl = new ThreadLocal[ReplyReactor]
// timer thread runs as daemon
private[actors] val timer = new Timer(true)
@@ -43,9 +71,10 @@ object Actor extends Combinators {
private[actors] def self(sched: IScheduler): Actor =
rawSelf(sched).asInstanceOf[Actor]
- private[actors] def rawSelf: Reactor = rawSelf(Scheduler)
+ private[actors] def rawSelf: ReplyReactor =
+ rawSelf(Scheduler)
- private[actors] def rawSelf(sched: IScheduler): Reactor = {
+ private[actors] def rawSelf(sched: IScheduler): ReplyReactor = {
val s = tl.get
if (s eq null) {
val r = new ActorProxy(currentThread, sched)
@@ -208,7 +237,7 @@ object Actor extends Combinators {
def eventloop(f: PartialFunction[Any, Unit]): Nothing =
rawSelf.react(new RecursiveProxyHandler(rawSelf, f))
- private class RecursiveProxyHandler(a: Reactor, f: PartialFunction[Any, Unit])
+ private class RecursiveProxyHandler(a: ReplyReactor, f: PartialFunction[Any, Unit])
extends PartialFunction[Any, Unit] {
def isDefinedAt(m: Any): Boolean =
true // events are immediately removed from the mailbox
@@ -222,21 +251,21 @@ object Actor extends Combinators {
* Returns the actor which sent the last received message.
*/
def sender: OutputChannel[Any] =
- rawSelf.asInstanceOf[ReplyReactor].sender
+ rawSelf.sender
/**
* Send <code>msg</code> to the actor waiting in a call to
* <code>!?</code>.
*/
def reply(msg: Any): Unit =
- rawSelf.asInstanceOf[ReplyReactor].reply(msg)
+ rawSelf.reply(msg)
/**
* Send <code>()</code> to the actor waiting in a call to
* <code>!?</code>.
*/
def reply(): Unit =
- rawSelf.asInstanceOf[ReplyReactor].reply(())
+ rawSelf.reply(())
/**
* Returns the number of messages in <code>self</code>'s mailbox
@@ -328,7 +357,7 @@ object Actor extends Combinators {
* <code>Exit(self, 'normal)</code> to <code>a</code>.
* </p>
*/
- def exit(): Nothing = self.exit()
+ def exit(): Nothing = rawSelf.exit()
}
@@ -371,12 +400,6 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor {
@volatile
private var received: Option[Any] = None
- /* This option holds a TimerTask when the actor waits in a
- * reactWithin/receiveWithin. The TimerTask is cancelled when
- * the actor can continue.
- */
- private var onTimeout: Option[TimerTask] = None
-
protected[actors] override def scheduler: IScheduler = Scheduler
private[actors] override def startSearch(msg: Any, replyTo: OutputChannel[Any], handler: PartialFunction[Any, Any]) =
@@ -390,17 +413,6 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor {
private[actors] override def makeReaction(fun: () => Unit): Runnable =
new ActorTask(this, fun)
- private[actors] override def resumeReceiver(item: (Any, OutputChannel[Any]), handler: PartialFunction[Any, Any], onSameThread: Boolean) {
- synchronized {
- if (!onTimeout.isEmpty) {
- onTimeout.get.cancel()
- onTimeout = None
- }
- }
- senders = List(item._2)
- super.resumeReceiver(item, handler, onSameThread)
- }
-
/**
* Receives a message from this actor's mailbox.
*
@@ -530,89 +542,18 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor {
result
}
- /**
- * Receives a message from this actor's mailbox.
- * <p>
- * This method never returns. Therefore, the rest of the computation
- * has to be contained in the actions of the partial function.
- *
- * @param f a partial function with message patterns and actions
- */
- override def react(f: PartialFunction[Any, Unit]): Nothing = {
- assert(Actor.self(scheduler) == this, "react on channel belonging to other actor")
+ override def react(handler: PartialFunction[Any, Unit]): Nothing = {
synchronized {
- if (shouldExit) exit() // links
- drainSendBuffer(mailbox)
+ if (shouldExit) exit()
}
- searchMailbox(mailbox, f, false)
- throw Actor.suspendException
+ super.react(handler)
}
- /**
- * Receives a message from this actor's mailbox within a certain
- * time span.
- * <p>
- * This method never returns. Therefore, the rest of the computation
- * has to be contained in the actions of the partial function.
- *
- * @param msec the time span before timeout
- * @param f a partial function with message patterns and actions
- */
- def reactWithin(msec: Long)(f: PartialFunction[Any, Unit]): Nothing = {
- assert(Actor.self(scheduler) == this, "react on channel belonging to other actor")
-
+ override def reactWithin(msec: Long)(handler: PartialFunction[Any, Unit]): Nothing = {
synchronized {
- if (shouldExit) exit() // links
- drainSendBuffer(mailbox)
- }
-
- // first, remove spurious TIMEOUT message from mailbox if any
- mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => m == TIMEOUT)
-
- val receiveTimeout = () => {
- if (f.isDefinedAt(TIMEOUT)) {
- senders = List(this)
- scheduleActor(f, TIMEOUT)
- } else
- error("unhandled timeout")
- }
-
- var done = false
- while (!done) {
- val qel = mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => {
- senders = List(replyTo)
- f.isDefinedAt(m)
- })
- if (null eq qel) {
- val todo = synchronized {
- // in mean time new stuff might have arrived
- if (!sendBuffer.isEmpty) {
- drainSendBuffer(mailbox)
- // keep going
- () => {}
- } else if (msec == 0L) {
- done = true
- receiveTimeout
- } else {
- waitingFor = f
- val thisActor = this
- onTimeout = Some(new TimerTask {
- def run() { thisActor.send(TIMEOUT, thisActor) }
- })
- Actor.timer.schedule(onTimeout.get, msec)
- done = true
- () => {}
- }
- }
- todo()
- } else {
- senders = List(qel.session)
- scheduleActor(f, qel.msg)
- done = true
- }
+ if (shouldExit) exit()
}
-
- throw Actor.suspendException
+ super.reactWithin(msec)(handler)
}
/**
@@ -660,25 +601,44 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor {
notify()
}
+ private[actors] override def exiting = synchronized {
+ _state == Actor.State.Terminated
+ }
+
/**
* Starts this actor.
*/
override def start(): Actor = synchronized {
- // Reset various flags.
- //
- // Note that we do *not* reset `trapExit`. The reason is that
- // users should be able to set the field in the constructor
- // and before `act` is called.
- exitReason = 'normal
- exiting = false
- shouldExit = false
+ if (_state == Actor.State.New) {
+ _state = Actor.State.Runnable
+
+ // Reset various flags.
+ //
+ // Note that we do *not* reset `trapExit`. The reason is that
+ // users should be able to set the field in the constructor
+ // and before `act` is called.
+ exitReason = 'normal
+ shouldExit = false
- scheduler newActor this
- scheduler execute (new Reaction(this))
+ scheduler newActor this
+ scheduler execute (new Reaction(this))
- this
+ this
+ } else
+ this
}
+ override def getState: Actor.State.Value = synchronized {
+ if (isSuspended) {
+ if (onTimeout.isEmpty)
+ Actor.State.Blocked
+ else
+ Actor.State.TimedBlocked
+ } else
+ super.getState
+ }
+
+ // guarded by this
private[actors] var links: List[AbstractActor] = Nil
/**
@@ -728,8 +688,11 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor {
links = links.filterNot(from.==)
}
+ @volatile
var trapExit = false
+ // guarded by this
private var exitReason: AnyRef = 'normal
+ // guarded by this
private[actors] var shouldExit = false
/**
@@ -749,7 +712,7 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor {
* <code>reason != 'normal</code>.
* </p>
*/
- protected[actors] def exit(reason: AnyRef): Nothing = {
+ protected[actors] def exit(reason: AnyRef): Nothing = synchronized {
exitReason = reason
exit()
}
@@ -757,17 +720,16 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor {
/**
* Terminates with exit reason <code>'normal</code>.
*/
- protected[actors] override def exit(): Nothing = {
- // links
+ protected[actors] override def exit(): Nothing = synchronized {
if (!links.isEmpty)
exitLinked()
- terminated()
- throw Actor.suspendException
+ super.exit()
}
// Assume !links.isEmpty
+ // guarded by this
private[actors] def exitLinked() {
- exiting = true
+ _state = Actor.State.Terminated
// remove this from links
val mylinks = links.filterNot(this.==)
// exit linked processes
@@ -779,6 +741,7 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor {
}
// Assume !links.isEmpty
+ // guarded by this
private[actors] def exitLinked(reason: AnyRef) {
exitReason = reason
exitLinked()