From ce0d59af0470a445170c74c161b8f971b6f2a5b3 Mon Sep 17 00:00:00 2001 From: Philipp Haller Date: Thu, 28 May 2009 08:56:39 +0000 Subject: Added OutputChannelActor. --- src/actors/scala/actors/Actor.scala | 126 ++++----------------- src/actors/scala/actors/ActorGC.scala | 16 +-- src/actors/scala/actors/DelegatingScheduler.scala | 6 +- src/actors/scala/actors/IScheduler.scala | 6 +- .../scala/actors/SimpleExecutorScheduler.scala | 8 +- .../scala/actors/SingleThreadedScheduler.scala | 6 +- 6 files changed, 45 insertions(+), 123 deletions(-) (limited to 'src/actors') diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala index 60c3c9a676..a0c9e1d888 100644 --- a/src/actors/scala/actors/Actor.scala +++ b/src/actors/scala/actors/Actor.scala @@ -27,7 +27,7 @@ import java.util.concurrent.ExecutionException */ object Actor { - private[actors] val tl = new ThreadLocal[Actor] + private[actors] val tl = new ThreadLocal[OutputChannelActor] // timer thread runs as daemon private[actors] val timer = new Timer(true) @@ -41,7 +41,12 @@ object Actor { */ def self: Actor = self(Scheduler) - private[actors] def self(sched: IScheduler): Actor = { + private[actors] def self(sched: IScheduler): Actor = + rawSelf(sched).asInstanceOf[Actor] + + private[actors] def rawSelf: OutputChannelActor = rawSelf(Scheduler) + + private[actors] def rawSelf(sched: IScheduler): OutputChannelActor = { val s = tl.get if (s eq null) { val r = new ActorProxy(currentThread, sched) @@ -185,7 +190,7 @@ object Actor { * @return this function never returns */ def react(f: PartialFunction[Any, Unit]): Nothing = - self.react(f) + rawSelf.react(f) /** * Lightweight variant of receiveWithin. @@ -202,9 +207,9 @@ object Actor { self.reactWithin(msec)(f) def eventloop(f: PartialFunction[Any, Unit]): Nothing = - self.react(new RecursiveProxyHandler(self, f)) + rawSelf.react(new RecursiveProxyHandler(rawSelf, f)) - private class RecursiveProxyHandler(a: Actor, f: PartialFunction[Any, Unit]) + private class RecursiveProxyHandler(a: OutputChannelActor, f: PartialFunction[Any, Unit]) extends PartialFunction[Any, Unit] { def isDefinedAt(m: Any): Boolean = true // events are immediately removed from the mailbox @@ -217,26 +222,26 @@ object Actor { /** * Returns the actor which sent the last received message. */ - def sender: OutputChannel[Any] = self.sender + def sender: OutputChannel[Any] = rawSelf.sender /** * Send msg to the actor waiting in a call to * !?. */ - def reply(msg: Any): Unit = self.reply(msg) + def reply(msg: Any): Unit = rawSelf.reply(msg) /** * Send () to the actor waiting in a call to * !?. */ - def reply(): Unit = self.reply(()) + def reply(): Unit = rawSelf.reply(()) /** * Returns the number of messages in self's mailbox * * @return the number of messages in self's mailbox */ - def mailboxSize: Int = self.mailboxSize + def mailboxSize: Int = rawSelf.mailboxSize /** *

@@ -265,7 +270,7 @@ object Actor { } implicit def mkBody[a](body: => a) = new Body[a] { - def andThen[b](other: => b): Unit = self.seq(body, other) + def andThen[b](other: => b): Unit = rawSelf.seq(body, other) } /** @@ -369,30 +374,7 @@ object Actor { * @author Philipp Haller */ @serializable -trait Actor extends AbstractActor { - - /* The actor's mailbox. */ - private val mailbox = new MessageQueue - - /* A list of the current senders. The head of the list is - * the sender of the message that was received last. - */ - private var senders: List[OutputChannel[Any]] = Nil - - /* If the actor waits in a react, continuation holds the - * message handler that react was called with. - */ - private var continuation: PartialFunction[Any, Unit] = null - - /* Whenever this Actor executes on some thread, waitingFor is - * guaranteed to be equal to waitingForNone. - * - * In other words, whenever waitingFor is not equal to - * waitingForNone, this Actor is guaranteed not to execute on some - * thread. - */ - private val waitingForNone = (m: Any) => false - private var waitingFor: Any => Boolean = waitingForNone +trait Actor extends OutputChannelActor with AbstractActor { /* The following two fields are only used when the actor * suspends by blocking its underlying thread, for example, @@ -412,20 +394,6 @@ trait Actor extends AbstractActor { */ private var onTimeout: Option[TimerTask] = None - protected[actors] def exceptionHandler: PartialFunction[Exception, Unit] = Map() - - protected[actors] def scheduler: IScheduler = - Scheduler - - /** - * Returns the number of messages in this actor's mailbox - * - * @return the number of messages in this actor's mailbox - */ - def mailboxSize: Int = synchronized { - mailbox.size - } - /** * Sends msg to this actor (asynchronous) supplying * explicit reply destination. @@ -433,7 +401,7 @@ trait Actor extends AbstractActor { * @param msg the message to send * @param replyTo the reply destination */ - def send(msg: Any, replyTo: OutputChannel[Any]) = synchronized { + override def send(msg: Any, replyTo: OutputChannel[Any]) = synchronized { if (waitingFor(msg)) { waitingFor = waitingForNone @@ -541,7 +509,7 @@ trait Actor extends AbstractActor { * * @param f a partial function with message patterns and actions */ - def react(f: PartialFunction[Any, Unit]): Nothing = { + override def react(f: PartialFunction[Any, Unit]): Nothing = { assert(Actor.self(scheduler) == this, "react on channel belonging to other actor") this.synchronized { if (shouldExit) exit() // links @@ -602,25 +570,17 @@ trait Actor extends AbstractActor { } } - /** - * The behavior of an actor is specified by implementing this - * abstract method. Note that the preferred way to create actors - * is through the actor method - * defined in object Actor. - */ - def act(): Unit - /** * Sends msg to this actor (asynchronous). */ - def !(msg: Any) { - send(msg, Actor.self(scheduler)) + override def !(msg: Any) { + send(msg, Actor.rawSelf(scheduler)) } /** * Forwards msg to this actor (asynchronous). */ - def forward(msg: Any) { + override def forward(msg: Any) { send(msg, Actor.sender) } @@ -812,13 +772,6 @@ trait Actor extends AbstractActor { } } - /** - * Replies with msg to the sender. - */ - def reply(msg: Any) { - sender ! msg - } - /** * Receives the next message from this actor's mailbox. */ @@ -826,12 +779,8 @@ trait Actor extends AbstractActor { case x => x } - def sender: OutputChannel[Any] = senders.head - - def receiver: Actor = this - // guarded by lock of this - protected def scheduleActor(f: PartialFunction[Any, Unit], msg: Any) = + protected override def scheduleActor(f: PartialFunction[Any, Unit], msg: Any) = if ((f eq null) && (continuation eq null)) { // do nothing (timeout is handled instead) } @@ -887,7 +836,7 @@ trait Actor extends AbstractActor { /** * Starts this actor. */ - def start(): Actor = synchronized { + override def start(): Actor = synchronized { // Reset various flags. // // Note that we do *not* reset `trapExit`. The reason is that @@ -906,29 +855,6 @@ trait Actor extends AbstractActor { this } - /* This closure is used to implement control-flow operations - * built on top of `seq`. Note that the only invocation of - * `kill` is supposed to be inside `Reaction.run`. - */ - private[actors] var kill: () => Unit = - () => { exit() } - - private def seq[a, b](first: => a, next: => b): Unit = { - val s = Actor.self(scheduler) - val killNext = s.kill - s.kill = () => { - s.kill = killNext - - // to avoid stack overflow: - // instead of directly executing `next`, - // schedule as continuation - scheduleActor({ case _ => next }, 1) - throw new SuspendActorException - } - first - throw new KillActorException - } - private[actors] var links: List[AbstractActor] = Nil /** @@ -1007,7 +933,7 @@ trait Actor extends AbstractActor { /** * Terminates with exit reason 'normal. */ - protected[actors] def exit(): Nothing = { + protected[actors] override def exit(): Nothing = { // links if (!links.isEmpty) exitLinked() @@ -1054,10 +980,6 @@ trait Actor extends AbstractActor { } } - private[actors] def terminated() { - scheduler.terminated(this) - } - /* Requires qualified private, because RemoteActor must * register a termination handler. */ diff --git a/src/actors/scala/actors/ActorGC.scala b/src/actors/scala/actors/ActorGC.scala index a5160d14e2..a0aba1980e 100644 --- a/src/actors/scala/actors/ActorGC.scala +++ b/src/actors/scala/actors/ActorGC.scala @@ -27,22 +27,22 @@ import scala.collection.mutable.{HashMap, HashSet} trait ActorGC extends IScheduler { private var pendingReactions = 0 - private val termHandlers = new HashMap[Actor, () => Unit] + private val termHandlers = new HashMap[OutputChannelActor, () => Unit] /** Actors are added to refQ in newActor. */ - private val refQ = new ReferenceQueue[Actor] + private val refQ = new ReferenceQueue[OutputChannelActor] /** * This is a set of references to all the actors registered with * this ActorGC. It is maintained so that the WeakReferences will not be GC'd * before the actors to which they point. */ - private val refSet = new HashSet[Reference[t] forSome { type t <: Actor }] + private val refSet = new HashSet[Reference[t] forSome { type t <: OutputChannelActor }] /** newActor is invoked whenever a new actor is started. */ - def newActor(a: Actor) = synchronized { + def newActor(a: OutputChannelActor) = synchronized { // registers a reference to the actor with the ReferenceQueue - val wr = new WeakReference[Actor](a, refQ) + val wr = new WeakReference[OutputChannelActor](a, refQ) refSet += wr pendingReactions += 1 } @@ -70,13 +70,13 @@ trait ActorGC extends IScheduler { pendingReactions <= 0 } - def onTerminate(a: Actor)(f: => Unit) = synchronized { + def onTerminate(a: OutputChannelActor)(f: => Unit) = synchronized { termHandlers += (a -> (() => f)) } /* Called only from Reaction. */ - def terminated(a: Actor) = synchronized { + def terminated(a: OutputChannelActor) = synchronized { // execute registered termination handler (if any) termHandlers.get(a) match { case Some(handler) => @@ -88,7 +88,7 @@ trait ActorGC extends IScheduler { } // find the weak reference that points to the terminated actor, if any - refSet.find((ref: Reference[t] forSome { type t <: Actor }) => ref.get() == a) match { + refSet.find((ref: Reference[t] forSome { type t <: OutputChannelActor }) => ref.get() == a) match { case Some(r) => // invoking clear will not cause r to be enqueued r.clear() diff --git a/src/actors/scala/actors/DelegatingScheduler.scala b/src/actors/scala/actors/DelegatingScheduler.scala index 782ef0da42..b5a0db9c1a 100644 --- a/src/actors/scala/actors/DelegatingScheduler.scala +++ b/src/actors/scala/actors/DelegatingScheduler.scala @@ -43,9 +43,9 @@ trait DelegatingScheduler extends IScheduler { } } - def newActor(actor: Actor) = impl.newActor(actor) + def newActor(actor: OutputChannelActor) = impl.newActor(actor) - def terminated(actor: Actor) = impl.terminated(actor) + def terminated(actor: OutputChannelActor) = impl.terminated(actor) - def onTerminate(actor: Actor)(f: => Unit) = impl.onTerminate(actor)(f) + def onTerminate(actor: OutputChannelActor)(f: => Unit) = impl.onTerminate(actor)(f) } diff --git a/src/actors/scala/actors/IScheduler.scala b/src/actors/scala/actors/IScheduler.scala index dc2e6961fa..42530a8381 100644 --- a/src/actors/scala/actors/IScheduler.scala +++ b/src/actors/scala/actors/IScheduler.scala @@ -47,14 +47,14 @@ trait IScheduler { * * @param a the actor to be registered */ - def newActor(a: Actor): Unit + def newActor(a: OutputChannelActor): Unit /** Unregisters an actor from this scheduler, because it * has terminated. * * @param a the actor to be registered */ - def terminated(a: Actor): Unit + def terminated(a: OutputChannelActor): Unit /** Registers a closure to be executed when the specified * actor terminates. @@ -62,5 +62,5 @@ trait IScheduler { * @param a the actor * @param f the closure to be registered */ - def onTerminate(a: Actor)(f: => Unit): Unit + def onTerminate(a: OutputChannelActor)(f: => Unit): Unit } diff --git a/src/actors/scala/actors/SimpleExecutorScheduler.scala b/src/actors/scala/actors/SimpleExecutorScheduler.scala index 4c07ec5c5a..e83cb5cfb9 100644 --- a/src/actors/scala/actors/SimpleExecutorScheduler.scala +++ b/src/actors/scala/actors/SimpleExecutorScheduler.scala @@ -29,7 +29,7 @@ class SimpleExecutorScheduler(protected var executor: ExecutorService) extends I /* Maintains at most one closure per actor that is executed * when the actor terminates. */ - protected val termHandlers = new HashMap[Actor, () => Unit] + protected val termHandlers = new HashMap[OutputChannelActor, () => Unit] /* This constructor (and the var above) is currently only used to work * around a bug in scaladoc, which cannot deal with early initializers @@ -76,9 +76,9 @@ class SimpleExecutorScheduler(protected var executor: ExecutorService) extends I def isActive = (executor ne null) && !executor.isShutdown() - def newActor(a: Actor) {} + def newActor(a: OutputChannelActor) {} - def terminated(a: Actor) { + def terminated(a: OutputChannelActor) { // obtain termination handler (if any) val todo = synchronized { termHandlers.get(a) match { @@ -100,7 +100,7 @@ class SimpleExecutorScheduler(protected var executor: ExecutorService) extends I * @param a the actor * @param f the closure to be registered */ - def onTerminate(a: Actor)(block: => Unit) = synchronized { + def onTerminate(a: OutputChannelActor)(block: => Unit) = synchronized { termHandlers += (a -> (() => block)) } } diff --git a/src/actors/scala/actors/SingleThreadedScheduler.scala b/src/actors/scala/actors/SingleThreadedScheduler.scala index fa41d02736..c861ae9ea1 100644 --- a/src/actors/scala/actors/SingleThreadedScheduler.scala +++ b/src/actors/scala/actors/SingleThreadedScheduler.scala @@ -30,9 +30,9 @@ class SingleThreadedScheduler extends IScheduler { def shutdown() {} - def newActor(actor: Actor) {} - def terminated(actor: Actor) {} - def onTerminate(actor: Actor)(f: => Unit) {} + def newActor(actor: OutputChannelActor) {} + def terminated(actor: OutputChannelActor) {} + def onTerminate(actor: OutputChannelActor)(f: => Unit) {} def isActive = true } -- cgit v1.2.3