From e99fb0c93842d517b8a185458f405bace2bbb46b Mon Sep 17 00:00:00 2001 From: Vojin Jovanovic Date: Fri, 18 May 2012 17:53:05 +0200 Subject: Adding the Actor Migration Kit. Kit consists of: 1) The StashingActor which adopts an interface similar to Akka. 2) Props mockup for creating Akka like code 3) Pattern mockup 4) Test cases for every step in the migration. 5) MigrationSystem which will paired on the Akka side. Review of the code : @phaller Review of the build: @jsuereth --- src/actors/scala/actors/ActorRef.scala | 119 ++++++++++++++++++++++++++++ src/actors/scala/actors/ActorTask.scala | 1 - src/actors/scala/actors/InternalActor.scala | 53 ++++++++++--- src/actors/scala/actors/MQueue.scala | 14 ++++ src/actors/scala/actors/Reactor.scala | 10 ++- 5 files changed, 186 insertions(+), 11 deletions(-) create mode 100644 src/actors/scala/actors/ActorRef.scala (limited to 'src/actors') diff --git a/src/actors/scala/actors/ActorRef.scala b/src/actors/scala/actors/ActorRef.scala new file mode 100644 index 0000000000..8f70b13e01 --- /dev/null +++ b/src/actors/scala/actors/ActorRef.scala @@ -0,0 +1,119 @@ +package scala.actors + +import java.util.concurrent.TimeoutException +import scala.concurrent.util.Duration + +/** + * Trait used for migration of Scala actors to Akka. + */ +@deprecated("ActorRef ought to be used only with the Actor Migration Kit.") +trait ActorRef { + + /** + * Sends a one-way asynchronous message. E.g. fire-and-forget semantics. + *

+ * + * If invoked from within an actor then the actor reference is implicitly passed on as the implicit 'sender' argument. + *

+ * + * This actor 'sender' reference is then available in the receiving actor in the 'sender' member variable, + * if invoked from within an Actor. If not then no sender is available. + *

+   *   actor ! message
+   * 
+ *

+ */ + def !(message: Any)(implicit sender: ActorRef = null): Unit + + /** + * Sends a message asynchronously, returning a future which may eventually hold the reply. + */ + private[actors] def ?(message: Any, timeout: Duration): Future[Any] + + /** + * Forwards the message and passes the original sender actor as the sender. + *

+ * Works with '!' and '?'. + */ + def forward(message: Any) + + private[actors] def localActor: AbstractActor + +} + +private[actors] class OutputChannelRef(val actor: OutputChannel[Any]) extends ActorRef { + + override private[actors] def ?(message: Any, timeout: Duration): Future[Any] = + throw new UnsupportedOperationException("Output channel does not support ?") + + /** + * Sends a one-way asynchronous message. E.g. fire-and-forget semantics. + *

+ * + *

+ *

+   *   actor ! message
+   * 
+ *

+ */ + def !(message: Any)(implicit sender: ActorRef = null): Unit = + if (sender != null) + actor.send(message, sender.localActor) + else + actor ! message + + override def equals(that: Any) = + that.isInstanceOf[OutputChannelRef] && that.asInstanceOf[OutputChannelRef].actor == this.actor + + private[actors] override def localActor: AbstractActor = + throw new UnsupportedOperationException("Output channel does not have an instance of the actor") + + def forward(message: Any): Unit = throw new UnsupportedOperationException("OutputChannel does not support forward.") + +} + +private[actors] class ReactorRef(override val actor: Reactor[Any]) extends OutputChannelRef(actor) { + + /** + * Forwards the message and passes the original sender actor as the sender. + *

+ * Works with '!' and '?'. + */ + override def forward(message: Any) = actor.forward(message) + +} + +private[actors] final class InternalActorRef(override val actor: InternalActor) extends ReactorRef(actor) { + + /** + * Sends a message asynchronously, returning a future which may eventually hold the reply. + */ + override private[actors] def ?(message: Any, timeout: Duration): Future[Any] = + Futures.future { + val dur = if (timeout.isFinite()) timeout.toMillis else (java.lang.Long.MAX_VALUE >> 2) + actor !? (dur, message) match { + case Some(x) => x + case None => new AskTimeoutException("? operation timed out.") + } + } + + override def !(message: Any)(implicit sender: ActorRef = null): Unit = + if (message == PoisonPill) + actor.stop('normal) + else if (sender != null) + actor.send(message, sender.localActor) + else + actor ! message + + private[actors] override def localActor: InternalActor = this.actor +} + +/** + * This is what is used to complete a Future that is returned from an ask/? call, + * when it times out. + */ +class AskTimeoutException(message: String, cause: Throwable) extends TimeoutException { + def this(message: String) = this(message, null: Throwable) +} + +object PoisonPill diff --git a/src/actors/scala/actors/ActorTask.scala b/src/actors/scala/actors/ActorTask.scala index bb04302238..045b00f5f2 100644 --- a/src/actors/scala/actors/ActorTask.scala +++ b/src/actors/scala/actors/ActorTask.scala @@ -51,7 +51,6 @@ private[actors] class ActorTask(actor: InternalActor, super.terminateExecution(e) () => {} } - actor.internalPostStop res } diff --git a/src/actors/scala/actors/InternalActor.scala b/src/actors/scala/actors/InternalActor.scala index c94da5b9fd..cb66021d1c 100644 --- a/src/actors/scala/actors/InternalActor.scala +++ b/src/actors/scala/actors/InternalActor.scala @@ -153,7 +153,7 @@ private[actors] trait InternalActor extends AbstractActor with InternalReplyReac val matches = f.isDefinedAt(m) senders = senders.tail matches - }) + }) if (null eq qel) { val todo = synchronized { // in mean time new stuff might have arrived @@ -316,6 +316,35 @@ private[actors] trait InternalActor extends AbstractActor with InternalReplyReac to } + /** + * Links self to actor to. + * + * @param to the actor to link to + * @return the parameter actor + */ + def link(to: ActorRef): ActorRef = { + this.link(to.localActor) + to + } + + /** + * Unidirectional linking. For migration purposes only + */ + private[actors] def watch(subject: ActorRef): ActorRef = { + assert(Actor.self(scheduler) == this, "link called on actor different from self") + subject.localActor linkTo this + subject + } + + /** + * Unidirectional linking. For migration purposes only + */ + private[actors] def unwatch(subject: ActorRef): ActorRef = { + assert(Actor.self(scheduler) == this, "link called on actor different from self") + subject.localActor unlinkFrom this + subject + } + /** * Links self to the actor defined by body. * @@ -346,17 +375,24 @@ private[actors] trait InternalActor extends AbstractActor with InternalReplyReac from unlinkFrom this } + /** + * Unlinks self from actor from. + */ + def unlink(from: ActorRef) { + unlink(from.localActor) + } + private[actors] def unlinkFrom(from: AbstractActor) = synchronized { links = links.filterNot(from.==) } - @volatile + @volatile private[actors] var _trapExit = false - + def trapExit = _trapExit - + def trapExit_=(value: Boolean) = _trapExit = value - + // guarded by this private var exitReason: AnyRef = 'normal // guarded by this @@ -445,12 +481,11 @@ private[actors] trait InternalActor extends AbstractActor with InternalReplyReac scheduler.onTerminate(this) { f } } - private[actors] def internalPostStop() = {} - private[actors] def stop(reason: AnyRef): Unit = { + private[actors] def stop(reason: AnyRef): Unit = { synchronized { shouldExit = true - exitReason = reason + exitReason = reason // resume this Actor in a way that // causes it to exit // (because shouldExit == true) @@ -464,7 +499,7 @@ private[actors] trait InternalActor extends AbstractActor with InternalReplyReac /* Here we should not throw a SuspendActorControl, since the current method is called from an actor that is in the process of exiting. - + Therefore, the contract for scheduleActor is that it never throws a SuspendActorControl. */ diff --git a/src/actors/scala/actors/MQueue.scala b/src/actors/scala/actors/MQueue.scala index 65427d68c5..4a148d2cb3 100644 --- a/src/actors/scala/actors/MQueue.scala +++ b/src/actors/scala/actors/MQueue.scala @@ -25,6 +25,20 @@ private[actors] class MQueue[Msg >: Null](protected val label: String) { _size += diff } + def prepend(other: MQueue[Msg]) { + if (!other.isEmpty) { + other.last.next = first + first = other.first + } + } + + def clear() { + first = null + last = null + _size = 0 + } + + def append(msg: Msg, session: OutputChannel[Any]) { changeSize(1) // size always increases by 1 val el = new MQueueElement(msg, session) diff --git a/src/actors/scala/actors/Reactor.scala b/src/actors/scala/actors/Reactor.scala index 206a97d97c..7a8d738758 100644 --- a/src/actors/scala/actors/Reactor.scala +++ b/src/actors/scala/actors/Reactor.scala @@ -214,11 +214,16 @@ trait Reactor[Msg >: Null] extends OutputChannel[Msg] with Combinators { scheduler executeFromActor makeReaction(null, handler, msg) } + private[actors] def preAct() = {} + // guarded by this private[actors] def dostart() { _state = Actor.State.Runnable scheduler newActor this - scheduler execute makeReaction(() => act(), null, null) + scheduler execute makeReaction(() => { + preAct() + act() + }, null, null) } /** @@ -285,12 +290,15 @@ trait Reactor[Msg >: Null] extends OutputChannel[Msg] with Combinators { throw Actor.suspendException } + private[actors] def internalPostStop() = {} + private[actors] def terminated() { synchronized { _state = Actor.State.Terminated // reset waitingFor, otherwise getState returns Suspended waitingFor = Reactor.waitingForNone } + internalPostStop() scheduler.terminated(this) } -- cgit v1.2.3