diff options
author | Vojin Jovanovic <vojin.jovanovic@epfl.ch> | 2012-03-19 22:25:26 +0100 |
---|---|---|
committer | Vojin Jovanovic <vojin.jovanovic@epfl.ch> | 2012-03-20 20:31:22 +0100 |
commit | 66f0679169bd8d5dc749c2288777c5a217ae3d43 (patch) | |
tree | 66faeb3cc69b2c725788f4a20a5752493c43e9cf /src/actors/scala/actors/ReplyReactor.scala | |
parent | d9d46a8bbb1b30d322057bb513ea4317bda735d3 (diff) | |
download | scala-66f0679169bd8d5dc749c2288777c5a217ae3d43.tar.gz scala-66f0679169bd8d5dc749c2288777c5a217ae3d43.tar.bz2 scala-66f0679169bd8d5dc749c2288777c5a217ae3d43.zip |
Prepared actors hierarchy for migration.
Internal nodes added so methods relevant to akka can be overridden.
Review by: @phaller
Diffstat (limited to 'src/actors/scala/actors/ReplyReactor.scala')
-rw-r--r-- | src/actors/scala/actors/ReplyReactor.scala | 165 |
1 files changed, 6 insertions, 159 deletions
diff --git a/src/actors/scala/actors/ReplyReactor.scala b/src/actors/scala/actors/ReplyReactor.scala index 0e5ce00c91..0ffbbd3cce 100644 --- a/src/actors/scala/actors/ReplyReactor.scala +++ b/src/actors/scala/actors/ReplyReactor.scala @@ -5,165 +5,12 @@ ** /____/\___/_/ |_/____/_/ | | ** ** |/ ** \* */ - package scala.actors -import java.util.{Timer, TimerTask} - -/** - * Extends the [[scala.actors.Reactor]] trait with methods to reply to the - * sender of a message. - * - * Sending a message to a `ReplyReactor` implicitly passes a reference to - * the sender together with the message. - * - * @author Philipp Haller - * - * @define actor `ReplyReactor` - */ -trait ReplyReactor extends Reactor[Any] with ReactorCanReply { - - /* A list of the current senders. The head of the list is - * the sender of the message that was received last. - */ - @volatile - private[actors] var senders: List[OutputChannel[Any]] = List() - - /* This option holds a TimerTask when the actor waits in a - * reactWithin. The TimerTask is cancelled when the actor - * resumes. - * - * guarded by this - */ - private[actors] var onTimeout: Option[TimerTask] = None - - /** - * Returns the $actor which sent the last received message. - */ - protected[actors] def sender: OutputChannel[Any] = senders.head - - /** - * Replies with `msg` to the sender. - */ - protected[actors] def reply(msg: Any) { - sender ! msg - } - - override def !(msg: Any) { - send(msg, Actor.rawSelf(scheduler)) - } - - override def forward(msg: Any) { - send(msg, Actor.sender) - } - - private[actors] override def resumeReceiver(item: (Any, OutputChannel[Any]), handler: PartialFunction[Any, Any], onSameThread: Boolean) { - synchronized { - if (!onTimeout.isEmpty) { - onTimeout.get.cancel() - onTimeout = None - } - } - senders = List(item._2) - super.resumeReceiver(item, handler, onSameThread) - } - - private[actors] override def searchMailbox(startMbox: MQueue[Any], - handler: PartialFunction[Any, Any], - resumeOnSameThread: Boolean) { - var tmpMbox = startMbox - var done = false - while (!done) { - val qel = tmpMbox.extractFirst((msg: Any, replyTo: OutputChannel[Any]) => { - senders = List(replyTo) - handler.isDefinedAt(msg) - }) - if (tmpMbox ne mailbox) - tmpMbox.foreach((m, s) => mailbox.append(m, s)) - if (null eq qel) { - synchronized { - // in mean time new stuff might have arrived - if (!sendBuffer.isEmpty) { - tmpMbox = new MQueue[Any]("Temp") - drainSendBuffer(tmpMbox) - // keep going - } else { - waitingFor = handler - // see Reactor.searchMailbox - throw Actor.suspendException - } - } - } else { - resumeReceiver((qel.msg, qel.session), handler, resumeOnSameThread) - done = true - } - } - } - - private[actors] override def makeReaction(fun: () => Unit, handler: PartialFunction[Any, Any], msg: Any): Runnable = - new ReplyReactorTask(this, fun, handler, msg) - - protected[actors] override def react(handler: PartialFunction[Any, Unit]): Nothing = { - assert(Actor.rawSelf(scheduler) == this, "react on channel belonging to other actor") - super.react(handler) - } - - /** - * Receives a message from this $actor's mailbox within a certain - * time span. - * - * This method never returns. Therefore, the rest of the computation - * has to be contained in the actions of the partial function. - * - * @param msec the time span before timeout - * @param handler a partial function with message patterns and actions - */ - protected[actors] def reactWithin(msec: Long)(handler: PartialFunction[Any, Unit]): Nothing = { - assert(Actor.rawSelf(scheduler) == this, "react on channel belonging to other actor") - - synchronized { drainSendBuffer(mailbox) } - - // first, remove spurious TIMEOUT message from mailbox if any - mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => m == TIMEOUT) - - while (true) { - val qel = mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => { - senders = List(replyTo) - handler isDefinedAt m - }) - if (null eq qel) { - synchronized { - // in mean time new messages might have arrived - if (!sendBuffer.isEmpty) { - drainSendBuffer(mailbox) - // keep going - } else if (msec == 0L) { - // throws Actor.suspendException - resumeReceiver((TIMEOUT, this), handler, false) - } else { - waitingFor = handler - val thisActor = this - onTimeout = Some(new TimerTask { - def run() { thisActor.send(TIMEOUT, thisActor) } - }) - Actor.timer.schedule(onTimeout.get, msec) - throw Actor.suspendException - } - } - } else - resumeReceiver((qel.msg, qel.session), handler, false) - } - throw Actor.suspendException - } - - override def getState: Actor.State.Value = synchronized { - if (waitingFor ne Reactor.waitingForNone) { - if (onTimeout.isEmpty) - Actor.State.Suspended - else - Actor.State.TimedSuspended - } else - _state - } - +@deprecated("Scala Actors are beeing removed from the standard library. Please refer to the migration guide.", "2.10") +trait ReplyReactor extends InternalReplyReactor { + + protected[actors] def sender: OutputChannel[Any] = super.internalSender + } + |