summaryrefslogtreecommitdiff
path: root/src/actors
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2009-05-28 08:56:39 +0000
committerPhilipp Haller <hallerp@gmail.com>2009-05-28 08:56:39 +0000
commitce0d59af0470a445170c74c161b8f971b6f2a5b3 (patch)
tree45d4dfda93f7950e79398af6ff4997c6ec19942a /src/actors
parentea519396afa0ba0e6195f46841febba2bb66ecc7 (diff)
downloadscala-ce0d59af0470a445170c74c161b8f971b6f2a5b3.tar.gz
scala-ce0d59af0470a445170c74c161b8f971b6f2a5b3.tar.bz2
scala-ce0d59af0470a445170c74c161b8f971b6f2a5b3.zip
Added OutputChannelActor.
Diffstat (limited to 'src/actors')
-rw-r--r--src/actors/scala/actors/Actor.scala126
-rw-r--r--src/actors/scala/actors/ActorGC.scala16
-rw-r--r--src/actors/scala/actors/DelegatingScheduler.scala6
-rw-r--r--src/actors/scala/actors/IScheduler.scala6
-rw-r--r--src/actors/scala/actors/SimpleExecutorScheduler.scala8
-rw-r--r--src/actors/scala/actors/SingleThreadedScheduler.scala6
6 files changed, 45 insertions, 123 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala
index 60c3c9a676..a0c9e1d888 100644
--- a/src/actors/scala/actors/Actor.scala
+++ b/src/actors/scala/actors/Actor.scala
@@ -27,7 +27,7 @@ import java.util.concurrent.ExecutionException
*/
object Actor {
- private[actors] val tl = new ThreadLocal[Actor]
+ private[actors] val tl = new ThreadLocal[OutputChannelActor]
// timer thread runs as daemon
private[actors] val timer = new Timer(true)
@@ -41,7 +41,12 @@ object Actor {
*/
def self: Actor = self(Scheduler)
- private[actors] def self(sched: IScheduler): Actor = {
+ private[actors] def self(sched: IScheduler): Actor =
+ rawSelf(sched).asInstanceOf[Actor]
+
+ private[actors] def rawSelf: OutputChannelActor = rawSelf(Scheduler)
+
+ private[actors] def rawSelf(sched: IScheduler): OutputChannelActor = {
val s = tl.get
if (s eq null) {
val r = new ActorProxy(currentThread, sched)
@@ -185,7 +190,7 @@ object Actor {
* @return this function never returns
*/
def react(f: PartialFunction[Any, Unit]): Nothing =
- self.react(f)
+ rawSelf.react(f)
/**
* Lightweight variant of <code>receiveWithin</code>.
@@ -202,9 +207,9 @@ object Actor {
self.reactWithin(msec)(f)
def eventloop(f: PartialFunction[Any, Unit]): Nothing =
- self.react(new RecursiveProxyHandler(self, f))
+ rawSelf.react(new RecursiveProxyHandler(rawSelf, f))
- private class RecursiveProxyHandler(a: Actor, f: PartialFunction[Any, Unit])
+ private class RecursiveProxyHandler(a: OutputChannelActor, f: PartialFunction[Any, Unit])
extends PartialFunction[Any, Unit] {
def isDefinedAt(m: Any): Boolean =
true // events are immediately removed from the mailbox
@@ -217,26 +222,26 @@ object Actor {
/**
* Returns the actor which sent the last received message.
*/
- def sender: OutputChannel[Any] = self.sender
+ def sender: OutputChannel[Any] = rawSelf.sender
/**
* Send <code>msg</code> to the actor waiting in a call to
* <code>!?</code>.
*/
- def reply(msg: Any): Unit = self.reply(msg)
+ def reply(msg: Any): Unit = rawSelf.reply(msg)
/**
* Send <code>()</code> to the actor waiting in a call to
* <code>!?</code>.
*/
- def reply(): Unit = self.reply(())
+ def reply(): Unit = rawSelf.reply(())
/**
* Returns the number of messages in <code>self</code>'s mailbox
*
* @return the number of messages in <code>self</code>'s mailbox
*/
- def mailboxSize: Int = self.mailboxSize
+ def mailboxSize: Int = rawSelf.mailboxSize
/**
* <p>
@@ -265,7 +270,7 @@ object Actor {
}
implicit def mkBody[a](body: => a) = new Body[a] {
- def andThen[b](other: => b): Unit = self.seq(body, other)
+ def andThen[b](other: => b): Unit = rawSelf.seq(body, other)
}
/**
@@ -369,30 +374,7 @@ object Actor {
* @author Philipp Haller
*/
@serializable
-trait Actor extends AbstractActor {
-
- /* The actor's mailbox. */
- private val mailbox = new MessageQueue
-
- /* A list of the current senders. The head of the list is
- * the sender of the message that was received last.
- */
- private var senders: List[OutputChannel[Any]] = Nil
-
- /* If the actor waits in a react, continuation holds the
- * message handler that react was called with.
- */
- private var continuation: PartialFunction[Any, Unit] = null
-
- /* Whenever this Actor executes on some thread, waitingFor is
- * guaranteed to be equal to waitingForNone.
- *
- * In other words, whenever waitingFor is not equal to
- * waitingForNone, this Actor is guaranteed not to execute on some
- * thread.
- */
- private val waitingForNone = (m: Any) => false
- private var waitingFor: Any => Boolean = waitingForNone
+trait Actor extends OutputChannelActor with AbstractActor {
/* The following two fields are only used when the actor
* suspends by blocking its underlying thread, for example,
@@ -412,20 +394,6 @@ trait Actor extends AbstractActor {
*/
private var onTimeout: Option[TimerTask] = None
- protected[actors] def exceptionHandler: PartialFunction[Exception, Unit] = Map()
-
- protected[actors] def scheduler: IScheduler =
- Scheduler
-
- /**
- * Returns the number of messages in this actor's mailbox
- *
- * @return the number of messages in this actor's mailbox
- */
- def mailboxSize: Int = synchronized {
- mailbox.size
- }
-
/**
* Sends <code>msg</code> to this actor (asynchronous) supplying
* explicit reply destination.
@@ -433,7 +401,7 @@ trait Actor extends AbstractActor {
* @param msg the message to send
* @param replyTo the reply destination
*/
- def send(msg: Any, replyTo: OutputChannel[Any]) = synchronized {
+ override def send(msg: Any, replyTo: OutputChannel[Any]) = synchronized {
if (waitingFor(msg)) {
waitingFor = waitingForNone
@@ -541,7 +509,7 @@ trait Actor extends AbstractActor {
*
* @param f a partial function with message patterns and actions
*/
- def react(f: PartialFunction[Any, Unit]): Nothing = {
+ override def react(f: PartialFunction[Any, Unit]): Nothing = {
assert(Actor.self(scheduler) == this, "react on channel belonging to other actor")
this.synchronized {
if (shouldExit) exit() // links
@@ -603,24 +571,16 @@ trait Actor extends AbstractActor {
}
/**
- * The behavior of an actor is specified by implementing this
- * abstract method. Note that the preferred way to create actors
- * is through the <code>actor</code> method
- * defined in object <code>Actor</code>.
- */
- def act(): Unit
-
- /**
* Sends <code>msg</code> to this actor (asynchronous).
*/
- def !(msg: Any) {
- send(msg, Actor.self(scheduler))
+ override def !(msg: Any) {
+ send(msg, Actor.rawSelf(scheduler))
}
/**
* Forwards <code>msg</code> to this actor (asynchronous).
*/
- def forward(msg: Any) {
+ override def forward(msg: Any) {
send(msg, Actor.sender)
}
@@ -813,25 +773,14 @@ trait Actor extends AbstractActor {
}
/**
- * Replies with <code>msg</code> to the sender.
- */
- def reply(msg: Any) {
- sender ! msg
- }
-
- /**
* Receives the next message from this actor's mailbox.
*/
def ? : Any = receive {
case x => x
}
- def sender: OutputChannel[Any] = senders.head
-
- def receiver: Actor = this
-
// guarded by lock of this
- protected def scheduleActor(f: PartialFunction[Any, Unit], msg: Any) =
+ protected override def scheduleActor(f: PartialFunction[Any, Unit], msg: Any) =
if ((f eq null) && (continuation eq null)) {
// do nothing (timeout is handled instead)
}
@@ -887,7 +836,7 @@ trait Actor extends AbstractActor {
/**
* Starts this actor.
*/
- def start(): Actor = synchronized {
+ override def start(): Actor = synchronized {
// Reset various flags.
//
// Note that we do *not* reset `trapExit`. The reason is that
@@ -906,29 +855,6 @@ trait Actor extends AbstractActor {
this
}
- /* This closure is used to implement control-flow operations
- * built on top of `seq`. Note that the only invocation of
- * `kill` is supposed to be inside `Reaction.run`.
- */
- private[actors] var kill: () => Unit =
- () => { exit() }
-
- private def seq[a, b](first: => a, next: => b): Unit = {
- val s = Actor.self(scheduler)
- val killNext = s.kill
- s.kill = () => {
- s.kill = killNext
-
- // to avoid stack overflow:
- // instead of directly executing `next`,
- // schedule as continuation
- scheduleActor({ case _ => next }, 1)
- throw new SuspendActorException
- }
- first
- throw new KillActorException
- }
-
private[actors] var links: List[AbstractActor] = Nil
/**
@@ -1007,7 +933,7 @@ trait Actor extends AbstractActor {
/**
* Terminates with exit reason <code>'normal</code>.
*/
- protected[actors] def exit(): Nothing = {
+ protected[actors] override def exit(): Nothing = {
// links
if (!links.isEmpty)
exitLinked()
@@ -1054,10 +980,6 @@ trait Actor extends AbstractActor {
}
}
- private[actors] def terminated() {
- scheduler.terminated(this)
- }
-
/* Requires qualified private, because <code>RemoteActor</code> must
* register a termination handler.
*/
diff --git a/src/actors/scala/actors/ActorGC.scala b/src/actors/scala/actors/ActorGC.scala
index a5160d14e2..a0aba1980e 100644
--- a/src/actors/scala/actors/ActorGC.scala
+++ b/src/actors/scala/actors/ActorGC.scala
@@ -27,22 +27,22 @@ import scala.collection.mutable.{HashMap, HashSet}
trait ActorGC extends IScheduler {
private var pendingReactions = 0
- private val termHandlers = new HashMap[Actor, () => Unit]
+ private val termHandlers = new HashMap[OutputChannelActor, () => Unit]
/** Actors are added to refQ in newActor. */
- private val refQ = new ReferenceQueue[Actor]
+ private val refQ = new ReferenceQueue[OutputChannelActor]
/**
* This is a set of references to all the actors registered with
* this ActorGC. It is maintained so that the WeakReferences will not be GC'd
* before the actors to which they point.
*/
- private val refSet = new HashSet[Reference[t] forSome { type t <: Actor }]
+ private val refSet = new HashSet[Reference[t] forSome { type t <: OutputChannelActor }]
/** newActor is invoked whenever a new actor is started. */
- def newActor(a: Actor) = synchronized {
+ def newActor(a: OutputChannelActor) = synchronized {
// registers a reference to the actor with the ReferenceQueue
- val wr = new WeakReference[Actor](a, refQ)
+ val wr = new WeakReference[OutputChannelActor](a, refQ)
refSet += wr
pendingReactions += 1
}
@@ -70,13 +70,13 @@ trait ActorGC extends IScheduler {
pendingReactions <= 0
}
- def onTerminate(a: Actor)(f: => Unit) = synchronized {
+ def onTerminate(a: OutputChannelActor)(f: => Unit) = synchronized {
termHandlers += (a -> (() => f))
}
/* Called only from <code>Reaction</code>.
*/
- def terminated(a: Actor) = synchronized {
+ def terminated(a: OutputChannelActor) = synchronized {
// execute registered termination handler (if any)
termHandlers.get(a) match {
case Some(handler) =>
@@ -88,7 +88,7 @@ trait ActorGC extends IScheduler {
}
// find the weak reference that points to the terminated actor, if any
- refSet.find((ref: Reference[t] forSome { type t <: Actor }) => ref.get() == a) match {
+ refSet.find((ref: Reference[t] forSome { type t <: OutputChannelActor }) => ref.get() == a) match {
case Some(r) =>
// invoking clear will not cause r to be enqueued
r.clear()
diff --git a/src/actors/scala/actors/DelegatingScheduler.scala b/src/actors/scala/actors/DelegatingScheduler.scala
index 782ef0da42..b5a0db9c1a 100644
--- a/src/actors/scala/actors/DelegatingScheduler.scala
+++ b/src/actors/scala/actors/DelegatingScheduler.scala
@@ -43,9 +43,9 @@ trait DelegatingScheduler extends IScheduler {
}
}
- def newActor(actor: Actor) = impl.newActor(actor)
+ def newActor(actor: OutputChannelActor) = impl.newActor(actor)
- def terminated(actor: Actor) = impl.terminated(actor)
+ def terminated(actor: OutputChannelActor) = impl.terminated(actor)
- def onTerminate(actor: Actor)(f: => Unit) = impl.onTerminate(actor)(f)
+ def onTerminate(actor: OutputChannelActor)(f: => Unit) = impl.onTerminate(actor)(f)
}
diff --git a/src/actors/scala/actors/IScheduler.scala b/src/actors/scala/actors/IScheduler.scala
index dc2e6961fa..42530a8381 100644
--- a/src/actors/scala/actors/IScheduler.scala
+++ b/src/actors/scala/actors/IScheduler.scala
@@ -47,14 +47,14 @@ trait IScheduler {
*
* @param a the actor to be registered
*/
- def newActor(a: Actor): Unit
+ def newActor(a: OutputChannelActor): Unit
/** Unregisters an actor from this scheduler, because it
* has terminated.
*
* @param a the actor to be registered
*/
- def terminated(a: Actor): Unit
+ def terminated(a: OutputChannelActor): Unit
/** Registers a closure to be executed when the specified
* actor terminates.
@@ -62,5 +62,5 @@ trait IScheduler {
* @param a the actor
* @param f the closure to be registered
*/
- def onTerminate(a: Actor)(f: => Unit): Unit
+ def onTerminate(a: OutputChannelActor)(f: => Unit): Unit
}
diff --git a/src/actors/scala/actors/SimpleExecutorScheduler.scala b/src/actors/scala/actors/SimpleExecutorScheduler.scala
index 4c07ec5c5a..e83cb5cfb9 100644
--- a/src/actors/scala/actors/SimpleExecutorScheduler.scala
+++ b/src/actors/scala/actors/SimpleExecutorScheduler.scala
@@ -29,7 +29,7 @@ class SimpleExecutorScheduler(protected var executor: ExecutorService) extends I
/* Maintains at most one closure per actor that is executed
* when the actor terminates.
*/
- protected val termHandlers = new HashMap[Actor, () => Unit]
+ protected val termHandlers = new HashMap[OutputChannelActor, () => Unit]
/* This constructor (and the var above) is currently only used to work
* around a bug in scaladoc, which cannot deal with early initializers
@@ -76,9 +76,9 @@ class SimpleExecutorScheduler(protected var executor: ExecutorService) extends I
def isActive =
(executor ne null) && !executor.isShutdown()
- def newActor(a: Actor) {}
+ def newActor(a: OutputChannelActor) {}
- def terminated(a: Actor) {
+ def terminated(a: OutputChannelActor) {
// obtain termination handler (if any)
val todo = synchronized {
termHandlers.get(a) match {
@@ -100,7 +100,7 @@ class SimpleExecutorScheduler(protected var executor: ExecutorService) extends I
* @param a the actor
* @param f the closure to be registered
*/
- def onTerminate(a: Actor)(block: => Unit) = synchronized {
+ def onTerminate(a: OutputChannelActor)(block: => Unit) = synchronized {
termHandlers += (a -> (() => block))
}
}
diff --git a/src/actors/scala/actors/SingleThreadedScheduler.scala b/src/actors/scala/actors/SingleThreadedScheduler.scala
index fa41d02736..c861ae9ea1 100644
--- a/src/actors/scala/actors/SingleThreadedScheduler.scala
+++ b/src/actors/scala/actors/SingleThreadedScheduler.scala
@@ -30,9 +30,9 @@ class SingleThreadedScheduler extends IScheduler {
def shutdown() {}
- def newActor(actor: Actor) {}
- def terminated(actor: Actor) {}
- def onTerminate(actor: Actor)(f: => Unit) {}
+ def newActor(actor: OutputChannelActor) {}
+ def terminated(actor: OutputChannelActor) {}
+ def onTerminate(actor: OutputChannelActor)(f: => Unit) {}
def isActive = true
}