summaryrefslogtreecommitdiff
path: root/src/actors/scala/actors/ReplyReactor.scala
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2010-03-08 15:01:53 +0000
committerPhilipp Haller <hallerp@gmail.com>2010-03-08 15:01:53 +0000
commit57261cf375a8442a267b918ed582af526f8491fa (patch)
tree7198dd7ee431697803bf865ccb7343aa2f939664 /src/actors/scala/actors/ReplyReactor.scala
parent13f24056a444fd5038cebdb294a0959bfe979492 (diff)
downloadscala-57261cf375a8442a267b918ed582af526f8491fa.tar.gz
scala-57261cf375a8442a267b918ed582af526f8491fa.tar.bz2
scala-57261cf375a8442a267b918ed582af526f8491fa.zip
Reactor now has type parameter.
Diffstat (limited to 'src/actors/scala/actors/ReplyReactor.scala')
-rw-r--r--src/actors/scala/actors/ReplyReactor.scala102
1 files changed, 88 insertions, 14 deletions
diff --git a/src/actors/scala/actors/ReplyReactor.scala b/src/actors/scala/actors/ReplyReactor.scala
index 05e4590162..196f7b1f4c 100644
--- a/src/actors/scala/actors/ReplyReactor.scala
+++ b/src/actors/scala/actors/ReplyReactor.scala
@@ -10,6 +10,8 @@
package scala.actors
+import java.util.{Timer, TimerTask}
+
/** <p>
* The <code>ReplyReactor</code> trait extends the <code>Reactor</code>
* trait with methods to reply to the sender of a message.
@@ -19,17 +21,26 @@ package scala.actors
*
* @author Philipp Haller
*/
-trait ReplyReactor extends Reactor with ReplyableReactor {
+trait ReplyReactor extends Reactor[Any] with ReplyableReactor {
/* A list of the current senders. The head of the list is
* the sender of the message that was received last.
*/
@volatile
- private[actors] var senders: List[OutputChannel[Any]] =
- Nil
+ private[actors] var senders: List[OutputChannel[Any]] = List()
+
+ /* This option holds a TimerTask when the actor waits in a
+ * reactWithin. The TimerTask is cancelled when the actor
+ * resumes.
+ *
+ * guarded by this
+ */
+ private[actors] var onTimeout: Option[TimerTask] = None
- protected[actors] def sender: OutputChannel[Any] =
- senders.head
+ /**
+ * Returns the actor which sent the last received message.
+ */
+ protected[actors] def sender: OutputChannel[Any] = senders.head
/**
* Replies with <code>msg</code> to the sender.
@@ -53,17 +64,17 @@ trait ReplyReactor extends Reactor with ReplyableReactor {
}
private[actors] override def resumeReceiver(item: (Any, OutputChannel[Any]), handler: PartialFunction[Any, Any], onSameThread: Boolean) {
- senders = List(item._2)
- if (onSameThread)
- handler(item._1)
- else {
- scheduleActor(handler, item._1)
- // see Reactor.resumeReceiver
- throw Actor.suspendException
+ synchronized {
+ if (!onTimeout.isEmpty) {
+ onTimeout.get.cancel()
+ onTimeout = None
+ }
}
+ senders = List(item._2)
+ super.resumeReceiver(item, handler, onSameThread)
}
- private[actors] override def searchMailbox(startMbox: MQueue,
+ private[actors] override def searchMailbox(startMbox: MQueue[Any],
handler: PartialFunction[Any, Any],
resumeOnSameThread: Boolean) {
var tmpMbox = startMbox
@@ -79,7 +90,7 @@ trait ReplyReactor extends Reactor with ReplyableReactor {
synchronized {
// in mean time new stuff might have arrived
if (!sendBuffer.isEmpty) {
- tmpMbox = new MQueue("Temp")
+ tmpMbox = new MQueue[Any]("Temp")
drainSendBuffer(tmpMbox)
// keep going
} else {
@@ -98,4 +109,67 @@ trait ReplyReactor extends Reactor with ReplyableReactor {
private[actors] override def makeReaction(fun: () => Unit): Runnable =
new ReplyReactorTask(this, fun)
+ protected[actors] override def react(handler: PartialFunction[Any, Unit]): Nothing = {
+ assert(Actor.rawSelf(scheduler) == this, "react on channel belonging to other actor")
+ super.react(handler)
+ }
+
+ /**
+ * Receives a message from this actor's mailbox within a certain
+ * time span.
+ * <p>
+ * This method never returns. Therefore, the rest of the computation
+ * has to be contained in the actions of the partial function.
+ *
+ * @param msec the time span before timeout
+ * @param handler a partial function with message patterns and actions
+ */
+ protected[actors] def reactWithin(msec: Long)(handler: PartialFunction[Any, Unit]): Nothing = {
+ assert(Actor.rawSelf(scheduler) == this, "react on channel belonging to other actor")
+
+ synchronized { drainSendBuffer(mailbox) }
+
+ // first, remove spurious TIMEOUT message from mailbox if any
+ mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => m == TIMEOUT)
+
+ while (true) {
+ val qel = mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => {
+ senders = List(replyTo)
+ handler isDefinedAt m
+ })
+ if (null eq qel) {
+ synchronized {
+ // in mean time new messages might have arrived
+ if (!sendBuffer.isEmpty) {
+ drainSendBuffer(mailbox)
+ // keep going
+ } else if (msec == 0L) {
+ // throws Actor.suspendException
+ resumeReceiver((TIMEOUT, this), handler, false)
+ } else {
+ waitingFor = handler
+ val thisActor = this
+ onTimeout = Some(new TimerTask {
+ def run() { thisActor.send(TIMEOUT, thisActor) }
+ })
+ Actor.timer.schedule(onTimeout.get, msec)
+ throw Actor.suspendException
+ }
+ }
+ } else
+ resumeReceiver((qel.msg, qel.session), handler, false)
+ }
+ throw Actor.suspendException
+ }
+
+ override def getState: Actor.State.Value = synchronized {
+ if (waitingFor ne Reactor.waitingForNone) {
+ if (onTimeout.isEmpty)
+ Actor.State.Suspended
+ else
+ Actor.State.TimedSuspended
+ } else
+ _state
+ }
+
}