summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2009-05-28 09:14:39 +0000
committerPhilipp Haller <hallerp@gmail.com>2009-05-28 09:14:39 +0000
commit09e9e88d00262d69ee629426d7da441234b695fd (patch)
treeffce6581e4cec3855550e60cc1df7a2413161e15
parentc8ef95caee7a4cb0123cd73de1ca3e79459da160 (diff)
downloadscala-09e9e88d00262d69ee629426d7da441234b695fd.tar.gz
scala-09e9e88d00262d69ee629426d7da441234b695fd.tar.bz2
scala-09e9e88d00262d69ee629426d7da441234b695fd.zip
Now for real: added OutputChannelActor.
-rw-r--r--src/actors/scala/actors/Actor.scala2
-rw-r--r--src/actors/scala/actors/LightReaction.scala75
-rw-r--r--src/actors/scala/actors/OutputChannelActor.scala153
3 files changed, 229 insertions, 1 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala
index a0c9e1d888..ddeeca8c9f 100644
--- a/src/actors/scala/actors/Actor.scala
+++ b/src/actors/scala/actors/Actor.scala
@@ -780,7 +780,7 @@ trait Actor extends OutputChannelActor with AbstractActor {
}
// guarded by lock of this
- protected override def scheduleActor(f: PartialFunction[Any, Unit], msg: Any) =
+ private def scheduleActor(f: PartialFunction[Any, Unit], msg: Any) =
if ((f eq null) && (continuation eq null)) {
// do nothing (timeout is handled instead)
}
diff --git a/src/actors/scala/actors/LightReaction.scala b/src/actors/scala/actors/LightReaction.scala
new file mode 100644
index 0000000000..c05d400f8b
--- /dev/null
+++ b/src/actors/scala/actors/LightReaction.scala
@@ -0,0 +1,75 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+// $Id: Reaction.scala 17862 2009-05-27 23:27:05Z phaller $
+
+
+package scala.actors
+
+import java.lang.Runnable
+
+/** <p>
+ * The abstract class <code>LightReaction</code> associates
+ * an instance of an <code>OutputChannelActor</code> with a
+ * <a class="java/lang/Runnable" href="" target="contentFrame">
+ * <code>java.lang.Runnable</code></a>.
+ * </p>
+ *
+ * @author Philipp Haller
+ */
+class LightReaction extends Runnable {
+
+ private[actors] var a: OutputChannelActor = _
+ private var f: PartialFunction[Any, Unit] = _
+ private var msg: Any = _
+
+ def this(a: OutputChannelActor, f: PartialFunction[Any, Unit], msg: Any) = {
+ this()
+ this.a = a
+ this.f = f
+ this.msg = msg
+ }
+
+ def this(a: OutputChannelActor) = this(a, null, null)
+
+ def run() {
+ val saved = Actor.tl.get
+ Actor.tl.set(a)
+ try {
+ try {
+ try {
+ if (f == null)
+ a.act()
+ else
+ f(msg)
+ } catch {
+ case e: Exception if (a.exceptionHandler.isDefinedAt(e)) =>
+ a.exceptionHandler(e)
+ }
+ } catch {
+ case _: KillActorException =>
+ }
+ a.kill()
+ }
+ catch {
+ case _: SuspendActorException => {
+ // do nothing (continuation is already saved)
+ }
+ case t: Throwable => {
+ Debug.info(a+": caught "+t)
+ a.terminated()
+ }
+ } finally {
+ Actor.tl.set(saved)
+ this.a = null
+ this.f = null
+ this.msg = null
+ }
+ }
+
+}
diff --git a/src/actors/scala/actors/OutputChannelActor.scala b/src/actors/scala/actors/OutputChannelActor.scala
new file mode 100644
index 0000000000..dc90898ccd
--- /dev/null
+++ b/src/actors/scala/actors/OutputChannelActor.scala
@@ -0,0 +1,153 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+// $Id$
+
+package scala.actors
+
+trait OutputChannelActor extends OutputChannel[Any] {
+
+ protected var ignoreSender: Boolean = false
+
+ /* The actor's mailbox. */
+ protected val mailbox = new MessageQueue
+
+ /* A list of the current senders. The head of the list is
+ * the sender of the message that was received last.
+ */
+ protected var senders: List[OutputChannel[Any]] =
+ if (ignoreSender) List(null)
+ else Nil
+
+ /* If the actor waits in a react, continuation holds the
+ * message handler that react was called with.
+ */
+ protected var continuation: PartialFunction[Any, Unit] = null
+
+ /* Whenever this Actor executes on some thread, waitingFor is
+ * guaranteed to be equal to waitingForNone.
+ *
+ * In other words, whenever waitingFor is not equal to
+ * waitingForNone, this Actor is guaranteed not to execute on some
+ * thread.
+ */
+ protected val waitingForNone = (m: Any) => false
+ protected var waitingFor: Any => Boolean = waitingForNone
+
+ /**
+ * The behavior of an actor is specified by implementing this
+ * abstract method.
+ */
+ def act(): Unit
+
+ protected[actors] def exceptionHandler: PartialFunction[Exception, Unit] = Map()
+
+ protected[actors] def scheduler: IScheduler =
+ Scheduler
+
+ def mailboxSize: Int = synchronized {
+ mailbox.size
+ }
+
+ def send(msg: Any, replyTo: OutputChannel[Any]) = synchronized {
+ if (waitingFor(msg)) {
+ waitingFor = waitingForNone
+
+ if (!ignoreSender)
+ senders = List(replyTo)
+
+ // assert continuation != null
+ scheduler.execute(new LightReaction(this, continuation, msg))
+ } else {
+ mailbox.append(msg, replyTo)
+ }
+ }
+
+ def !(msg: Any) {
+ send(msg, if (ignoreSender) null else Actor.rawSelf(scheduler))
+ }
+
+ def forward(msg: Any) {
+ send(msg, if (ignoreSender) null else Actor.sender)
+ }
+
+ def receiver: Actor = this.asInstanceOf[Actor]
+
+ def react(f: PartialFunction[Any, Unit]): Nothing = {
+ assert(Actor.rawSelf(scheduler) == this, "react on channel belonging to other actor")
+ this.synchronized {
+ val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m))
+ if (null eq qel) {
+ waitingFor = f.isDefinedAt
+ continuation = f
+ } else {
+ if (!ignoreSender)
+ senders = List(qel.session)
+ scheduleActor(f, qel.msg)
+ }
+ throw new SuspendActorException
+ }
+ }
+
+ def sender: OutputChannel[Any] = senders.head
+
+ /**
+ * Replies with <code>msg</code> to the sender.
+ */
+ def reply(msg: Any) {
+ sender ! msg
+ }
+
+ private def scheduleActor(f: PartialFunction[Any, Unit], msg: Any) = {
+ val task = new LightReaction(this,
+ if (f eq null) continuation else f,
+ msg)
+ scheduler execute task
+ }
+
+ def start(): OutputChannelActor = synchronized {
+ scheduler execute {
+ scheduler.newActor(OutputChannelActor.this)
+ (new LightReaction(OutputChannelActor.this)).run()
+ }
+ this
+ }
+
+ /* This closure is used to implement control-flow operations
+ * built on top of `seq`. Note that the only invocation of
+ * `kill` is supposed to be inside `Reaction.run`.
+ */
+ private[actors] var kill: () => Unit =
+ () => { exit() }
+
+ private[actors] def seq[a, b](first: => a, next: => b): Unit = {
+ val s = Actor.rawSelf(scheduler)
+ val killNext = s.kill
+ s.kill = () => {
+ s.kill = killNext
+
+ // to avoid stack overflow:
+ // instead of directly executing `next`,
+ // schedule as continuation
+ scheduleActor({ case _ => next }, 1)
+ throw new SuspendActorException
+ }
+ first
+ throw new KillActorException
+ }
+
+ protected[actors] def exit(): Nothing = {
+ terminated()
+ throw new SuspendActorException
+ }
+
+ protected[actors] def terminated() {
+ scheduler.terminated(this)
+ }
+
+}