summaryrefslogtreecommitdiff
path: root/src/actors
diff options
context:
space:
mode:
authorVojin Jovanovic <vojin.jovanovic@epfl.ch>2012-05-18 17:53:05 +0200
committerVojin Jovanovic <vojin.jovanovic@epfl.ch>2012-05-18 17:53:05 +0200
commite99fb0c93842d517b8a185458f405bace2bbb46b (patch)
tree070e9117c43fc09e3c850594b76f1957fb68c045 /src/actors
parentef7708812fac32ca0c2a05330222a6b0806c9054 (diff)
downloadscala-e99fb0c93842d517b8a185458f405bace2bbb46b.tar.gz
scala-e99fb0c93842d517b8a185458f405bace2bbb46b.tar.bz2
scala-e99fb0c93842d517b8a185458f405bace2bbb46b.zip
Adding the Actor Migration Kit.
Kit consists of: 1) The StashingActor which adopts an interface similar to Akka. 2) Props mockup for creating Akka like code 3) Pattern mockup 4) Test cases for every step in the migration. 5) MigrationSystem which will paired on the Akka side. Review of the code : @phaller Review of the build: @jsuereth
Diffstat (limited to 'src/actors')
-rw-r--r--src/actors/scala/actors/ActorRef.scala119
-rw-r--r--src/actors/scala/actors/ActorTask.scala1
-rw-r--r--src/actors/scala/actors/InternalActor.scala53
-rw-r--r--src/actors/scala/actors/MQueue.scala14
-rw-r--r--src/actors/scala/actors/Reactor.scala10
5 files changed, 186 insertions, 11 deletions
diff --git a/src/actors/scala/actors/ActorRef.scala b/src/actors/scala/actors/ActorRef.scala
new file mode 100644
index 0000000000..8f70b13e01
--- /dev/null
+++ b/src/actors/scala/actors/ActorRef.scala
@@ -0,0 +1,119 @@
+package scala.actors
+
+import java.util.concurrent.TimeoutException
+import scala.concurrent.util.Duration
+
+/**
+ * Trait used for migration of Scala actors to Akka.
+ */
+@deprecated("ActorRef ought to be used only with the Actor Migration Kit.")
+trait ActorRef {
+
+ /**
+ * Sends a one-way asynchronous message. E.g. fire-and-forget semantics.
+ * <p/>
+ *
+ * If invoked from within an actor then the actor reference is implicitly passed on as the implicit 'sender' argument.
+ * <p/>
+ *
+ * This actor 'sender' reference is then available in the receiving actor in the 'sender' member variable,
+ * if invoked from within an Actor. If not then no sender is available.
+ * <pre>
+ * actor ! message
+ * </pre>
+ * <p/>
+ */
+ def !(message: Any)(implicit sender: ActorRef = null): Unit
+
+ /**
+ * Sends a message asynchronously, returning a future which may eventually hold the reply.
+ */
+ private[actors] def ?(message: Any, timeout: Duration): Future[Any]
+
+ /**
+ * Forwards the message and passes the original sender actor as the sender.
+ * <p/>
+ * Works with '!' and '?'.
+ */
+ def forward(message: Any)
+
+ private[actors] def localActor: AbstractActor
+
+}
+
+private[actors] class OutputChannelRef(val actor: OutputChannel[Any]) extends ActorRef {
+
+ override private[actors] def ?(message: Any, timeout: Duration): Future[Any] =
+ throw new UnsupportedOperationException("Output channel does not support ?")
+
+ /**
+ * Sends a one-way asynchronous message. E.g. fire-and-forget semantics.
+ * <p/>
+ *
+ * <p/>
+ * <pre>
+ * actor ! message
+ * </pre>
+ * <p/>
+ */
+ def !(message: Any)(implicit sender: ActorRef = null): Unit =
+ if (sender != null)
+ actor.send(message, sender.localActor)
+ else
+ actor ! message
+
+ override def equals(that: Any) =
+ that.isInstanceOf[OutputChannelRef] && that.asInstanceOf[OutputChannelRef].actor == this.actor
+
+ private[actors] override def localActor: AbstractActor =
+ throw new UnsupportedOperationException("Output channel does not have an instance of the actor")
+
+ def forward(message: Any): Unit = throw new UnsupportedOperationException("OutputChannel does not support forward.")
+
+}
+
+private[actors] class ReactorRef(override val actor: Reactor[Any]) extends OutputChannelRef(actor) {
+
+ /**
+ * Forwards the message and passes the original sender actor as the sender.
+ * <p/>
+ * Works with '!' and '?'.
+ */
+ override def forward(message: Any) = actor.forward(message)
+
+}
+
+private[actors] final class InternalActorRef(override val actor: InternalActor) extends ReactorRef(actor) {
+
+ /**
+ * Sends a message asynchronously, returning a future which may eventually hold the reply.
+ */
+ override private[actors] def ?(message: Any, timeout: Duration): Future[Any] =
+ Futures.future {
+ val dur = if (timeout.isFinite()) timeout.toMillis else (java.lang.Long.MAX_VALUE >> 2)
+ actor !? (dur, message) match {
+ case Some(x) => x
+ case None => new AskTimeoutException("? operation timed out.")
+ }
+ }
+
+ override def !(message: Any)(implicit sender: ActorRef = null): Unit =
+ if (message == PoisonPill)
+ actor.stop('normal)
+ else if (sender != null)
+ actor.send(message, sender.localActor)
+ else
+ actor ! message
+
+ private[actors] override def localActor: InternalActor = this.actor
+}
+
+/**
+ * This is what is used to complete a Future that is returned from an ask/? call,
+ * when it times out.
+ */
+class AskTimeoutException(message: String, cause: Throwable) extends TimeoutException {
+ def this(message: String) = this(message, null: Throwable)
+}
+
+object PoisonPill
diff --git a/src/actors/scala/actors/ActorTask.scala b/src/actors/scala/actors/ActorTask.scala
index bb04302238..045b00f5f2 100644
--- a/src/actors/scala/actors/ActorTask.scala
+++ b/src/actors/scala/actors/ActorTask.scala
@@ -51,7 +51,6 @@ private[actors] class ActorTask(actor: InternalActor,
super.terminateExecution(e)
() => {}
}
- actor.internalPostStop
res
}
diff --git a/src/actors/scala/actors/InternalActor.scala b/src/actors/scala/actors/InternalActor.scala
index c94da5b9fd..cb66021d1c 100644
--- a/src/actors/scala/actors/InternalActor.scala
+++ b/src/actors/scala/actors/InternalActor.scala
@@ -153,7 +153,7 @@ private[actors] trait InternalActor extends AbstractActor with InternalReplyReac
val matches = f.isDefinedAt(m)
senders = senders.tail
matches
- })
+ })
if (null eq qel) {
val todo = synchronized {
// in mean time new stuff might have arrived
@@ -317,6 +317,35 @@ private[actors] trait InternalActor extends AbstractActor with InternalReplyReac
}
/**
+ * Links <code>self</code> to actor <code>to</code>.
+ *
+ * @param to the actor to link to
+ * @return the parameter actor
+ */
+ def link(to: ActorRef): ActorRef = {
+ this.link(to.localActor)
+ to
+ }
+
+ /**
+ * Unidirectional linking. For migration purposes only
+ */
+ private[actors] def watch(subject: ActorRef): ActorRef = {
+ assert(Actor.self(scheduler) == this, "link called on actor different from self")
+ subject.localActor linkTo this
+ subject
+ }
+
+ /**
+ * Unidirectional linking. For migration purposes only
+ */
+ private[actors] def unwatch(subject: ActorRef): ActorRef = {
+ assert(Actor.self(scheduler) == this, "link called on actor different from self")
+ subject.localActor unlinkFrom this
+ subject
+ }
+
+ /**
* Links <code>self</code> to the actor defined by <code>body</code>.
*
* @param body the body of the actor to link to
@@ -346,17 +375,24 @@ private[actors] trait InternalActor extends AbstractActor with InternalReplyReac
from unlinkFrom this
}
+ /**
+ * Unlinks <code>self</code> from actor <code>from</code>.
+ */
+ def unlink(from: ActorRef) {
+ unlink(from.localActor)
+ }
+
private[actors] def unlinkFrom(from: AbstractActor) = synchronized {
links = links.filterNot(from.==)
}
- @volatile
+ @volatile
private[actors] var _trapExit = false
-
+
def trapExit = _trapExit
-
+
def trapExit_=(value: Boolean) = _trapExit = value
-
+
// guarded by this
private var exitReason: AnyRef = 'normal
// guarded by this
@@ -445,12 +481,11 @@ private[actors] trait InternalActor extends AbstractActor with InternalReplyReac
scheduler.onTerminate(this) { f }
}
- private[actors] def internalPostStop() = {}
- private[actors] def stop(reason: AnyRef): Unit = {
+ private[actors] def stop(reason: AnyRef): Unit = {
synchronized {
shouldExit = true
- exitReason = reason
+ exitReason = reason
// resume this Actor in a way that
// causes it to exit
// (because shouldExit == true)
@@ -464,7 +499,7 @@ private[actors] trait InternalActor extends AbstractActor with InternalReplyReac
/* Here we should not throw a SuspendActorControl,
since the current method is called from an actor that
is in the process of exiting.
-
+
Therefore, the contract for scheduleActor is that
it never throws a SuspendActorControl.
*/
diff --git a/src/actors/scala/actors/MQueue.scala b/src/actors/scala/actors/MQueue.scala
index 65427d68c5..4a148d2cb3 100644
--- a/src/actors/scala/actors/MQueue.scala
+++ b/src/actors/scala/actors/MQueue.scala
@@ -25,6 +25,20 @@ private[actors] class MQueue[Msg >: Null](protected val label: String) {
_size += diff
}
+ def prepend(other: MQueue[Msg]) {
+ if (!other.isEmpty) {
+ other.last.next = first
+ first = other.first
+ }
+ }
+
+ def clear() {
+ first = null
+ last = null
+ _size = 0
+ }
+
+
def append(msg: Msg, session: OutputChannel[Any]) {
changeSize(1) // size always increases by 1
val el = new MQueueElement(msg, session)
diff --git a/src/actors/scala/actors/Reactor.scala b/src/actors/scala/actors/Reactor.scala
index 206a97d97c..7a8d738758 100644
--- a/src/actors/scala/actors/Reactor.scala
+++ b/src/actors/scala/actors/Reactor.scala
@@ -214,11 +214,16 @@ trait Reactor[Msg >: Null] extends OutputChannel[Msg] with Combinators {
scheduler executeFromActor makeReaction(null, handler, msg)
}
+ private[actors] def preAct() = {}
+
// guarded by this
private[actors] def dostart() {
_state = Actor.State.Runnable
scheduler newActor this
- scheduler execute makeReaction(() => act(), null, null)
+ scheduler execute makeReaction(() => {
+ preAct()
+ act()
+ }, null, null)
}
/**
@@ -285,12 +290,15 @@ trait Reactor[Msg >: Null] extends OutputChannel[Msg] with Combinators {
throw Actor.suspendException
}
+ private[actors] def internalPostStop() = {}
+
private[actors] def terminated() {
synchronized {
_state = Actor.State.Terminated
// reset waitingFor, otherwise getState returns Suspended
waitingFor = Reactor.waitingForNone
}
+ internalPostStop()
scheduler.terminated(this)
}