From 63031aa7f0f1f32c7b998d118c832cb181b4e99e Mon Sep 17 00:00:00 2001 From: Philipp Haller Date: Sun, 31 May 2009 13:18:37 +0000 Subject: Renamed OutputChannelActor to Reactor. Renamed Future.ch to Future.inputChannel. Exceptions are handled properly while matching messages. Tasks that execute actors no longer catch Throwable, but Exception. --- src/actors/scala/actors/Actor.scala | 74 +++++++++++++--------- src/actors/scala/actors/ActorGC.scala | 16 ++--- src/actors/scala/actors/ActorTask.scala | 73 +++++++++++++++++++++ src/actors/scala/actors/DelegatingScheduler.scala | 6 +- src/actors/scala/actors/Future.scala | 7 +- src/actors/scala/actors/IScheduler.scala | 6 +- src/actors/scala/actors/LightReaction.scala | 59 +++-------------- src/actors/scala/actors/MessageQueue.scala | 8 +++ src/actors/scala/actors/OutputChannelActor.scala | 72 ++++++++++----------- src/actors/scala/actors/Reaction.scala | 64 ++----------------- src/actors/scala/actors/ReactorTask.scala | 64 +++++++++++++++++++ src/actors/scala/actors/SchedulerAdapter.scala | 6 +- .../scala/actors/SimpleExecutorScheduler.scala | 2 +- .../scala/actors/SingleThreadedScheduler.scala | 6 +- src/actors/scala/actors/TerminationMonitor.scala | 8 +-- 15 files changed, 267 insertions(+), 204 deletions(-) create mode 100644 src/actors/scala/actors/ActorTask.scala create mode 100644 src/actors/scala/actors/ReactorTask.scala diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala index 55cf976c14..dfbd46a1a7 100644 --- a/src/actors/scala/actors/Actor.scala +++ b/src/actors/scala/actors/Actor.scala @@ -24,7 +24,7 @@ import java.util.concurrent.ExecutionException */ object Actor { - private[actors] val tl = new ThreadLocal[OutputChannelActor] + private[actors] val tl = new ThreadLocal[Reactor] // timer thread runs as daemon private[actors] val timer = new Timer(true) @@ -43,9 +43,9 @@ object Actor { private[actors] def self(sched: IScheduler): Actor = rawSelf(sched).asInstanceOf[Actor] - private[actors] def rawSelf: OutputChannelActor = rawSelf(Scheduler) + private[actors] def rawSelf: Reactor = rawSelf(Scheduler) - private[actors] def rawSelf(sched: IScheduler): OutputChannelActor = { + private[actors] def rawSelf(sched: IScheduler): Reactor = { val s = tl.get if (s eq null) { val r = new ActorProxy(currentThread, sched) @@ -208,7 +208,7 @@ object Actor { def eventloop(f: PartialFunction[Any, Unit]): Nothing = rawSelf.react(new RecursiveProxyHandler(rawSelf, f)) - private class RecursiveProxyHandler(a: OutputChannelActor, f: PartialFunction[Any, Unit]) + private class RecursiveProxyHandler(a: Reactor, f: PartialFunction[Any, Unit]) extends PartialFunction[Any, Unit] { def isDefinedAt(m: Any): Boolean = true // events are immediately removed from the mailbox @@ -373,7 +373,7 @@ object Actor { * @author Philipp Haller */ @serializable -trait Actor extends OutputChannelActor with AbstractActor { +trait Actor extends Reactor with AbstractActor { /* The following two fields are only used when the actor * suspends by blocking its underlying thread, for example, @@ -394,7 +394,16 @@ trait Actor extends OutputChannelActor with AbstractActor { */ private var onTimeout: Option[TimerTask] = None - protected[this] override def resumeReceiver(item: (Any, OutputChannel[Any])) { + protected[this] override def makeReaction(block: => Unit): Runnable = { + if (isSuspended) + new Runnable { + def run() { block } + } + else + new ActorTask(this, { block }) + } + + protected[this] override def resumeReceiver(item: (Any, OutputChannel[Any]), onSameThread: Boolean) { if (!onTimeout.isEmpty) { onTimeout.get.cancel() onTimeout = None @@ -408,7 +417,10 @@ trait Actor extends OutputChannelActor with AbstractActor { } else { senders = List(item._2) // assert continuation != null - (new Reaction(this, continuation, item._1)).run() + if (onSameThread) + continuation(item._1) + else + scheduleActor(null, item._1) } } @@ -423,7 +435,7 @@ trait Actor extends OutputChannelActor with AbstractActor { synchronized { if (shouldExit) exit() // links - drainSendBuffer() + drainSendBuffer(mailbox) } var done = false @@ -433,7 +445,7 @@ trait Actor extends OutputChannelActor with AbstractActor { synchronized { // in mean time new stuff might have arrived if (!sendBuffer.isEmpty) { - drainSendBuffer() + drainSendBuffer(mailbox) // keep going } else { waitingFor = f.isDefinedAt @@ -467,7 +479,7 @@ trait Actor extends OutputChannelActor with AbstractActor { synchronized { if (shouldExit) exit() // links - drainSendBuffer() + drainSendBuffer(mailbox) } // first, remove spurious TIMEOUT message from mailbox if any @@ -488,7 +500,7 @@ trait Actor extends OutputChannelActor with AbstractActor { val todo = synchronized { // in mean time new stuff might have arrived if (!sendBuffer.isEmpty) { - drainSendBuffer() + drainSendBuffer(mailbox) // keep going () => {} } else if (msec == 0) { @@ -498,13 +510,14 @@ trait Actor extends OutputChannelActor with AbstractActor { waitingFor = f.isDefinedAt received = None suspendActorFor(msec) + done = true if (received.isEmpty) { // actor is not resumed because of new message // therefore, waitingFor has not been updated, yet. waitingFor = waitingForNone - } - done = true - receiveTimeout + receiveTimeout + } else + () => {} } } todo() @@ -533,9 +546,10 @@ trait Actor extends OutputChannelActor with AbstractActor { assert(Actor.self(scheduler) == this, "react on channel belonging to other actor") synchronized { if (shouldExit) exit() // links - drainSendBuffer() + drainSendBuffer(mailbox) } - searchMailbox(f) + continuation = f + searchMailbox(mailbox, f.isDefinedAt, false) throw Actor.suspendException } @@ -554,7 +568,7 @@ trait Actor extends OutputChannelActor with AbstractActor { synchronized { if (shouldExit) exit() // links - drainSendBuffer() + drainSendBuffer(mailbox) } // first, remove spurious TIMEOUT message from mailbox if any @@ -575,7 +589,7 @@ trait Actor extends OutputChannelActor with AbstractActor { val todo = synchronized { // in mean time new stuff might have arrived if (!sendBuffer.isEmpty) { - drainSendBuffer() + drainSendBuffer(mailbox) // keep going () => {} } else if (msec == 0) { @@ -681,16 +695,16 @@ trait Actor extends OutputChannelActor with AbstractActor { new Future[Any](someChan) { def apply() = if (isSet) value.get - else ch.receive { + else inputChannel.receive { case any => value = Some(any); any } def respond(k: Any => Unit): Unit = if (isSet) k(value.get) - else ch.react { + else inputChannel.react { case any => value = Some(any); k(any) } def isSet = value match { - case None => ch.receiveWithin(0) { + case None => inputChannel.receiveWithin(0) { case TIMEOUT => false case any => value = Some(any); true } @@ -704,16 +718,16 @@ trait Actor extends OutputChannelActor with AbstractActor { new Future[A](someChan) { def apply() = if (isSet) value.get.asInstanceOf[A] - else ch.receive { + else inputChannel.receive { case any => value = Some(any); any } def respond(k: A => Unit): Unit = if (isSet) k(value.get.asInstanceOf[A]) - else ch.react { + else inputChannel.react { case any => value = Some(any); k(any) } def isSet = value match { - case None => ch.receiveWithin(0) { + case None => inputChannel.receiveWithin(0) { case TIMEOUT => false case any => value = Some(any); true } @@ -744,13 +758,13 @@ trait Actor extends OutputChannelActor with AbstractActor { else throw new ExecutionException(new Exception(reason.toString())) } - } else ch.receive(handleReply andThen {(x: Unit) => apply()}) + } else inputChannel.receive(handleReply andThen {(x: Unit) => apply()}) def respond(k: Any => Unit): Unit = if (isSet) apply() else - ch.react(handleReply andThen {(x: Unit) => k(apply())}) + inputChannel.react(handleReply andThen {(x: Unit) => k(apply())}) def isSet = (value match { case None => @@ -760,7 +774,7 @@ trait Actor extends OutputChannelActor with AbstractActor { } val whatToDo = handleTimeout orElse (handleReply andThen {(x: Unit) => true}) - ch.receiveWithin(0)(whatToDo) + inputChannel.receiveWithin(0)(whatToDo) case Some(_) => true }) || !exitReason.isEmpty } @@ -788,16 +802,16 @@ trait Actor extends OutputChannelActor with AbstractActor { new Future[A](ftch) { def apply() = if (isSet) value.get.asInstanceOf[A] - else ch.receive { + else inputChannel.receive { case any => value = Some(any); value.get.asInstanceOf[A] } def respond(k: A => Unit): Unit = if (isSet) k(value.get.asInstanceOf[A]) - else ch.react { + else inputChannel.react { case any => value = Some(any); k(value.get.asInstanceOf[A]) } def isSet = value match { - case None => ch.receiveWithin(0) { + case None => inputChannel.receiveWithin(0) { case TIMEOUT => false case any => value = Some(any); true } diff --git a/src/actors/scala/actors/ActorGC.scala b/src/actors/scala/actors/ActorGC.scala index a0aba1980e..45ddb8c067 100644 --- a/src/actors/scala/actors/ActorGC.scala +++ b/src/actors/scala/actors/ActorGC.scala @@ -27,22 +27,22 @@ import scala.collection.mutable.{HashMap, HashSet} trait ActorGC extends IScheduler { private var pendingReactions = 0 - private val termHandlers = new HashMap[OutputChannelActor, () => Unit] + private val termHandlers = new HashMap[Reactor, () => Unit] /** Actors are added to refQ in newActor. */ - private val refQ = new ReferenceQueue[OutputChannelActor] + private val refQ = new ReferenceQueue[Reactor] /** * This is a set of references to all the actors registered with * this ActorGC. It is maintained so that the WeakReferences will not be GC'd * before the actors to which they point. */ - private val refSet = new HashSet[Reference[t] forSome { type t <: OutputChannelActor }] + private val refSet = new HashSet[Reference[t] forSome { type t <: Reactor }] /** newActor is invoked whenever a new actor is started. */ - def newActor(a: OutputChannelActor) = synchronized { + def newActor(a: Reactor) = synchronized { // registers a reference to the actor with the ReferenceQueue - val wr = new WeakReference[OutputChannelActor](a, refQ) + val wr = new WeakReference[Reactor](a, refQ) refSet += wr pendingReactions += 1 } @@ -70,13 +70,13 @@ trait ActorGC extends IScheduler { pendingReactions <= 0 } - def onTerminate(a: OutputChannelActor)(f: => Unit) = synchronized { + def onTerminate(a: Reactor)(f: => Unit) = synchronized { termHandlers += (a -> (() => f)) } /* Called only from Reaction. */ - def terminated(a: OutputChannelActor) = synchronized { + def terminated(a: Reactor) = synchronized { // execute registered termination handler (if any) termHandlers.get(a) match { case Some(handler) => @@ -88,7 +88,7 @@ trait ActorGC extends IScheduler { } // find the weak reference that points to the terminated actor, if any - refSet.find((ref: Reference[t] forSome { type t <: OutputChannelActor }) => ref.get() == a) match { + refSet.find((ref: Reference[t] forSome { type t <: Reactor }) => ref.get() == a) match { case Some(r) => // invoking clear will not cause r to be enqueued r.clear() diff --git a/src/actors/scala/actors/ActorTask.scala b/src/actors/scala/actors/ActorTask.scala new file mode 100644 index 0000000000..4236785433 --- /dev/null +++ b/src/actors/scala/actors/ActorTask.scala @@ -0,0 +1,73 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +// $Id$ + + +package scala.actors + +import java.lang.Runnable + +/**

+ * The class ActorTask... + *

+ * + * @author Philipp Haller + */ +class ActorTask extends Runnable { + + private var a: Actor = null + private var fun: () => Unit = null + + def this(a: Actor, block: => Unit) { + this() + this.a = a + this.fun = () => { block } + } + + def run() { + val saved = Actor.tl.get + Actor.tl set a + try { + if (a.shouldExit) // links + a.exit() + try { + try { + fun() + } catch { + case e: Exception if (a.exceptionHandler.isDefinedAt(e)) => + a.exceptionHandler(e) + } + } catch { + case _: KillActorException => + } + a.kill() + } + catch { + case _: SuspendActorException => { + // do nothing + } + case t: Exception => { + Debug.info(a+": caught "+t) + a.terminated() + // links + a.synchronized { + if (!a.links.isEmpty) + a.exitLinked(t) + else + t.printStackTrace() + } + } + } finally { + Actor.tl set saved + this.a = null + this.fun = null + } + } + +} diff --git a/src/actors/scala/actors/DelegatingScheduler.scala b/src/actors/scala/actors/DelegatingScheduler.scala index b5a0db9c1a..39328c8077 100644 --- a/src/actors/scala/actors/DelegatingScheduler.scala +++ b/src/actors/scala/actors/DelegatingScheduler.scala @@ -43,9 +43,9 @@ trait DelegatingScheduler extends IScheduler { } } - def newActor(actor: OutputChannelActor) = impl.newActor(actor) + def newActor(actor: Reactor) = impl.newActor(actor) - def terminated(actor: OutputChannelActor) = impl.terminated(actor) + def terminated(actor: Reactor) = impl.terminated(actor) - def onTerminate(actor: OutputChannelActor)(f: => Unit) = impl.onTerminate(actor)(f) + def onTerminate(actor: Reactor)(f: => Unit) = impl.onTerminate(actor)(f) } diff --git a/src/actors/scala/actors/Future.scala b/src/actors/scala/actors/Future.scala index f8c2385561..ff938ab1aa 100644 --- a/src/actors/scala/actors/Future.scala +++ b/src/actors/scala/actors/Future.scala @@ -25,7 +25,7 @@ package scala.actors * @author Philipp Haller * @version 0.9.16 */ -abstract class Future[+T](val ch: InputChannel[T]) extends Responder[T] with Function0[T] { +abstract class Future[+T](val inputChannel: InputChannel[T]) extends Responder[T] with Function0[T] { protected var value: Option[Any] = None def isSet: Boolean } @@ -61,7 +61,8 @@ object Futures { } def awaitEither[a, b](ft1: Future[a], ft2: Future[b]): Any = { - val FutCh1 = ft1.ch; val FutCh2 = ft2.ch + val FutCh1 = ft1.inputChannel + val FutCh2 = ft2.inputChannel Actor.receive { case FutCh1 ! arg1 => arg1 case FutCh2 ! arg2 => arg2 @@ -95,7 +96,7 @@ object Futures { }) val partFuns = unsetFts.map((p: Pair[Int, Future[Any]]) => { - val FutCh = p._2.ch + val FutCh = p._2.inputChannel val singleCase: PartialFunction[Any, Pair[Int, Any]] = { case FutCh ! any => Pair(p._1, any) } diff --git a/src/actors/scala/actors/IScheduler.scala b/src/actors/scala/actors/IScheduler.scala index 42530a8381..49b42d39d6 100644 --- a/src/actors/scala/actors/IScheduler.scala +++ b/src/actors/scala/actors/IScheduler.scala @@ -47,14 +47,14 @@ trait IScheduler { * * @param a the actor to be registered */ - def newActor(a: OutputChannelActor): Unit + def newActor(a: Reactor): Unit /** Unregisters an actor from this scheduler, because it * has terminated. * * @param a the actor to be registered */ - def terminated(a: OutputChannelActor): Unit + def terminated(a: Reactor): Unit /** Registers a closure to be executed when the specified * actor terminates. @@ -62,5 +62,5 @@ trait IScheduler { * @param a the actor * @param f the closure to be registered */ - def onTerminate(a: OutputChannelActor)(f: => Unit): Unit + def onTerminate(a: Reactor)(f: => Unit): Unit } diff --git a/src/actors/scala/actors/LightReaction.scala b/src/actors/scala/actors/LightReaction.scala index c05d400f8b..ad58edf6d7 100644 --- a/src/actors/scala/actors/LightReaction.scala +++ b/src/actors/scala/actors/LightReaction.scala @@ -15,61 +15,20 @@ import java.lang.Runnable /**

* The abstract class LightReaction associates - * an instance of an OutputChannelActor with a + * an instance of a Reactor with a * * java.lang.Runnable. *

* * @author Philipp Haller */ -class LightReaction extends Runnable { - - private[actors] var a: OutputChannelActor = _ - private var f: PartialFunction[Any, Unit] = _ - private var msg: Any = _ - - def this(a: OutputChannelActor, f: PartialFunction[Any, Unit], msg: Any) = { - this() - this.a = a - this.f = f - this.msg = msg - } - - def this(a: OutputChannelActor) = this(a, null, null) - - def run() { - val saved = Actor.tl.get - Actor.tl.set(a) - try { - try { - try { - if (f == null) - a.act() - else - f(msg) - } catch { - case e: Exception if (a.exceptionHandler.isDefinedAt(e)) => - a.exceptionHandler(e) - } - } catch { - case _: KillActorException => - } - a.kill() - } - catch { - case _: SuspendActorException => { - // do nothing (continuation is already saved) - } - case t: Throwable => { - Debug.info(a+": caught "+t) - a.terminated() - } - } finally { - Actor.tl.set(saved) - this.a = null - this.f = null - this.msg = null - } - } +class LightReaction(a: Reactor, f: PartialFunction[Any, Unit], msg: Any) extends ReactorTask(a, { + if (f == null) + a.act() + else + f(msg) +}) { + + def this(a: Reactor) = this(a, null, null) } diff --git a/src/actors/scala/actors/MessageQueue.scala b/src/actors/scala/actors/MessageQueue.scala index 3b730c6217..19ab904e80 100644 --- a/src/actors/scala/actors/MessageQueue.scala +++ b/src/actors/scala/actors/MessageQueue.scala @@ -67,6 +67,14 @@ class MessageQueue { } } + def foreach(f: (Any, OutputChannel[Any]) => Unit) { + var curr = first + while (curr != null) { + f(curr.msg, curr.session) + curr = curr.next + } + } + def foldLeft[B](z: B)(f: (B, Any) => B): B = { var acc = z var curr = first diff --git a/src/actors/scala/actors/OutputChannelActor.scala b/src/actors/scala/actors/OutputChannelActor.scala index 28f2948680..599dee69db 100644 --- a/src/actors/scala/actors/OutputChannelActor.scala +++ b/src/actors/scala/actors/OutputChannelActor.scala @@ -12,7 +12,7 @@ package scala.actors import scala.collection.mutable.Queue -trait OutputChannelActor extends OutputChannel[Any] { +trait Reactor extends OutputChannel[Any] { @volatile protected var ignoreSender: Boolean = false @@ -71,27 +71,11 @@ trait OutputChannelActor extends OutputChannel[Any] { if (waitingFor ne waitingForNone) { val savedWaitingFor = waitingFor waitingFor = waitingForNone - () => scheduler execute { - var item: Option[(Any, OutputChannel[Any])] = - synchronized { Some(msg, replyTo) } - while (!item.isEmpty) { - if (savedWaitingFor(item.get._1)) { - resumeReceiver(item.get) - item = None - } else { - mailbox.append(item.get._1, item.get._2) - - item = synchronized { - if (!sendBuffer.isEmpty) - Some(sendBuffer.dequeue()) - else { - waitingFor = savedWaitingFor - None - } - } - } - } - } + () => scheduler execute (makeReaction { + val startMbox = new MessageQueue + synchronized { startMbox.append(msg, replyTo) } + searchMailbox(startMbox, savedWaitingFor, true) + }) } else { sendBuffer.enqueue((msg, replyTo)) () => { /* do nothing */ } @@ -100,11 +84,17 @@ trait OutputChannelActor extends OutputChannel[Any] { todo() } - protected[this] def resumeReceiver(item: (Any, OutputChannel[Any])) { + protected[this] def makeReaction(block: => Unit): Runnable = + new ReactorTask(this, { block }) + + protected[this] def resumeReceiver(item: (Any, OutputChannel[Any]), onSameThread: Boolean) { if (!ignoreSender) senders = List(item._2) // assert continuation != null - (new LightReaction(this, continuation, item._1)).run() + if (onSameThread) + continuation(item._1) + else + scheduleActor(null, item._1) } def !(msg: Any) { @@ -117,32 +107,37 @@ trait OutputChannelActor extends OutputChannel[Any] { def receiver: Actor = this.asInstanceOf[Actor] - protected[this] def drainSendBuffer() { + protected[this] def drainSendBuffer(mbox: MessageQueue) { while (!sendBuffer.isEmpty) { val item = sendBuffer.dequeue() - mailbox.append(item._1, item._2) + mbox.append(item._1, item._2) } } - protected[this] def searchMailbox(f: PartialFunction[Any, Unit]) { + // assume continuation has been set + protected[this] def searchMailbox(startMbox: MessageQueue, + handlesMessage: Any => Boolean, + resumeOnSameThread: Boolean) { + var tmpMbox = startMbox var done = false while (!done) { - val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m)) + val qel = tmpMbox.extractFirst((m: Any) => handlesMessage(m)) + if (tmpMbox ne mailbox) + tmpMbox.foreach((m, s) => mailbox.append(m, s)) if (null eq qel) { synchronized { // in mean time new stuff might have arrived if (!sendBuffer.isEmpty) { - drainSendBuffer() + tmpMbox = new MessageQueue + drainSendBuffer(tmpMbox) // keep going } else { - waitingFor = f.isDefinedAt - continuation = f + waitingFor = handlesMessage done = true } } } else { - senders = List(qel.session) - scheduleActor(f, qel.msg) + resumeReceiver((qel.msg, qel.session), resumeOnSameThread) done = true } } @@ -150,8 +145,9 @@ trait OutputChannelActor extends OutputChannel[Any] { protected[actors] def react(f: PartialFunction[Any, Unit]): Nothing = { assert(Actor.rawSelf(scheduler) == this, "react on channel belonging to other actor") - synchronized { drainSendBuffer() } - searchMailbox(f) + synchronized { drainSendBuffer(mailbox) } + continuation = f + searchMailbox(mailbox, f.isDefinedAt, false) throw Actor.suspendException } @@ -170,10 +166,10 @@ trait OutputChannelActor extends OutputChannel[Any] { msg)) } - def start(): OutputChannelActor = { + def start(): Reactor = { scheduler execute { - scheduler.newActor(OutputChannelActor.this) - (new LightReaction(OutputChannelActor.this)).run() + scheduler.newActor(this) + (new LightReaction(this)).run() } this } diff --git a/src/actors/scala/actors/Reaction.scala b/src/actors/scala/actors/Reaction.scala index 9a4c67b3be..298e8d2fb1 100644 --- a/src/actors/scala/actors/Reaction.scala +++ b/src/actors/scala/actors/Reaction.scala @@ -31,65 +31,13 @@ private[actors] class KillActorException extends Throwable { * @version 0.9.10 * @author Philipp Haller */ -class Reaction extends Runnable { - - private[actors] var a: Actor = _ - private var f: PartialFunction[Any, Unit] = _ - private var msg: Any = _ - - def this(a: Actor, f: PartialFunction[Any, Unit], msg: Any) = { - this() - this.a = a - this.f = f - this.msg = msg - } +class Reaction(a: Actor, f: PartialFunction[Any, Unit], msg: Any) extends ActorTask(a, { + if (f == null) + a.act() + else + f(msg) +}) { def this(a: Actor) = this(a, null, null) - def run() { - val saved = Actor.tl.get.asInstanceOf[Actor] - Actor.tl.set(a) - try { - if (a.shouldExit) // links - a.exit() - else { - try { - try { - if (f == null) - a.act() - else - f(msg) - } catch { - case e: Exception if (a.exceptionHandler.isDefinedAt(e)) => - a.exceptionHandler(e) - } - } catch { - case _: KillActorException => - } - a.kill() - } - } - catch { - case _: SuspendActorException => { - // do nothing (continuation is already saved) - } - case t: Throwable => { - Debug.info(a+": caught "+t) - a.terminated() - // links - a.synchronized { - if (!a.links.isEmpty) - a.exitLinked(t) - else - t.printStackTrace() - } - } - } finally { - Actor.tl.set(saved) - this.a = null - this.f = null - this.msg = null - } - } - } diff --git a/src/actors/scala/actors/ReactorTask.scala b/src/actors/scala/actors/ReactorTask.scala new file mode 100644 index 0000000000..7445097812 --- /dev/null +++ b/src/actors/scala/actors/ReactorTask.scala @@ -0,0 +1,64 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +// $Id$ + + +package scala.actors + +import java.lang.Runnable + +/**

+ * The class ReactorTask... + *

+ * + * @author Philipp Haller + */ +class ReactorTask extends Runnable { + + private var reactor: Reactor = null + private var fun: () => Unit = null + + def this(reactor: Reactor, block: => Unit) { + this() + this.reactor = reactor + this.fun = () => { block } + } + + def run() { + val saved = Actor.tl.get + Actor.tl set reactor + try { + try { + try { + fun() + } catch { + case e: Exception if (reactor.exceptionHandler.isDefinedAt(e)) => + reactor.exceptionHandler(e) + } + } catch { + case _: KillActorException => + } + reactor.kill() + } + catch { + case _: SuspendActorException => { + // do nothing (continuation is already saved) + } + case t: Exception => { + Debug.info(reactor+": caught "+t) + reactor.terminated() + } + } finally { + Actor.tl set saved + this.reactor = null + this.fun = null + } + } + +} diff --git a/src/actors/scala/actors/SchedulerAdapter.scala b/src/actors/scala/actors/SchedulerAdapter.scala index f47f0cbd0f..749847d6b9 100644 --- a/src/actors/scala/actors/SchedulerAdapter.scala +++ b/src/actors/scala/actors/SchedulerAdapter.scala @@ -42,7 +42,7 @@ trait SchedulerAdapter extends IScheduler { * * @param a the actor to be registered */ - def newActor(a: OutputChannelActor) = + def newActor(a: Reactor) = Scheduler.newActor(a) /** Unregisters an actor from this scheduler, because it @@ -50,7 +50,7 @@ trait SchedulerAdapter extends IScheduler { * * @param a the actor to be unregistered */ - def terminated(a: OutputChannelActor) = + def terminated(a: Reactor) = Scheduler.terminated(a) /** Registers a closure to be executed when the specified @@ -59,6 +59,6 @@ trait SchedulerAdapter extends IScheduler { * @param a the actor * @param f the closure to be registered */ - def onTerminate(a: OutputChannelActor)(f: => Unit) = + def onTerminate(a: Reactor)(f: => Unit) = Scheduler.onTerminate(a)(f) } diff --git a/src/actors/scala/actors/SimpleExecutorScheduler.scala b/src/actors/scala/actors/SimpleExecutorScheduler.scala index dfa1bdaf73..1499aeb7be 100644 --- a/src/actors/scala/actors/SimpleExecutorScheduler.scala +++ b/src/actors/scala/actors/SimpleExecutorScheduler.scala @@ -34,7 +34,7 @@ class SimpleExecutorScheduler(protected var executor: ExecutorService, /* Maintains per actor one closure that is executed * when the actor terminates. */ - protected val termHandlers = new HashMap[OutputChannelActor, () => Unit] + protected val termHandlers = new HashMap[Reactor, () => Unit] private var pendingReactions = 0 diff --git a/src/actors/scala/actors/SingleThreadedScheduler.scala b/src/actors/scala/actors/SingleThreadedScheduler.scala index c861ae9ea1..ae75478cd7 100644 --- a/src/actors/scala/actors/SingleThreadedScheduler.scala +++ b/src/actors/scala/actors/SingleThreadedScheduler.scala @@ -30,9 +30,9 @@ class SingleThreadedScheduler extends IScheduler { def shutdown() {} - def newActor(actor: OutputChannelActor) {} - def terminated(actor: OutputChannelActor) {} - def onTerminate(actor: OutputChannelActor)(f: => Unit) {} + def newActor(actor: Reactor) {} + def terminated(actor: Reactor) {} + def onTerminate(actor: Reactor)(f: => Unit) {} def isActive = true } diff --git a/src/actors/scala/actors/TerminationMonitor.scala b/src/actors/scala/actors/TerminationMonitor.scala index b7cb748bf0..ba76d19145 100644 --- a/src/actors/scala/actors/TerminationMonitor.scala +++ b/src/actors/scala/actors/TerminationMonitor.scala @@ -15,10 +15,10 @@ import scala.collection.mutable.HashMap trait TerminationMonitor extends IScheduler { private var pendingReactions = 0 - private val termHandlers = new HashMap[OutputChannelActor, () => Unit] + private val termHandlers = new HashMap[Reactor, () => Unit] /** newActor is invoked whenever a new actor is started. */ - def newActor(a: OutputChannelActor) = synchronized { + def newActor(a: Reactor) = synchronized { pendingReactions += 1 } @@ -28,11 +28,11 @@ trait TerminationMonitor extends IScheduler { * @param a the actor * @param f the closure to be registered */ - def onTerminate(a: OutputChannelActor)(f: => Unit) = synchronized { + def onTerminate(a: Reactor)(f: => Unit) = synchronized { termHandlers += (a -> (() => f)) } - def terminated(a: OutputChannelActor) = synchronized { + def terminated(a: Reactor) = synchronized { // obtain termination handler (if any) val todo = synchronized { termHandlers.get(a) match { -- cgit v1.2.3