diff options
Diffstat (limited to 'src/actors')
-rw-r--r-- | src/actors/scala/actors/ActorRef.scala | 119 | ||||
-rw-r--r-- | src/actors/scala/actors/ActorTask.scala | 1 | ||||
-rw-r--r-- | src/actors/scala/actors/InternalActor.scala | 53 | ||||
-rw-r--r-- | src/actors/scala/actors/MQueue.scala | 14 | ||||
-rw-r--r-- | src/actors/scala/actors/Reactor.scala | 10 |
5 files changed, 186 insertions, 11 deletions
diff --git a/src/actors/scala/actors/ActorRef.scala b/src/actors/scala/actors/ActorRef.scala new file mode 100644 index 0000000000..8f70b13e01 --- /dev/null +++ b/src/actors/scala/actors/ActorRef.scala @@ -0,0 +1,119 @@ +package scala.actors + +import java.util.concurrent.TimeoutException +import scala.concurrent.util.Duration + +/** + * Trait used for migration of Scala actors to Akka. + */ +@deprecated("ActorRef ought to be used only with the Actor Migration Kit.") +trait ActorRef { + + /** + * Sends a one-way asynchronous message. E.g. fire-and-forget semantics. + * <p/> + * + * If invoked from within an actor then the actor reference is implicitly passed on as the implicit 'sender' argument. + * <p/> + * + * This actor 'sender' reference is then available in the receiving actor in the 'sender' member variable, + * if invoked from within an Actor. If not then no sender is available. + * <pre> + * actor ! message + * </pre> + * <p/> + */ + def !(message: Any)(implicit sender: ActorRef = null): Unit + + /** + * Sends a message asynchronously, returning a future which may eventually hold the reply. + */ + private[actors] def ?(message: Any, timeout: Duration): Future[Any] + + /** + * Forwards the message and passes the original sender actor as the sender. + * <p/> + * Works with '!' and '?'. + */ + def forward(message: Any) + + private[actors] def localActor: AbstractActor + +} + +private[actors] class OutputChannelRef(val actor: OutputChannel[Any]) extends ActorRef { + + override private[actors] def ?(message: Any, timeout: Duration): Future[Any] = + throw new UnsupportedOperationException("Output channel does not support ?") + + /** + * Sends a one-way asynchronous message. E.g. fire-and-forget semantics. + * <p/> + * + * <p/> + * <pre> + * actor ! message + * </pre> + * <p/> + */ + def !(message: Any)(implicit sender: ActorRef = null): Unit = + if (sender != null) + actor.send(message, sender.localActor) + else + actor ! message + + override def equals(that: Any) = + that.isInstanceOf[OutputChannelRef] && that.asInstanceOf[OutputChannelRef].actor == this.actor + + private[actors] override def localActor: AbstractActor = + throw new UnsupportedOperationException("Output channel does not have an instance of the actor") + + def forward(message: Any): Unit = throw new UnsupportedOperationException("OutputChannel does not support forward.") + +} + +private[actors] class ReactorRef(override val actor: Reactor[Any]) extends OutputChannelRef(actor) { + + /** + * Forwards the message and passes the original sender actor as the sender. + * <p/> + * Works with '!' and '?'. + */ + override def forward(message: Any) = actor.forward(message) + +} + +private[actors] final class InternalActorRef(override val actor: InternalActor) extends ReactorRef(actor) { + + /** + * Sends a message asynchronously, returning a future which may eventually hold the reply. + */ + override private[actors] def ?(message: Any, timeout: Duration): Future[Any] = + Futures.future { + val dur = if (timeout.isFinite()) timeout.toMillis else (java.lang.Long.MAX_VALUE >> 2) + actor !? (dur, message) match { + case Some(x) => x + case None => new AskTimeoutException("? operation timed out.") + } + } + + override def !(message: Any)(implicit sender: ActorRef = null): Unit = + if (message == PoisonPill) + actor.stop('normal) + else if (sender != null) + actor.send(message, sender.localActor) + else + actor ! message + + private[actors] override def localActor: InternalActor = this.actor +} + +/** + * This is what is used to complete a Future that is returned from an ask/? call, + * when it times out. + */ +class AskTimeoutException(message: String, cause: Throwable) extends TimeoutException { + def this(message: String) = this(message, null: Throwable) +} + +object PoisonPill diff --git a/src/actors/scala/actors/ActorTask.scala b/src/actors/scala/actors/ActorTask.scala index bb04302238..045b00f5f2 100644 --- a/src/actors/scala/actors/ActorTask.scala +++ b/src/actors/scala/actors/ActorTask.scala @@ -51,7 +51,6 @@ private[actors] class ActorTask(actor: InternalActor, super.terminateExecution(e) () => {} } - actor.internalPostStop res } diff --git a/src/actors/scala/actors/InternalActor.scala b/src/actors/scala/actors/InternalActor.scala index c94da5b9fd..cb66021d1c 100644 --- a/src/actors/scala/actors/InternalActor.scala +++ b/src/actors/scala/actors/InternalActor.scala @@ -153,7 +153,7 @@ private[actors] trait InternalActor extends AbstractActor with InternalReplyReac val matches = f.isDefinedAt(m) senders = senders.tail matches - }) + }) if (null eq qel) { val todo = synchronized { // in mean time new stuff might have arrived @@ -317,6 +317,35 @@ private[actors] trait InternalActor extends AbstractActor with InternalReplyReac } /** + * Links <code>self</code> to actor <code>to</code>. + * + * @param to the actor to link to + * @return the parameter actor + */ + def link(to: ActorRef): ActorRef = { + this.link(to.localActor) + to + } + + /** + * Unidirectional linking. For migration purposes only + */ + private[actors] def watch(subject: ActorRef): ActorRef = { + assert(Actor.self(scheduler) == this, "link called on actor different from self") + subject.localActor linkTo this + subject + } + + /** + * Unidirectional linking. For migration purposes only + */ + private[actors] def unwatch(subject: ActorRef): ActorRef = { + assert(Actor.self(scheduler) == this, "link called on actor different from self") + subject.localActor unlinkFrom this + subject + } + + /** * Links <code>self</code> to the actor defined by <code>body</code>. * * @param body the body of the actor to link to @@ -346,17 +375,24 @@ private[actors] trait InternalActor extends AbstractActor with InternalReplyReac from unlinkFrom this } + /** + * Unlinks <code>self</code> from actor <code>from</code>. + */ + def unlink(from: ActorRef) { + unlink(from.localActor) + } + private[actors] def unlinkFrom(from: AbstractActor) = synchronized { links = links.filterNot(from.==) } - @volatile + @volatile private[actors] var _trapExit = false - + def trapExit = _trapExit - + def trapExit_=(value: Boolean) = _trapExit = value - + // guarded by this private var exitReason: AnyRef = 'normal // guarded by this @@ -445,12 +481,11 @@ private[actors] trait InternalActor extends AbstractActor with InternalReplyReac scheduler.onTerminate(this) { f } } - private[actors] def internalPostStop() = {} - private[actors] def stop(reason: AnyRef): Unit = { + private[actors] def stop(reason: AnyRef): Unit = { synchronized { shouldExit = true - exitReason = reason + exitReason = reason // resume this Actor in a way that // causes it to exit // (because shouldExit == true) @@ -464,7 +499,7 @@ private[actors] trait InternalActor extends AbstractActor with InternalReplyReac /* Here we should not throw a SuspendActorControl, since the current method is called from an actor that is in the process of exiting. - + Therefore, the contract for scheduleActor is that it never throws a SuspendActorControl. */ diff --git a/src/actors/scala/actors/MQueue.scala b/src/actors/scala/actors/MQueue.scala index 65427d68c5..4a148d2cb3 100644 --- a/src/actors/scala/actors/MQueue.scala +++ b/src/actors/scala/actors/MQueue.scala @@ -25,6 +25,20 @@ private[actors] class MQueue[Msg >: Null](protected val label: String) { _size += diff } + def prepend(other: MQueue[Msg]) { + if (!other.isEmpty) { + other.last.next = first + first = other.first + } + } + + def clear() { + first = null + last = null + _size = 0 + } + + def append(msg: Msg, session: OutputChannel[Any]) { changeSize(1) // size always increases by 1 val el = new MQueueElement(msg, session) diff --git a/src/actors/scala/actors/Reactor.scala b/src/actors/scala/actors/Reactor.scala index 206a97d97c..7a8d738758 100644 --- a/src/actors/scala/actors/Reactor.scala +++ b/src/actors/scala/actors/Reactor.scala @@ -214,11 +214,16 @@ trait Reactor[Msg >: Null] extends OutputChannel[Msg] with Combinators { scheduler executeFromActor makeReaction(null, handler, msg) } + private[actors] def preAct() = {} + // guarded by this private[actors] def dostart() { _state = Actor.State.Runnable scheduler newActor this - scheduler execute makeReaction(() => act(), null, null) + scheduler execute makeReaction(() => { + preAct() + act() + }, null, null) } /** @@ -285,12 +290,15 @@ trait Reactor[Msg >: Null] extends OutputChannel[Msg] with Combinators { throw Actor.suspendException } + private[actors] def internalPostStop() = {} + private[actors] def terminated() { synchronized { _state = Actor.State.Terminated // reset waitingFor, otherwise getState returns Suspended waitingFor = Reactor.waitingForNone } + internalPostStop() scheduler.terminated(this) } |