diff options
author | Philipp Haller <hallerp@gmail.com> | 2010-03-08 15:01:53 +0000 |
---|---|---|
committer | Philipp Haller <hallerp@gmail.com> | 2010-03-08 15:01:53 +0000 |
commit | 57261cf375a8442a267b918ed582af526f8491fa (patch) | |
tree | 7198dd7ee431697803bf865ccb7343aa2f939664 /src/actors/scala/actors/ReplyReactor.scala | |
parent | 13f24056a444fd5038cebdb294a0959bfe979492 (diff) | |
download | scala-57261cf375a8442a267b918ed582af526f8491fa.tar.gz scala-57261cf375a8442a267b918ed582af526f8491fa.tar.bz2 scala-57261cf375a8442a267b918ed582af526f8491fa.zip |
Reactor now has type parameter.
Diffstat (limited to 'src/actors/scala/actors/ReplyReactor.scala')
-rw-r--r-- | src/actors/scala/actors/ReplyReactor.scala | 102 |
1 files changed, 88 insertions, 14 deletions
diff --git a/src/actors/scala/actors/ReplyReactor.scala b/src/actors/scala/actors/ReplyReactor.scala index 05e4590162..196f7b1f4c 100644 --- a/src/actors/scala/actors/ReplyReactor.scala +++ b/src/actors/scala/actors/ReplyReactor.scala @@ -10,6 +10,8 @@ package scala.actors +import java.util.{Timer, TimerTask} + /** <p> * The <code>ReplyReactor</code> trait extends the <code>Reactor</code> * trait with methods to reply to the sender of a message. @@ -19,17 +21,26 @@ package scala.actors * * @author Philipp Haller */ -trait ReplyReactor extends Reactor with ReplyableReactor { +trait ReplyReactor extends Reactor[Any] with ReplyableReactor { /* A list of the current senders. The head of the list is * the sender of the message that was received last. */ @volatile - private[actors] var senders: List[OutputChannel[Any]] = - Nil + private[actors] var senders: List[OutputChannel[Any]] = List() + + /* This option holds a TimerTask when the actor waits in a + * reactWithin. The TimerTask is cancelled when the actor + * resumes. + * + * guarded by this + */ + private[actors] var onTimeout: Option[TimerTask] = None - protected[actors] def sender: OutputChannel[Any] = - senders.head + /** + * Returns the actor which sent the last received message. + */ + protected[actors] def sender: OutputChannel[Any] = senders.head /** * Replies with <code>msg</code> to the sender. @@ -53,17 +64,17 @@ trait ReplyReactor extends Reactor with ReplyableReactor { } private[actors] override def resumeReceiver(item: (Any, OutputChannel[Any]), handler: PartialFunction[Any, Any], onSameThread: Boolean) { - senders = List(item._2) - if (onSameThread) - handler(item._1) - else { - scheduleActor(handler, item._1) - // see Reactor.resumeReceiver - throw Actor.suspendException + synchronized { + if (!onTimeout.isEmpty) { + onTimeout.get.cancel() + onTimeout = None + } } + senders = List(item._2) + super.resumeReceiver(item, handler, onSameThread) } - private[actors] override def searchMailbox(startMbox: MQueue, + private[actors] override def searchMailbox(startMbox: MQueue[Any], handler: PartialFunction[Any, Any], resumeOnSameThread: Boolean) { var tmpMbox = startMbox @@ -79,7 +90,7 @@ trait ReplyReactor extends Reactor with ReplyableReactor { synchronized { // in mean time new stuff might have arrived if (!sendBuffer.isEmpty) { - tmpMbox = new MQueue("Temp") + tmpMbox = new MQueue[Any]("Temp") drainSendBuffer(tmpMbox) // keep going } else { @@ -98,4 +109,67 @@ trait ReplyReactor extends Reactor with ReplyableReactor { private[actors] override def makeReaction(fun: () => Unit): Runnable = new ReplyReactorTask(this, fun) + protected[actors] override def react(handler: PartialFunction[Any, Unit]): Nothing = { + assert(Actor.rawSelf(scheduler) == this, "react on channel belonging to other actor") + super.react(handler) + } + + /** + * Receives a message from this actor's mailbox within a certain + * time span. + * <p> + * This method never returns. Therefore, the rest of the computation + * has to be contained in the actions of the partial function. + * + * @param msec the time span before timeout + * @param handler a partial function with message patterns and actions + */ + protected[actors] def reactWithin(msec: Long)(handler: PartialFunction[Any, Unit]): Nothing = { + assert(Actor.rawSelf(scheduler) == this, "react on channel belonging to other actor") + + synchronized { drainSendBuffer(mailbox) } + + // first, remove spurious TIMEOUT message from mailbox if any + mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => m == TIMEOUT) + + while (true) { + val qel = mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => { + senders = List(replyTo) + handler isDefinedAt m + }) + if (null eq qel) { + synchronized { + // in mean time new messages might have arrived + if (!sendBuffer.isEmpty) { + drainSendBuffer(mailbox) + // keep going + } else if (msec == 0L) { + // throws Actor.suspendException + resumeReceiver((TIMEOUT, this), handler, false) + } else { + waitingFor = handler + val thisActor = this + onTimeout = Some(new TimerTask { + def run() { thisActor.send(TIMEOUT, thisActor) } + }) + Actor.timer.schedule(onTimeout.get, msec) + throw Actor.suspendException + } + } + } else + resumeReceiver((qel.msg, qel.session), handler, false) + } + throw Actor.suspendException + } + + override def getState: Actor.State.Value = synchronized { + if (waitingFor ne Reactor.waitingForNone) { + if (onTimeout.isEmpty) + Actor.State.Suspended + else + Actor.State.TimedSuspended + } else + _state + } + } |