summaryrefslogtreecommitdiff
path: root/src/actors
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2006-10-10 14:07:52 +0000
committerPhilipp Haller <hallerp@gmail.com>2006-10-10 14:07:52 +0000
commitb1b11f7221b216f2d8b80bc34c2fca31d34a2c4a (patch)
tree720d4bd10b3f25afd03761b5015ece746ff1a82d /src/actors
parentc1a76844ed9f1ab43dc6757e9c8d7866a5ccc998 (diff)
downloadscala-b1b11f7221b216f2d8b80bc34c2fca31d34a2c4a.tar.gz
scala-b1b11f7221b216f2d8b80bc34c2fca31d34a2c4a.tar.bz2
scala-b1b11f7221b216f2d8b80bc34c2fca31d34a2c4a.zip
Diffstat (limited to 'src/actors')
-rw-r--r--src/actors/scala/actors/Actor.scala105
-rw-r--r--src/actors/scala/actors/ActorProxy.scala29
-rw-r--r--src/actors/scala/actors/Channel.scala14
-rw-r--r--src/actors/scala/actors/Reaction.scala92
-rw-r--r--src/actors/scala/actors/Reactor.scala139
-rw-r--r--src/actors/scala/actors/Scheduler.scala21
-rw-r--r--src/actors/scala/actors/TimerThread.scala7
7 files changed, 346 insertions, 61 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala
index b1f8bcde23..cf60bd95e2 100644
--- a/src/actors/scala/actors/Actor.scala
+++ b/src/actors/scala/actors/Actor.scala
@@ -18,6 +18,7 @@ import scala.collection.mutable.HashSet
* <code>receive</code>, <code>react</code>, <code>reply</code>,
* etc.
*
+ * @version Beta2
* @author Philipp Haller
*/
object Actor {
@@ -40,7 +41,7 @@ object Actor {
}
def actor(body: => Unit): Actor = synchronized {
- val actor = new Reactor {
+ val actor = new Actor {
def act() = body
}
actor.start()
@@ -52,14 +53,16 @@ object Actor {
* channel which can be used for typed communication with other
* actors.
*/
+/*
def actor[a](ch: Channel[a])(body: => Unit): Actor = synchronized {
- val actor = new Reactor {
+ val actor = new Actor {
def act() = body
}
ch.receiver = actor
actor.start()
actor
}
+*/
/**
* Receives a message from the mailbox of
@@ -228,17 +231,21 @@ object Actor {
}
/**
- * This trait defines commonalities between thread-based and
- * event-based actors.
+ * This class provides (together with <code>Channel</code>) an
+ * implementation of event-based actors.
*
+ * The main ideas of our approach are explained in the paper<br>
+ * <b>Event-Based Programming without Inversion of Control</b>, Philipp Haller, Martin Odersky <i>Proc. JMLC 2006</i>
+ *
+ * @version Beta2
* @author Philipp Haller
*/
trait Actor extends OutputChannel[Any] {
-
private[actors] val in = new Channel[Any]
in.receiver = this
private var rc: Channel[Any] = null
+
private[actors] def reply: Channel[Any] = {
if (rc == null) {
rc = new Channel[Any]
@@ -273,9 +280,38 @@ trait Actor extends OutputChannel[Any] {
*/
def !?(msg: Any): Any = in !? msg
- private[actors] def sender: Actor
- private[actors] def pushSender(sender: Actor): unit
- private[actors] def popSender(): unit
+ private val lastSenders = new scala.collection.mutable.Stack[Actor]
+
+ private[actors] def sender: Actor = {
+ if (lastSenders.isEmpty) null
+ else lastSenders.top
+ }
+
+ private[actors] def pushSender(s: Actor) = { lastSenders.push(s) }
+ private[actors] def popSender(): Unit = { lastSenders.pop }
+
+ private[actors] var continuation: PartialFunction[Any, Unit] = null
+ private[actors] var timeoutPending = false
+
+ private[actors] def scheduleActor(f: PartialFunction[Any, Unit], msg: Any) =
+ if (f == null && continuation == null) {
+ // do nothing (timeout is handled instead)
+ }
+ else {
+ val task = new ActorTask(this,
+ if (f == null) continuation else f,
+ msg)
+ Scheduler.execute(task)
+ }
+
+ private[actors] def tick(): Unit =
+ Scheduler.tick(this)
+
+ private[actors] def defaultDetachActor: PartialFunction[Any, Unit] => Unit =
+ (f: PartialFunction[Any, Unit]) => {
+ continuation = f
+ throw new SuspendActorException
+ }
private[actors] var suspendActor: () => Unit = _
private[actors] var suspendActorFor: long => Unit = _
@@ -283,12 +319,22 @@ trait Actor extends OutputChannel[Any] {
private[actors] var detachActor: PartialFunction[Any, Unit] => Unit = _
private[actors] var kill: () => Unit = _
- private[actors] def scheduleActor(f: PartialFunction[Any, Unit], msg: Any)
- private[actors] def tick(): Unit
- private[actors] def resetActor(): unit
+ private[actors] def resetActor(): Unit = {
+ suspendActor = () => wait()
+ suspendActorFor = (msec: long) => wait(msec)
+ resumeActor = () => notify()
+ detachActor = defaultDetachActor
+ kill = () => {}
+ }
resetActor()
+ /**
+ * Starts this reactor.
+ */
+ def start(): Unit =
+ Scheduler.execute(new StartTask(this))
+
private val links = new HashSet[Actor]
/**
@@ -304,7 +350,7 @@ trait Actor extends OutputChannel[Any] {
Links <code>self</code> to actor defined by <code>body</code>.
*/
def link(body: => Unit): Actor = {
- val actor = new Reactor {
+ val actor = new Actor {
def act() = body
}
link(actor)
@@ -343,7 +389,10 @@ trait Actor extends OutputChannel[Any] {
call <code>a.exit(reason)</code> if
<code>!reason.equals("normal")</code>.
*/
- def exit(reason: String): Unit
+ def exit(reason: String): Unit = {
+ exitReason = reason
+ Thread.currentThread().interrupt()
+ }
private[actors] def exit(from: Actor, reason: String): Unit = {
if (from == this) {
@@ -382,46 +431,20 @@ trait Actor extends OutputChannel[Any] {
}
}
+
/**
* Messages of this type are sent to each actor <code>a</code>
* that is linked to an actor <code>b</code> whenever
* <code>b</code> terminates and <code>a</code> has
* <code>trapExit</code> set to <code>true</code>.
*
+ * @version Beta2
* @author Philipp Haller
*/
case class Exit(from: Actor, reason: String)
/**
- * This class provides a dynamic actor proxy for normal Java
- * threads.
- *
- * @author Philipp Haller
- */
-private[actors] class ActorProxy(t: Thread) extends Reactor {
- def act(): Unit = {}
- /**
- Terminates execution of <code>self</code> with the following
- effect on linked actors:
-
- For each linked actor <code>a</code> with
- <code>trapExit</code> set to <code>true</code>, send message
- <code>Exit(self, reason)</code> to <code>a</code>.
-
- For each linked actor <code>a</code> with
- <code>trapExit</code> set to <code>false</code> (default),
- call <code>a.exit(reason)</code> if
- <code>!reason.equals("normal")</code>.
- */
- override def exit(reason: String): Unit = {
- exitReason = reason
- t.interrupt()
- }
-}
-
-
-/**
* <p>
* This class is used by our efficient message queue
* implementation.
diff --git a/src/actors/scala/actors/ActorProxy.scala b/src/actors/scala/actors/ActorProxy.scala
new file mode 100644
index 0000000000..9e0ce052e9
--- /dev/null
+++ b/src/actors/scala/actors/ActorProxy.scala
@@ -0,0 +1,29 @@
+package scala.actors
+
+/**
+ * This class provides a dynamic actor proxy for normal Java
+ * threads.
+ *
+ * @version Beta2
+ * @author Philipp Haller
+ */
+private[actors] class ActorProxy(t: Thread) extends Actor {
+ def act(): Unit = {}
+ /**
+ Terminates execution of <code>self</code> with the following
+ effect on linked actors:
+
+ For each linked actor <code>a</code> with
+ <code>trapExit</code> set to <code>true</code>, send message
+ <code>Exit(self, reason)</code> to <code>a</code>.
+
+ For each linked actor <code>a</code> with
+ <code>trapExit</code> set to <code>false</code> (default),
+ call <code>a.exit(reason)</code> if
+ <code>!reason.equals("normal")</code>.
+ */
+ override def exit(reason: String): Unit = {
+ exitReason = reason
+ t.interrupt()
+ }
+}
diff --git a/src/actors/scala/actors/Channel.scala b/src/actors/scala/actors/Channel.scala
index e7c800a8f2..3638253cdc 100644
--- a/src/actors/scala/actors/Channel.scala
+++ b/src/actors/scala/actors/Channel.scala
@@ -27,6 +27,7 @@ class SuspendActorException extends Throwable {
* actors. Only the actor creating an instance of a
* <code>Channel</code> may receive from it.
*
+ * @version Beta2
* @author Philipp Haller
*/
class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] {
@@ -56,12 +57,9 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] {
waitingFor = waitingForNone
waitingForSender = null
- if (receiver.isInstanceOf[Reactor]) {
- val myReactor = receiver.asInstanceOf[Reactor]
- if (myReactor.timeoutPending) {
- myReactor.timeoutPending = false
- TimerThread.trashRequest(myReactor)
- }
+ if (receiver.timeoutPending) {
+ receiver.timeoutPending = false
+ TimerThread.trashRequest(receiver)
}
if (isSuspended)
@@ -335,8 +333,8 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] {
}
case None => {
this.synchronized {
- TimerThread.requestTimeout(receiver.asInstanceOf[Reactor], f, msec)
- receiver.asInstanceOf[Reactor].timeoutPending = true
+ TimerThread.requestTimeout(receiver, f, msec)
+ receiver.timeoutPending = true
receiver.detachActor(f)
}
}
diff --git a/src/actors/scala/actors/Reaction.scala b/src/actors/scala/actors/Reaction.scala
new file mode 100644
index 0000000000..80ecd5c8a2
--- /dev/null
+++ b/src/actors/scala/actors/Reaction.scala
@@ -0,0 +1,92 @@
+package scala.actors
+
+/**
+ * The abstract class <code>Reaction</code> associates an instance
+ * of an <code>Actor</code> with a
+ * <code>java.lang.Runnable</code>. It is also the super class of
+ * the different kinds of tasks used for the execution of
+ * event-based <code>Actor</code>s.
+ *
+ * @version Beta2
+ * @author Philipp Haller
+ */
+private[actors] abstract class Reaction extends Runnable {
+ def actor: Actor
+}
+
+/**
+ * This class represents task items used to start the execution
+ * of <code>Actor</code>s.
+ *
+ * @version Beta2
+ * @author Philipp Haller
+ */
+private[actors] class StartTask(a: Actor) extends Reaction {
+ def actor = a
+
+ def run(): Unit = {
+ val t = Thread.currentThread()
+ val saved = Actor.selfs.get(t).asInstanceOf[Actor]
+ Actor.selfs.put(t, a)
+ try {
+ a.act()
+ if (Thread.currentThread().isInterrupted())
+ throw new InterruptedException
+ a.kill()
+ if (Thread.currentThread().isInterrupted())
+ throw new InterruptedException
+ a.exit("normal")
+ }
+ catch {
+ case _: InterruptedException =>
+ a.exitLinked()
+ case d: SuspendActorException =>
+ // do nothing (continuation is already saved)
+ case t: Throwable =>
+ a.exit(t.toString())
+ }
+ finally {
+ Actor.selfs.put(t, saved)
+ }
+ }
+}
+
+/**
+ * This class represents task items used to execute actions
+ * specified in arguments of <code>react</code> and
+ * <code>reactWithin</code>.
+ *
+ * @version Beta2
+ * @author Philipp Haller
+ */
+private[actors] class ActorTask(a: Actor,
+ f: PartialFunction[Any, Unit],
+ msg: Any) extends Reaction {
+ def actor = a
+
+ def run(): Unit = {
+ val t = Thread.currentThread()
+ val saved = Actor.selfs.get(t).asInstanceOf[Actor]
+ Actor.selfs.put(t, a)
+ try {
+ f(msg)
+ if (Thread.currentThread().isInterrupted())
+ throw new InterruptedException
+ a.kill()
+ if (Thread.currentThread().isInterrupted())
+ throw new InterruptedException
+ a.exit("normal")
+ }
+ catch {
+ case _: InterruptedException =>
+ a.exitLinked()
+ case d: SuspendActorException =>
+ // do nothing (continuation is already saved)
+ case t: Throwable =>
+ a.exit(t.toString())
+ }
+ finally {
+ Actor.selfs.put(t, saved)
+ }
+ }
+}
diff --git a/src/actors/scala/actors/Reactor.scala b/src/actors/scala/actors/Reactor.scala
index aa16eea7ac..f3569361d3 100644
--- a/src/actors/scala/actors/Reactor.scala
+++ b/src/actors/scala/actors/Reactor.scala
@@ -17,10 +17,50 @@ package scala.actors
* The main ideas of our approach are explained in the paper<br>
* <b>Event-Based Programming without Inversion of Control</b>, Philipp Haller, Martin Odersky <i>Proc. JMLC 2006</i>
*
+ * @version Beta2
* @author Philipp Haller
*/
trait Reactor extends Actor {
+ private[actors] val in = new Channel[Any]
+ in.receiver = this
+
+ private var rc: Channel[Any] = null
+
+ private[actors] def reply: Channel[Any] = {
+ if (rc == null) {
+ rc = new Channel[Any]
+ rc.receiver = this
+ }
+ rc
+ }
+
+ private[actors] def freshReply(): Unit = {
+ rc = new Channel[Any]
+ rc.receiver = this
+ }
+
+ /**
+ * The behavior of an actor is specified by implementing this
+ * abstract method. Note that the preferred way to create actors
+ * is through the <code>actor</code> and <code>reactor</code>
+ * methods defined in object <code>Actor</code>.
+ */
+ def act(): Unit
+
+ /**
+ * Sends <code>msg</code> to this actor (asynchronous).
+ */
+ def !(msg: Any): Unit = in ! msg
+
+ def forward(msg: Any): Unit = in forward msg
+
+ /**
+ * Sends <code>msg</code> to this actor and awaits reply
+ * (synchronous).
+ */
+ def !?(msg: Any): Any = in !? msg
+
private val lastSenders = new scala.collection.mutable.Stack[Actor]
private[actors] def sender: Actor = {
@@ -54,6 +94,12 @@ trait Reactor extends Actor {
throw new SuspendActorException
}
+ private[actors] var suspendActor: () => Unit = _
+ private[actors] var suspendActorFor: long => Unit = _
+ private[actors] var resumeActor: () => Unit = _
+ private[actors] var detachActor: PartialFunction[Any, Unit] => Unit = _
+ private[actors] var kill: () => Unit = _
+
private[actors] def resetActor(): Unit = {
suspendActor = () => wait()
suspendActorFor = (msec: long) => wait(msec)
@@ -70,14 +116,100 @@ trait Reactor extends Actor {
def start(): Unit =
Scheduler.execute(new StartTask(this))
+ private val links = new HashSet[Actor]
+
+ /**
+ Links <code>self</code> to actor <code>to</code>.
+ */
+ def link(to: Actor): Actor = {
+ links += to
+ to.linkTo(this)
+ to
+ }
+
+ /**
+ Links <code>self</code> to actor defined by <code>body</code>.
+ */
+ def link(body: => Unit): Actor = {
+ val actor = new Reactor {
+ def act() = body
+ }
+ link(actor)
+ actor.start()
+ actor
+ }
+
+ private[actors] def linkTo(to: Actor): Unit =
+ links += to
+
/**
- * Terminates this reactor, thereby influencing linked actors
- * (see Actor.exit).
+ Unlinks <code>self</code> from actor <code>from</code>.
+ */
+ def unlink(from: Actor): Unit = {
+ links -= from
+ from.unlinkFrom(this)
+ }
+
+ private[actors] def unlinkFrom(from: Actor): Unit =
+ links -= from
+
+ var trapExit = false
+
+ private[actors] var exitReason: String = ""
+
+ /**
+ Terminates execution of <code>self</code> with the following
+ effect on linked actors:
+
+ For each linked actor <code>a</code> with
+ <code>trapExit</code> set to <code>true</code>, send message
+ <code>Exit(self, reason)</code> to <code>a</code>.
+
+ For each linked actor <code>a</code> with
+ <code>trapExit</code> set to <code>false</code> (default),
+ call <code>a.exit(reason)</code> if
+ <code>!reason.equals("normal")</code>.
*/
def exit(reason: String): Unit = {
exitReason = reason
Thread.currentThread().interrupt()
}
+
+ private[actors] def exit(from: Actor, reason: String): Unit = {
+ if (from == this) {
+ exit(reason)
+ }
+ else {
+ if (trapExit)
+ this ! Exit(from, reason)
+ else if (!reason.equals("normal"))
+ exit(reason)
+ }
+ }
+
+ private[actors] def exitLinked(): Unit =
+ exitLinked(exitReason, new HashSet[Actor])
+
+ private[actors] def exitLinked(reason: String): Unit =
+ exitLinked(reason, new HashSet[Actor])
+
+ private[actors] def exitLinked(reason: String,
+ exitMarks: HashSet[Actor]): Unit = {
+ if (exitMarks contains this) {
+ // we are marked, do nothing
+ }
+ else {
+ exitMarks += this // mark this as exiting
+ // exit linked processes
+ val iter = links.elements
+ while (iter.hasNext) {
+ val linked = iter.next
+ unlink(linked)
+ linked.exit(this, reason)
+ }
+ exitMarks -= this
+ }
+ }
}
/**
@@ -87,6 +219,7 @@ trait Reactor extends Actor {
* the different kinds of tasks used for the execution of
* <code>Reactor</code>s.
*
+ * @version Beta2
* @author Philipp Haller
*/
private[actors] abstract class Reaction extends Runnable {
@@ -97,6 +230,7 @@ private[actors] abstract class Reaction extends Runnable {
* This class represents task items used to start the execution
* of <code>Reactor</code>s.
*
+ * @version Beta2
* @author Philipp Haller
*/
private[actors] class StartTask(a: Reactor) extends Reaction {
@@ -134,6 +268,7 @@ private[actors] class StartTask(a: Reactor) extends Reaction {
* specified in arguments of <code>react</code> and
* <code>reactWithin</code>.
*
+ * @version Beta2
* @author Philipp Haller
*/
private[actors] class ActorTask(a: Reactor,
diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala
index 4899f545d8..4f50248d29 100644
--- a/src/actors/scala/actors/Scheduler.scala
+++ b/src/actors/scala/actors/Scheduler.scala
@@ -14,9 +14,10 @@ import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue}
/**
* The <code>Scheduler</code> object is used by
- * <code>Reactor</code> to execute tasks of an execution of a
+ * <code>Actor</code> to execute tasks of an execution of a
* reactor.
*
+ * @version Beta2
* @author Philipp Haller
*/
object Scheduler {
@@ -33,7 +34,7 @@ object Scheduler {
sched.execute(task)
}
- def tick(a: Reactor) = sched.tick(a)
+ def tick(a: Actor) = sched.tick(a)
def shutdown(): Unit = sched.shutdown()
}
@@ -42,17 +43,18 @@ object Scheduler {
* This abstract class provides a common interface for all
* schedulers used to execute reactors.
*
+ * @version Beta2
* @author Philipp Haller
*/
abstract class IScheduler {
def execute(task: Reaction): Unit
def getTask(worker: WorkerThread): Runnable
- def tick(a: Reactor): Unit
+ def tick(a: Actor): Unit
def shutdown(): Unit
val QUIT_TASK = new Reaction() {
- def actor: Reactor = null
+ def actor: Actor = null
def run(): Unit = {}
override def toString() = "QUIT_TASK"
}
@@ -62,6 +64,7 @@ abstract class IScheduler {
* This scheduler executes the tasks of a reactor on a single
* thread (the current thread).
*
+ * @version Beta2
* @author Philipp Haller
*/
class SingleThreadedScheduler extends IScheduler {
@@ -72,7 +75,7 @@ class SingleThreadedScheduler extends IScheduler {
def getTask(worker: WorkerThread): Runnable = { null }
- def tick(a: Reactor): Unit = {}
+ def tick(a: Actor): Unit = {}
def shutdown(): Unit = {}
}
@@ -81,6 +84,7 @@ class SingleThreadedScheduler extends IScheduler {
* This scheduler creates additional threads whenever there is no
* idle thread available.
*
+ * @version Beta2
* @author Philipp Haller
*/
class SpareWorkerScheduler extends IScheduler {
@@ -126,7 +130,7 @@ class SpareWorkerScheduler extends IScheduler {
}
}
- def tick(a: Reactor): Unit = {}
+ def tick(a: Actor): Unit = {}
def shutdown(): Unit = synchronized {
terminating = true
@@ -231,7 +235,7 @@ class TickedScheduler extends IScheduler {
var ticksCnt = 0
- def tick(a: Reactor): unit = synchronized {
+ def tick(a: Actor): unit = synchronized {
ticksCnt = ticksCnt + 1
executing.get(a) match {
case None => // thread outside of scheduler; error("No worker thread associated with actor " + a)
@@ -270,6 +274,9 @@ class QuitException extends Throwable {
* The class <code>WorkerThread</code> is used by schedulers to execute
* reactor tasks on multiple threads.
*
+ * TODO: put proof of deadlock-freedom here!
+ *
+ * @version Beta2
* @author Philipp Haller
*/
class WorkerThread(sched: IScheduler) extends Thread {
diff --git a/src/actors/scala/actors/TimerThread.scala b/src/actors/scala/actors/TimerThread.scala
index 60bc2240f6..3c401292cf 100644
--- a/src/actors/scala/actors/TimerThread.scala
+++ b/src/actors/scala/actors/TimerThread.scala
@@ -16,13 +16,14 @@ package scala.actors
* Note that the library deletes non-received <code>TIMEOUT</code> message if a
* message is received before the time-out occurs.
*
+ * @version Beta2
* @author Sebastien Noir
* @author Philipp Haller
*/
object TimerThread extends AnyRef with Runnable {
- case class WakedActor(actor: Reactor, f: PartialFunction[Any, Unit], time: long)
+ case class WakedActor(actor: Actor, f: PartialFunction[Any, Unit], time: long)
extends Ordered[WakedActor] {
var valid = true
def compare(that: WakedActor): int = -(this.time compare that.time)
@@ -33,7 +34,7 @@ object TimerThread extends AnyRef with Runnable {
var lateList: List[WakedActor] = Nil
- def trashRequest(a: Reactor) = synchronized {
+ def trashRequest(a: Actor) = synchronized {
// keep in mind: killing dead people is a bad idea!
queue.elements.find((wa: WakedActor) => wa.actor == a && wa.valid) match {
case Some(b) =>
@@ -74,7 +75,7 @@ object TimerThread extends AnyRef with Runnable {
}
}
- def requestTimeout(a: Reactor, f: PartialFunction[Any, Unit], waitMillis: long): unit = synchronized {
+ def requestTimeout(a: Actor, f: PartialFunction[Any, Unit], waitMillis: long): unit = synchronized {
val wakeTime = now + waitMillis
if (waitMillis <= 0) {
a.continuation = null