From 66f0679169bd8d5dc749c2288777c5a217ae3d43 Mon Sep 17 00:00:00 2001 From: Vojin Jovanovic Date: Mon, 19 Mar 2012 22:25:26 +0100 Subject: Prepared actors hierarchy for migration. Internal nodes added so methods relevant to akka can be overridden. Review by: @phaller --- src/actors/scala/actors/Actor.scala | 484 +------------------- src/actors/scala/actors/ActorCanReply.scala | 2 +- src/actors/scala/actors/ActorTask.scala | 9 +- src/actors/scala/actors/Channel.scala | 2 +- src/actors/scala/actors/Combinators.scala | 2 +- src/actors/scala/actors/InternalActor.scala | 509 +++++++++++++++++++++ src/actors/scala/actors/InternalReplyReactor.scala | 161 +++++++ src/actors/scala/actors/OutputChannel.scala | 2 +- src/actors/scala/actors/ReactChannel.scala | 2 +- src/actors/scala/actors/Reactor.scala | 2 +- src/actors/scala/actors/ReactorCanReply.scala | 2 +- src/actors/scala/actors/ReplyReactor.scala | 165 +------ src/actors/scala/actors/ReplyReactorTask.scala | 4 +- src/actors/scala/actors/UncaughtException.scala | 2 +- 14 files changed, 702 insertions(+), 646 deletions(-) create mode 100644 src/actors/scala/actors/InternalActor.scala create mode 100644 src/actors/scala/actors/InternalReplyReactor.scala (limited to 'src') diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala index b746f68268..aab533ae8d 100644 --- a/src/actors/scala/actors/Actor.scala +++ b/src/actors/scala/actors/Actor.scala @@ -46,7 +46,7 @@ object Actor extends Combinators { Terminated = Value } - private[actors] val tl = new ThreadLocal[ReplyReactor] + private[actors] val tl = new ThreadLocal[InternalReplyReactor] // timer thread runs as daemon private[actors] val timer = new Timer(true) @@ -59,15 +59,15 @@ object Actor extends Combinators { * * @return returns the currently executing actor. */ - def self: Actor = self(Scheduler) + def self: Actor = self(Scheduler).asInstanceOf[Actor] - private[actors] def self(sched: IScheduler): Actor = - rawSelf(sched).asInstanceOf[Actor] + private[actors] def self(sched: IScheduler): InternalActor = + rawSelf(sched).asInstanceOf[InternalActor] - private[actors] def rawSelf: ReplyReactor = + private[actors] def rawSelf: InternalReplyReactor = rawSelf(Scheduler) - private[actors] def rawSelf(sched: IScheduler): ReplyReactor = { + private[actors] def rawSelf(sched: IScheduler): InternalReplyReactor = { val s = tl.get if (s eq null) { val r = new ActorProxy(Thread.currentThread, sched) @@ -245,7 +245,7 @@ object Actor extends Combinators { def eventloop(f: PartialFunction[Any, Unit]): Nothing = rawSelf.react(new RecursiveProxyHandler(rawSelf, f)) - private class RecursiveProxyHandler(a: ReplyReactor, f: PartialFunction[Any, Unit]) + private class RecursiveProxyHandler(a: InternalReplyReactor, f: PartialFunction[Any, Unit]) extends scala.runtime.AbstractPartialFunction[Any, Unit] { def _isDefinedAt(m: Any): Boolean = true // events are immediately removed from the mailbox @@ -259,7 +259,7 @@ object Actor extends Combinators { * Returns the actor which sent the last received message. */ def sender: OutputChannel[Any] = - rawSelf.sender + rawSelf.internalSender /** * Sends `msg` to the actor waiting in a call to `!?`. @@ -302,7 +302,7 @@ object Actor extends Combinators { def andThen[b](other: => b): Unit } - implicit def mkBody[a](body: => a) = new Body[a] { + implicit def mkBody[a](body: => a) = new InternalActor.Body[a] { def andThen[b](other: => b): Unit = rawSelf.seq(body, other) } @@ -397,476 +397,12 @@ object Actor extends Combinators { * @define channel actor's mailbox */ @SerialVersionUID(-781154067877019505L) -trait Actor extends AbstractActor with ReplyReactor with ActorCanReply with InputChannel[Any] with Serializable { - - /* The following two fields are only used when the actor - * suspends by blocking its underlying thread, for example, - * when waiting in a receive or synchronous send. - */ - @volatile - private var isSuspended = false - - /* This field is used to communicate the received message from - * the invocation of send to the place where the thread of - * the receiving actor resumes inside receive/receiveWithin. - */ - @volatile - private var received: Option[Any] = None - - protected[actors] override def scheduler: IScheduler = Scheduler - - private[actors] override def startSearch(msg: Any, replyTo: OutputChannel[Any], handler: PartialFunction[Any, Any]) = - if (isSuspended) { - () => synchronized { - mailbox.append(msg, replyTo) - resumeActor() - } - } else super.startSearch(msg, replyTo, handler) - - // we override this method to check `shouldExit` before suspending - private[actors] override def searchMailbox(startMbox: MQueue[Any], - handler: PartialFunction[Any, Any], - resumeOnSameThread: Boolean) { - var tmpMbox = startMbox - var done = false - while (!done) { - val qel = tmpMbox.extractFirst((msg: Any, replyTo: OutputChannel[Any]) => { - senders = List(replyTo) - handler.isDefinedAt(msg) - }) - if (tmpMbox ne mailbox) - tmpMbox.foreach((m, s) => mailbox.append(m, s)) - if (null eq qel) { - synchronized { - // in mean time new stuff might have arrived - if (!sendBuffer.isEmpty) { - tmpMbox = new MQueue[Any]("Temp") - drainSendBuffer(tmpMbox) - // keep going - } else { - // very important to check for `shouldExit` at this point - // since linked actors might have set it after we checked - // last time (e.g., at the beginning of `react`) - if (shouldExit) exit() - waitingFor = handler - // see Reactor.searchMailbox - throw Actor.suspendException - } - } - } else { - resumeReceiver((qel.msg, qel.session), handler, resumeOnSameThread) - done = true - } - } - } - - private[actors] override def makeReaction(fun: () => Unit, handler: PartialFunction[Any, Any], msg: Any): Runnable = - new ActorTask(this, fun, handler, msg) - - /** See the companion object's `receive` method. */ - def receive[R](f: PartialFunction[Any, R]): R = { - assert(Actor.self(scheduler) == this, "receive from channel belonging to other actor") - - synchronized { - if (shouldExit) exit() // links - drainSendBuffer(mailbox) - } - - var done = false - while (!done) { - val qel = mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => { - senders = replyTo :: senders - val matches = f.isDefinedAt(m) - senders = senders.tail - matches - }) - if (null eq qel) { - synchronized { - // in mean time new stuff might have arrived - if (!sendBuffer.isEmpty) { - drainSendBuffer(mailbox) - // keep going - } else { - waitingFor = f - isSuspended = true - scheduler.managedBlock(blocker) - drainSendBuffer(mailbox) - // keep going - } - } - } else { - received = Some(qel.msg) - senders = qel.session :: senders - done = true - } - } - - val result = f(received.get) - received = None - senders = senders.tail - result - } - - /** See the companion object's `receiveWithin` method. */ - def receiveWithin[R](msec: Long)(f: PartialFunction[Any, R]): R = { - assert(Actor.self(scheduler) == this, "receive from channel belonging to other actor") - - synchronized { - if (shouldExit) exit() // links - drainSendBuffer(mailbox) - } - - // first, remove spurious TIMEOUT message from mailbox if any - mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => m == TIMEOUT) - - val receiveTimeout = () => { - if (f.isDefinedAt(TIMEOUT)) { - received = Some(TIMEOUT) - senders = this :: senders - } else - sys.error("unhandled timeout") - } - - var done = false - while (!done) { - val qel = mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => { - senders = replyTo :: senders - val matches = f.isDefinedAt(m) - senders = senders.tail - matches - }) - if (null eq qel) { - val todo = synchronized { - // in mean time new stuff might have arrived - if (!sendBuffer.isEmpty) { - drainSendBuffer(mailbox) - // keep going - () => {} - } else if (msec == 0L) { - done = true - receiveTimeout - } else { - if (onTimeout.isEmpty) { - if (!f.isDefinedAt(TIMEOUT)) - sys.error("unhandled timeout") - - val thisActor = this - onTimeout = Some(new TimerTask { - def run() { - thisActor.send(TIMEOUT, thisActor) - } - }) - Actor.timer.schedule(onTimeout.get, msec) - } - - // It is possible that !onTimeout.isEmpty, but TIMEOUT is not yet in mailbox - // See SI-4759 - waitingFor = f - received = None - isSuspended = true - scheduler.managedBlock(blocker) - drainSendBuffer(mailbox) - // keep going - () => {} - } - } - todo() - } else { - synchronized { - if (!onTimeout.isEmpty) { - onTimeout.get.cancel() - onTimeout = None - } - } - received = Some(qel.msg) - senders = qel.session :: senders - done = true - } - } - - val result = f(received.get) - received = None - senders = senders.tail - result - } - - /** See the companion object's `react` method. */ - override def react(handler: PartialFunction[Any, Unit]): Nothing = { - synchronized { - if (shouldExit) exit() - } - super.react(handler) - } - - /** See the companion object's `reactWithin` method. */ - override def reactWithin(msec: Long)(handler: PartialFunction[Any, Unit]): Nothing = { - synchronized { - if (shouldExit) exit() - } - super.reactWithin(msec)(handler) - } - - /** Receives the next message from the mailbox */ - def ? : Any = receive { - case x => x - } - - // guarded by lock of this - // never throws SuspendActorControl - private[actors] override def scheduleActor(f: PartialFunction[Any, Any], msg: Any) = - if (f eq null) { - // do nothing (timeout is handled instead) - } - else { - val task = new ActorTask(this, null, f, msg) - scheduler executeFromActor task - } - - /* Used for notifying scheduler when blocking inside receive/receiveWithin. */ - private object blocker extends scala.concurrent.ManagedBlocker { - def block() = { - Actor.this.suspendActor() - true - } - def isReleasable = - !Actor.this.isSuspended - } - - private def suspendActor() = synchronized { - while (isSuspended) { - try { - wait() - } catch { - case _: InterruptedException => - } - } - // links: check if we should exit - if (shouldExit) exit() - } - - private def resumeActor() { - isSuspended = false - notify() - } - - private[actors] override def exiting = synchronized { - _state == Actor.State.Terminated - } - - // guarded by this - private[actors] override def dostart() { - // Reset various flags. - // - // Note that we do *not* reset `trapExit`. The reason is that - // users should be able to set the field in the constructor - // and before `act` is called. - exitReason = 'normal - shouldExit = false - - super.dostart() - } +trait Actor extends InternalActor with ReplyReactor { override def start(): Actor = synchronized { super.start() this } - /** State of this actor */ - override def getState: Actor.State.Value = synchronized { - if (isSuspended) { - if (onTimeout.isEmpty) - Actor.State.Blocked - else - Actor.State.TimedBlocked - } else - super.getState - } - - // guarded by this - private[actors] var links: List[AbstractActor] = Nil - - /** - * Links `self` to actor `to`. - * - * @param to the actor to link to - * @return the parameter actor - */ - def link(to: AbstractActor): AbstractActor = { - assert(Actor.self(scheduler) == this, "link called on actor different from self") - this linkTo to - to linkTo this - to - } - - /** - * Links `self` to the actor defined by `body`. - * - * @param body the body of the actor to link to - * @return the parameter actor - */ - def link(body: => Unit): Actor = { - assert(Actor.self(scheduler) == this, "link called on actor different from self") - val a = new Actor { - def act() = body - override final val scheduler: IScheduler = Actor.this.scheduler - } - link(a) - a.start() - a - } - - private[actors] def linkTo(to: AbstractActor) = synchronized { - links = to :: links - } - - /** - * Unlinks `self` from actor `from`. - */ - def unlink(from: AbstractActor) { - assert(Actor.self(scheduler) == this, "unlink called on actor different from self") - this unlinkFrom from - from unlinkFrom this - } - - private[actors] def unlinkFrom(from: AbstractActor) = synchronized { - links = links.filterNot(from.==) - } - - @volatile - var trapExit = false - // guarded by this - private var exitReason: AnyRef = 'normal - // guarded by this - private[actors] var shouldExit = false - - /** - * Terminates execution of `self` with the following effect on - * linked actors: - * - * For each linked actor `a` with `trapExit` set to `'''true'''`, - * send message `Exit(self, reason)` to `a`. - * - * For each linked actor `a` with `trapExit` set to `'''false'''` - * (default), call `a.exit(reason)` if `reason != 'normal`. - */ - protected[actors] def exit(reason: AnyRef): Nothing = { - synchronized { - exitReason = reason - } - exit() - } - - /** - * Terminates with exit reason `'normal`. - */ - protected[actors] override def exit(): Nothing = { - val todo = synchronized { - if (!links.isEmpty) - exitLinked() - else - () => {} - } - todo() - super.exit() - } - - // Assume !links.isEmpty - // guarded by this - private[actors] def exitLinked(): () => Unit = { - _state = Actor.State.Terminated - // reset waitingFor, otherwise getState returns Suspended - waitingFor = Reactor.waitingForNone - // remove this from links - val mylinks = links.filterNot(this.==) - // unlink actors - mylinks.foreach(unlinkFrom(_)) - // return closure that locks linked actors - () => { - mylinks.foreach((linked: AbstractActor) => { - linked.synchronized { - if (!linked.exiting) { - linked.unlinkFrom(this) - linked.exit(this, exitReason) - } - } - }) - } - } - - // Assume !links.isEmpty - // guarded by this - private[actors] def exitLinked(reason: AnyRef): () => Unit = { - exitReason = reason - exitLinked() - } - - // Assume !this.exiting - private[actors] def exit(from: AbstractActor, reason: AnyRef) { - if (trapExit) { - this ! Exit(from, reason) - } - else if (reason != 'normal) - synchronized { - shouldExit = true - exitReason = reason - // resume this Actor in a way that - // causes it to exit - // (because shouldExit == true) - if (isSuspended) - resumeActor() - else if (waitingFor ne Reactor.waitingForNone) { - waitingFor = Reactor.waitingForNone - // it doesn't matter what partial function we are passing here - scheduleActor(waitingFor, null) - /* Here we should not throw a SuspendActorControl, - since the current method is called from an actor that - is in the process of exiting. - - Therefore, the contract for scheduleActor is that - it never throws a SuspendActorControl. - */ - } - } - } - - /** Requires qualified private, because `RemoteActor` must - * register a termination handler. - */ - private[actors] def onTerminate(f: => Unit) { - scheduler.onTerminate(this) { f } - } } - -/** - * Used as the timeout pattern in - * - * receiveWithin and - * - * reactWithin. - * - * @example {{{ - * receiveWithin(500) { - * case (x, y) => ... - * case TIMEOUT => ... - * } - * }}} - * - * @author Philipp Haller - */ -case object TIMEOUT - - -/** Sent to an actor with `trapExit` set to `'''true'''` whenever one of its - * linked actors terminates. - * - * @param from the actor that terminated - * @param reason the reason that caused the actor to terminate - */ -case class Exit(from: AbstractActor, reason: AnyRef) - -/** Manages control flow of actor executions. - * - * @author Philipp Haller - */ -private[actors] class SuspendActorControl extends ControlThrowable diff --git a/src/actors/scala/actors/ActorCanReply.scala b/src/actors/scala/actors/ActorCanReply.scala index b307aafa57..d92fb183c0 100644 --- a/src/actors/scala/actors/ActorCanReply.scala +++ b/src/actors/scala/actors/ActorCanReply.scala @@ -18,7 +18,7 @@ import scala.concurrent.SyncVar * @author Philipp Haller */ private[actors] trait ActorCanReply extends ReactorCanReply { - this: AbstractActor with ReplyReactor => + this: AbstractActor with InternalReplyReactor => override def !?(msg: Any): Any = { val replyCh = new Channel[Any](Actor.self(scheduler)) diff --git a/src/actors/scala/actors/ActorTask.scala b/src/actors/scala/actors/ActorTask.scala index 090d0448f0..bb04302238 100644 --- a/src/actors/scala/actors/ActorTask.scala +++ b/src/actors/scala/actors/ActorTask.scala @@ -17,7 +17,7 @@ package scala.actors * changes to the underlying var invisible.) I can't figure out what's supposed * to happen, so I renamed the constructor parameter to at least be less confusing. */ -private[actors] class ActorTask(actor: Actor, +private[actors] class ActorTask(actor: InternalActor, fun: () => Unit, handler: PartialFunction[Any, Any], initialMsg: Any) @@ -32,7 +32,7 @@ private[actors] class ActorTask(actor: Actor, } protected override def terminateExecution(e: Throwable) { - val senderInfo = try { Some(actor.sender) } catch { + val senderInfo = try { Some(actor.internalSender) } catch { case _: Exception => None } // !!! If this is supposed to be setting the current contents of the @@ -45,13 +45,16 @@ private[actors] class ActorTask(actor: Actor, e) val todo = actor.synchronized { - if (!actor.links.isEmpty) + val res = if (!actor.links.isEmpty) actor.exitLinked(uncaught) else { super.terminateExecution(e) () => {} } + actor.internalPostStop + res } + todo() } diff --git a/src/actors/scala/actors/Channel.scala b/src/actors/scala/actors/Channel.scala index 62331239e8..36cee66b42 100644 --- a/src/actors/scala/actors/Channel.scala +++ b/src/actors/scala/actors/Channel.scala @@ -34,7 +34,7 @@ case class ! [a](ch: Channel[a], msg: a) * @define actor channel * @define channel channel */ -class Channel[Msg](val receiver: Actor) extends InputChannel[Msg] with OutputChannel[Msg] with CanReply[Msg, Any] { +class Channel[Msg](val receiver: InternalActor) extends InputChannel[Msg] with OutputChannel[Msg] with CanReply[Msg, Any] { type Future[+P] = scala.actors.Future[P] diff --git a/src/actors/scala/actors/Combinators.scala b/src/actors/scala/actors/Combinators.scala index 5276c7843e..c1a9095614 100644 --- a/src/actors/scala/actors/Combinators.scala +++ b/src/actors/scala/actors/Combinators.scala @@ -16,7 +16,7 @@ private[actors] trait Combinators { * Enables the composition of suspendable closures using `andThen`, * `loop`, `loopWhile`, etc. */ - implicit def mkBody[a](body: => a): Actor.Body[a] + implicit def mkBody[a](body: => a): InternalActor.Body[a] /** * Repeatedly executes `body`. diff --git a/src/actors/scala/actors/InternalActor.scala b/src/actors/scala/actors/InternalActor.scala new file mode 100644 index 0000000000..c94da5b9fd --- /dev/null +++ b/src/actors/scala/actors/InternalActor.scala @@ -0,0 +1,509 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2005-2011, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ +package scala.actors +import java.util.TimerTask +import scala.util.control.ControlThrowable + +private[actors] object InternalActor { + private[actors] trait Body[a] { + def andThen[b](other: => b): Unit + } +} + +private[actors] trait InternalActor extends AbstractActor with InternalReplyReactor with ActorCanReply with InputChannel[Any] with Serializable { + + /* The following two fields are only used when the actor + * suspends by blocking its underlying thread, for example, + * when waiting in a receive or synchronous send. + */ + @volatile + private[actors] var isSuspended = false + + /* This field is used to communicate the received message from + * the invocation of send to the place where the thread of + * the receiving actor resumes inside receive/receiveWithin. + */ + @volatile + private var received: Option[Any] = None + + protected[actors] override def scheduler: IScheduler = Scheduler + + private[actors] override def startSearch(msg: Any, replyTo: OutputChannel[Any], handler: PartialFunction[Any, Any]) = + if (isSuspended) { + () => + synchronized { + mailbox.append(msg, replyTo) + resumeActor() + } + } else super.startSearch(msg, replyTo, handler) + + // we override this method to check `shouldExit` before suspending + private[actors] override def searchMailbox(startMbox: MQueue[Any], + handler: PartialFunction[Any, Any], + resumeOnSameThread: Boolean) { + var tmpMbox = startMbox + var done = false + while (!done) { + val qel = tmpMbox.extractFirst((msg: Any, replyTo: OutputChannel[Any]) => { + senders = List(replyTo) + handler.isDefinedAt(msg) + }) + if (tmpMbox ne mailbox) + tmpMbox.foreach((m, s) => mailbox.append(m, s)) + if (null eq qel) { + synchronized { + // in mean time new stuff might have arrived + if (!sendBuffer.isEmpty) { + tmpMbox = new MQueue[Any]("Temp") + drainSendBuffer(tmpMbox) + // keep going + } else { + // very important to check for `shouldExit` at this point + // since linked actors might have set it after we checked + // last time (e.g., at the beginning of `react`) + if (shouldExit) exit() + waitingFor = handler + // see Reactor.searchMailbox + throw Actor.suspendException + } + } + } else { + resumeReceiver((qel.msg, qel.session), handler, resumeOnSameThread) + done = true + } + } + } + + private[actors] override def makeReaction(fun: () => Unit, handler: PartialFunction[Any, Any], msg: Any): Runnable = + new ActorTask(this, fun, handler, msg) + + /** See the companion object's `receive` method. */ + def receive[R](f: PartialFunction[Any, R]): R = { + assert(Actor.self(scheduler) == this, "receive from channel belonging to other actor") + + synchronized { + if (shouldExit) exit() // links + drainSendBuffer(mailbox) + } + + var done = false + while (!done) { + val qel = mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => { + senders = replyTo :: senders + val matches = f.isDefinedAt(m) + senders = senders.tail + matches + }) + if (null eq qel) { + synchronized { + // in mean time new stuff might have arrived + if (!sendBuffer.isEmpty) { + drainSendBuffer(mailbox) + // keep going + } else { + waitingFor = f + isSuspended = true + scheduler.managedBlock(blocker) + drainSendBuffer(mailbox) + // keep going + } + } + } else { + received = Some(qel.msg) + senders = qel.session :: senders + done = true + } + } + + val result = f(received.get) + received = None + senders = senders.tail + result + } + + /** See the companion object's `receiveWithin` method. */ + def receiveWithin[R](msec: Long)(f: PartialFunction[Any, R]): R = { + assert(Actor.self(scheduler) == this, "receive from channel belonging to other actor") + + synchronized { + if (shouldExit) exit() // links + drainSendBuffer(mailbox) + } + + // first, remove spurious TIMEOUT message from mailbox if any + mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => m == TIMEOUT) + + val receiveTimeout = () => { + if (f.isDefinedAt(TIMEOUT)) { + received = Some(TIMEOUT) + senders = this :: senders + } else + sys.error("unhandled timeout") + } + + var done = false + while (!done) { + val qel = mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => { + senders = replyTo :: senders + val matches = f.isDefinedAt(m) + senders = senders.tail + matches + }) + if (null eq qel) { + val todo = synchronized { + // in mean time new stuff might have arrived + if (!sendBuffer.isEmpty) { + drainSendBuffer(mailbox) + // keep going + () => {} + } else if (msec == 0L) { + done = true + receiveTimeout + } else { + if (onTimeout.isEmpty) { + if (!f.isDefinedAt(TIMEOUT)) + sys.error("unhandled timeout") + + val thisActor = this + onTimeout = Some(new TimerTask { + def run() { + thisActor.send(TIMEOUT, thisActor) + } + }) + Actor.timer.schedule(onTimeout.get, msec) + } + + // It is possible that !onTimeout.isEmpty, but TIMEOUT is not yet in mailbox + // See SI-4759 + waitingFor = f + received = None + isSuspended = true + scheduler.managedBlock(blocker) + drainSendBuffer(mailbox) + // keep going + () => {} + } + } + todo() + } else { + synchronized { + if (!onTimeout.isEmpty) { + onTimeout.get.cancel() + onTimeout = None + } + } + received = Some(qel.msg) + senders = qel.session :: senders + done = true + } + } + + val result = f(received.get) + received = None + senders = senders.tail + result + } + + /** See the companion object's `react` method. */ + override def react(handler: PartialFunction[Any, Unit]): Nothing = { + synchronized { + if (shouldExit) exit() + } + super.react(handler) + } + + /** See the companion object's `reactWithin` method. */ + override def reactWithin(msec: Long)(handler: PartialFunction[Any, Unit]): Nothing = { + synchronized { + if (shouldExit) exit() + } + super.reactWithin(msec)(handler) + } + + /** Receives the next message from the mailbox */ + def ? : Any = receive { + case x => x + } + + // guarded by lock of this + // never throws SuspendActorControl + private[actors] override def scheduleActor(f: PartialFunction[Any, Any], msg: Any) = + if (f eq null) { + // do nothing (timeout is handled instead) + } else { + val task = new ActorTask(this, null, f, msg) + scheduler executeFromActor task + } + + /* Used for notifying scheduler when blocking inside receive/receiveWithin. */ + private object blocker extends scala.concurrent.ManagedBlocker { + def block() = { + InternalActor.this.suspendActor() + true + } + def isReleasable = + !InternalActor.this.isSuspended + } + + private def suspendActor() = synchronized { + while (isSuspended) { + try { + wait() + } catch { + case _: InterruptedException => + } + } + // links: check if we should exit + if (shouldExit) exit() + } + + private def resumeActor() { + isSuspended = false + notify() + } + + private[actors] override def exiting = synchronized { + _state == Actor.State.Terminated + } + + // guarded by this + private[actors] override def dostart() { + // Reset various flags. + // + // Note that we do *not* reset `trapExit`. The reason is that + // users should be able to set the field in the constructor + // and before `act` is called. + exitReason = 'normal + shouldExit = false + + super.dostart() + } + + override def start(): InternalActor = synchronized { + super.start() + this + } + + /** State of this actor */ + override def getState: Actor.State.Value = synchronized { + if (isSuspended) { + if (onTimeout.isEmpty) + Actor.State.Blocked + else + Actor.State.TimedBlocked + } else + super.getState + } + + // guarded by this + private[actors] var links: List[AbstractActor] = Nil + + /** + * Links self to actor to. + * + * @param to the actor to link to + * @return the parameter actor + */ + def link(to: AbstractActor): AbstractActor = { + assert(Actor.self(scheduler) == this, "link called on actor different from self") + this linkTo to + to linkTo this + to + } + + /** + * Links self to the actor defined by body. + * + * @param body the body of the actor to link to + * @return the parameter actor + */ + def link(body: => Unit): Actor = { + assert(Actor.self(scheduler) == this, "link called on actor different from self") + val a = new Actor { + def act() = body + override final val scheduler: IScheduler = InternalActor.this.scheduler + } + link(a) + a.start() + a + } + + private[actors] def linkTo(to: AbstractActor) = synchronized { + links = to :: links + } + + /** + * Unlinks self from actor from. + */ + def unlink(from: AbstractActor) { + assert(Actor.self(scheduler) == this, "unlink called on actor different from self") + this unlinkFrom from + from unlinkFrom this + } + + private[actors] def unlinkFrom(from: AbstractActor) = synchronized { + links = links.filterNot(from.==) + } + + @volatile + private[actors] var _trapExit = false + + def trapExit = _trapExit + + def trapExit_=(value: Boolean) = _trapExit = value + + // guarded by this + private var exitReason: AnyRef = 'normal + // guarded by this + private[actors] var shouldExit = false + + /** + *

+ * Terminates execution of self with the following + * effect on linked actors: + *

+ *

+ * For each linked actor a with + * trapExit set to true, send message + * Exit(self, reason) to a. + *

+ *

+ * For each linked actor a with + * trapExit set to false (default), + * call a.exit(reason) if + * reason != 'normal. + *

+ */ + protected[actors] def exit(reason: AnyRef): Nothing = { + synchronized { + exitReason = reason + } + exit() + } + + /** + * Terminates with exit reason 'normal. + */ + protected[actors] override def exit(): Nothing = { + val todo = synchronized { + if (!links.isEmpty) + exitLinked() + else + () => {} + } + todo() + super.exit() + } + + // Assume !links.isEmpty + // guarded by this + private[actors] def exitLinked(): () => Unit = { + _state = Actor.State.Terminated + // reset waitingFor, otherwise getState returns Suspended + waitingFor = Reactor.waitingForNone + // remove this from links + val mylinks = links.filterNot(this.==) + // unlink actors + mylinks.foreach(unlinkFrom(_)) + // return closure that locks linked actors + () => { + mylinks.foreach((linked: AbstractActor) => { + linked.synchronized { + if (!linked.exiting) { + linked.unlinkFrom(this) + linked.exit(this, exitReason) + } + } + }) + } + } + + // Assume !links.isEmpty + // guarded by this + private[actors] def exitLinked(reason: AnyRef): () => Unit = { + exitReason = reason + exitLinked() + } + + // Assume !this.exiting + private[actors] def exit(from: AbstractActor, reason: AnyRef) { + if (trapExit) { + this ! Exit(from, reason) + } else if (reason != 'normal) + stop(reason) + } + + /* Requires qualified private, because RemoteActor must + * register a termination handler. + */ + private[actors] def onTerminate(f: => Unit) { + scheduler.onTerminate(this) { f } + } + + private[actors] def internalPostStop() = {} + + private[actors] def stop(reason: AnyRef): Unit = { + synchronized { + shouldExit = true + exitReason = reason + // resume this Actor in a way that + // causes it to exit + // (because shouldExit == true) + if (isSuspended) + resumeActor() + else if (waitingFor ne Reactor.waitingForNone) { + waitingFor = Reactor.waitingForNone + // it doesn't matter what partial function we are passing here + val task = new ActorTask(this, null, waitingFor, null) + scheduler execute task + /* Here we should not throw a SuspendActorControl, + since the current method is called from an actor that + is in the process of exiting. + + Therefore, the contract for scheduleActor is that + it never throws a SuspendActorControl. + */ + } + } + } +} + +/** + * Used as the timeout pattern in + * + * receiveWithin and + * + * reactWithin. + * + * @example {{{ + * receiveWithin(500) { + * case (x, y) => ... + * case TIMEOUT => ... + * } + * }}} + * + * @author Philipp Haller + */ +case object TIMEOUT + +/** + * Sent to an actor + * with `trapExit` set to `true` whenever one of its linked actors + * terminates. + * + * @param from the actor that terminated + * @param reason the reason that caused the actor to terminate + */ +case class Exit(from: AbstractActor, reason: AnyRef) + +/** + * Manages control flow of actor executions. + * + * @author Philipp Haller + */ +private[actors] class SuspendActorControl extends ControlThrowable diff --git a/src/actors/scala/actors/InternalReplyReactor.scala b/src/actors/scala/actors/InternalReplyReactor.scala new file mode 100644 index 0000000000..38295138d4 --- /dev/null +++ b/src/actors/scala/actors/InternalReplyReactor.scala @@ -0,0 +1,161 @@ +package scala.actors + +import java.util.{TimerTask} + +/** + * Extends the [[scala.actors.Reactor]] + * trait with methods to reply to the sender of a message. + * Sending a message to a ReplyReactor implicitly + * passes a reference to the sender together with the message. + * + * @author Philipp Haller + * + * @define actor `ReplyReactor` + */ +trait InternalReplyReactor extends Reactor[Any] with ReactorCanReply { + + /* A list of the current senders. The head of the list is + * the sender of the message that was received last. + */ + @volatile + private[actors] var senders: List[OutputChannel[Any]] = List() + + /* This option holds a TimerTask when the actor waits in a + * reactWithin. The TimerTask is cancelled when the actor + * resumes. + * + * guarded by this + */ + private[actors] var onTimeout: Option[TimerTask] = None + + /** + * Returns the $actor which sent the last received message. + */ + protected[actors] def internalSender: OutputChannel[Any] = senders.head + + /** + * Replies with msg to the sender. + */ + protected[actors] def reply(msg: Any) { + internalSender ! msg + } + + override def !(msg: Any) { + send(msg, Actor.rawSelf(scheduler)) + } + + override def forward(msg: Any) { + send(msg, Actor.sender) + } + + private[actors] override def resumeReceiver(item: (Any, OutputChannel[Any]), handler: PartialFunction[Any, Any], onSameThread: Boolean) { + synchronized { + if (!onTimeout.isEmpty) { + onTimeout.get.cancel() + onTimeout = None + } + } + senders = List(item._2) + super.resumeReceiver(item, handler, onSameThread) + } + + private[actors] override def searchMailbox(startMbox: MQueue[Any], + handler: PartialFunction[Any, Any], + resumeOnSameThread: Boolean) { + var tmpMbox = startMbox + var done = false + while (!done) { + val qel = tmpMbox.extractFirst((msg: Any, replyTo: OutputChannel[Any]) => { + senders = List(replyTo) + handler.isDefinedAt(msg) + }) + if (tmpMbox ne mailbox) + tmpMbox.foreach((m, s) => mailbox.append(m, s)) + if (null eq qel) { + synchronized { + // in mean time new stuff might have arrived + if (!sendBuffer.isEmpty) { + tmpMbox = new MQueue[Any]("Temp") + drainSendBuffer(tmpMbox) + // keep going + } else { + waitingFor = handler + // see Reactor.searchMailbox + throw Actor.suspendException + } + } + } else { + resumeReceiver((qel.msg, qel.session), handler, resumeOnSameThread) + done = true + } + } + } + + private[actors] override def makeReaction(fun: () => Unit, handler: PartialFunction[Any, Any], msg: Any): Runnable = + new ReplyReactorTask(this, fun, handler, msg) + + protected[actors] override def react(handler: PartialFunction[Any, Unit]): Nothing = { + assert(Actor.rawSelf(scheduler) == this, "react on channel belonging to other actor") + super.react(handler) + } + + + /** + * Receives a message from this $actor's mailbox within a certain + * time span. + * + * This method never returns. Therefore, the rest of the computation + * has to be contained in the actions of the partial function. + * + * @param msec the time span before timeout + * @param handler a partial function with message patterns and actions + */ + protected[actors] def reactWithin(msec: Long)(handler: PartialFunction[Any, Unit]): Nothing = { + assert(Actor.rawSelf(scheduler) == this, "react on channel belonging to other actor") + + synchronized { drainSendBuffer(mailbox) } + + // first, remove spurious TIMEOUT message from mailbox if any + mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => m == TIMEOUT) + + while (true) { + val qel = mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => { + senders = List(replyTo) + handler isDefinedAt m + }) + if (null eq qel) { + synchronized { + // in mean time new messages might have arrived + if (!sendBuffer.isEmpty) { + drainSendBuffer(mailbox) + // keep going + } else if (msec == 0L) { + // throws Actor.suspendException + resumeReceiver((TIMEOUT, this), handler, false) + } else { + waitingFor = handler + val thisActor = this + onTimeout = Some(new TimerTask { + def run() { thisActor.send(TIMEOUT, thisActor) } + }) + Actor.timer.schedule(onTimeout.get, msec) + throw Actor.suspendException + } + } + } else + resumeReceiver((qel.msg, qel.session), handler, false) + } + throw Actor.suspendException + } + + override def getState: Actor.State.Value = synchronized { + if (waitingFor ne Reactor.waitingForNone) { + if (onTimeout.isEmpty) + Actor.State.Suspended + else + Actor.State.TimedSuspended + } else + _state + } + +} diff --git a/src/actors/scala/actors/OutputChannel.scala b/src/actors/scala/actors/OutputChannel.scala index 089b3d0981..1fba684975 100644 --- a/src/actors/scala/actors/OutputChannel.scala +++ b/src/actors/scala/actors/OutputChannel.scala @@ -43,5 +43,5 @@ trait OutputChannel[-Msg] { /** * Returns the `Actor` that is receiving from this $actor. */ - def receiver: Actor + def receiver: InternalActor } diff --git a/src/actors/scala/actors/ReactChannel.scala b/src/actors/scala/actors/ReactChannel.scala index fccde34272..81a166c1a4 100644 --- a/src/actors/scala/actors/ReactChannel.scala +++ b/src/actors/scala/actors/ReactChannel.scala @@ -12,7 +12,7 @@ package scala.actors /** * @author Philipp Haller */ -private[actors] class ReactChannel[Msg](receiver: ReplyReactor) extends InputChannel[Msg] { +private[actors] class ReactChannel[Msg](receiver: InternalReplyReactor) extends InputChannel[Msg] { private case class SendToReactor(channel: ReactChannel[Msg], msg: Msg) diff --git a/src/actors/scala/actors/Reactor.scala b/src/actors/scala/actors/Reactor.scala index 7d21e9f91e..8fc7578344 100644 --- a/src/actors/scala/actors/Reactor.scala +++ b/src/actors/scala/actors/Reactor.scala @@ -253,7 +253,7 @@ trait Reactor[Msg >: Null] extends OutputChannel[Msg] with Combinators { _state } - implicit def mkBody[A](body: => A) = new Actor.Body[A] { + implicit def mkBody[A](body: => A) = new InternalActor.Body[A] { def andThen[B](other: => B): Unit = Reactor.this.seq(body, other) } diff --git a/src/actors/scala/actors/ReactorCanReply.scala b/src/actors/scala/actors/ReactorCanReply.scala index 68f9999776..dabd0832f0 100644 --- a/src/actors/scala/actors/ReactorCanReply.scala +++ b/src/actors/scala/actors/ReactorCanReply.scala @@ -16,7 +16,7 @@ package scala.actors * @author Philipp Haller */ private[actors] trait ReactorCanReply extends CanReply[Any, Any] { - _: ReplyReactor => + _: InternalReplyReactor => type Future[+P] = scala.actors.Future[P] diff --git a/src/actors/scala/actors/ReplyReactor.scala b/src/actors/scala/actors/ReplyReactor.scala index 0e5ce00c91..0ffbbd3cce 100644 --- a/src/actors/scala/actors/ReplyReactor.scala +++ b/src/actors/scala/actors/ReplyReactor.scala @@ -5,165 +5,12 @@ ** /____/\___/_/ |_/____/_/ | | ** ** |/ ** \* */ - package scala.actors -import java.util.{Timer, TimerTask} - -/** - * Extends the [[scala.actors.Reactor]] trait with methods to reply to the - * sender of a message. - * - * Sending a message to a `ReplyReactor` implicitly passes a reference to - * the sender together with the message. - * - * @author Philipp Haller - * - * @define actor `ReplyReactor` - */ -trait ReplyReactor extends Reactor[Any] with ReactorCanReply { - - /* A list of the current senders. The head of the list is - * the sender of the message that was received last. - */ - @volatile - private[actors] var senders: List[OutputChannel[Any]] = List() - - /* This option holds a TimerTask when the actor waits in a - * reactWithin. The TimerTask is cancelled when the actor - * resumes. - * - * guarded by this - */ - private[actors] var onTimeout: Option[TimerTask] = None - - /** - * Returns the $actor which sent the last received message. - */ - protected[actors] def sender: OutputChannel[Any] = senders.head - - /** - * Replies with `msg` to the sender. - */ - protected[actors] def reply(msg: Any) { - sender ! msg - } - - override def !(msg: Any) { - send(msg, Actor.rawSelf(scheduler)) - } - - override def forward(msg: Any) { - send(msg, Actor.sender) - } - - private[actors] override def resumeReceiver(item: (Any, OutputChannel[Any]), handler: PartialFunction[Any, Any], onSameThread: Boolean) { - synchronized { - if (!onTimeout.isEmpty) { - onTimeout.get.cancel() - onTimeout = None - } - } - senders = List(item._2) - super.resumeReceiver(item, handler, onSameThread) - } - - private[actors] override def searchMailbox(startMbox: MQueue[Any], - handler: PartialFunction[Any, Any], - resumeOnSameThread: Boolean) { - var tmpMbox = startMbox - var done = false - while (!done) { - val qel = tmpMbox.extractFirst((msg: Any, replyTo: OutputChannel[Any]) => { - senders = List(replyTo) - handler.isDefinedAt(msg) - }) - if (tmpMbox ne mailbox) - tmpMbox.foreach((m, s) => mailbox.append(m, s)) - if (null eq qel) { - synchronized { - // in mean time new stuff might have arrived - if (!sendBuffer.isEmpty) { - tmpMbox = new MQueue[Any]("Temp") - drainSendBuffer(tmpMbox) - // keep going - } else { - waitingFor = handler - // see Reactor.searchMailbox - throw Actor.suspendException - } - } - } else { - resumeReceiver((qel.msg, qel.session), handler, resumeOnSameThread) - done = true - } - } - } - - private[actors] override def makeReaction(fun: () => Unit, handler: PartialFunction[Any, Any], msg: Any): Runnable = - new ReplyReactorTask(this, fun, handler, msg) - - protected[actors] override def react(handler: PartialFunction[Any, Unit]): Nothing = { - assert(Actor.rawSelf(scheduler) == this, "react on channel belonging to other actor") - super.react(handler) - } - - /** - * Receives a message from this $actor's mailbox within a certain - * time span. - * - * This method never returns. Therefore, the rest of the computation - * has to be contained in the actions of the partial function. - * - * @param msec the time span before timeout - * @param handler a partial function with message patterns and actions - */ - protected[actors] def reactWithin(msec: Long)(handler: PartialFunction[Any, Unit]): Nothing = { - assert(Actor.rawSelf(scheduler) == this, "react on channel belonging to other actor") - - synchronized { drainSendBuffer(mailbox) } - - // first, remove spurious TIMEOUT message from mailbox if any - mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => m == TIMEOUT) - - while (true) { - val qel = mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => { - senders = List(replyTo) - handler isDefinedAt m - }) - if (null eq qel) { - synchronized { - // in mean time new messages might have arrived - if (!sendBuffer.isEmpty) { - drainSendBuffer(mailbox) - // keep going - } else if (msec == 0L) { - // throws Actor.suspendException - resumeReceiver((TIMEOUT, this), handler, false) - } else { - waitingFor = handler - val thisActor = this - onTimeout = Some(new TimerTask { - def run() { thisActor.send(TIMEOUT, thisActor) } - }) - Actor.timer.schedule(onTimeout.get, msec) - throw Actor.suspendException - } - } - } else - resumeReceiver((qel.msg, qel.session), handler, false) - } - throw Actor.suspendException - } - - override def getState: Actor.State.Value = synchronized { - if (waitingFor ne Reactor.waitingForNone) { - if (onTimeout.isEmpty) - Actor.State.Suspended - else - Actor.State.TimedSuspended - } else - _state - } - +@deprecated("Scala Actors are beeing removed from the standard library. Please refer to the migration guide.", "2.10") +trait ReplyReactor extends InternalReplyReactor { + + protected[actors] def sender: OutputChannel[Any] = super.internalSender + } + diff --git a/src/actors/scala/actors/ReplyReactorTask.scala b/src/actors/scala/actors/ReplyReactorTask.scala index cb63d7e000..d38eb50381 100644 --- a/src/actors/scala/actors/ReplyReactorTask.scala +++ b/src/actors/scala/actors/ReplyReactorTask.scala @@ -17,13 +17,13 @@ package scala.actors * changes to the underlying var invisible.) I can't figure out what's supposed * to happen, so I renamed the constructor parameter to at least be less confusing. */ -private[actors] class ReplyReactorTask(replyReactor: ReplyReactor, +private[actors] class ReplyReactorTask(replyReactor: InternalReplyReactor, fun: () => Unit, handler: PartialFunction[Any, Any], msg: Any) extends ReactorTask(replyReactor, fun, handler, msg) { - var saved: ReplyReactor = _ + var saved: InternalReplyReactor = _ protected override def beginExecution() { saved = Actor.tl.get diff --git a/src/actors/scala/actors/UncaughtException.scala b/src/actors/scala/actors/UncaughtException.scala index 3e6efe3b7c..a3e7f795f1 100644 --- a/src/actors/scala/actors/UncaughtException.scala +++ b/src/actors/scala/actors/UncaughtException.scala @@ -20,7 +20,7 @@ package scala.actors * @author Philipp Haller * @author Erik Engbrecht */ -case class UncaughtException(actor: Actor, +case class UncaughtException(actor: InternalActor, message: Option[Any], sender: Option[OutputChannel[Any]], thread: Thread, -- cgit v1.2.3