summaryrefslogtreecommitdiff
path: root/src/actors
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2009-05-27 22:45:16 +0000
committerPhilipp Haller <hallerp@gmail.com>2009-05-27 22:45:16 +0000
commit359d4609491a10543c08f2e65db0ec0f05b52705 (patch)
tree54c2383a9168c208a5920cfc08c890ed47baee16 /src/actors
parent5abe77233b683a935860b6276db71669ff2c263e (diff)
downloadscala-359d4609491a10543c08f2e65db0ec0f05b52705.tar.gz
scala-359d4609491a10543c08f2e65db0ec0f05b52705.tar.bz2
scala-359d4609491a10543c08f2e65db0ec0f05b52705.zip
Commented instance vars of Actor trait and comp...
Commented instance vars of Actor trait and completed some scaladocs. Renamed sessions to senders.
Diffstat (limited to 'src/actors')
-rw-r--r--src/actors/scala/actors/Actor.scala87
-rw-r--r--src/actors/scala/actors/Reaction.scala2
2 files changed, 58 insertions, 31 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala
index a61de1b249..29c57620a5 100644
--- a/src/actors/scala/actors/Actor.scala
+++ b/src/actors/scala/actors/Actor.scala
@@ -23,7 +23,6 @@ import java.util.concurrent.ExecutionException
* <code>receive</code>, <code>react</code>, <code>reply</code>,
* etc.
*
- * @version 0.9.18
* @author Philipp Haller
*/
object Actor {
@@ -293,15 +292,15 @@ object Actor {
* Links <code>self</code> to actor <code>to</code>.
*
* @param to the actor to link to
- * @return
+ * @return the parameter actor
*/
def link(to: AbstractActor): AbstractActor = self.link(to)
/**
- * Links <code>self</code> to actor defined by <code>body</code>.
+ * Links <code>self</code> to the actor defined by <code>body</code>.
*
- * @param body ...
- * @return ...
+ * @param body the body of the actor to link to
+ * @return the parameter actor
*/
def link(body: => Unit): Actor = self.link(body)
@@ -367,15 +366,23 @@ object Actor {
* </li>
* </ul>
*
- * @version 0.9.18
* @author Philipp Haller
*/
@serializable
trait Actor extends AbstractActor {
- private var received: Option[Any] = None
+ /* The actor's mailbox. */
+ private val mailbox = new MessageQueue
- private val waitingForNone = (m: Any) => false
+ /* A list of the current senders. The head of the list is
+ * the sender of the message that was received last.
+ */
+ private var senders: List[OutputChannel[Any]] = Nil
+
+ /* If the actor waits in a react, continuation holds the
+ * message handler that react was called with.
+ */
+ private var continuation: PartialFunction[Any, Unit] = null
/* Whenever this Actor executes on some thread, waitingFor is
* guaranteed to be equal to waitingForNone.
@@ -384,12 +391,26 @@ trait Actor extends AbstractActor {
* waitingForNone, this Actor is guaranteed not to execute on some
* thread.
*/
+ private val waitingForNone = (m: Any) => false
private var waitingFor: Any => Boolean = waitingForNone
+ /* The following two fields are only used when the actor
+ * suspends by blocking its underlying thread, for example,
+ * when waiting in a receive or synchronous send.
+ */
private var isSuspended = false
- protected val mailbox = new MessageQueue
- private var sessions: List[OutputChannel[Any]] = Nil
+ /* This field is used to communicate the received message from
+ * the invocation of send to the place where the thread of
+ * the receiving actor resumes inside receive/receiveWithin.
+ */
+ private var received: Option[Any] = None
+
+ /* This option holds a TimerTask when the actor waits in a
+ * reactWithin/receiveWithin. The TimerTask is cancelled when
+ * the actor can continue.
+ */
+ private var onTimeout: Option[TimerTask] = None
protected[actors] def scheduler: IScheduler =
Scheduler
@@ -420,11 +441,11 @@ trait Actor extends AbstractActor {
}
if (isSuspended) {
- sessions = replyTo :: sessions
+ senders = replyTo :: senders
received = Some(msg)
resumeActor()
} else {
- sessions = List(replyTo)
+ senders = List(replyTo)
// assert continuation != null
scheduler.execute(new Reaction(this, continuation, msg))
}
@@ -449,12 +470,12 @@ trait Actor extends AbstractActor {
suspendActor()
} else {
received = Some(qel.msg)
- sessions = qel.session :: sessions
+ senders = qel.session :: senders
}
}
val result = f(received.get)
received = None
- sessions = sessions.tail
+ senders = senders.tail
result
}
@@ -479,7 +500,7 @@ trait Actor extends AbstractActor {
if (msec == 0) {
if (f.isDefinedAt(TIMEOUT)) {
received = Some(TIMEOUT)
- sessions = this :: sessions
+ senders = this :: senders
} else
error("unhandled timeout")
}
@@ -493,7 +514,7 @@ trait Actor extends AbstractActor {
waitingFor = waitingForNone
if (f.isDefinedAt(TIMEOUT)) {
received = Some(TIMEOUT)
- sessions = this :: sessions
+ senders = this :: senders
}
else
error("unhandled timeout")
@@ -501,12 +522,12 @@ trait Actor extends AbstractActor {
}
} else {
received = Some(qel.msg)
- sessions = qel.session :: sessions
+ senders = qel.session :: senders
}
}
val result = f(received.get)
received = None
- sessions = sessions.tail
+ senders = senders.tail
result
}
@@ -527,7 +548,7 @@ trait Actor extends AbstractActor {
waitingFor = f.isDefinedAt
continuation = f
} else {
- sessions = List(qel.session)
+ senders = List(qel.session)
scheduleActor(f, qel.msg)
}
throw new SuspendActorException
@@ -548,6 +569,7 @@ trait Actor extends AbstractActor {
assert(Actor.self(scheduler) == this, "react on channel belonging to other actor")
this.synchronized {
if (shouldExit) exit() // links
+
// first, remove spurious TIMEOUT message from mailbox if any
val spurious = mailbox.extractFirst((m: Any) => m == TIMEOUT)
@@ -555,7 +577,7 @@ trait Actor extends AbstractActor {
if (null eq qel) {
if (msec == 0) {
if (f.isDefinedAt(TIMEOUT)) {
- sessions = List(this)
+ senders = List(this)
scheduleActor(f, TIMEOUT)
}
else
@@ -571,7 +593,7 @@ trait Actor extends AbstractActor {
continuation = f
}
} else {
- sessions = List(qel.session)
+ senders = List(qel.session)
scheduleActor(f, qel.msg)
}
throw new SuspendActorException
@@ -802,13 +824,10 @@ trait Actor extends AbstractActor {
case x => x
}
- def sender: OutputChannel[Any] = sessions.head
+ def sender: OutputChannel[Any] = senders.head
def receiver: Actor = this
- private var continuation: PartialFunction[Any, Unit] = null
- private var onTimeout: Option[TimerTask] = None
-
// guarded by lock of this
protected def scheduleActor(f: PartialFunction[Any, Unit], msg: Any) =
if ((f eq null) && (continuation eq null)) {
@@ -821,8 +840,6 @@ trait Actor extends AbstractActor {
scheduler execute task
}
- private[actors] var kill: () => Unit = () => {}
-
private def suspendActor() {
isSuspended = true
while (isSuspended) {
@@ -887,6 +904,13 @@ trait Actor extends AbstractActor {
this
}
+ /* This closure is used to implement control-flow operations
+ * built on top of `seq`. Note that the only invocation of
+ * `kill` is supposed to be inside `Reaction.run`.
+ */
+ private[actors] var kill: () => Unit =
+ () => { exit() }
+
private def seq[a, b](first: => a, next: => b): Unit = {
val s = Actor.self(scheduler)
val killNext = s.kill
@@ -908,8 +932,8 @@ trait Actor extends AbstractActor {
/**
* Links <code>self</code> to actor <code>to</code>.
*
- * @param to ...
- * @return ...
+ * @param to the actor to link to
+ * @return the parameter actor
*/
def link(to: AbstractActor): AbstractActor = {
assert(Actor.self(scheduler) == this, "link called on actor different from self")
@@ -919,7 +943,10 @@ trait Actor extends AbstractActor {
}
/**
- * Links <code>self</code> to actor defined by <code>body</code>.
+ * Links <code>self</code> to the actor defined by <code>body</code>.
+ *
+ * @param body the body of the actor to link to
+ * @return the parameter actor
*/
def link(body: => Unit): Actor = {
assert(Actor.self(scheduler) == this, "link called on actor different from self")
diff --git a/src/actors/scala/actors/Reaction.scala b/src/actors/scala/actors/Reaction.scala
index 1017e42080..54a6b032e2 100644
--- a/src/actors/scala/actors/Reaction.scala
+++ b/src/actors/scala/actors/Reaction.scala
@@ -61,7 +61,7 @@ class Reaction extends Runnable {
} catch {
case _: KillActorException =>
}
- a.kill(); a.exit()
+ a.kill()
}
}
catch {