summaryrefslogtreecommitdiff
path: root/src/actors-migration
diff options
context:
space:
mode:
authorVojin Jovanovic <vojin.jovanovic@epfl.ch>2012-05-18 17:53:05 +0200
committerVojin Jovanovic <vojin.jovanovic@epfl.ch>2012-05-18 17:53:05 +0200
commite99fb0c93842d517b8a185458f405bace2bbb46b (patch)
tree070e9117c43fc09e3c850594b76f1957fb68c045 /src/actors-migration
parentef7708812fac32ca0c2a05330222a6b0806c9054 (diff)
downloadscala-e99fb0c93842d517b8a185458f405bace2bbb46b.tar.gz
scala-e99fb0c93842d517b8a185458f405bace2bbb46b.tar.bz2
scala-e99fb0c93842d517b8a185458f405bace2bbb46b.zip
Adding the Actor Migration Kit.
Kit consists of: 1) The StashingActor which adopts an interface similar to Akka. 2) Props mockup for creating Akka like code 3) Pattern mockup 4) Test cases for every step in the migration. 5) MigrationSystem which will paired on the Akka side. Review of the code : @phaller Review of the build: @jsuereth
Diffstat (limited to 'src/actors-migration')
-rw-r--r--src/actors-migration/scala/actors/MigrationSystem.scala36
-rw-r--r--src/actors-migration/scala/actors/Pattern.scala25
-rw-r--r--src/actors-migration/scala/actors/Props.scala14
-rw-r--r--src/actors-migration/scala/actors/StashingActor.scala261
-rw-r--r--src/actors-migration/scala/actors/Timeout.scala34
5 files changed, 370 insertions, 0 deletions
diff --git a/src/actors-migration/scala/actors/MigrationSystem.scala b/src/actors-migration/scala/actors/MigrationSystem.scala
new file mode 100644
index 0000000000..ffc93d9c6f
--- /dev/null
+++ b/src/actors-migration/scala/actors/MigrationSystem.scala
@@ -0,0 +1,36 @@
+package scala.actors
+
+import scala.collection._
+
+object MigrationSystem {
+
+ private[actors] val contextStack = new ThreadLocal[immutable.Stack[Boolean]] {
+ override def initialValue() = immutable.Stack[Boolean]()
+ }
+
+ private[this] def withCleanContext(block: => ActorRef): ActorRef = {
+ // push clean marker
+ val old = contextStack.get
+ contextStack.set(old.push(true))
+ try {
+ val instance = block
+
+ if (instance eq null)
+ throw new Exception("Actor instance passed to actorOf can't be 'null'")
+
+ instance
+ } finally {
+ val stackAfter = contextStack.get
+ if (stackAfter.nonEmpty)
+ contextStack.set(if (!stackAfter.head) stackAfter.pop.pop else stackAfter.pop)
+ }
+ }
+
+ def actorOf(props: Props): ActorRef = withCleanContext {
+ val creator = props.creator()
+ val r = new InternalActorRef(creator)
+ creator.start()
+ r
+ }
+
+} \ No newline at end of file
diff --git a/src/actors-migration/scala/actors/Pattern.scala b/src/actors-migration/scala/actors/Pattern.scala
new file mode 100644
index 0000000000..97dbd2cccd
--- /dev/null
+++ b/src/actors-migration/scala/actors/Pattern.scala
@@ -0,0 +1,25 @@
+package scala.actors
+
+import scala.concurrent.util.Duration
+
+object pattern {
+
+ implicit def askSupport(ar: ActorRef): AskableActorRef =
+ new AskableActorRef(ar)
+}
+
+/**
+ * ActorRef with support for ask(?) operation.
+ */
+class AskableActorRef(val ar: ActorRef) extends ActorRef {
+
+ def !(message: Any)(implicit sender: ActorRef = null): Unit = ar.!(message)(sender)
+
+ def ?(message: Any)(timeout: Timeout): Future[Any] = ar.?(message, timeout.duration)
+
+ private[actors] def ?(message: Any, timeout: Duration): Future[Any] = ar.?(message, timeout)
+
+ def forward(message: Any) = ar.forward(message)
+
+ private[actors] def localActor: AbstractActor = ar.localActor
+} \ No newline at end of file
diff --git a/src/actors-migration/scala/actors/Props.scala b/src/actors-migration/scala/actors/Props.scala
new file mode 100644
index 0000000000..b4d18d5fad
--- /dev/null
+++ b/src/actors-migration/scala/actors/Props.scala
@@ -0,0 +1,14 @@
+package scala.actors
+
+/**
+ * ActorRef configuration object. It represents the minimal subset of Akka Props class.
+ */
+case class Props(creator: () ⇒ InternalActor, dispatcher: String) {
+
+ /**
+ * Returns a new Props with the specified creator set
+ * Scala API
+ */
+ def withCreator(c: ⇒ InternalActor) = copy(creator = () ⇒ c)
+
+} \ No newline at end of file
diff --git a/src/actors-migration/scala/actors/StashingActor.scala b/src/actors-migration/scala/actors/StashingActor.scala
new file mode 100644
index 0000000000..4bca879d9f
--- /dev/null
+++ b/src/actors-migration/scala/actors/StashingActor.scala
@@ -0,0 +1,261 @@
+package scala.actors
+
+import scala.collection._
+import scala.concurrent.util.Duration
+import java.util.concurrent.TimeUnit
+
+object StashingActor extends Combinators {
+ implicit def mkBody[A](body: => A) = new InternalActor.Body[A] {
+ def andThen[B](other: => B): Unit = Actor.rawSelf.seq(body, other)
+ }
+}
+
+@deprecated("Scala Actors are beeing removed from the standard library. Please refer to the migration guide.", "2.10")
+trait StashingActor extends InternalActor {
+ type Receive = PartialFunction[Any, Unit]
+
+ // checks if StashingActor is created within the actorOf block
+ creationCheck;
+
+ private[actors] val ref = new InternalActorRef(this)
+
+ val self: ActorRef = ref
+
+ protected[this] val context: ActorContext = new ActorContext(this)
+
+ @volatile
+ private var myTimeout: Option[Long] = None
+
+ private val stash = new MQueue[Any]("Stash")
+
+ /**
+ * Migration notes:
+ * this method replaces receiveWithin, receive and react methods from Scala Actors.
+ */
+ def receive: Receive
+
+ /**
+ * User overridable callback.
+ * <p/>
+ * Is called when an Actor is started by invoking 'actor'.
+ */
+ def preStart() {}
+
+ /**
+ * User overridable callback.
+ * <p/>
+ * Is called when 'actor.stop()' is invoked.
+ */
+ def postStop() {}
+
+ /**
+ * User overridable callback.
+ * <p/>
+ * Is called on a crashed Actor right BEFORE it is restarted to allow clean
+ * up of resources before Actor is terminated.
+ * By default it calls postStop()
+ */
+ def preRestart(reason: Throwable, message: Option[Any]) { postStop() }
+
+ /**
+ * Changes the Actor's behavior to become the new 'Receive' (PartialFunction[Any, Unit]) handler.
+ * Puts the behavior on top of the hotswap stack.
+ * If "discardOld" is true, an unbecome will be issued prior to pushing the new behavior to the stack
+ */
+ private def become(behavior: Receive, discardOld: Boolean = true) {
+ if (discardOld) unbecome()
+ behaviorStack = behaviorStack.push(wrapWithSystemMessageHandling(behavior))
+ }
+
+ /**
+ * Reverts the Actor behavior to the previous one in the hotswap stack.
+ */
+ private def unbecome() {
+ // never unbecome the initial behavior
+ if (behaviorStack.size > 1)
+ behaviorStack = behaviorStack.pop
+ }
+
+ /**
+ * User overridable callback.
+ * <p/>
+ * Is called when a message isn't handled by the current behavior of the actor
+ * by default it does: EventHandler.warning(self, message)
+ */
+ def unhandled(message: Any) {
+ println("unhandeld")
+ message match {
+ case _ => throw new UnhandledMessageException(message, self)
+ }
+ }
+
+ protected def sender: ActorRef = new OutputChannelRef(internalSender)
+
+ override def act(): Unit = internalAct()
+
+ override def start(): StashingActor = {
+ super.start()
+ this
+ }
+
+ override def receive[R](f: PartialFunction[Any, R]): R
+
+ /*
+ * Internal implementation.
+ */
+
+ private[actors] var behaviorStack = immutable.Stack[PartialFunction[Any, Unit]]()
+
+ /*
+ * Checks that StashingActor can be created only by MigrationSystem.actorOf method.
+ */
+ private[this] def creationCheck = {
+
+ // creation check (see ActorRef)
+ val context = MigrationSystem.contextStack.get
+ if (context.isEmpty)
+ throw new RuntimeException("In order to create StashingActor one must use actorOf.")
+ else {
+ if (!context.head)
+ throw new RuntimeException("Only one actor can be created per actorOf call.")
+ else
+ MigrationSystem.contextStack.set(context.push(false))
+ }
+
+ }
+
+ private[actors] override def preAct() {
+ preStart()
+ }
+
+ /**
+ * Adds message to a stash, to be processed later. Stashed messages can be fed back into the $actor's
+ * mailbox using <code>unstashAll()</code>.
+ *
+ * Temporarily stashing away messages that the $actor does not (yet) handle simplifies implementing
+ * certain messaging protocols.
+ */
+ final def stash(msg: Any): Unit = {
+ stash.append(msg, null)
+ }
+
+ final def unstashAll(): Unit = {
+ mailbox.prepend(stash)
+ stash.clear()
+ }
+
+ /**
+ * Wraps any partial function with Exit message handling.
+ */
+ private[actors] def wrapWithSystemMessageHandling(pf: PartialFunction[Any, Unit]): PartialFunction[Any, Unit] = {
+
+ def swapExitHandler(pf: PartialFunction[Any, Unit]) = new PartialFunction[Any, Unit] {
+ def swapExit(v: Any) = v match {
+ case Exit(from, reason) =>
+
+ Terminated(new InternalActorRef(from.asInstanceOf[InternalActor]))
+ case v => v
+ }
+
+ def isDefinedAt(v: Any) = pf.isDefinedAt(swapExit(v))
+ def apply(v: Any) = pf(swapExit(v))
+ }
+
+ swapExitHandler(pf orElse {
+ case m => unhandled(m)
+ })
+ }
+
+ /**
+ * Method that models the behavior of Akka actors.
+ */
+ private[actors] def internalAct() {
+ trapExit = true
+ behaviorStack = behaviorStack.push(wrapWithSystemMessageHandling(receive))
+ loop {
+ if (myTimeout.isDefined)
+ reactWithin(myTimeout.get)(behaviorStack.top)
+ else
+ react(behaviorStack.top)
+ }
+ }
+
+ private[actors] override def internalPostStop() = postStop()
+
+ // Used for pattern matching statement similar to Akka
+ lazy val ReceiveTimeout = TIMEOUT
+
+ /**
+ * Used to simulate Akka context behavior. Should be used only for migration purposes.
+ */
+ protected[actors] class ActorContext(val actr: StashingActor) {
+
+ /**
+ * Changes the Actor's behavior to become the new 'Receive' (PartialFunction[Any, Unit]) handler.
+ * Puts the behavior on top of the hotswap stack.
+ * If "discardOld" is true, an unbecome will be issued prior to pushing the new behavior to the stack
+ */
+ def become(behavior: Receive, discardOld: Boolean = true) = actr.become(behavior, discardOld)
+
+ /**
+ * Reverts the Actor behavior to the previous one in the hotswap stack.
+ */
+ def unbecome() = actr.unbecome()
+
+ /**
+ * Shuts down the actor its dispatcher and message queue.
+ */
+ def stop(subject: ActorRef): Nothing = if (subject != ref)
+ throw new RuntimeException("Only stoping of self is allowed during migration.")
+ else
+ actr.exit()
+
+ /**
+ * Registers this actor as a Monitor for the provided ActorRef.
+ * @return the provided ActorRef
+ */
+ def watch(subject: ActorRef): ActorRef = {
+ actr.watch(subject)
+ subject
+ }
+
+ /**
+ * Unregisters this actor as Monitor for the provided ActorRef.
+ * @return the provided ActorRef
+ */
+ def unwatch(subject: ActorRef): ActorRef = {
+ actr unwatch subject
+ subject
+ }
+
+ /**
+ * Defines the receiver timeout value.
+ */
+ final def setReceiveTimeout(timeout: Duration): Unit =
+ actr.myTimeout = Some(timeout.toMillis)
+
+ /**
+ * Gets the current receiveTimeout
+ */
+ final def receiveTimeout: Option[Duration] =
+ actr.myTimeout.map(Duration(_, TimeUnit.MILLISECONDS))
+
+ }
+}
+
+/**
+ * This message is thrown by default when an Actors behavior doesn't match a message
+ */
+case class UnhandledMessageException(msg: Any, ref: ActorRef = null) extends Exception {
+
+ def this(msg: String) = this(msg, null)
+
+ // constructor with 'null' ActorRef needed to work with client instantiation of remote exception
+ override def getMessage =
+ if (ref ne null) "Actor %s does not handle [%s]".format(ref, msg)
+ else "Actor does not handle [%s]".format(msg)
+
+ override def fillInStackTrace() = this //Don't waste cycles generating stack trace
+}
+
+case class Terminated(actor: ActorRef)
diff --git a/src/actors-migration/scala/actors/Timeout.scala b/src/actors-migration/scala/actors/Timeout.scala
new file mode 100644
index 0000000000..bb3c8c0476
--- /dev/null
+++ b/src/actors-migration/scala/actors/Timeout.scala
@@ -0,0 +1,34 @@
+/**
+ * Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
+ */
+package scala.actors
+
+import scala.concurrent.util.Duration
+import java.util.concurrent.TimeUnit
+
+case class Timeout(duration: Duration) {
+ def this(timeout: Long) = this(Duration(timeout, TimeUnit.MILLISECONDS))
+ def this(length: Long, unit: TimeUnit) = this(Duration(length, unit))
+}
+
+object Timeout {
+
+ /**
+ * A timeout with zero duration, will cause most requests to always timeout.
+ */
+ val zero = new Timeout(Duration.Zero)
+
+ /**
+ * A Timeout with infinite duration. Will never timeout. Use extreme caution with this
+ * as it may cause memory leaks, blocked threads, or may not even be supported by
+ * the receiver, which would result in an exception.
+ */
+ val never = new Timeout(Duration.Inf)
+
+ def apply(timeout: Long) = new Timeout(timeout)
+ def apply(length: Long, unit: TimeUnit) = new Timeout(length, unit)
+
+ implicit def durationToTimeout(duration: Duration) = new Timeout(duration)
+ implicit def intToTimeout(timeout: Int) = new Timeout(timeout)
+ implicit def longToTimeout(timeout: Long) = new Timeout(timeout)
+}