summaryrefslogtreecommitdiff
path: root/src/actors-migration/scala/actors/migration
diff options
context:
space:
mode:
Diffstat (limited to 'src/actors-migration/scala/actors/migration')
-rw-r--r--src/actors-migration/scala/actors/migration/MigrationSystem.scala37
-rw-r--r--src/actors-migration/scala/actors/migration/Pattern.scala27
-rw-r--r--src/actors-migration/scala/actors/migration/Props.scala14
-rw-r--r--src/actors-migration/scala/actors/migration/StashingActor.scala257
-rw-r--r--src/actors-migration/scala/actors/migration/Timeout.scala40
5 files changed, 375 insertions, 0 deletions
diff --git a/src/actors-migration/scala/actors/migration/MigrationSystem.scala b/src/actors-migration/scala/actors/migration/MigrationSystem.scala
new file mode 100644
index 0000000000..3dcb38e634
--- /dev/null
+++ b/src/actors-migration/scala/actors/migration/MigrationSystem.scala
@@ -0,0 +1,37 @@
+package scala.actors.migration
+
+import scala.actors._
+import scala.collection._
+
+object MigrationSystem {
+
+ private[migration] val contextStack = new ThreadLocal[immutable.Stack[Boolean]] {
+ override def initialValue() = immutable.Stack[Boolean]()
+ }
+
+ private[this] def withCleanContext(block: => ActorRef): ActorRef = {
+ // push clean marker
+ val old = contextStack.get
+ contextStack.set(old.push(true))
+ try {
+ val instance = block
+
+ if (instance eq null)
+ throw new Exception("Actor instance passed to actorOf can't be 'null'")
+
+ instance
+ } finally {
+ val stackAfter = contextStack.get
+ if (stackAfter.nonEmpty)
+ contextStack.set(if (!stackAfter.head) stackAfter.pop.pop else stackAfter.pop)
+ }
+ }
+
+ def actorOf(props: Props): ActorRef = withCleanContext {
+ val creator = props.creator()
+ val r = new InternalActorRef(creator)
+ creator.start()
+ r
+ }
+
+} \ No newline at end of file
diff --git a/src/actors-migration/scala/actors/migration/Pattern.scala b/src/actors-migration/scala/actors/migration/Pattern.scala
new file mode 100644
index 0000000000..25ba191ce7
--- /dev/null
+++ b/src/actors-migration/scala/actors/migration/Pattern.scala
@@ -0,0 +1,27 @@
+package scala.actors.migration
+
+import scala.actors._
+import scala.concurrent.duration.Duration
+import language.implicitConversions
+
+object pattern {
+
+ implicit def ask(ar: ActorRef): AskableActorRef =
+ new AskableActorRef(ar)
+}
+
+/**
+ * ActorRef with support for ask(?) operation.
+ */
+class AskableActorRef(val ar: ActorRef) extends ActorRef {
+
+ def !(message: Any)(implicit sender: ActorRef = null): Unit = ar.!(message)(sender)
+
+ def ?(message: Any)(implicit timeout: Timeout): scala.concurrent.Future[Any] = ar.?(message, timeout.duration)
+
+ private[actors] def ?(message: Any, timeout: Duration): scala.concurrent.Future[Any] = ar.?(message, timeout)
+
+ def forward(message: Any) = ar.forward(message)
+
+ private[actors] def localActor: AbstractActor = ar.localActor
+}
diff --git a/src/actors-migration/scala/actors/migration/Props.scala b/src/actors-migration/scala/actors/migration/Props.scala
new file mode 100644
index 0000000000..00bc9d93f8
--- /dev/null
+++ b/src/actors-migration/scala/actors/migration/Props.scala
@@ -0,0 +1,14 @@
+package scala.actors.migration
+
+import scala.actors._
+
+/**
+ * ActorRef configuration object. It represents the minimal subset of Akka Props class.
+ */
+case class Props(creator: () ⇒ InternalActor, dispatcher: String) {
+
+ /**
+ * Returns a new Props with the specified creator set
+ */
+ final def withCreator(c: ⇒ InternalActor) = copy(creator = () ⇒ c)
+}
diff --git a/src/actors-migration/scala/actors/migration/StashingActor.scala b/src/actors-migration/scala/actors/migration/StashingActor.scala
new file mode 100644
index 0000000000..d0a1432e72
--- /dev/null
+++ b/src/actors-migration/scala/actors/migration/StashingActor.scala
@@ -0,0 +1,257 @@
+package scala.actors.migration
+
+import scala.actors._
+import scala.actors.Actor._
+import scala.collection._
+import scala.concurrent.duration.Duration
+import java.util.concurrent.TimeUnit
+import scala.language.implicitConversions
+
+object StashingActor extends Combinators {
+ implicit def mkBody[A](body: => A) = new InternalActor.Body[A] {
+ def andThen[B](other: => B): Unit = Actor.rawSelf.seq(body, other)
+ }
+}
+
+@deprecated("Scala Actors are being removed from the standard library. Please refer to the migration guide.", "2.10")
+trait StashingActor extends InternalActor {
+ type Receive = PartialFunction[Any, Unit]
+
+ // checks if StashingActor is created within the actorOf block
+ creationCheck()
+
+ private[actors] val ref = new InternalActorRef(this)
+
+ val self: ActorRef = ref
+
+ protected[this] val context: ActorContext = new ActorContext(this)
+
+ @volatile
+ private var myTimeout: Option[Long] = None
+
+ private val stash = new MQueue[Any]("Stash")
+
+ /**
+ * Migration notes:
+ * this method replaces receiveWithin, receive and react methods from Scala Actors.
+ */
+ def receive: Receive
+
+ /**
+ * User overridable callback.
+ * <p/>
+ * Is called when an Actor is started by invoking 'actor'.
+ */
+ def preStart() {}
+
+ /**
+ * User overridable callback.
+ * <p/>
+ * Is called when 'actor.stop()' is invoked.
+ */
+ def postStop() {}
+
+ /**
+ * User overridable callback.
+ * <p/>
+ * Is called on a crashed Actor right BEFORE it is restarted to allow clean
+ * up of resources before Actor is terminated.
+ * By default it calls postStop()
+ */
+ def preRestart(reason: Throwable, message: Option[Any]) { postStop() }
+
+ /**
+ * Changes the Actor's behavior to become the new 'Receive' (PartialFunction[Any, Unit]) handler.
+ * Puts the behavior on top of the hotswap stack.
+ * If "discardOld" is true, an unbecome will be issued prior to pushing the new behavior to the stack
+ */
+ private def become(behavior: Receive, discardOld: Boolean = true) {
+ if (discardOld) unbecome()
+ behaviorStack = behaviorStack.push(wrapWithSystemMessageHandling(behavior))
+ }
+
+ /**
+ * Reverts the Actor behavior to the previous one in the hotswap stack.
+ */
+ private def unbecome() {
+ // never unbecome the initial behavior
+ if (behaviorStack.size > 1)
+ behaviorStack = behaviorStack.pop
+ }
+
+ /**
+ * User overridable callback.
+ * <p/>
+ * Is called when a message isn't handled by the current behavior of the actor
+ * by default it does: EventHandler.warning(self, message)
+ */
+ def unhandled(message: Any) {
+ message match {
+ case Terminated(dead) ⇒ throw new DeathPactException(dead)
+ case _ ⇒ System.err.println("Unhandeled message " + message)
+ }
+ }
+
+ protected def sender: ActorRef = new OutputChannelRef(internalSender)
+
+ override def act(): Unit = internalAct()
+
+ override def start(): StashingActor = {
+ super.start()
+ this
+ }
+
+ override def receive[R](f: PartialFunction[Any, R]): R
+
+ /*
+ * Internal implementation.
+ */
+
+ private[actors] var behaviorStack = immutable.Stack[PartialFunction[Any, Unit]]()
+
+ /*
+ * Checks that StashingActor can be created only by MigrationSystem.actorOf method.
+ */
+ private[this] def creationCheck(): Unit = {
+ // creation check (see ActorRef)
+ val context = MigrationSystem.contextStack.get
+ if (context.isEmpty)
+ throw new RuntimeException("In order to create StashingActor one must use actorOf.")
+ else {
+ if (!context.head)
+ throw new RuntimeException("Only one actor can be created per actorOf call.")
+ else
+ MigrationSystem.contextStack.set(context.push(false))
+ }
+
+ }
+
+ private[actors] override def preAct() {
+ preStart()
+ }
+
+ /**
+ * Adds message to a stash, to be processed later. Stashed messages can be fed back into the $actor's
+ * mailbox using <code>unstashAll()</code>.
+ *
+ * Temporarily stashing away messages that the $actor does not (yet) handle simplifies implementing
+ * certain messaging protocols.
+ */
+ final def stash(msg: Any): Unit = {
+ stash.append(msg, null)
+ }
+
+ final def unstashAll(): Unit = {
+ mailbox.prepend(stash)
+ stash.clear()
+ }
+
+ /**
+ * Wraps any partial function with Exit message handling.
+ */
+ private[actors] def wrapWithSystemMessageHandling(pf: PartialFunction[Any, Unit]): PartialFunction[Any, Unit] = {
+
+ def swapExitHandler(pf: PartialFunction[Any, Unit]) = new PartialFunction[Any, Unit] {
+ def swapExit(v: Any) = v match {
+ case Exit(from, reason) =>
+ Terminated(new InternalActorRef(from.asInstanceOf[InternalActor]))
+ case v => v
+ }
+
+ def isDefinedAt(v: Any) = pf.isDefinedAt(swapExit(v))
+ def apply(v: Any) = pf(swapExit(v))
+ }
+
+ swapExitHandler(pf orElse {
+ case m => unhandled(m)
+ })
+ }
+
+ /**
+ * Method that models the behavior of Akka actors.
+ */
+ private[actors] def internalAct() {
+ trapExit = true
+ behaviorStack = behaviorStack.push(wrapWithSystemMessageHandling(receive))
+ loop {
+ if (myTimeout.isDefined)
+ reactWithin(myTimeout.get)(behaviorStack.top)
+ else
+ react(behaviorStack.top)
+ }
+ }
+
+ private[actors] override def internalPostStop() = postStop()
+
+ // Used for pattern matching statement similar to Akka
+ lazy val ReceiveTimeout = TIMEOUT
+
+ /**
+ * Used to simulate Akka context behavior. Should be used only for migration purposes.
+ */
+ protected[actors] class ActorContext(val actr: StashingActor) {
+
+ /**
+ * Changes the Actor's behavior to become the new 'Receive' (PartialFunction[Any, Unit]) handler.
+ * Puts the behavior on top of the hotswap stack.
+ * If "discardOld" is true, an unbecome will be issued prior to pushing the new behavior to the stack
+ */
+ def become(behavior: Receive, discardOld: Boolean = true) = actr.become(behavior, discardOld)
+
+ /**
+ * Reverts the Actor behavior to the previous one in the hotswap stack.
+ */
+ def unbecome() = actr.unbecome()
+
+ /**
+ * Shuts down the actor its dispatcher and message queue.
+ */
+ def stop(subject: ActorRef): Nothing = if (subject != ref)
+ throw new RuntimeException("Only stoping of self is allowed during migration.")
+ else
+ actr.exit()
+
+ /**
+ * Registers this actor as a Monitor for the provided ActorRef.
+ * @return the provided ActorRef
+ */
+ def watch(subject: ActorRef): ActorRef = {
+ actr.watch(subject)
+ subject
+ }
+
+ /**
+ * Unregisters this actor as Monitor for the provided ActorRef.
+ * @return the provided ActorRef
+ */
+ def unwatch(subject: ActorRef): ActorRef = {
+ actr unwatch subject
+ subject
+ }
+
+ /**
+ * Defines the receiver timeout value.
+ */
+ final def setReceiveTimeout(timeout: Duration): Unit =
+ actr.myTimeout = Some(timeout.toMillis)
+
+ /**
+ * Gets the current receiveTimeout
+ */
+ final def receiveTimeout: Option[Duration] =
+ actr.myTimeout.map(Duration(_, TimeUnit.MILLISECONDS))
+
+ }
+}
+
+/**
+ * This message is thrown by default when an Actor does not handle termination.
+ */
+class DeathPactException(ref: ActorRef = null) extends Exception {
+ override def fillInStackTrace() = this //Don't waste cycles generating stack trace
+}
+
+/**
+ * Message that is sent to a watching actor when the watched actor terminates.
+ */
+case class Terminated(actor: ActorRef)
diff --git a/src/actors-migration/scala/actors/migration/Timeout.scala b/src/actors-migration/scala/actors/migration/Timeout.scala
new file mode 100644
index 0000000000..32ea5f20fc
--- /dev/null
+++ b/src/actors-migration/scala/actors/migration/Timeout.scala
@@ -0,0 +1,40 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2005-2011, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+package scala.actors.migration
+
+import scala.concurrent.duration.Duration
+import java.util.concurrent.TimeUnit
+import scala.language.implicitConversions
+
+case class Timeout(duration: Duration) {
+ def this(timeout: Long) = this(Duration(timeout, TimeUnit.MILLISECONDS))
+ def this(length: Long, unit: TimeUnit) = this(Duration(length, unit))
+}
+
+object Timeout {
+
+ /**
+ * A timeout with zero duration, will cause most requests to always timeout.
+ */
+ val zero = new Timeout(Duration.Zero)
+
+ /**
+ * A Timeout with infinite duration. Will never timeout. Use extreme caution with this
+ * as it may cause memory leaks, blocked threads, or may not even be supported by
+ * the receiver, which would result in an exception.
+ */
+ val never = new Timeout(Duration.Inf)
+
+ def apply(timeout: Long) = new Timeout(timeout)
+ def apply(length: Long, unit: TimeUnit) = new Timeout(length, unit)
+
+ implicit def durationToTimeout(duration: Duration) = new Timeout(duration)
+ implicit def intToTimeout(timeout: Int) = new Timeout(timeout)
+ implicit def longToTimeout(timeout: Long) = new Timeout(timeout)
+}