From 09e9e88d00262d69ee629426d7da441234b695fd Mon Sep 17 00:00:00 2001 From: Philipp Haller Date: Thu, 28 May 2009 09:14:39 +0000 Subject: Now for real: added OutputChannelActor. --- src/actors/scala/actors/Actor.scala | 2 +- src/actors/scala/actors/LightReaction.scala | 75 +++++++++++ src/actors/scala/actors/OutputChannelActor.scala | 153 +++++++++++++++++++++++ 3 files changed, 229 insertions(+), 1 deletion(-) create mode 100644 src/actors/scala/actors/LightReaction.scala create mode 100644 src/actors/scala/actors/OutputChannelActor.scala (limited to 'src') diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala index a0c9e1d888..ddeeca8c9f 100644 --- a/src/actors/scala/actors/Actor.scala +++ b/src/actors/scala/actors/Actor.scala @@ -780,7 +780,7 @@ trait Actor extends OutputChannelActor with AbstractActor { } // guarded by lock of this - protected override def scheduleActor(f: PartialFunction[Any, Unit], msg: Any) = + private def scheduleActor(f: PartialFunction[Any, Unit], msg: Any) = if ((f eq null) && (continuation eq null)) { // do nothing (timeout is handled instead) } diff --git a/src/actors/scala/actors/LightReaction.scala b/src/actors/scala/actors/LightReaction.scala new file mode 100644 index 0000000000..c05d400f8b --- /dev/null +++ b/src/actors/scala/actors/LightReaction.scala @@ -0,0 +1,75 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +// $Id: Reaction.scala 17862 2009-05-27 23:27:05Z phaller $ + + +package scala.actors + +import java.lang.Runnable + +/**

+ * The abstract class LightReaction associates + * an instance of an OutputChannelActor with a + * + * java.lang.Runnable. + *

+ * + * @author Philipp Haller + */ +class LightReaction extends Runnable { + + private[actors] var a: OutputChannelActor = _ + private var f: PartialFunction[Any, Unit] = _ + private var msg: Any = _ + + def this(a: OutputChannelActor, f: PartialFunction[Any, Unit], msg: Any) = { + this() + this.a = a + this.f = f + this.msg = msg + } + + def this(a: OutputChannelActor) = this(a, null, null) + + def run() { + val saved = Actor.tl.get + Actor.tl.set(a) + try { + try { + try { + if (f == null) + a.act() + else + f(msg) + } catch { + case e: Exception if (a.exceptionHandler.isDefinedAt(e)) => + a.exceptionHandler(e) + } + } catch { + case _: KillActorException => + } + a.kill() + } + catch { + case _: SuspendActorException => { + // do nothing (continuation is already saved) + } + case t: Throwable => { + Debug.info(a+": caught "+t) + a.terminated() + } + } finally { + Actor.tl.set(saved) + this.a = null + this.f = null + this.msg = null + } + } + +} diff --git a/src/actors/scala/actors/OutputChannelActor.scala b/src/actors/scala/actors/OutputChannelActor.scala new file mode 100644 index 0000000000..dc90898ccd --- /dev/null +++ b/src/actors/scala/actors/OutputChannelActor.scala @@ -0,0 +1,153 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +// $Id$ + +package scala.actors + +trait OutputChannelActor extends OutputChannel[Any] { + + protected var ignoreSender: Boolean = false + + /* The actor's mailbox. */ + protected val mailbox = new MessageQueue + + /* A list of the current senders. The head of the list is + * the sender of the message that was received last. + */ + protected var senders: List[OutputChannel[Any]] = + if (ignoreSender) List(null) + else Nil + + /* If the actor waits in a react, continuation holds the + * message handler that react was called with. + */ + protected var continuation: PartialFunction[Any, Unit] = null + + /* Whenever this Actor executes on some thread, waitingFor is + * guaranteed to be equal to waitingForNone. + * + * In other words, whenever waitingFor is not equal to + * waitingForNone, this Actor is guaranteed not to execute on some + * thread. + */ + protected val waitingForNone = (m: Any) => false + protected var waitingFor: Any => Boolean = waitingForNone + + /** + * The behavior of an actor is specified by implementing this + * abstract method. + */ + def act(): Unit + + protected[actors] def exceptionHandler: PartialFunction[Exception, Unit] = Map() + + protected[actors] def scheduler: IScheduler = + Scheduler + + def mailboxSize: Int = synchronized { + mailbox.size + } + + def send(msg: Any, replyTo: OutputChannel[Any]) = synchronized { + if (waitingFor(msg)) { + waitingFor = waitingForNone + + if (!ignoreSender) + senders = List(replyTo) + + // assert continuation != null + scheduler.execute(new LightReaction(this, continuation, msg)) + } else { + mailbox.append(msg, replyTo) + } + } + + def !(msg: Any) { + send(msg, if (ignoreSender) null else Actor.rawSelf(scheduler)) + } + + def forward(msg: Any) { + send(msg, if (ignoreSender) null else Actor.sender) + } + + def receiver: Actor = this.asInstanceOf[Actor] + + def react(f: PartialFunction[Any, Unit]): Nothing = { + assert(Actor.rawSelf(scheduler) == this, "react on channel belonging to other actor") + this.synchronized { + val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m)) + if (null eq qel) { + waitingFor = f.isDefinedAt + continuation = f + } else { + if (!ignoreSender) + senders = List(qel.session) + scheduleActor(f, qel.msg) + } + throw new SuspendActorException + } + } + + def sender: OutputChannel[Any] = senders.head + + /** + * Replies with msg to the sender. + */ + def reply(msg: Any) { + sender ! msg + } + + private def scheduleActor(f: PartialFunction[Any, Unit], msg: Any) = { + val task = new LightReaction(this, + if (f eq null) continuation else f, + msg) + scheduler execute task + } + + def start(): OutputChannelActor = synchronized { + scheduler execute { + scheduler.newActor(OutputChannelActor.this) + (new LightReaction(OutputChannelActor.this)).run() + } + this + } + + /* This closure is used to implement control-flow operations + * built on top of `seq`. Note that the only invocation of + * `kill` is supposed to be inside `Reaction.run`. + */ + private[actors] var kill: () => Unit = + () => { exit() } + + private[actors] def seq[a, b](first: => a, next: => b): Unit = { + val s = Actor.rawSelf(scheduler) + val killNext = s.kill + s.kill = () => { + s.kill = killNext + + // to avoid stack overflow: + // instead of directly executing `next`, + // schedule as continuation + scheduleActor({ case _ => next }, 1) + throw new SuspendActorException + } + first + throw new KillActorException + } + + protected[actors] def exit(): Nothing = { + terminated() + throw new SuspendActorException + } + + protected[actors] def terminated() { + scheduler.terminated(this) + } + +} -- cgit v1.2.3