summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2009-05-31 13:18:37 +0000
committerPhilipp Haller <hallerp@gmail.com>2009-05-31 13:18:37 +0000
commit63031aa7f0f1f32c7b998d118c832cb181b4e99e (patch)
tree2491eeb242f1a6693a461f97a04ce9c1fc28e0b4
parentd3a5b5b97ba4d55a4f95042dde2f31eaeedf4c10 (diff)
downloadscala-63031aa7f0f1f32c7b998d118c832cb181b4e99e.tar.gz
scala-63031aa7f0f1f32c7b998d118c832cb181b4e99e.tar.bz2
scala-63031aa7f0f1f32c7b998d118c832cb181b4e99e.zip
Renamed OutputChannelActor to Reactor.
Renamed Future.ch to Future.inputChannel. Exceptions are handled properly while matching messages. Tasks that execute actors no longer catch Throwable, but Exception.
-rw-r--r--src/actors/scala/actors/Actor.scala74
-rw-r--r--src/actors/scala/actors/ActorGC.scala16
-rw-r--r--src/actors/scala/actors/ActorTask.scala73
-rw-r--r--src/actors/scala/actors/DelegatingScheduler.scala6
-rw-r--r--src/actors/scala/actors/Future.scala7
-rw-r--r--src/actors/scala/actors/IScheduler.scala6
-rw-r--r--src/actors/scala/actors/LightReaction.scala59
-rw-r--r--src/actors/scala/actors/MessageQueue.scala8
-rw-r--r--src/actors/scala/actors/OutputChannelActor.scala72
-rw-r--r--src/actors/scala/actors/Reaction.scala64
-rw-r--r--src/actors/scala/actors/ReactorTask.scala64
-rw-r--r--src/actors/scala/actors/SchedulerAdapter.scala6
-rw-r--r--src/actors/scala/actors/SimpleExecutorScheduler.scala2
-rw-r--r--src/actors/scala/actors/SingleThreadedScheduler.scala6
-rw-r--r--src/actors/scala/actors/TerminationMonitor.scala8
15 files changed, 267 insertions, 204 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala
index 55cf976c14..dfbd46a1a7 100644
--- a/src/actors/scala/actors/Actor.scala
+++ b/src/actors/scala/actors/Actor.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.ExecutionException
*/
object Actor {
- private[actors] val tl = new ThreadLocal[OutputChannelActor]
+ private[actors] val tl = new ThreadLocal[Reactor]
// timer thread runs as daemon
private[actors] val timer = new Timer(true)
@@ -43,9 +43,9 @@ object Actor {
private[actors] def self(sched: IScheduler): Actor =
rawSelf(sched).asInstanceOf[Actor]
- private[actors] def rawSelf: OutputChannelActor = rawSelf(Scheduler)
+ private[actors] def rawSelf: Reactor = rawSelf(Scheduler)
- private[actors] def rawSelf(sched: IScheduler): OutputChannelActor = {
+ private[actors] def rawSelf(sched: IScheduler): Reactor = {
val s = tl.get
if (s eq null) {
val r = new ActorProxy(currentThread, sched)
@@ -208,7 +208,7 @@ object Actor {
def eventloop(f: PartialFunction[Any, Unit]): Nothing =
rawSelf.react(new RecursiveProxyHandler(rawSelf, f))
- private class RecursiveProxyHandler(a: OutputChannelActor, f: PartialFunction[Any, Unit])
+ private class RecursiveProxyHandler(a: Reactor, f: PartialFunction[Any, Unit])
extends PartialFunction[Any, Unit] {
def isDefinedAt(m: Any): Boolean =
true // events are immediately removed from the mailbox
@@ -373,7 +373,7 @@ object Actor {
* @author Philipp Haller
*/
@serializable
-trait Actor extends OutputChannelActor with AbstractActor {
+trait Actor extends Reactor with AbstractActor {
/* The following two fields are only used when the actor
* suspends by blocking its underlying thread, for example,
@@ -394,7 +394,16 @@ trait Actor extends OutputChannelActor with AbstractActor {
*/
private var onTimeout: Option[TimerTask] = None
- protected[this] override def resumeReceiver(item: (Any, OutputChannel[Any])) {
+ protected[this] override def makeReaction(block: => Unit): Runnable = {
+ if (isSuspended)
+ new Runnable {
+ def run() { block }
+ }
+ else
+ new ActorTask(this, { block })
+ }
+
+ protected[this] override def resumeReceiver(item: (Any, OutputChannel[Any]), onSameThread: Boolean) {
if (!onTimeout.isEmpty) {
onTimeout.get.cancel()
onTimeout = None
@@ -408,7 +417,10 @@ trait Actor extends OutputChannelActor with AbstractActor {
} else {
senders = List(item._2)
// assert continuation != null
- (new Reaction(this, continuation, item._1)).run()
+ if (onSameThread)
+ continuation(item._1)
+ else
+ scheduleActor(null, item._1)
}
}
@@ -423,7 +435,7 @@ trait Actor extends OutputChannelActor with AbstractActor {
synchronized {
if (shouldExit) exit() // links
- drainSendBuffer()
+ drainSendBuffer(mailbox)
}
var done = false
@@ -433,7 +445,7 @@ trait Actor extends OutputChannelActor with AbstractActor {
synchronized {
// in mean time new stuff might have arrived
if (!sendBuffer.isEmpty) {
- drainSendBuffer()
+ drainSendBuffer(mailbox)
// keep going
} else {
waitingFor = f.isDefinedAt
@@ -467,7 +479,7 @@ trait Actor extends OutputChannelActor with AbstractActor {
synchronized {
if (shouldExit) exit() // links
- drainSendBuffer()
+ drainSendBuffer(mailbox)
}
// first, remove spurious TIMEOUT message from mailbox if any
@@ -488,7 +500,7 @@ trait Actor extends OutputChannelActor with AbstractActor {
val todo = synchronized {
// in mean time new stuff might have arrived
if (!sendBuffer.isEmpty) {
- drainSendBuffer()
+ drainSendBuffer(mailbox)
// keep going
() => {}
} else if (msec == 0) {
@@ -498,13 +510,14 @@ trait Actor extends OutputChannelActor with AbstractActor {
waitingFor = f.isDefinedAt
received = None
suspendActorFor(msec)
+ done = true
if (received.isEmpty) {
// actor is not resumed because of new message
// therefore, waitingFor has not been updated, yet.
waitingFor = waitingForNone
- }
- done = true
- receiveTimeout
+ receiveTimeout
+ } else
+ () => {}
}
}
todo()
@@ -533,9 +546,10 @@ trait Actor extends OutputChannelActor with AbstractActor {
assert(Actor.self(scheduler) == this, "react on channel belonging to other actor")
synchronized {
if (shouldExit) exit() // links
- drainSendBuffer()
+ drainSendBuffer(mailbox)
}
- searchMailbox(f)
+ continuation = f
+ searchMailbox(mailbox, f.isDefinedAt, false)
throw Actor.suspendException
}
@@ -554,7 +568,7 @@ trait Actor extends OutputChannelActor with AbstractActor {
synchronized {
if (shouldExit) exit() // links
- drainSendBuffer()
+ drainSendBuffer(mailbox)
}
// first, remove spurious TIMEOUT message from mailbox if any
@@ -575,7 +589,7 @@ trait Actor extends OutputChannelActor with AbstractActor {
val todo = synchronized {
// in mean time new stuff might have arrived
if (!sendBuffer.isEmpty) {
- drainSendBuffer()
+ drainSendBuffer(mailbox)
// keep going
() => {}
} else if (msec == 0) {
@@ -681,16 +695,16 @@ trait Actor extends OutputChannelActor with AbstractActor {
new Future[Any](someChan) {
def apply() =
if (isSet) value.get
- else ch.receive {
+ else inputChannel.receive {
case any => value = Some(any); any
}
def respond(k: Any => Unit): Unit =
if (isSet) k(value.get)
- else ch.react {
+ else inputChannel.react {
case any => value = Some(any); k(any)
}
def isSet = value match {
- case None => ch.receiveWithin(0) {
+ case None => inputChannel.receiveWithin(0) {
case TIMEOUT => false
case any => value = Some(any); true
}
@@ -704,16 +718,16 @@ trait Actor extends OutputChannelActor with AbstractActor {
new Future[A](someChan) {
def apply() =
if (isSet) value.get.asInstanceOf[A]
- else ch.receive {
+ else inputChannel.receive {
case any => value = Some(any); any
}
def respond(k: A => Unit): Unit =
if (isSet) k(value.get.asInstanceOf[A])
- else ch.react {
+ else inputChannel.react {
case any => value = Some(any); k(any)
}
def isSet = value match {
- case None => ch.receiveWithin(0) {
+ case None => inputChannel.receiveWithin(0) {
case TIMEOUT => false
case any => value = Some(any); true
}
@@ -744,13 +758,13 @@ trait Actor extends OutputChannelActor with AbstractActor {
else
throw new ExecutionException(new Exception(reason.toString()))
}
- } else ch.receive(handleReply andThen {(x: Unit) => apply()})
+ } else inputChannel.receive(handleReply andThen {(x: Unit) => apply()})
def respond(k: Any => Unit): Unit =
if (isSet)
apply()
else
- ch.react(handleReply andThen {(x: Unit) => k(apply())})
+ inputChannel.react(handleReply andThen {(x: Unit) => k(apply())})
def isSet = (value match {
case None =>
@@ -760,7 +774,7 @@ trait Actor extends OutputChannelActor with AbstractActor {
}
val whatToDo =
handleTimeout orElse (handleReply andThen {(x: Unit) => true})
- ch.receiveWithin(0)(whatToDo)
+ inputChannel.receiveWithin(0)(whatToDo)
case Some(_) => true
}) || !exitReason.isEmpty
}
@@ -788,16 +802,16 @@ trait Actor extends OutputChannelActor with AbstractActor {
new Future[A](ftch) {
def apply() =
if (isSet) value.get.asInstanceOf[A]
- else ch.receive {
+ else inputChannel.receive {
case any => value = Some(any); value.get.asInstanceOf[A]
}
def respond(k: A => Unit): Unit =
if (isSet) k(value.get.asInstanceOf[A])
- else ch.react {
+ else inputChannel.react {
case any => value = Some(any); k(value.get.asInstanceOf[A])
}
def isSet = value match {
- case None => ch.receiveWithin(0) {
+ case None => inputChannel.receiveWithin(0) {
case TIMEOUT => false
case any => value = Some(any); true
}
diff --git a/src/actors/scala/actors/ActorGC.scala b/src/actors/scala/actors/ActorGC.scala
index a0aba1980e..45ddb8c067 100644
--- a/src/actors/scala/actors/ActorGC.scala
+++ b/src/actors/scala/actors/ActorGC.scala
@@ -27,22 +27,22 @@ import scala.collection.mutable.{HashMap, HashSet}
trait ActorGC extends IScheduler {
private var pendingReactions = 0
- private val termHandlers = new HashMap[OutputChannelActor, () => Unit]
+ private val termHandlers = new HashMap[Reactor, () => Unit]
/** Actors are added to refQ in newActor. */
- private val refQ = new ReferenceQueue[OutputChannelActor]
+ private val refQ = new ReferenceQueue[Reactor]
/**
* This is a set of references to all the actors registered with
* this ActorGC. It is maintained so that the WeakReferences will not be GC'd
* before the actors to which they point.
*/
- private val refSet = new HashSet[Reference[t] forSome { type t <: OutputChannelActor }]
+ private val refSet = new HashSet[Reference[t] forSome { type t <: Reactor }]
/** newActor is invoked whenever a new actor is started. */
- def newActor(a: OutputChannelActor) = synchronized {
+ def newActor(a: Reactor) = synchronized {
// registers a reference to the actor with the ReferenceQueue
- val wr = new WeakReference[OutputChannelActor](a, refQ)
+ val wr = new WeakReference[Reactor](a, refQ)
refSet += wr
pendingReactions += 1
}
@@ -70,13 +70,13 @@ trait ActorGC extends IScheduler {
pendingReactions <= 0
}
- def onTerminate(a: OutputChannelActor)(f: => Unit) = synchronized {
+ def onTerminate(a: Reactor)(f: => Unit) = synchronized {
termHandlers += (a -> (() => f))
}
/* Called only from <code>Reaction</code>.
*/
- def terminated(a: OutputChannelActor) = synchronized {
+ def terminated(a: Reactor) = synchronized {
// execute registered termination handler (if any)
termHandlers.get(a) match {
case Some(handler) =>
@@ -88,7 +88,7 @@ trait ActorGC extends IScheduler {
}
// find the weak reference that points to the terminated actor, if any
- refSet.find((ref: Reference[t] forSome { type t <: OutputChannelActor }) => ref.get() == a) match {
+ refSet.find((ref: Reference[t] forSome { type t <: Reactor }) => ref.get() == a) match {
case Some(r) =>
// invoking clear will not cause r to be enqueued
r.clear()
diff --git a/src/actors/scala/actors/ActorTask.scala b/src/actors/scala/actors/ActorTask.scala
new file mode 100644
index 0000000000..4236785433
--- /dev/null
+++ b/src/actors/scala/actors/ActorTask.scala
@@ -0,0 +1,73 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+// $Id$
+
+
+package scala.actors
+
+import java.lang.Runnable
+
+/** <p>
+ * The class <code>ActorTask</code>...
+ * </p>
+ *
+ * @author Philipp Haller
+ */
+class ActorTask extends Runnable {
+
+ private var a: Actor = null
+ private var fun: () => Unit = null
+
+ def this(a: Actor, block: => Unit) {
+ this()
+ this.a = a
+ this.fun = () => { block }
+ }
+
+ def run() {
+ val saved = Actor.tl.get
+ Actor.tl set a
+ try {
+ if (a.shouldExit) // links
+ a.exit()
+ try {
+ try {
+ fun()
+ } catch {
+ case e: Exception if (a.exceptionHandler.isDefinedAt(e)) =>
+ a.exceptionHandler(e)
+ }
+ } catch {
+ case _: KillActorException =>
+ }
+ a.kill()
+ }
+ catch {
+ case _: SuspendActorException => {
+ // do nothing
+ }
+ case t: Exception => {
+ Debug.info(a+": caught "+t)
+ a.terminated()
+ // links
+ a.synchronized {
+ if (!a.links.isEmpty)
+ a.exitLinked(t)
+ else
+ t.printStackTrace()
+ }
+ }
+ } finally {
+ Actor.tl set saved
+ this.a = null
+ this.fun = null
+ }
+ }
+
+}
diff --git a/src/actors/scala/actors/DelegatingScheduler.scala b/src/actors/scala/actors/DelegatingScheduler.scala
index b5a0db9c1a..39328c8077 100644
--- a/src/actors/scala/actors/DelegatingScheduler.scala
+++ b/src/actors/scala/actors/DelegatingScheduler.scala
@@ -43,9 +43,9 @@ trait DelegatingScheduler extends IScheduler {
}
}
- def newActor(actor: OutputChannelActor) = impl.newActor(actor)
+ def newActor(actor: Reactor) = impl.newActor(actor)
- def terminated(actor: OutputChannelActor) = impl.terminated(actor)
+ def terminated(actor: Reactor) = impl.terminated(actor)
- def onTerminate(actor: OutputChannelActor)(f: => Unit) = impl.onTerminate(actor)(f)
+ def onTerminate(actor: Reactor)(f: => Unit) = impl.onTerminate(actor)(f)
}
diff --git a/src/actors/scala/actors/Future.scala b/src/actors/scala/actors/Future.scala
index f8c2385561..ff938ab1aa 100644
--- a/src/actors/scala/actors/Future.scala
+++ b/src/actors/scala/actors/Future.scala
@@ -25,7 +25,7 @@ package scala.actors
* @author Philipp Haller
* @version 0.9.16
*/
-abstract class Future[+T](val ch: InputChannel[T]) extends Responder[T] with Function0[T] {
+abstract class Future[+T](val inputChannel: InputChannel[T]) extends Responder[T] with Function0[T] {
protected var value: Option[Any] = None
def isSet: Boolean
}
@@ -61,7 +61,8 @@ object Futures {
}
def awaitEither[a, b](ft1: Future[a], ft2: Future[b]): Any = {
- val FutCh1 = ft1.ch; val FutCh2 = ft2.ch
+ val FutCh1 = ft1.inputChannel
+ val FutCh2 = ft2.inputChannel
Actor.receive {
case FutCh1 ! arg1 => arg1
case FutCh2 ! arg2 => arg2
@@ -95,7 +96,7 @@ object Futures {
})
val partFuns = unsetFts.map((p: Pair[Int, Future[Any]]) => {
- val FutCh = p._2.ch
+ val FutCh = p._2.inputChannel
val singleCase: PartialFunction[Any, Pair[Int, Any]] = {
case FutCh ! any => Pair(p._1, any)
}
diff --git a/src/actors/scala/actors/IScheduler.scala b/src/actors/scala/actors/IScheduler.scala
index 42530a8381..49b42d39d6 100644
--- a/src/actors/scala/actors/IScheduler.scala
+++ b/src/actors/scala/actors/IScheduler.scala
@@ -47,14 +47,14 @@ trait IScheduler {
*
* @param a the actor to be registered
*/
- def newActor(a: OutputChannelActor): Unit
+ def newActor(a: Reactor): Unit
/** Unregisters an actor from this scheduler, because it
* has terminated.
*
* @param a the actor to be registered
*/
- def terminated(a: OutputChannelActor): Unit
+ def terminated(a: Reactor): Unit
/** Registers a closure to be executed when the specified
* actor terminates.
@@ -62,5 +62,5 @@ trait IScheduler {
* @param a the actor
* @param f the closure to be registered
*/
- def onTerminate(a: OutputChannelActor)(f: => Unit): Unit
+ def onTerminate(a: Reactor)(f: => Unit): Unit
}
diff --git a/src/actors/scala/actors/LightReaction.scala b/src/actors/scala/actors/LightReaction.scala
index c05d400f8b..ad58edf6d7 100644
--- a/src/actors/scala/actors/LightReaction.scala
+++ b/src/actors/scala/actors/LightReaction.scala
@@ -15,61 +15,20 @@ import java.lang.Runnable
/** <p>
* The abstract class <code>LightReaction</code> associates
- * an instance of an <code>OutputChannelActor</code> with a
+ * an instance of a <code>Reactor</code> with a
* <a class="java/lang/Runnable" href="" target="contentFrame">
* <code>java.lang.Runnable</code></a>.
* </p>
*
* @author Philipp Haller
*/
-class LightReaction extends Runnable {
-
- private[actors] var a: OutputChannelActor = _
- private var f: PartialFunction[Any, Unit] = _
- private var msg: Any = _
-
- def this(a: OutputChannelActor, f: PartialFunction[Any, Unit], msg: Any) = {
- this()
- this.a = a
- this.f = f
- this.msg = msg
- }
-
- def this(a: OutputChannelActor) = this(a, null, null)
-
- def run() {
- val saved = Actor.tl.get
- Actor.tl.set(a)
- try {
- try {
- try {
- if (f == null)
- a.act()
- else
- f(msg)
- } catch {
- case e: Exception if (a.exceptionHandler.isDefinedAt(e)) =>
- a.exceptionHandler(e)
- }
- } catch {
- case _: KillActorException =>
- }
- a.kill()
- }
- catch {
- case _: SuspendActorException => {
- // do nothing (continuation is already saved)
- }
- case t: Throwable => {
- Debug.info(a+": caught "+t)
- a.terminated()
- }
- } finally {
- Actor.tl.set(saved)
- this.a = null
- this.f = null
- this.msg = null
- }
- }
+class LightReaction(a: Reactor, f: PartialFunction[Any, Unit], msg: Any) extends ReactorTask(a, {
+ if (f == null)
+ a.act()
+ else
+ f(msg)
+}) {
+
+ def this(a: Reactor) = this(a, null, null)
}
diff --git a/src/actors/scala/actors/MessageQueue.scala b/src/actors/scala/actors/MessageQueue.scala
index 3b730c6217..19ab904e80 100644
--- a/src/actors/scala/actors/MessageQueue.scala
+++ b/src/actors/scala/actors/MessageQueue.scala
@@ -67,6 +67,14 @@ class MessageQueue {
}
}
+ def foreach(f: (Any, OutputChannel[Any]) => Unit) {
+ var curr = first
+ while (curr != null) {
+ f(curr.msg, curr.session)
+ curr = curr.next
+ }
+ }
+
def foldLeft[B](z: B)(f: (B, Any) => B): B = {
var acc = z
var curr = first
diff --git a/src/actors/scala/actors/OutputChannelActor.scala b/src/actors/scala/actors/OutputChannelActor.scala
index 28f2948680..599dee69db 100644
--- a/src/actors/scala/actors/OutputChannelActor.scala
+++ b/src/actors/scala/actors/OutputChannelActor.scala
@@ -12,7 +12,7 @@ package scala.actors
import scala.collection.mutable.Queue
-trait OutputChannelActor extends OutputChannel[Any] {
+trait Reactor extends OutputChannel[Any] {
@volatile
protected var ignoreSender: Boolean = false
@@ -71,27 +71,11 @@ trait OutputChannelActor extends OutputChannel[Any] {
if (waitingFor ne waitingForNone) {
val savedWaitingFor = waitingFor
waitingFor = waitingForNone
- () => scheduler execute {
- var item: Option[(Any, OutputChannel[Any])] =
- synchronized { Some(msg, replyTo) }
- while (!item.isEmpty) {
- if (savedWaitingFor(item.get._1)) {
- resumeReceiver(item.get)
- item = None
- } else {
- mailbox.append(item.get._1, item.get._2)
-
- item = synchronized {
- if (!sendBuffer.isEmpty)
- Some(sendBuffer.dequeue())
- else {
- waitingFor = savedWaitingFor
- None
- }
- }
- }
- }
- }
+ () => scheduler execute (makeReaction {
+ val startMbox = new MessageQueue
+ synchronized { startMbox.append(msg, replyTo) }
+ searchMailbox(startMbox, savedWaitingFor, true)
+ })
} else {
sendBuffer.enqueue((msg, replyTo))
() => { /* do nothing */ }
@@ -100,11 +84,17 @@ trait OutputChannelActor extends OutputChannel[Any] {
todo()
}
- protected[this] def resumeReceiver(item: (Any, OutputChannel[Any])) {
+ protected[this] def makeReaction(block: => Unit): Runnable =
+ new ReactorTask(this, { block })
+
+ protected[this] def resumeReceiver(item: (Any, OutputChannel[Any]), onSameThread: Boolean) {
if (!ignoreSender)
senders = List(item._2)
// assert continuation != null
- (new LightReaction(this, continuation, item._1)).run()
+ if (onSameThread)
+ continuation(item._1)
+ else
+ scheduleActor(null, item._1)
}
def !(msg: Any) {
@@ -117,32 +107,37 @@ trait OutputChannelActor extends OutputChannel[Any] {
def receiver: Actor = this.asInstanceOf[Actor]
- protected[this] def drainSendBuffer() {
+ protected[this] def drainSendBuffer(mbox: MessageQueue) {
while (!sendBuffer.isEmpty) {
val item = sendBuffer.dequeue()
- mailbox.append(item._1, item._2)
+ mbox.append(item._1, item._2)
}
}
- protected[this] def searchMailbox(f: PartialFunction[Any, Unit]) {
+ // assume continuation has been set
+ protected[this] def searchMailbox(startMbox: MessageQueue,
+ handlesMessage: Any => Boolean,
+ resumeOnSameThread: Boolean) {
+ var tmpMbox = startMbox
var done = false
while (!done) {
- val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m))
+ val qel = tmpMbox.extractFirst((m: Any) => handlesMessage(m))
+ if (tmpMbox ne mailbox)
+ tmpMbox.foreach((m, s) => mailbox.append(m, s))
if (null eq qel) {
synchronized {
// in mean time new stuff might have arrived
if (!sendBuffer.isEmpty) {
- drainSendBuffer()
+ tmpMbox = new MessageQueue
+ drainSendBuffer(tmpMbox)
// keep going
} else {
- waitingFor = f.isDefinedAt
- continuation = f
+ waitingFor = handlesMessage
done = true
}
}
} else {
- senders = List(qel.session)
- scheduleActor(f, qel.msg)
+ resumeReceiver((qel.msg, qel.session), resumeOnSameThread)
done = true
}
}
@@ -150,8 +145,9 @@ trait OutputChannelActor extends OutputChannel[Any] {
protected[actors] def react(f: PartialFunction[Any, Unit]): Nothing = {
assert(Actor.rawSelf(scheduler) == this, "react on channel belonging to other actor")
- synchronized { drainSendBuffer() }
- searchMailbox(f)
+ synchronized { drainSendBuffer(mailbox) }
+ continuation = f
+ searchMailbox(mailbox, f.isDefinedAt, false)
throw Actor.suspendException
}
@@ -170,10 +166,10 @@ trait OutputChannelActor extends OutputChannel[Any] {
msg))
}
- def start(): OutputChannelActor = {
+ def start(): Reactor = {
scheduler execute {
- scheduler.newActor(OutputChannelActor.this)
- (new LightReaction(OutputChannelActor.this)).run()
+ scheduler.newActor(this)
+ (new LightReaction(this)).run()
}
this
}
diff --git a/src/actors/scala/actors/Reaction.scala b/src/actors/scala/actors/Reaction.scala
index 9a4c67b3be..298e8d2fb1 100644
--- a/src/actors/scala/actors/Reaction.scala
+++ b/src/actors/scala/actors/Reaction.scala
@@ -31,65 +31,13 @@ private[actors] class KillActorException extends Throwable {
* @version 0.9.10
* @author Philipp Haller
*/
-class Reaction extends Runnable {
-
- private[actors] var a: Actor = _
- private var f: PartialFunction[Any, Unit] = _
- private var msg: Any = _
-
- def this(a: Actor, f: PartialFunction[Any, Unit], msg: Any) = {
- this()
- this.a = a
- this.f = f
- this.msg = msg
- }
+class Reaction(a: Actor, f: PartialFunction[Any, Unit], msg: Any) extends ActorTask(a, {
+ if (f == null)
+ a.act()
+ else
+ f(msg)
+}) {
def this(a: Actor) = this(a, null, null)
- def run() {
- val saved = Actor.tl.get.asInstanceOf[Actor]
- Actor.tl.set(a)
- try {
- if (a.shouldExit) // links
- a.exit()
- else {
- try {
- try {
- if (f == null)
- a.act()
- else
- f(msg)
- } catch {
- case e: Exception if (a.exceptionHandler.isDefinedAt(e)) =>
- a.exceptionHandler(e)
- }
- } catch {
- case _: KillActorException =>
- }
- a.kill()
- }
- }
- catch {
- case _: SuspendActorException => {
- // do nothing (continuation is already saved)
- }
- case t: Throwable => {
- Debug.info(a+": caught "+t)
- a.terminated()
- // links
- a.synchronized {
- if (!a.links.isEmpty)
- a.exitLinked(t)
- else
- t.printStackTrace()
- }
- }
- } finally {
- Actor.tl.set(saved)
- this.a = null
- this.f = null
- this.msg = null
- }
- }
-
}
diff --git a/src/actors/scala/actors/ReactorTask.scala b/src/actors/scala/actors/ReactorTask.scala
new file mode 100644
index 0000000000..7445097812
--- /dev/null
+++ b/src/actors/scala/actors/ReactorTask.scala
@@ -0,0 +1,64 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+// $Id$
+
+
+package scala.actors
+
+import java.lang.Runnable
+
+/** <p>
+ * The class <code>ReactorTask</code>...
+ * </p>
+ *
+ * @author Philipp Haller
+ */
+class ReactorTask extends Runnable {
+
+ private var reactor: Reactor = null
+ private var fun: () => Unit = null
+
+ def this(reactor: Reactor, block: => Unit) {
+ this()
+ this.reactor = reactor
+ this.fun = () => { block }
+ }
+
+ def run() {
+ val saved = Actor.tl.get
+ Actor.tl set reactor
+ try {
+ try {
+ try {
+ fun()
+ } catch {
+ case e: Exception if (reactor.exceptionHandler.isDefinedAt(e)) =>
+ reactor.exceptionHandler(e)
+ }
+ } catch {
+ case _: KillActorException =>
+ }
+ reactor.kill()
+ }
+ catch {
+ case _: SuspendActorException => {
+ // do nothing (continuation is already saved)
+ }
+ case t: Exception => {
+ Debug.info(reactor+": caught "+t)
+ reactor.terminated()
+ }
+ } finally {
+ Actor.tl set saved
+ this.reactor = null
+ this.fun = null
+ }
+ }
+
+}
diff --git a/src/actors/scala/actors/SchedulerAdapter.scala b/src/actors/scala/actors/SchedulerAdapter.scala
index f47f0cbd0f..749847d6b9 100644
--- a/src/actors/scala/actors/SchedulerAdapter.scala
+++ b/src/actors/scala/actors/SchedulerAdapter.scala
@@ -42,7 +42,7 @@ trait SchedulerAdapter extends IScheduler {
*
* @param a the actor to be registered
*/
- def newActor(a: OutputChannelActor) =
+ def newActor(a: Reactor) =
Scheduler.newActor(a)
/** Unregisters an actor from this scheduler, because it
@@ -50,7 +50,7 @@ trait SchedulerAdapter extends IScheduler {
*
* @param a the actor to be unregistered
*/
- def terminated(a: OutputChannelActor) =
+ def terminated(a: Reactor) =
Scheduler.terminated(a)
/** Registers a closure to be executed when the specified
@@ -59,6 +59,6 @@ trait SchedulerAdapter extends IScheduler {
* @param a the actor
* @param f the closure to be registered
*/
- def onTerminate(a: OutputChannelActor)(f: => Unit) =
+ def onTerminate(a: Reactor)(f: => Unit) =
Scheduler.onTerminate(a)(f)
}
diff --git a/src/actors/scala/actors/SimpleExecutorScheduler.scala b/src/actors/scala/actors/SimpleExecutorScheduler.scala
index dfa1bdaf73..1499aeb7be 100644
--- a/src/actors/scala/actors/SimpleExecutorScheduler.scala
+++ b/src/actors/scala/actors/SimpleExecutorScheduler.scala
@@ -34,7 +34,7 @@ class SimpleExecutorScheduler(protected var executor: ExecutorService,
/* Maintains per actor one closure that is executed
* when the actor terminates.
*/
- protected val termHandlers = new HashMap[OutputChannelActor, () => Unit]
+ protected val termHandlers = new HashMap[Reactor, () => Unit]
private var pendingReactions = 0
diff --git a/src/actors/scala/actors/SingleThreadedScheduler.scala b/src/actors/scala/actors/SingleThreadedScheduler.scala
index c861ae9ea1..ae75478cd7 100644
--- a/src/actors/scala/actors/SingleThreadedScheduler.scala
+++ b/src/actors/scala/actors/SingleThreadedScheduler.scala
@@ -30,9 +30,9 @@ class SingleThreadedScheduler extends IScheduler {
def shutdown() {}
- def newActor(actor: OutputChannelActor) {}
- def terminated(actor: OutputChannelActor) {}
- def onTerminate(actor: OutputChannelActor)(f: => Unit) {}
+ def newActor(actor: Reactor) {}
+ def terminated(actor: Reactor) {}
+ def onTerminate(actor: Reactor)(f: => Unit) {}
def isActive = true
}
diff --git a/src/actors/scala/actors/TerminationMonitor.scala b/src/actors/scala/actors/TerminationMonitor.scala
index b7cb748bf0..ba76d19145 100644
--- a/src/actors/scala/actors/TerminationMonitor.scala
+++ b/src/actors/scala/actors/TerminationMonitor.scala
@@ -15,10 +15,10 @@ import scala.collection.mutable.HashMap
trait TerminationMonitor extends IScheduler {
private var pendingReactions = 0
- private val termHandlers = new HashMap[OutputChannelActor, () => Unit]
+ private val termHandlers = new HashMap[Reactor, () => Unit]
/** newActor is invoked whenever a new actor is started. */
- def newActor(a: OutputChannelActor) = synchronized {
+ def newActor(a: Reactor) = synchronized {
pendingReactions += 1
}
@@ -28,11 +28,11 @@ trait TerminationMonitor extends IScheduler {
* @param a the actor
* @param f the closure to be registered
*/
- def onTerminate(a: OutputChannelActor)(f: => Unit) = synchronized {
+ def onTerminate(a: Reactor)(f: => Unit) = synchronized {
termHandlers += (a -> (() => f))
}
- def terminated(a: OutputChannelActor) = synchronized {
+ def terminated(a: Reactor) = synchronized {
// obtain termination handler (if any)
val todo = synchronized {
termHandlers.get(a) match {