summaryrefslogtreecommitdiff
path: root/src/actors/scala/actors/ReplyReactor.scala
diff options
context:
space:
mode:
authorVojin Jovanovic <vojin.jovanovic@epfl.ch>2012-03-19 22:25:26 +0100
committerVojin Jovanovic <vojin.jovanovic@epfl.ch>2012-03-20 20:31:22 +0100
commit66f0679169bd8d5dc749c2288777c5a217ae3d43 (patch)
tree66faeb3cc69b2c725788f4a20a5752493c43e9cf /src/actors/scala/actors/ReplyReactor.scala
parentd9d46a8bbb1b30d322057bb513ea4317bda735d3 (diff)
downloadscala-66f0679169bd8d5dc749c2288777c5a217ae3d43.tar.gz
scala-66f0679169bd8d5dc749c2288777c5a217ae3d43.tar.bz2
scala-66f0679169bd8d5dc749c2288777c5a217ae3d43.zip
Prepared actors hierarchy for migration.
Internal nodes added so methods relevant to akka can be overridden. Review by: @phaller
Diffstat (limited to 'src/actors/scala/actors/ReplyReactor.scala')
-rw-r--r--src/actors/scala/actors/ReplyReactor.scala165
1 files changed, 6 insertions, 159 deletions
diff --git a/src/actors/scala/actors/ReplyReactor.scala b/src/actors/scala/actors/ReplyReactor.scala
index 0e5ce00c91..0ffbbd3cce 100644
--- a/src/actors/scala/actors/ReplyReactor.scala
+++ b/src/actors/scala/actors/ReplyReactor.scala
@@ -5,165 +5,12 @@
** /____/\___/_/ |_/____/_/ | | **
** |/ **
\* */
-
package scala.actors
-import java.util.{Timer, TimerTask}
-
-/**
- * Extends the [[scala.actors.Reactor]] trait with methods to reply to the
- * sender of a message.
- *
- * Sending a message to a `ReplyReactor` implicitly passes a reference to
- * the sender together with the message.
- *
- * @author Philipp Haller
- *
- * @define actor `ReplyReactor`
- */
-trait ReplyReactor extends Reactor[Any] with ReactorCanReply {
-
- /* A list of the current senders. The head of the list is
- * the sender of the message that was received last.
- */
- @volatile
- private[actors] var senders: List[OutputChannel[Any]] = List()
-
- /* This option holds a TimerTask when the actor waits in a
- * reactWithin. The TimerTask is cancelled when the actor
- * resumes.
- *
- * guarded by this
- */
- private[actors] var onTimeout: Option[TimerTask] = None
-
- /**
- * Returns the $actor which sent the last received message.
- */
- protected[actors] def sender: OutputChannel[Any] = senders.head
-
- /**
- * Replies with `msg` to the sender.
- */
- protected[actors] def reply(msg: Any) {
- sender ! msg
- }
-
- override def !(msg: Any) {
- send(msg, Actor.rawSelf(scheduler))
- }
-
- override def forward(msg: Any) {
- send(msg, Actor.sender)
- }
-
- private[actors] override def resumeReceiver(item: (Any, OutputChannel[Any]), handler: PartialFunction[Any, Any], onSameThread: Boolean) {
- synchronized {
- if (!onTimeout.isEmpty) {
- onTimeout.get.cancel()
- onTimeout = None
- }
- }
- senders = List(item._2)
- super.resumeReceiver(item, handler, onSameThread)
- }
-
- private[actors] override def searchMailbox(startMbox: MQueue[Any],
- handler: PartialFunction[Any, Any],
- resumeOnSameThread: Boolean) {
- var tmpMbox = startMbox
- var done = false
- while (!done) {
- val qel = tmpMbox.extractFirst((msg: Any, replyTo: OutputChannel[Any]) => {
- senders = List(replyTo)
- handler.isDefinedAt(msg)
- })
- if (tmpMbox ne mailbox)
- tmpMbox.foreach((m, s) => mailbox.append(m, s))
- if (null eq qel) {
- synchronized {
- // in mean time new stuff might have arrived
- if (!sendBuffer.isEmpty) {
- tmpMbox = new MQueue[Any]("Temp")
- drainSendBuffer(tmpMbox)
- // keep going
- } else {
- waitingFor = handler
- // see Reactor.searchMailbox
- throw Actor.suspendException
- }
- }
- } else {
- resumeReceiver((qel.msg, qel.session), handler, resumeOnSameThread)
- done = true
- }
- }
- }
-
- private[actors] override def makeReaction(fun: () => Unit, handler: PartialFunction[Any, Any], msg: Any): Runnable =
- new ReplyReactorTask(this, fun, handler, msg)
-
- protected[actors] override def react(handler: PartialFunction[Any, Unit]): Nothing = {
- assert(Actor.rawSelf(scheduler) == this, "react on channel belonging to other actor")
- super.react(handler)
- }
-
- /**
- * Receives a message from this $actor's mailbox within a certain
- * time span.
- *
- * This method never returns. Therefore, the rest of the computation
- * has to be contained in the actions of the partial function.
- *
- * @param msec the time span before timeout
- * @param handler a partial function with message patterns and actions
- */
- protected[actors] def reactWithin(msec: Long)(handler: PartialFunction[Any, Unit]): Nothing = {
- assert(Actor.rawSelf(scheduler) == this, "react on channel belonging to other actor")
-
- synchronized { drainSendBuffer(mailbox) }
-
- // first, remove spurious TIMEOUT message from mailbox if any
- mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => m == TIMEOUT)
-
- while (true) {
- val qel = mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => {
- senders = List(replyTo)
- handler isDefinedAt m
- })
- if (null eq qel) {
- synchronized {
- // in mean time new messages might have arrived
- if (!sendBuffer.isEmpty) {
- drainSendBuffer(mailbox)
- // keep going
- } else if (msec == 0L) {
- // throws Actor.suspendException
- resumeReceiver((TIMEOUT, this), handler, false)
- } else {
- waitingFor = handler
- val thisActor = this
- onTimeout = Some(new TimerTask {
- def run() { thisActor.send(TIMEOUT, thisActor) }
- })
- Actor.timer.schedule(onTimeout.get, msec)
- throw Actor.suspendException
- }
- }
- } else
- resumeReceiver((qel.msg, qel.session), handler, false)
- }
- throw Actor.suspendException
- }
-
- override def getState: Actor.State.Value = synchronized {
- if (waitingFor ne Reactor.waitingForNone) {
- if (onTimeout.isEmpty)
- Actor.State.Suspended
- else
- Actor.State.TimedSuspended
- } else
- _state
- }
-
+@deprecated("Scala Actors are beeing removed from the standard library. Please refer to the migration guide.", "2.10")
+trait ReplyReactor extends InternalReplyReactor {
+
+ protected[actors] def sender: OutputChannel[Any] = super.internalSender
+
}
+