From e99fb0c93842d517b8a185458f405bace2bbb46b Mon Sep 17 00:00:00 2001 From: Vojin Jovanovic Date: Fri, 18 May 2012 17:53:05 +0200 Subject: Adding the Actor Migration Kit. Kit consists of: 1) The StashingActor which adopts an interface similar to Akka. 2) Props mockup for creating Akka like code 3) Pattern mockup 4) Test cases for every step in the migration. 5) MigrationSystem which will paired on the Akka side. Review of the code : @phaller Review of the build: @jsuereth --- build.xml | 44 +++- .../scala/actors/MigrationSystem.scala | 36 +++ src/actors-migration/scala/actors/Pattern.scala | 25 ++ src/actors-migration/scala/actors/Props.scala | 14 ++ .../scala/actors/StashingActor.scala | 261 +++++++++++++++++++++ src/actors-migration/scala/actors/Timeout.scala | 34 +++ src/actors/scala/actors/ActorRef.scala | 119 ++++++++++ src/actors/scala/actors/ActorTask.scala | 1 - src/actors/scala/actors/InternalActor.scala | 53 ++++- src/actors/scala/actors/MQueue.scala | 14 ++ src/actors/scala/actors/Reactor.scala | 10 +- src/build/maven/maven-deploy.xml | 3 + src/build/maven/scala-actors-migration.pom | 66 ++++++ src/build/pack.xml | 6 + src/partest/scala/tools/partest/PartestTask.scala | 11 + .../scala/tools/partest/nest/AntRunner.scala | 1 + .../tools/partest/nest/ConsoleFileManager.scala | 9 + .../scala/tools/partest/nest/DirectRunner.scala | 4 +- .../scala/tools/partest/nest/FileManager.scala | 1 + .../tools/partest/nest/ReflectiveRunner.scala | 4 +- .../scala/tools/partest/nest/SBTRunner.scala | 2 + src/partest/scala/tools/partest/nest/Worker.scala | 1 + test/files/jvm/actmig-PinS.check | 18 ++ test/files/jvm/actmig-PinS.scala | 118 ++++++++++ test/files/jvm/actmig-PinS_1.check | 18 ++ test/files/jvm/actmig-PinS_1.scala | 130 ++++++++++ test/files/jvm/actmig-PinS_2.check | 18 ++ test/files/jvm/actmig-PinS_2.scala | 150 ++++++++++++ test/files/jvm/actmig-PinS_3.check | 19 ++ test/files/jvm/actmig-PinS_3.scala | 159 +++++++++++++ test/files/jvm/actmig-hierarchy.check | 2 + test/files/jvm/actmig-hierarchy.scala | 44 ++++ test/files/jvm/actmig-hierarchy_1.check | 2 + test/files/jvm/actmig-hierarchy_1.scala | 41 ++++ test/files/jvm/actmig-instantiation.check | 8 + test/files/jvm/actmig-instantiation.scala | 91 +++++++ test/files/jvm/actmig-loop-react.check | 16 ++ test/files/jvm/actmig-loop-react.scala | 170 ++++++++++++++ test/files/jvm/actmig-public-methods.check | 6 + test/files/jvm/actmig-public-methods.scala | 69 ++++++ test/files/jvm/actmig-public-methods_1.check | 6 + test/files/jvm/actmig-public-methods_1.scala | 88 +++++++ test/files/jvm/actmig-react-receive.check | 16 ++ test/files/jvm/actmig-react-receive.scala | 104 ++++++++ 44 files changed, 1996 insertions(+), 16 deletions(-) create mode 100644 src/actors-migration/scala/actors/MigrationSystem.scala create mode 100644 src/actors-migration/scala/actors/Pattern.scala create mode 100644 src/actors-migration/scala/actors/Props.scala create mode 100644 src/actors-migration/scala/actors/StashingActor.scala create mode 100644 src/actors-migration/scala/actors/Timeout.scala create mode 100644 src/actors/scala/actors/ActorRef.scala create mode 100644 src/build/maven/scala-actors-migration.pom create mode 100644 test/files/jvm/actmig-PinS.check create mode 100644 test/files/jvm/actmig-PinS.scala create mode 100644 test/files/jvm/actmig-PinS_1.check create mode 100644 test/files/jvm/actmig-PinS_1.scala create mode 100644 test/files/jvm/actmig-PinS_2.check create mode 100644 test/files/jvm/actmig-PinS_2.scala create mode 100644 test/files/jvm/actmig-PinS_3.check create mode 100644 test/files/jvm/actmig-PinS_3.scala create mode 100644 test/files/jvm/actmig-hierarchy.check create mode 100644 test/files/jvm/actmig-hierarchy.scala create mode 100644 test/files/jvm/actmig-hierarchy_1.check create mode 100644 test/files/jvm/actmig-hierarchy_1.scala create mode 100644 test/files/jvm/actmig-instantiation.check create mode 100644 test/files/jvm/actmig-instantiation.scala create mode 100644 test/files/jvm/actmig-loop-react.check create mode 100644 test/files/jvm/actmig-loop-react.scala create mode 100644 test/files/jvm/actmig-public-methods.check create mode 100644 test/files/jvm/actmig-public-methods.scala create mode 100644 test/files/jvm/actmig-public-methods_1.check create mode 100644 test/files/jvm/actmig-public-methods_1.scala create mode 100644 test/files/jvm/actmig-react-receive.check create mode 100644 test/files/jvm/actmig-react-receive.scala diff --git a/build.xml b/build.xml index b28e6141c6..4727dffbdc 100644 --- a/build.xml +++ b/build.xml @@ -682,6 +682,7 @@ QUICK BUILD (QUICK) + @@ -879,7 +880,32 @@ QUICK BUILD (QUICK) - + + + + + + + + + + + + + + + + + + + + + @@ -1083,6 +1109,9 @@ PACKED QUICK BUILD (PACK) + + + @@ -1213,6 +1242,7 @@ PACKED QUICK BUILD (PACK) + @@ -1685,6 +1715,7 @@ DOCUMENTATION + @@ -1705,6 +1736,7 @@ DOCUMENTATION docRootContent="${src.dir}/library/rootdoc.txt" implicits="on" diagrams="on"> + @@ -1901,6 +1933,7 @@ BOOTRAPING TEST AND TEST SUITE + @@ -2079,6 +2112,9 @@ DISTRIBUTION + + + @@ -2258,6 +2294,12 @@ POSITIONS + + + + + + diff --git a/src/actors-migration/scala/actors/MigrationSystem.scala b/src/actors-migration/scala/actors/MigrationSystem.scala new file mode 100644 index 0000000000..ffc93d9c6f --- /dev/null +++ b/src/actors-migration/scala/actors/MigrationSystem.scala @@ -0,0 +1,36 @@ +package scala.actors + +import scala.collection._ + +object MigrationSystem { + + private[actors] val contextStack = new ThreadLocal[immutable.Stack[Boolean]] { + override def initialValue() = immutable.Stack[Boolean]() + } + + private[this] def withCleanContext(block: => ActorRef): ActorRef = { + // push clean marker + val old = contextStack.get + contextStack.set(old.push(true)) + try { + val instance = block + + if (instance eq null) + throw new Exception("Actor instance passed to actorOf can't be 'null'") + + instance + } finally { + val stackAfter = contextStack.get + if (stackAfter.nonEmpty) + contextStack.set(if (!stackAfter.head) stackAfter.pop.pop else stackAfter.pop) + } + } + + def actorOf(props: Props): ActorRef = withCleanContext { + val creator = props.creator() + val r = new InternalActorRef(creator) + creator.start() + r + } + +} \ No newline at end of file diff --git a/src/actors-migration/scala/actors/Pattern.scala b/src/actors-migration/scala/actors/Pattern.scala new file mode 100644 index 0000000000..97dbd2cccd --- /dev/null +++ b/src/actors-migration/scala/actors/Pattern.scala @@ -0,0 +1,25 @@ +package scala.actors + +import scala.concurrent.util.Duration + +object pattern { + + implicit def askSupport(ar: ActorRef): AskableActorRef = + new AskableActorRef(ar) +} + +/** + * ActorRef with support for ask(?) operation. + */ +class AskableActorRef(val ar: ActorRef) extends ActorRef { + + def !(message: Any)(implicit sender: ActorRef = null): Unit = ar.!(message)(sender) + + def ?(message: Any)(timeout: Timeout): Future[Any] = ar.?(message, timeout.duration) + + private[actors] def ?(message: Any, timeout: Duration): Future[Any] = ar.?(message, timeout) + + def forward(message: Any) = ar.forward(message) + + private[actors] def localActor: AbstractActor = ar.localActor +} \ No newline at end of file diff --git a/src/actors-migration/scala/actors/Props.scala b/src/actors-migration/scala/actors/Props.scala new file mode 100644 index 0000000000..b4d18d5fad --- /dev/null +++ b/src/actors-migration/scala/actors/Props.scala @@ -0,0 +1,14 @@ +package scala.actors + +/** + * ActorRef configuration object. It represents the minimal subset of Akka Props class. + */ +case class Props(creator: () ⇒ InternalActor, dispatcher: String) { + + /** + * Returns a new Props with the specified creator set + * Scala API + */ + def withCreator(c: ⇒ InternalActor) = copy(creator = () ⇒ c) + +} \ No newline at end of file diff --git a/src/actors-migration/scala/actors/StashingActor.scala b/src/actors-migration/scala/actors/StashingActor.scala new file mode 100644 index 0000000000..4bca879d9f --- /dev/null +++ b/src/actors-migration/scala/actors/StashingActor.scala @@ -0,0 +1,261 @@ +package scala.actors + +import scala.collection._ +import scala.concurrent.util.Duration +import java.util.concurrent.TimeUnit + +object StashingActor extends Combinators { + implicit def mkBody[A](body: => A) = new InternalActor.Body[A] { + def andThen[B](other: => B): Unit = Actor.rawSelf.seq(body, other) + } +} + +@deprecated("Scala Actors are beeing removed from the standard library. Please refer to the migration guide.", "2.10") +trait StashingActor extends InternalActor { + type Receive = PartialFunction[Any, Unit] + + // checks if StashingActor is created within the actorOf block + creationCheck; + + private[actors] val ref = new InternalActorRef(this) + + val self: ActorRef = ref + + protected[this] val context: ActorContext = new ActorContext(this) + + @volatile + private var myTimeout: Option[Long] = None + + private val stash = new MQueue[Any]("Stash") + + /** + * Migration notes: + * this method replaces receiveWithin, receive and react methods from Scala Actors. + */ + def receive: Receive + + /** + * User overridable callback. + *

+ * Is called when an Actor is started by invoking 'actor'. + */ + def preStart() {} + + /** + * User overridable callback. + *

+ * Is called when 'actor.stop()' is invoked. + */ + def postStop() {} + + /** + * User overridable callback. + *

+ * Is called on a crashed Actor right BEFORE it is restarted to allow clean + * up of resources before Actor is terminated. + * By default it calls postStop() + */ + def preRestart(reason: Throwable, message: Option[Any]) { postStop() } + + /** + * Changes the Actor's behavior to become the new 'Receive' (PartialFunction[Any, Unit]) handler. + * Puts the behavior on top of the hotswap stack. + * If "discardOld" is true, an unbecome will be issued prior to pushing the new behavior to the stack + */ + private def become(behavior: Receive, discardOld: Boolean = true) { + if (discardOld) unbecome() + behaviorStack = behaviorStack.push(wrapWithSystemMessageHandling(behavior)) + } + + /** + * Reverts the Actor behavior to the previous one in the hotswap stack. + */ + private def unbecome() { + // never unbecome the initial behavior + if (behaviorStack.size > 1) + behaviorStack = behaviorStack.pop + } + + /** + * User overridable callback. + *

+ * Is called when a message isn't handled by the current behavior of the actor + * by default it does: EventHandler.warning(self, message) + */ + def unhandled(message: Any) { + println("unhandeld") + message match { + case _ => throw new UnhandledMessageException(message, self) + } + } + + protected def sender: ActorRef = new OutputChannelRef(internalSender) + + override def act(): Unit = internalAct() + + override def start(): StashingActor = { + super.start() + this + } + + override def receive[R](f: PartialFunction[Any, R]): R + + /* + * Internal implementation. + */ + + private[actors] var behaviorStack = immutable.Stack[PartialFunction[Any, Unit]]() + + /* + * Checks that StashingActor can be created only by MigrationSystem.actorOf method. + */ + private[this] def creationCheck = { + + // creation check (see ActorRef) + val context = MigrationSystem.contextStack.get + if (context.isEmpty) + throw new RuntimeException("In order to create StashingActor one must use actorOf.") + else { + if (!context.head) + throw new RuntimeException("Only one actor can be created per actorOf call.") + else + MigrationSystem.contextStack.set(context.push(false)) + } + + } + + private[actors] override def preAct() { + preStart() + } + + /** + * Adds message to a stash, to be processed later. Stashed messages can be fed back into the $actor's + * mailbox using unstashAll(). + * + * Temporarily stashing away messages that the $actor does not (yet) handle simplifies implementing + * certain messaging protocols. + */ + final def stash(msg: Any): Unit = { + stash.append(msg, null) + } + + final def unstashAll(): Unit = { + mailbox.prepend(stash) + stash.clear() + } + + /** + * Wraps any partial function with Exit message handling. + */ + private[actors] def wrapWithSystemMessageHandling(pf: PartialFunction[Any, Unit]): PartialFunction[Any, Unit] = { + + def swapExitHandler(pf: PartialFunction[Any, Unit]) = new PartialFunction[Any, Unit] { + def swapExit(v: Any) = v match { + case Exit(from, reason) => + + Terminated(new InternalActorRef(from.asInstanceOf[InternalActor])) + case v => v + } + + def isDefinedAt(v: Any) = pf.isDefinedAt(swapExit(v)) + def apply(v: Any) = pf(swapExit(v)) + } + + swapExitHandler(pf orElse { + case m => unhandled(m) + }) + } + + /** + * Method that models the behavior of Akka actors. + */ + private[actors] def internalAct() { + trapExit = true + behaviorStack = behaviorStack.push(wrapWithSystemMessageHandling(receive)) + loop { + if (myTimeout.isDefined) + reactWithin(myTimeout.get)(behaviorStack.top) + else + react(behaviorStack.top) + } + } + + private[actors] override def internalPostStop() = postStop() + + // Used for pattern matching statement similar to Akka + lazy val ReceiveTimeout = TIMEOUT + + /** + * Used to simulate Akka context behavior. Should be used only for migration purposes. + */ + protected[actors] class ActorContext(val actr: StashingActor) { + + /** + * Changes the Actor's behavior to become the new 'Receive' (PartialFunction[Any, Unit]) handler. + * Puts the behavior on top of the hotswap stack. + * If "discardOld" is true, an unbecome will be issued prior to pushing the new behavior to the stack + */ + def become(behavior: Receive, discardOld: Boolean = true) = actr.become(behavior, discardOld) + + /** + * Reverts the Actor behavior to the previous one in the hotswap stack. + */ + def unbecome() = actr.unbecome() + + /** + * Shuts down the actor its dispatcher and message queue. + */ + def stop(subject: ActorRef): Nothing = if (subject != ref) + throw new RuntimeException("Only stoping of self is allowed during migration.") + else + actr.exit() + + /** + * Registers this actor as a Monitor for the provided ActorRef. + * @return the provided ActorRef + */ + def watch(subject: ActorRef): ActorRef = { + actr.watch(subject) + subject + } + + /** + * Unregisters this actor as Monitor for the provided ActorRef. + * @return the provided ActorRef + */ + def unwatch(subject: ActorRef): ActorRef = { + actr unwatch subject + subject + } + + /** + * Defines the receiver timeout value. + */ + final def setReceiveTimeout(timeout: Duration): Unit = + actr.myTimeout = Some(timeout.toMillis) + + /** + * Gets the current receiveTimeout + */ + final def receiveTimeout: Option[Duration] = + actr.myTimeout.map(Duration(_, TimeUnit.MILLISECONDS)) + + } +} + +/** + * This message is thrown by default when an Actors behavior doesn't match a message + */ +case class UnhandledMessageException(msg: Any, ref: ActorRef = null) extends Exception { + + def this(msg: String) = this(msg, null) + + // constructor with 'null' ActorRef needed to work with client instantiation of remote exception + override def getMessage = + if (ref ne null) "Actor %s does not handle [%s]".format(ref, msg) + else "Actor does not handle [%s]".format(msg) + + override def fillInStackTrace() = this //Don't waste cycles generating stack trace +} + +case class Terminated(actor: ActorRef) diff --git a/src/actors-migration/scala/actors/Timeout.scala b/src/actors-migration/scala/actors/Timeout.scala new file mode 100644 index 0000000000..bb3c8c0476 --- /dev/null +++ b/src/actors-migration/scala/actors/Timeout.scala @@ -0,0 +1,34 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package scala.actors + +import scala.concurrent.util.Duration +import java.util.concurrent.TimeUnit + +case class Timeout(duration: Duration) { + def this(timeout: Long) = this(Duration(timeout, TimeUnit.MILLISECONDS)) + def this(length: Long, unit: TimeUnit) = this(Duration(length, unit)) +} + +object Timeout { + + /** + * A timeout with zero duration, will cause most requests to always timeout. + */ + val zero = new Timeout(Duration.Zero) + + /** + * A Timeout with infinite duration. Will never timeout. Use extreme caution with this + * as it may cause memory leaks, blocked threads, or may not even be supported by + * the receiver, which would result in an exception. + */ + val never = new Timeout(Duration.Inf) + + def apply(timeout: Long) = new Timeout(timeout) + def apply(length: Long, unit: TimeUnit) = new Timeout(length, unit) + + implicit def durationToTimeout(duration: Duration) = new Timeout(duration) + implicit def intToTimeout(timeout: Int) = new Timeout(timeout) + implicit def longToTimeout(timeout: Long) = new Timeout(timeout) +} diff --git a/src/actors/scala/actors/ActorRef.scala b/src/actors/scala/actors/ActorRef.scala new file mode 100644 index 0000000000..8f70b13e01 --- /dev/null +++ b/src/actors/scala/actors/ActorRef.scala @@ -0,0 +1,119 @@ +package scala.actors + +import java.util.concurrent.TimeoutException +import scala.concurrent.util.Duration + +/** + * Trait used for migration of Scala actors to Akka. + */ +@deprecated("ActorRef ought to be used only with the Actor Migration Kit.") +trait ActorRef { + + /** + * Sends a one-way asynchronous message. E.g. fire-and-forget semantics. + *

+ * + * If invoked from within an actor then the actor reference is implicitly passed on as the implicit 'sender' argument. + *

+ * + * This actor 'sender' reference is then available in the receiving actor in the 'sender' member variable, + * if invoked from within an Actor. If not then no sender is available. + *

+   *   actor ! message
+   * 
+ *

+ */ + def !(message: Any)(implicit sender: ActorRef = null): Unit + + /** + * Sends a message asynchronously, returning a future which may eventually hold the reply. + */ + private[actors] def ?(message: Any, timeout: Duration): Future[Any] + + /** + * Forwards the message and passes the original sender actor as the sender. + *

+ * Works with '!' and '?'. + */ + def forward(message: Any) + + private[actors] def localActor: AbstractActor + +} + +private[actors] class OutputChannelRef(val actor: OutputChannel[Any]) extends ActorRef { + + override private[actors] def ?(message: Any, timeout: Duration): Future[Any] = + throw new UnsupportedOperationException("Output channel does not support ?") + + /** + * Sends a one-way asynchronous message. E.g. fire-and-forget semantics. + *

+ * + *

+ *

+   *   actor ! message
+   * 
+ *

+ */ + def !(message: Any)(implicit sender: ActorRef = null): Unit = + if (sender != null) + actor.send(message, sender.localActor) + else + actor ! message + + override def equals(that: Any) = + that.isInstanceOf[OutputChannelRef] && that.asInstanceOf[OutputChannelRef].actor == this.actor + + private[actors] override def localActor: AbstractActor = + throw new UnsupportedOperationException("Output channel does not have an instance of the actor") + + def forward(message: Any): Unit = throw new UnsupportedOperationException("OutputChannel does not support forward.") + +} + +private[actors] class ReactorRef(override val actor: Reactor[Any]) extends OutputChannelRef(actor) { + + /** + * Forwards the message and passes the original sender actor as the sender. + *

+ * Works with '!' and '?'. + */ + override def forward(message: Any) = actor.forward(message) + +} + +private[actors] final class InternalActorRef(override val actor: InternalActor) extends ReactorRef(actor) { + + /** + * Sends a message asynchronously, returning a future which may eventually hold the reply. + */ + override private[actors] def ?(message: Any, timeout: Duration): Future[Any] = + Futures.future { + val dur = if (timeout.isFinite()) timeout.toMillis else (java.lang.Long.MAX_VALUE >> 2) + actor !? (dur, message) match { + case Some(x) => x + case None => new AskTimeoutException("? operation timed out.") + } + } + + override def !(message: Any)(implicit sender: ActorRef = null): Unit = + if (message == PoisonPill) + actor.stop('normal) + else if (sender != null) + actor.send(message, sender.localActor) + else + actor ! message + + private[actors] override def localActor: InternalActor = this.actor +} + +/** + * This is what is used to complete a Future that is returned from an ask/? call, + * when it times out. + */ +class AskTimeoutException(message: String, cause: Throwable) extends TimeoutException { + def this(message: String) = this(message, null: Throwable) +} + +object PoisonPill diff --git a/src/actors/scala/actors/ActorTask.scala b/src/actors/scala/actors/ActorTask.scala index bb04302238..045b00f5f2 100644 --- a/src/actors/scala/actors/ActorTask.scala +++ b/src/actors/scala/actors/ActorTask.scala @@ -51,7 +51,6 @@ private[actors] class ActorTask(actor: InternalActor, super.terminateExecution(e) () => {} } - actor.internalPostStop res } diff --git a/src/actors/scala/actors/InternalActor.scala b/src/actors/scala/actors/InternalActor.scala index c94da5b9fd..cb66021d1c 100644 --- a/src/actors/scala/actors/InternalActor.scala +++ b/src/actors/scala/actors/InternalActor.scala @@ -153,7 +153,7 @@ private[actors] trait InternalActor extends AbstractActor with InternalReplyReac val matches = f.isDefinedAt(m) senders = senders.tail matches - }) + }) if (null eq qel) { val todo = synchronized { // in mean time new stuff might have arrived @@ -316,6 +316,35 @@ private[actors] trait InternalActor extends AbstractActor with InternalReplyReac to } + /** + * Links self to actor to. + * + * @param to the actor to link to + * @return the parameter actor + */ + def link(to: ActorRef): ActorRef = { + this.link(to.localActor) + to + } + + /** + * Unidirectional linking. For migration purposes only + */ + private[actors] def watch(subject: ActorRef): ActorRef = { + assert(Actor.self(scheduler) == this, "link called on actor different from self") + subject.localActor linkTo this + subject + } + + /** + * Unidirectional linking. For migration purposes only + */ + private[actors] def unwatch(subject: ActorRef): ActorRef = { + assert(Actor.self(scheduler) == this, "link called on actor different from self") + subject.localActor unlinkFrom this + subject + } + /** * Links self to the actor defined by body. * @@ -346,17 +375,24 @@ private[actors] trait InternalActor extends AbstractActor with InternalReplyReac from unlinkFrom this } + /** + * Unlinks self from actor from. + */ + def unlink(from: ActorRef) { + unlink(from.localActor) + } + private[actors] def unlinkFrom(from: AbstractActor) = synchronized { links = links.filterNot(from.==) } - @volatile + @volatile private[actors] var _trapExit = false - + def trapExit = _trapExit - + def trapExit_=(value: Boolean) = _trapExit = value - + // guarded by this private var exitReason: AnyRef = 'normal // guarded by this @@ -445,12 +481,11 @@ private[actors] trait InternalActor extends AbstractActor with InternalReplyReac scheduler.onTerminate(this) { f } } - private[actors] def internalPostStop() = {} - private[actors] def stop(reason: AnyRef): Unit = { + private[actors] def stop(reason: AnyRef): Unit = { synchronized { shouldExit = true - exitReason = reason + exitReason = reason // resume this Actor in a way that // causes it to exit // (because shouldExit == true) @@ -464,7 +499,7 @@ private[actors] trait InternalActor extends AbstractActor with InternalReplyReac /* Here we should not throw a SuspendActorControl, since the current method is called from an actor that is in the process of exiting. - + Therefore, the contract for scheduleActor is that it never throws a SuspendActorControl. */ diff --git a/src/actors/scala/actors/MQueue.scala b/src/actors/scala/actors/MQueue.scala index 65427d68c5..4a148d2cb3 100644 --- a/src/actors/scala/actors/MQueue.scala +++ b/src/actors/scala/actors/MQueue.scala @@ -25,6 +25,20 @@ private[actors] class MQueue[Msg >: Null](protected val label: String) { _size += diff } + def prepend(other: MQueue[Msg]) { + if (!other.isEmpty) { + other.last.next = first + first = other.first + } + } + + def clear() { + first = null + last = null + _size = 0 + } + + def append(msg: Msg, session: OutputChannel[Any]) { changeSize(1) // size always increases by 1 val el = new MQueueElement(msg, session) diff --git a/src/actors/scala/actors/Reactor.scala b/src/actors/scala/actors/Reactor.scala index 206a97d97c..7a8d738758 100644 --- a/src/actors/scala/actors/Reactor.scala +++ b/src/actors/scala/actors/Reactor.scala @@ -214,11 +214,16 @@ trait Reactor[Msg >: Null] extends OutputChannel[Msg] with Combinators { scheduler executeFromActor makeReaction(null, handler, msg) } + private[actors] def preAct() = {} + // guarded by this private[actors] def dostart() { _state = Actor.State.Runnable scheduler newActor this - scheduler execute makeReaction(() => act(), null, null) + scheduler execute makeReaction(() => { + preAct() + act() + }, null, null) } /** @@ -285,12 +290,15 @@ trait Reactor[Msg >: Null] extends OutputChannel[Msg] with Combinators { throw Actor.suspendException } + private[actors] def internalPostStop() = {} + private[actors] def terminated() { synchronized { _state = Actor.State.Terminated // reset waitingFor, otherwise getState returns Suspended waitingFor = Reactor.waitingForNone } + internalPostStop() scheduler.terminated(this) } diff --git a/src/build/maven/maven-deploy.xml b/src/build/maven/maven-deploy.xml index ac0f8f745b..7ab54f81c3 100644 --- a/src/build/maven/maven-deploy.xml +++ b/src/build/maven/maven-deploy.xml @@ -112,6 +112,7 @@ + @@ -172,6 +173,7 @@ + @@ -239,6 +241,7 @@ + diff --git a/src/build/maven/scala-actors-migration.pom b/src/build/maven/scala-actors-migration.pom new file mode 100644 index 0000000000..93fc34ece9 --- /dev/null +++ b/src/build/maven/scala-actors-migration.pom @@ -0,0 +1,66 @@ + + 4.0.0 + org.scala-lang + scala-actors-migration + jar + @VERSION@ + Scala Migration Kit + Migration kit that enables easy transition from the Scala Actors to Akka. + http://www.scala-lang.org/ + 2012 + + LAMP/EPFL + http://lamp.epfl.ch/ + + + + BSD-like + http://www.scala-lang.org/downloads/license.html + + repo + + + + scm:git:git://github.com/scala/scala.git + https://github.com/scala/scala.git + + + JIRA + https://issues.scala-lang.org/ + + + + org.scala-lang + scala-library + @VERSION@ + + + org.scala-lang + scala-actors + @VERSION@ + + + + + scala-tools.org + @RELEASE_REPOSITORY@ + + + scala-tools.org + @SNAPSHOT_REPOSITORY@ + false + + + + + lamp + EPFL LAMP + + + Typesafe + Typesafe, Inc. + + + diff --git a/src/build/pack.xml b/src/build/pack.xml index f96c6b9799..956beaef88 100644 --- a/src/build/pack.xml +++ b/src/build/pack.xml @@ -139,6 +139,7 @@ MAIN DISTRIBUTION PACKAGING + @@ -201,6 +202,11 @@ MAIN DISTRIBUTION PACKAGING basedir="${build-docs.dir}/continuations-plugin"> + + + + diff --git a/src/partest/scala/tools/partest/PartestTask.scala b/src/partest/scala/tools/partest/PartestTask.scala index ad2e155182..67b38d2e24 100644 --- a/src/partest/scala/tools/partest/PartestTask.scala +++ b/src/partest/scala/tools/partest/PartestTask.scala @@ -309,6 +309,16 @@ class PartestTask extends Task with CompilationPathProperty { } } getOrElse sys.error("Provided classpath does not contain a Scala actors.") + val scalaActorsMigration = { + (classpath.list map { fs => new File(fs) }) find { f => + f.getName match { + case "scala-actors-migration.jar" => true + case "actors-migration" if (f.getParentFile.getName == "classes") => true + case _ => false + } + } + } getOrElse sys.error("Provided classpath does not contain a Scala actors.") + def scalacArgsFlat: Option[Seq[String]] = scalacArgs map (_ flatMap { a => val parts = a.getParts if(parts eq null) Seq[String]() else parts.toSeq @@ -335,6 +345,7 @@ class PartestTask extends Task with CompilationPathProperty { antFileManager.LATEST_COMP = scalaCompiler.getAbsolutePath antFileManager.LATEST_PARTEST = scalaPartest.getAbsolutePath antFileManager.LATEST_ACTORS = scalaActors.getAbsolutePath + antFileManager.LATEST_ACTORS_MIGRATION = scalaActorsMigration.getAbsolutePath javacmd foreach (x => antFileManager.JAVACMD = x.getAbsolutePath) javaccmd foreach (x => antFileManager.JAVAC_CMD = x.getAbsolutePath) diff --git a/src/partest/scala/tools/partest/nest/AntRunner.scala b/src/partest/scala/tools/partest/nest/AntRunner.scala index e77385d6e9..dc83e4ea66 100644 --- a/src/partest/scala/tools/partest/nest/AntRunner.scala +++ b/src/partest/scala/tools/partest/nest/AntRunner.scala @@ -23,6 +23,7 @@ class AntRunner extends DirectRunner { var LATEST_COMP: String = _ var LATEST_PARTEST: String = _ var LATEST_ACTORS: String = _ + var LATEST_ACTORS_MIGRATION: String = _ val testRootPath: String = "test" val testRootDir: Directory = Directory(testRootPath) } diff --git a/src/partest/scala/tools/partest/nest/ConsoleFileManager.scala b/src/partest/scala/tools/partest/nest/ConsoleFileManager.scala index 8d239a84bd..b270a6b65a 100644 --- a/src/partest/scala/tools/partest/nest/ConsoleFileManager.scala +++ b/src/partest/scala/tools/partest/nest/ConsoleFileManager.scala @@ -84,6 +84,7 @@ class ConsoleFileManager extends FileManager { latestFile = testClassesDir.parent / "bin" latestLibFile = testClassesDir / "library" latestActorsFile = testClassesDir / "library" / "actors" + latestActMigFile = testClassesDir / "actors-migration" latestCompFile = testClassesDir / "compiler" latestPartestFile = testClassesDir / "partest" latestFjbgFile = testParent / "lib" / "fjbg.jar" @@ -94,6 +95,7 @@ class ConsoleFileManager extends FileManager { latestFile = dir / "bin" latestLibFile = dir / "lib/scala-library.jar" latestActorsFile = dir / "lib/scala-actors.jar" + latestActMigFile = dir / "lib/scala-actors-migration.jar" latestCompFile = dir / "lib/scala-compiler.jar" latestPartestFile = dir / "lib/scala-partest.jar" latestFjbgFile = testParent / "lib" / "fjbg.jar" @@ -104,6 +106,7 @@ class ConsoleFileManager extends FileManager { latestFile = prefixFile("build/quick/bin") latestLibFile = prefixFile("build/quick/classes/library") latestActorsFile = prefixFile("build/quick/classes/library/actors") + latestActMigFile = prefixFile("build/quick/classes/actors-migration") latestCompFile = prefixFile("build/quick/classes/compiler") latestPartestFile = prefixFile("build/quick/classes/partest") } @@ -114,6 +117,7 @@ class ConsoleFileManager extends FileManager { latestFile = prefixFileWith(p, "bin") latestLibFile = prefixFileWith(p, "lib/scala-library.jar") latestActorsFile = prefixFileWith(p, "lib/scala-actors.jar") + latestActMigFile = prefixFileWith(p, "lib/scala-actors-migration.jar") latestCompFile = prefixFileWith(p, "lib/scala-compiler.jar") latestPartestFile = prefixFileWith(p, "lib/scala-partest.jar") } @@ -123,6 +127,7 @@ class ConsoleFileManager extends FileManager { latestFile = prefixFile("dists/latest/bin") latestLibFile = prefixFile("dists/latest/lib/scala-library.jar") latestActorsFile = prefixFile("dists/latest/lib/scala-actors.jar") + latestActMigFile = prefixFile("dists/latest/lib/scala-actors-migration.jar") latestCompFile = prefixFile("dists/latest/lib/scala-compiler.jar") latestPartestFile = prefixFile("dists/latest/lib/scala-partest.jar") } @@ -132,6 +137,7 @@ class ConsoleFileManager extends FileManager { latestFile = prefixFile("build/pack/bin") latestLibFile = prefixFile("build/pack/lib/scala-library.jar") latestActorsFile = prefixFile("build/pack/lib/scala-actors.jar") + latestActMigFile = prefixFile("build/pack/lib/scala-actors-migration.jar") latestCompFile = prefixFile("build/pack/lib/scala-compiler.jar") latestPartestFile = prefixFile("build/pack/lib/scala-partest.jar") } @@ -167,16 +173,19 @@ class ConsoleFileManager extends FileManager { LATEST_COMP = latestCompFile.getAbsolutePath LATEST_PARTEST = latestPartestFile.getAbsolutePath LATEST_ACTORS = latestActorsFile.getAbsolutePath + LATEST_ACTORS_MIGRATION = latestActMigFile.getAbsolutePath } var LATEST_LIB: String = "" var LATEST_COMP: String = "" var LATEST_PARTEST: String = "" var LATEST_ACTORS: String = "" + var LATEST_ACTORS_MIGRATION: String = "" var latestFile: File = _ var latestLibFile: File = _ var latestActorsFile: File = _ + var latestActMigFile: File = _ var latestCompFile: File = _ var latestPartestFile: File = _ var latestFjbgFile: File = _ diff --git a/src/partest/scala/tools/partest/nest/DirectRunner.scala b/src/partest/scala/tools/partest/nest/DirectRunner.scala index 20f435cfbb..815c27f567 100644 --- a/src/partest/scala/tools/partest/nest/DirectRunner.scala +++ b/src/partest/scala/tools/partest/nest/DirectRunner.scala @@ -61,10 +61,10 @@ trait DirectRunner { val latestLibFile = new File(fileManager.LATEST_LIB) val latestPartestFile = new File(fileManager.LATEST_PARTEST) val latestActorsFile = new File(fileManager.LATEST_ACTORS) - + val latestActMigFile = new File(fileManager.LATEST_ACTORS_MIGRATION) val scalacheckURL = PathSettings.scalaCheck.toURL val scalaCheckParentClassLoader = ScalaClassLoader.fromURLs( - scalacheckURL :: (List(latestCompFile, latestLibFile, latestActorsFile, latestPartestFile).map(_.toURI.toURL)) + scalacheckURL :: (List(latestCompFile, latestLibFile, latestActorsFile, latestActMigFile, latestPartestFile).map(_.toURI.toURL)) ) Output.init() diff --git a/src/partest/scala/tools/partest/nest/FileManager.scala b/src/partest/scala/tools/partest/nest/FileManager.scala index 6d9e64730f..cf7160f521 100644 --- a/src/partest/scala/tools/partest/nest/FileManager.scala +++ b/src/partest/scala/tools/partest/nest/FileManager.scala @@ -63,6 +63,7 @@ trait FileManager extends FileUtil { var LATEST_COMP: String var LATEST_PARTEST: String var LATEST_ACTORS: String + var LATEST_ACTORS_MIGRATION: String var showDiff = false var updateCheck = false diff --git a/src/partest/scala/tools/partest/nest/ReflectiveRunner.scala b/src/partest/scala/tools/partest/nest/ReflectiveRunner.scala index a0511774a9..a5d5952ff7 100644 --- a/src/partest/scala/tools/partest/nest/ReflectiveRunner.scala +++ b/src/partest/scala/tools/partest/nest/ReflectiveRunner.scala @@ -48,9 +48,9 @@ class ReflectiveRunner { new ConsoleFileManager import fileManager. - { latestCompFile, latestLibFile, latestPartestFile, latestFjbgFile, latestScalapFile, latestActorsFile } + { latestCompFile, latestLibFile, latestPartestFile, latestFjbgFile, latestScalapFile, latestActorsFile, latestActMigFile } val files = - Array(latestCompFile, latestLibFile, latestPartestFile, latestFjbgFile, latestScalapFile, latestActorsFile) map (x => io.File(x)) + Array(latestCompFile, latestLibFile, latestPartestFile, latestFjbgFile, latestScalapFile, latestActorsFile, latestActMigFile) map (x => io.File(x)) val sepUrls = files map (_.toURL) var sepLoader = new URLClassLoader(sepUrls, null) diff --git a/src/partest/scala/tools/partest/nest/SBTRunner.scala b/src/partest/scala/tools/partest/nest/SBTRunner.scala index 5d994eeb37..14e2dc3df9 100644 --- a/src/partest/scala/tools/partest/nest/SBTRunner.scala +++ b/src/partest/scala/tools/partest/nest/SBTRunner.scala @@ -16,6 +16,7 @@ object SBTRunner extends DirectRunner { var LATEST_COMP: String = _ var LATEST_PARTEST: String = _ var LATEST_ACTORS: String = _ + var LATEST_ACTORS_MIGRATION: String = _ val testRootPath: String = "test" val testRootDir: Directory = Directory(testRootPath) } @@ -65,6 +66,7 @@ object SBTRunner extends DirectRunner { fileManager.LATEST_COMP = findClasspath("scala-compiler", "scala-compiler") getOrElse sys.error("No scala-compiler found! Classpath = " + fileManager.CLASSPATH) fileManager.LATEST_PARTEST = findClasspath("scala-partest", "partest") getOrElse sys.error("No scala-partest found! Classpath = " + fileManager.CLASSPATH) fileManager.LATEST_ACTORS = findClasspath("scala-actors", "actors") getOrElse sys.error("No scala-actors found! Classpath = " + fileManager.CLASSPATH) + fileManager.LATEST_ACTORS_MIGRATION = findClasspath("scala-actors-migration", "actors-migration") getOrElse sys.error("No scala-actors-migration found! Classpath = " + fileManager.CLASSPATH) // TODO - Do something useful here!!! fileManager.JAVAC_CMD = "javac" diff --git a/src/partest/scala/tools/partest/nest/Worker.scala b/src/partest/scala/tools/partest/nest/Worker.scala index 9326a8b232..40325c6375 100644 --- a/src/partest/scala/tools/partest/nest/Worker.scala +++ b/src/partest/scala/tools/partest/nest/Worker.scala @@ -56,6 +56,7 @@ class ScalaCheckFileManager(val origmanager: FileManager) extends FileManager { var LATEST_COMP: String = origmanager.LATEST_COMP var LATEST_PARTEST: String = origmanager.LATEST_PARTEST var LATEST_ACTORS: String = origmanager.LATEST_ACTORS + var LATEST_ACTORS_MIGRATION: String = origmanager.LATEST_ACTORS_MIGRATION } object Output { diff --git a/test/files/jvm/actmig-PinS.check b/test/files/jvm/actmig-PinS.check new file mode 100644 index 0000000000..97d1c5be02 --- /dev/null +++ b/test/files/jvm/actmig-PinS.check @@ -0,0 +1,18 @@ +I'm acting! +I'm acting! +I'm acting! +I'm acting! +I'm acting! +To be or not to be. +To be or not to be. +To be or not to be. +To be or not to be. +To be or not to be. +That is the question. +That is the question. +That is the question. +That is the question. +That is the question. +received message: hi there +received message: 15 +Got an Int: 12 diff --git a/test/files/jvm/actmig-PinS.scala b/test/files/jvm/actmig-PinS.scala new file mode 100644 index 0000000000..db5713dde4 --- /dev/null +++ b/test/files/jvm/actmig-PinS.scala @@ -0,0 +1,118 @@ +import scala.actors._ + +import scala.actors.Actor._ + +/* PinS, Listing 32.1: A simple actor + */ +object SillyActor extends Actor { + def act() { + for (i <- 1 to 5) { + println("I'm acting!") + //Thread.sleep(1000) + Thread.sleep(10) + } + } +} + +object SeriousActor extends Actor { + def act() { + for (i <- 1 to 5) { + println("To be or not to be.") + //Thread.sleep(1000) + Thread.sleep(10) + } + } +} + +/* PinS, Listing 32.3: An actor that calls react + */ +object NameResolver extends Actor { + import java.net.{InetAddress, UnknownHostException} + + def act() { + react { + case (name: String, actor: Actor) => + actor ! getIp(name) + act() + case "EXIT" => + println("Name resolver exiting.") + // quit + case msg => + println("Unhandled message: " + msg) + act() + } + } + + def getIp(name: String): Option[InetAddress] = { + try { + Some(InetAddress.getByName(name)) + } catch { + case _: UnknownHostException => None + } + } + +} + +object Test extends App { + + /* PinS, Listing 32.2: An actor that calls receive + */ + def makeEchoActor(): Actor = actor { + while (true) { + receive { + case 'stop => + exit() + case msg => + println("received message: " + msg) + } + } + } + + /* PinS, page 696 + */ + def makeIntActor(): Actor = actor { + receive { + case x: Int => // I only want Ints + println("Got an Int: " + x) + } + } + + + actor { + self.trapExit = true + self.link(SillyActor) + SillyActor.start() + + react { + case Exit(SillyActor, _) => + self.link(SeriousActor) + SeriousActor.start() + react { + case Exit(SeriousActor, _) => + // PinS, page 694 + val seriousActor2 = actor { + for (i <- 1 to 5) + println("That is the question.") + //Thread.sleep(1000) + Thread.sleep(10) + } + + Thread.sleep(200) + val echoActor = makeEchoActor() + self.link(echoActor) + echoActor ! "hi there" + echoActor ! 15 + echoActor ! 'stop + + react { + case Exit(_, _) => + val intActor = makeIntActor() + intActor ! "hello" + intActor ! math.Pi + // only the following send leads to output + intActor ! 12 + } + } + } + } +} diff --git a/test/files/jvm/actmig-PinS_1.check b/test/files/jvm/actmig-PinS_1.check new file mode 100644 index 0000000000..97d1c5be02 --- /dev/null +++ b/test/files/jvm/actmig-PinS_1.check @@ -0,0 +1,18 @@ +I'm acting! +I'm acting! +I'm acting! +I'm acting! +I'm acting! +To be or not to be. +To be or not to be. +To be or not to be. +To be or not to be. +To be or not to be. +That is the question. +That is the question. +That is the question. +That is the question. +That is the question. +received message: hi there +received message: 15 +Got an Int: 12 diff --git a/test/files/jvm/actmig-PinS_1.scala b/test/files/jvm/actmig-PinS_1.scala new file mode 100644 index 0000000000..d203526513 --- /dev/null +++ b/test/files/jvm/actmig-PinS_1.scala @@ -0,0 +1,130 @@ +import scala.actors._ + +object SillyActor { + val ref = MigrationSystem.actorOf(Props(() => new SillyActor, "akka.actor.default-stash-dispatcher")) +} + +/* PinS, Listing 32.1: A simple actor + */ +class SillyActor extends Actor { + def act() { + for (i <- 1 to 5) { + println("I'm acting!") + Thread.sleep(10) + } + } +} + +object SeriousActor { + val ref = MigrationSystem.actorOf(Props(() => new SeriousActor, "akka.actor.default-stash-dispatcher")) +} + +class SeriousActor extends Actor { + def act() { + for (i <- 1 to 5) { + println("To be or not to be.") + //Thread.sleep(1000) + Thread.sleep(10) + } + } +} + +/* PinS, Listing 32.3: An actor that calls react + */ +object NameResolver extends Actor { + import java.net.{ InetAddress, UnknownHostException } + + def act() { + react { + case (name: String, actor: Actor) => + actor ! getIp(name) + act() + case "EXIT" => + println("Name resolver exiting.") + // quit + case msg => + println("Unhandled message: " + msg) + act() + } + } + + def getIp(name: String): Option[InetAddress] = { + try { + Some(InetAddress.getByName(name)) + } catch { + case _: UnknownHostException => None + } + } + +} + +object Test extends App { + + /* PinS, Listing 32.2: An actor that calls receive + */ + def makeEchoActor(): ActorRef = MigrationSystem.actorOf(Props(() => new Actor { + def act() { + while (true) { + receive { + case 'stop => + exit() + case msg => + println("received message: " + msg) + } + } + } + }, "akka.actor.default-stash-dispatcher")) + + /* PinS, page 696 + */ + def makeIntActor(): ActorRef = MigrationSystem.actorOf(Props(() => new Actor { + def act() { + receive { + case x: Int => // I only want Ints + println("Got an Int: " + x) + } + } + }, "akka.actor.default-stash-dispatcher")) + + MigrationSystem.actorOf(Props(() => new Actor { + def act() { + trapExit = true + link(SillyActor.ref) + react { + case Exit(_: SillyActor, _) => + link(SeriousActor.ref) + react { + case Exit(_: SeriousActor, _) => + // PinS, page 694 + val seriousActor2 = MigrationSystem.actorOf(Props(() => + new Actor { + def act() { + for (i <- 1 to 5) { + println("That is the question.") + //Thread.sleep(1000) + Thread.sleep(10) + } + } + } + , "akka.actor.default-stash-dispatcher")) + + Thread.sleep(200) + val echoActor = makeEchoActor() + link(echoActor) + echoActor ! "hi there" + echoActor ! 15 + echoActor ! 'stop + + react { + case Exit(_, _) => + val intActor = makeIntActor() + intActor ! "hello" + intActor ! math.Pi + // only the following send leads to output + intActor ! 12 + } + } + } + } + }, "akka.actor.default-stash-dispatcher")) +} diff --git a/test/files/jvm/actmig-PinS_2.check b/test/files/jvm/actmig-PinS_2.check new file mode 100644 index 0000000000..97d1c5be02 --- /dev/null +++ b/test/files/jvm/actmig-PinS_2.check @@ -0,0 +1,18 @@ +I'm acting! +I'm acting! +I'm acting! +I'm acting! +I'm acting! +To be or not to be. +To be or not to be. +To be or not to be. +To be or not to be. +To be or not to be. +That is the question. +That is the question. +That is the question. +That is the question. +That is the question. +received message: hi there +received message: 15 +Got an Int: 12 diff --git a/test/files/jvm/actmig-PinS_2.scala b/test/files/jvm/actmig-PinS_2.scala new file mode 100644 index 0000000000..9f52cca369 --- /dev/null +++ b/test/files/jvm/actmig-PinS_2.scala @@ -0,0 +1,150 @@ +import scala.actors.{ MigrationSystem, StashingActor, ActorRef, Props, Exit } + +object SillyActor { + val ref = MigrationSystem.actorOf(Props(() => new SillyActor, "default-stash-dispatcher")) +} + +/* PinS, Listing 32.1: A simple actor + */ +class SillyActor extends StashingActor { + + def receive = { case _ => println("Nop") } + + override def act() { + for (i <- 1 to 5) { + println("I'm acting!") + Thread.sleep(10) + } + } +} + +object SeriousActor { + val ref = MigrationSystem.actorOf(Props(() => new SeriousActor, "default-stash-dispatcher")) +} + +class SeriousActor extends StashingActor { + def receive = { case _ => println("Nop") } + override def act() { + for (i <- 1 to 5) { + println("To be or not to be.") + Thread.sleep(10) + } + } +} + +/* PinS, Listing 32.3: An actor that calls react + */ +object NameResolver { + val ref = MigrationSystem.actorOf(Props(() => new NameResolver, "default-stash-dispatcher")) +} + +class NameResolver extends StashingActor { + import java.net.{ InetAddress, UnknownHostException } + + def receive = { case _ => println("Nop") } + + override def act() { + react { + case (name: String, actor: ActorRef) => + actor ! getIp(name) + act() + case "EXIT" => + println("Name resolver exiting.") + // quit + case msg => + println("Unhandled message: " + msg) + act() + } + } + + def getIp(name: String): Option[InetAddress] = { + try { + Some(InetAddress.getByName(name)) + } catch { + case _: UnknownHostException => None + } + } + +} + +object Test extends App { + + /* PinS, Listing 32.2: An actor that calls receive + */ + def makeEchoActor(): ActorRef = MigrationSystem.actorOf(Props(() => + new StashingActor { + def receive = { case _ => println("Nop") } + + override def act() { + loop { + react { + case 'stop => + exit() + case msg => + println("received message: " + msg) + } + } + } + }, "default-stash-dispatcher")) + + /* PinS, page 696 + */ + def makeIntActor(): ActorRef = MigrationSystem.actorOf(Props(() =>new StashingActor { + + def receive = { case _ => println("Nop") } + + override def act() { + react { + case x: Int => // I only want Ints + println("Got an Int: " + x) + } + } + }, "default-stash-dispatcher")) + + MigrationSystem.actorOf(Props(() => new StashingActor { + + def receive = { case _ => println("Nop") } + + override def act() { + trapExit = true + link(SillyActor.ref) + react { + case Exit(_: SillyActor, _) => + link(SeriousActor.ref) + react { + case Exit(_: SeriousActor, _) => + // PinS, page 694 + val seriousActor2 = MigrationSystem.actorOf(Props(() =>{ + new StashingActor { + + def receive = { case _ => println("Nop") } + + override def act() { + for (i <- 1 to 5) { + println("That is the question.") + Thread.sleep(10) + } + } + } + }, "default-stash-dispatcher")) + + Thread.sleep(200) + val echoActor = makeEchoActor() + link(echoActor) + echoActor ! "hi there" + echoActor ! 15 + echoActor ! 'stop + + react { + case Exit(_, _) => + val intActor = makeIntActor() + intActor ! "hello" + intActor ! math.Pi + // only the following send leads to output + intActor ! 12 + } + } + } + } + }, "default-stash-dispatcher")) +} diff --git a/test/files/jvm/actmig-PinS_3.check b/test/files/jvm/actmig-PinS_3.check new file mode 100644 index 0000000000..bdbdf8a692 --- /dev/null +++ b/test/files/jvm/actmig-PinS_3.check @@ -0,0 +1,19 @@ +I'm acting! +I'm acting! +I'm acting! +I'm acting! +I'm acting! +Post stop +To be or not to be. +To be or not to be. +To be or not to be. +To be or not to be. +To be or not to be. +That is the question. +That is the question. +That is the question. +That is the question. +That is the question. +received message: hi there +received message: 15 +Got an Int: 12 diff --git a/test/files/jvm/actmig-PinS_3.scala b/test/files/jvm/actmig-PinS_3.scala new file mode 100644 index 0000000000..047bf53c32 --- /dev/null +++ b/test/files/jvm/actmig-PinS_3.scala @@ -0,0 +1,159 @@ +import scala.actors.{ MigrationSystem, StashingActor, ActorRef, Terminated, Props } + +object SillyActor { + val ref = MigrationSystem.actorOf(Props(() => new SillyActor, "default-stash-dispatcher")) +} + +/* PinS, Listing 32.1: A simple actor + */ +class SillyActor extends StashingActor { + def receive = { case _ => println("Why are you not dead"); context.stop(self) } + + override def preStart() { + for (i <- 1 to 5) { + println("I'm acting!") + Thread.sleep(10) + } + context.stop(self) + } + + override def postStop() { + println("Post stop") + } +} + +object SeriousActor { + val ref = MigrationSystem.actorOf(Props(() => new SeriousActor, "default-stash-dispatcher")) +} + +class SeriousActor extends StashingActor { + def receive = { case _ => println("Nop") } + override def preStart() { + for (i <- 1 to 5) { + println("To be or not to be.") + //Thread.sleep(1000) + Thread.sleep(10) + } + context.stop(self) + } +} + +/* PinS, Listing 32.3: An actor that calls react + */ +object NameResolver { + val ref = MigrationSystem.actorOf(Props(() => new NameResolver, "default-stash-dispatcher")) +} + +class NameResolver extends StashingActor { + import java.net.{ InetAddress, UnknownHostException } + + def receive = { + case (name: String, actor: ActorRef) => + actor ! getIp(name) + case "EXIT" => + println("Name resolver exiting.") + context.stop(self) // quit + case msg => + println("Unhandled message: " + msg) + } + + def getIp(name: String): Option[InetAddress] = { + try { + Some(InetAddress.getByName(name)) + } catch { + case _: UnknownHostException => None + } + } + +} + +object Test extends App { + + /* PinS, Listing 32.2: An actor that calls receive + */ + def makeEchoActor(): ActorRef = MigrationSystem.actorOf(Props(() => new StashingActor { + + def receive = { // how to handle receive + case 'stop => + context.stop(self) + case msg => + println("received message: " + msg) + } + }, "default-stash-dispatcher")) + + /* PinS, page 696 + */ + def makeIntActor(): ActorRef = MigrationSystem.actorOf(Props(() => new StashingActor { + + def receive = { + case x: Int => // I only want Ints + unstashAll() + println("Got an Int: " + x) + context.stop(self) + case _ => stash() + } + }, "default-stash-dispatcher")) + + MigrationSystem.actorOf(Props(() => new StashingActor { + val silly = SillyActor.ref + + override def preStart() { + context.watch(SillyActor.ref) + } + + def receive = { + case Terminated(`silly`) => + unstashAll() + val serious = SeriousActor.ref + context.watch(SeriousActor.ref) + context.become { + case Terminated(`serious`) => + // PinS, page 694 + val seriousActor2 = MigrationSystem.actorOf(Props(() => { + new StashingActor { + + def receive = { case _ => context.stop(self) } + + override def preStart() = { + for (i <- 1 to 5) { + println("That is the question.") + //Thread.sleep(1000) + Thread.sleep(10) + } + context.stop(self) + } + } + }, "default-stash-dispatcher")) + + Thread.sleep(200) + val echoActor = makeEchoActor() + context.watch(echoActor) + echoActor ! "hi there" + echoActor ! 15 + echoActor ! 'stop + + context.become { + case Terminated(_) => + unstashAll() + val intActor = makeIntActor() + intActor ! "hello" + intActor ! math.Pi + // only the following send leads to output + intActor ! 12 + context.unbecome() + context.unbecome() + context.stop(self) + case m => + println("Stash 1 " + m) + stash(m) + } + case m => + println("Stash 2 " + m) + stash(m) + } + case m => + println("Stash 3 " + m) + stash(m) + } + }, "default-stash-dispatcher")) +} diff --git a/test/files/jvm/actmig-hierarchy.check b/test/files/jvm/actmig-hierarchy.check new file mode 100644 index 0000000000..317e9677c3 --- /dev/null +++ b/test/files/jvm/actmig-hierarchy.check @@ -0,0 +1,2 @@ +hello +hello diff --git a/test/files/jvm/actmig-hierarchy.scala b/test/files/jvm/actmig-hierarchy.scala new file mode 100644 index 0000000000..7277329d85 --- /dev/null +++ b/test/files/jvm/actmig-hierarchy.scala @@ -0,0 +1,44 @@ +import scala.actors._ + + +class ReactorActor extends Reactor[String] { + def act() { + var cond = true + loopWhile(cond) { + react { + case x if x == "hello1" => println(x.dropRight(1)) + case "exit" => cond = false + } + } + } +} + +class ReplyActor extends ReplyReactor { + def act() { + var cond = true + loopWhile(cond) { + react { + case "hello" => println("hello") + case "exit" => cond = false; + } + } + } +} + + + +object Test { + + def main(args: Array[String]) { + val reactorActor = new ReactorActor + val replyActor = new ReplyActor + reactorActor.start() + replyActor.start() + + reactorActor ! "hello1" + replyActor ! "hello" + + reactorActor ! "exit" + replyActor ! "exit" + } +} diff --git a/test/files/jvm/actmig-hierarchy_1.check b/test/files/jvm/actmig-hierarchy_1.check new file mode 100644 index 0000000000..317e9677c3 --- /dev/null +++ b/test/files/jvm/actmig-hierarchy_1.check @@ -0,0 +1,2 @@ +hello +hello diff --git a/test/files/jvm/actmig-hierarchy_1.scala b/test/files/jvm/actmig-hierarchy_1.scala new file mode 100644 index 0000000000..559ebe7b11 --- /dev/null +++ b/test/files/jvm/actmig-hierarchy_1.scala @@ -0,0 +1,41 @@ +import scala.actors._ + +class ReactorActor extends Actor { + def act() { + var cond = true + loopWhile(cond) { + react { + case x: String if x == "hello1" => println(x.dropRight(1)) + case "exit" => cond = false + } + } + } +} + +class ReplyActor extends Actor { + def act() { + var cond = true + loopWhile(cond) { + react { + case "hello" => println("hello") + case "exit" => cond = false; + } + } + } +} + +object Test { + + def main(args: Array[String]) { + val reactorActor = new ReactorActor + val replyActor = new ReplyActor + reactorActor.start() + replyActor.start() + + reactorActor ! "hello1" + replyActor ! "hello" + + reactorActor ! "exit" + replyActor ! "exit" + } +} diff --git a/test/files/jvm/actmig-instantiation.check b/test/files/jvm/actmig-instantiation.check new file mode 100644 index 0000000000..4c13d5c0a1 --- /dev/null +++ b/test/files/jvm/actmig-instantiation.check @@ -0,0 +1,8 @@ +OK error: java.lang.RuntimeException: In order to create StashingActor one must use actorOf. +OK error: java.lang.RuntimeException: Only one actor can be created per actorOf call. +0 +100 +200 +300 +400 +500 diff --git a/test/files/jvm/actmig-instantiation.scala b/test/files/jvm/actmig-instantiation.scala new file mode 100644 index 0000000000..4170dbd3ad --- /dev/null +++ b/test/files/jvm/actmig-instantiation.scala @@ -0,0 +1,91 @@ +import scala.actors.MigrationSystem._ +import scala.actors.Actor._ +import scala.actors.{ Actor, StashingActor, ActorRef, Props, MigrationSystem, PoisonPill } +import java.util.concurrent.{ TimeUnit, CountDownLatch } +import scala.collection.mutable.ArrayBuffer + +class TestStashingActor extends StashingActor { + + def receive = { case v: Int => Test.append(v); Test.latch.countDown() } + +} + +object Test { + val NUMBER_OF_TESTS = 5 + + // used for sorting non-deterministic output + val buff = ArrayBuffer[Int](0) + val latch = new CountDownLatch(NUMBER_OF_TESTS) + val toStop = ArrayBuffer[ActorRef]() + + def append(v: Int) = synchronized { + buff += v + } + + def main(args: Array[String]) = { + // plain scala actor + val a1 = actor { + react { case v: Int => Test.append(v); Test.latch.countDown() } + } + a1 ! 100 + + // simple instantiation + val a2 = MigrationSystem.actorOf(Props(() => new TestStashingActor, "akka.actor.default-stash-dispatcher")) + a2 ! 200 + toStop += a2 + + // actor of with scala actor + val a3 = actorOf(Props(() => actor { + react { case v: Int => Test.append(v); Test.latch.countDown() } + }, "akka.actor.default-stash-dispatcher")) + a3 ! 300 + + // using the manifest + val a4 = actorOf(Props(() => new TestStashingActor, "akka.actor.default-stash-dispatcher")) + a4 ! 400 + toStop += a4 + + // deterministic part of a test + // creation without actorOf + try { + val a3 = new TestStashingActor + a3 ! -1 + } catch { + case e => println("OK error: " + e) + } + + // actorOf double creation + try { + val a3 = actorOf(Props(() => { + new TestStashingActor + new TestStashingActor + }, "akka.actor.default-stash-dispatcher")) + a3 ! -1 + } catch { + case e => println("OK error: " + e) + } + + // actorOf nesting + try { + val a5 = actorOf(Props(() => { + val a6 = actorOf(Props(() => new TestStashingActor, "akka.actor.default-stash-dispatcher")) + toStop += a6 + new TestStashingActor + }, "akka.actor.default-stash-dispatcher")) + + a5 ! 500 + toStop += a5 + } catch { + case e => println("Should not throw an exception: " + e) + } + + // output + latch.await(200, TimeUnit.MILLISECONDS) + if (latch.getCount() > 0) { + println("Error: Tasks have not finished!!!") + } + + buff.sorted.foreach(println) + toStop.foreach(_ ! PoisonPill) + } +} diff --git a/test/files/jvm/actmig-loop-react.check b/test/files/jvm/actmig-loop-react.check new file mode 100644 index 0000000000..7b955aa9b9 --- /dev/null +++ b/test/files/jvm/actmig-loop-react.check @@ -0,0 +1,16 @@ +do task +do task +do task +do task +working +scala got exception +working +akka got exception +do task 1 +do string I am a String +do task 42 +after react +Terminated +do task 1 +do string I am a String +do task 42 diff --git a/test/files/jvm/actmig-loop-react.scala b/test/files/jvm/actmig-loop-react.scala new file mode 100644 index 0000000000..8452f9766a --- /dev/null +++ b/test/files/jvm/actmig-loop-react.scala @@ -0,0 +1,170 @@ +import scala.actors.MigrationSystem._ +import scala.actors.Actor._ +import scala.actors.{ Actor, StashingActor, ActorRef, Props, MigrationSystem, PoisonPill } +import java.util.concurrent.{ TimeUnit, CountDownLatch } +import scala.collection.mutable.ArrayBuffer + +object Test { + + def testLoopWithConditionReact() = { + // Snippet showing composition of receives + // Loop with Condition Snippet - before + val myActor = actor { + var c = true + loopWhile(c) { + react { + case x: Int => + // do task + println("do task") + if (x == 42) c = false + } + } + } + + myActor.start() + myActor ! 1 + myActor ! 42 + + // Loop with Condition Snippet - migrated + val myAkkaActor = MigrationSystem.actorOf(Props(() => new StashingActor { + + def receive = { + case x: Int => + // do task + println("do task") + if (x == 42) context.stop(self) + } + }, "default-stashing-dispatcher")) + myAkkaActor ! 1 + myAkkaActor ! 42 + } + + def testNestedReact() = { + // Snippet showing composition of receives + // Loop with Condition Snippet - before + val myActor = actor { + var c = true + loopWhile(c) { + react { + case x: Int => + // do task + println("do task " + x) + if (x == 42) c = false + else + react { + case y: String => + println("do string " + y) + } + println("after react") + } + } + } + myActor.start() + + myActor ! 1 + myActor ! "I am a String" + myActor ! 42 + + Thread.sleep(100) + println(myActor.getState) + + // Loop with Condition Snippet - migrated + val myAkkaActor = MigrationSystem.actorOf(Props(() => new StashingActor { + + def receive = { + case x: Int => + // do task + println("do task " + x) + if (x == 42) + context.stop(self) + else + context.become(({ + case y: String => + println("do string " + y) + }: Receive).andThen(x => { + unstashAll() + context.unbecome() + }).orElse { case x => stash() }) + } + }, "default-stashing-dispatcher")) + + myAkkaActor ! 1 + myAkkaActor ! "I am a String" + myAkkaActor ! 42 + + } + + def exceptionHandling() = { + // Stashing actor with act and exception handler + val myActor = MigrationSystem.actorOf(Props(() => new StashingActor { + + def receive = { case _ => println("Dummy method.") } + override def act() = { + loop { + react { + case "fail" => + throw new Exception("failed") + case "work" => + println("working") + case "die" => + exit() + } + } + } + + override def exceptionHandler = { + case x: Exception => println("scala got exception") + } + + }, "default-stashing-dispatcher")) + + myActor ! "work" + myActor ! "fail" + myActor ! "die" + + Thread.sleep(100) + // Stashing actor in Akka style + val myAkkaActor = MigrationSystem.actorOf(Props(() => new StashingActor { + def receive = PFCatch({ + case "fail" => + throw new Exception("failed") + case "work" => + println("working") + case "die" => + context.stop(self) + }, { case x: Exception => println("akka got exception") }) + }, "default-stashing-dispatcher")) + + myAkkaActor ! "work" + myAkkaActor ! "fail" + myAkkaActor ! "die" + } + + def main(args: Array[String]) = { + testLoopWithConditionReact() + Thread.sleep(100) + exceptionHandling() + Thread.sleep(100) + testNestedReact() + } + +} + +// As per Jim Mcbeath blog (http://jim-mcbeath.blogspot.com/2008/07/actor-exceptions.html) +class PFCatch(f: PartialFunction[Any, Unit], handler: PartialFunction[Exception, Unit]) + extends PartialFunction[Any, Unit] { + + def apply(x: Any) = { + try { + f(x) + } catch { + case e: Exception if handler.isDefinedAt(e) => handler(e) + } + } + + def isDefinedAt(x: Any) = f.isDefinedAt(x) +} + +object PFCatch { + def apply(f: PartialFunction[Any, Unit], handler: PartialFunction[Exception, Unit]) = new PFCatch(f, handler) +} diff --git a/test/files/jvm/actmig-public-methods.check b/test/files/jvm/actmig-public-methods.check new file mode 100644 index 0000000000..bb6530c926 --- /dev/null +++ b/test/files/jvm/actmig-public-methods.check @@ -0,0 +1,6 @@ +None +Some(bang qmark after 1) +bang +bang qmark after 0 +bang qmark in future after 0 +typed bang qmark in future after 0 diff --git a/test/files/jvm/actmig-public-methods.scala b/test/files/jvm/actmig-public-methods.scala new file mode 100644 index 0000000000..c4c8560122 --- /dev/null +++ b/test/files/jvm/actmig-public-methods.scala @@ -0,0 +1,69 @@ +import scala.collection.mutable.ArrayBuffer +import scala.actors.Actor._ +import scala.actors._ +import scala.actors.MigrationSystem +import scala.util.continuations._ +import java.util.concurrent.{ TimeUnit, CountDownLatch } + +object Test { + val NUMBER_OF_TESTS = 6 + + // used for sorting non-deterministic output + val buff = ArrayBuffer[String]() + val latch = new CountDownLatch(NUMBER_OF_TESTS) + val toStop = ArrayBuffer[Actor]() + + def append(v: String) = synchronized { + buff += v + } + + def main(args: Array[String]) = { + + val respActor = actor { + loop { + react { + case (x: String, time: Long) => + Thread.sleep(time) + reply(x + " after " + time) + case str: String => + append(str) + latch.countDown() + case _ => exit() + } + } + } + + toStop += respActor + + respActor ! ("bang") + + val res1 = respActor !? (("bang qmark", 0L)) + append(res1.toString) + latch.countDown() + + val res2 = respActor !? (200, ("bang qmark", 1L)) + append(res2.toString) + latch.countDown() + + val res21 = respActor !? (1, ("bang qmark", 200L)) + append(res21.toString) + latch.countDown() + + val fut1 = respActor !! (("bang qmark in future", 0L)) + append(fut1().toString()) + latch.countDown() + + val fut2 = respActor !! (("typed bang qmark in future", 0L), { case x: String => x }) + append(fut2()) + latch.countDown() + + // output + latch.await(10, TimeUnit.MILLISECONDS) + if (latch.getCount() > 0) { + println("Error: Tasks have not finished!!!") + } + + buff.sorted.foreach(println) + toStop.foreach(_ ! 'stop) + } +} diff --git a/test/files/jvm/actmig-public-methods_1.check b/test/files/jvm/actmig-public-methods_1.check new file mode 100644 index 0000000000..bb6530c926 --- /dev/null +++ b/test/files/jvm/actmig-public-methods_1.check @@ -0,0 +1,6 @@ +None +Some(bang qmark after 1) +bang +bang qmark after 0 +bang qmark in future after 0 +typed bang qmark in future after 0 diff --git a/test/files/jvm/actmig-public-methods_1.scala b/test/files/jvm/actmig-public-methods_1.scala new file mode 100644 index 0000000000..41798c4c37 --- /dev/null +++ b/test/files/jvm/actmig-public-methods_1.scala @@ -0,0 +1,88 @@ +import scala.collection.mutable.ArrayBuffer +import scala.actors.Actor._ +import scala.actors._ +import scala.util._ +import java.util.concurrent.{ TimeUnit, CountDownLatch } +import scala.concurrent.util.Duration +import scala.actors.pattern._ + +object Test { + val NUMBER_OF_TESTS = 6 + + // used for sorting non-deterministic output + val buff = ArrayBuffer[String]() + val latch = new CountDownLatch(NUMBER_OF_TESTS) + val toStop = ArrayBuffer[ActorRef]() + + def append(v: String) = synchronized { + buff += v + } + + def main(args: Array[String]) = { + + val respActor = MigrationSystem.actorOf(Props(() => actor { + loop { + react { + case (x: String, time: Long) => + Thread.sleep(time) + reply(x + " after " + time) + case str: String => + append(str) + latch.countDown() + case x => + exit() + } + } + }, "akka.actor.default-stash-dispatcher")) + + toStop += respActor + + respActor ! "bang" + + implicit val timeout = Timeout(Duration(200, TimeUnit.MILLISECONDS)) + val msg = ("bang qmark", 0L) + val res1 = respActor.?(msg)(Timeout(Duration.Inf)) + append(res1().toString) + latch.countDown() + + val msg1 = ("bang qmark", 1L) + val res2 = respActor.?(msg1)(Timeout(Duration(200, TimeUnit.MILLISECONDS))) + append((res2() match { + case x: AskTimeoutException => None + case v => Some(v) + }).toString) + latch.countDown() + + // this one should time out + val msg11 = ("bang qmark", 200L) + val res21 = respActor.?(msg11)(Timeout(Duration(1, TimeUnit.MILLISECONDS))) + append((res21() match { + case x: AskTimeoutException => None + case v => Some(v) + }).toString) + latch.countDown() + + val msg2 = ("bang qmark in future", 0L) + val fut1 = respActor.?(msg2)(Duration.Inf) + append(fut1().toString()) + latch.countDown() + + val handler: PartialFunction[Any, String] = { + case x: String => x.toString + } + + val msg3 = ("typed bang qmark in future", 0L) + val fut2 = (respActor.?(msg3)(Duration.Inf)) + append(Futures.future { handler.apply(fut2()) }().toString) + latch.countDown() + + // output + latch.await(10, TimeUnit.MILLISECONDS) + if (latch.getCount() > 0) { + println("Error: Tasks have not finished!!!") + } + + buff.sorted.foreach(println) + toStop.foreach(_ ! PoisonPill) + } +} diff --git a/test/files/jvm/actmig-react-receive.check b/test/files/jvm/actmig-react-receive.check new file mode 100644 index 0000000000..cc2a426e68 --- /dev/null +++ b/test/files/jvm/actmig-react-receive.check @@ -0,0 +1,16 @@ +do before +do task +do after +do before +do task +do after +do before +do task +do in between +do string +do after +do before +do task +do in between +do string +do after diff --git a/test/files/jvm/actmig-react-receive.scala b/test/files/jvm/actmig-react-receive.scala new file mode 100644 index 0000000000..fbc10a8d52 --- /dev/null +++ b/test/files/jvm/actmig-react-receive.scala @@ -0,0 +1,104 @@ +import scala.actors.MigrationSystem._ +import scala.actors.Actor._ +import scala.actors.{ Actor, StashingActor, ActorRef, Props, MigrationSystem, PoisonPill } +import java.util.concurrent.{ TimeUnit, CountDownLatch } +import scala.collection.mutable.ArrayBuffer + +object Test { + + def testComposition() = { + // Snippet showing composition of receives + // React Snippet - before + val myActor = actor { + // do before + println("do before") + receive { + case x: Int => + // do task + println("do task") + } + println("do in between") + receive { + case x: String => + // do string now + println("do string") + } + println("do after") + } + myActor.start() + myActor ! 1 + myActor ! "1" + Thread.sleep(200) + // React Snippet - migrated + val myAkkaActor = MigrationSystem.actorOf(Props(() => new StashingActor { + override def preStart() = { + println("do before") + } + + def receive = ({ + case x: Int => + // do task + println("do task") + }: Receive) andThen { v => + context.become { + case x: String => + //do string + println("do string") + context.stop(self) + } + println("do in between") + } + + override def postStop() = { + println("do after") + } + + }, "default-stashing-dispatcher")) + myAkkaActor ! 1 + myAkkaActor ! "1" + Thread.sleep(200) + } + + def main(args: Array[String]) = { + // React Snippet - before + val myActor = actor { + // do before + println("do before") + receive { + case x: Int => + // do task + println("do task") + } + println("do after") + } + myActor.start() + myActor ! 1 + + Thread.sleep(200) + + // React Snippet - migrated + val myAkkaActor = MigrationSystem.actorOf(Props(() => new StashingActor { + override def preStart() = { + println("do before") + } + + def receive = { + case x: Int => + // do task + println("do task") + context.stop(self) + } + + override def postStop() = { + println("do after") + } + + }, "default-stashing-dispatcher")) + myAkkaActor ! 1 + + Thread.sleep(200) + // Starting composition test + testComposition() + + } +} -- cgit v1.2.3