summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2006-09-30 14:36:20 +0000
committerPhilipp Haller <hallerp@gmail.com>2006-09-30 14:36:20 +0000
commit53cb459ecf29b9018f7c100a419a2bbfd5d4dc64 (patch)
tree3f40e5e20d8a5a16e5a221512a2cff95deab2f57
parent60b3d90f81cf3f83440725a02afc7dc693fa9ea5 (diff)
downloadscala-53cb459ecf29b9018f7c100a419a2bbfd5d4dc64.tar.gz
scala-53cb459ecf29b9018f7c100a419a2bbfd5d4dc64.tar.bz2
scala-53cb459ecf29b9018f7c100a419a2bbfd5d4dc64.zip
Added actor linking.
-rw-r--r--src/actors/scala/actors/Actor.scala128
-rw-r--r--src/actors/scala/actors/Channel.scala4
-rw-r--r--src/actors/scala/actors/Reactor.scala36
-rw-r--r--src/actors/scala/actors/Scheduler.scala3
-rw-r--r--src/actors/scala/actors/remote/NetKernel.scala2
5 files changed, 153 insertions, 20 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala
index 7159cb890e..4397032b60 100644
--- a/src/actors/scala/actors/Actor.scala
+++ b/src/actors/scala/actors/Actor.scala
@@ -1,5 +1,7 @@
package scala.actors
+import scala.collection.mutable.HashSet
+
object Actor {
private[actors] val selfs = new java.util.WeakHashMap(16, 0.5f)
@@ -20,10 +22,7 @@ object Actor {
def actor(body: => Unit): ActorThread = synchronized {
val actor = new ActorThread {
- override def run(): Unit = {
- body
- this.kill()
- }
+ def act() = body
}
actor.start()
actor
@@ -31,7 +30,7 @@ object Actor {
def reactor(body: => Unit): Reactor = synchronized {
val reactor = new Reactor {
- override def run(): Unit = body
+ def act() = body
}
reactor.start()
reactor
@@ -120,6 +119,11 @@ object Actor {
s.kill = () => { b2 }
b1
}
+
+ def link(to: Actor): Actor = self.link(to)
+ def link(body: => Unit): Actor = self.link(body)
+ def unlink(from: Actor): Unit = self.unlink(from)
+ def exit(reason: String): Unit = self.exit(reason)
}
trait Actor {
@@ -141,6 +145,11 @@ trait Actor {
rc.receiver = this
}
+ /*
+ Specification of behavior
+ */
+ def act(): Unit
+
def !(msg: Any): Unit = in ! msg
def !?(msg: Any): Any = in !? msg
@@ -148,10 +157,10 @@ trait Actor {
private[actors] def pushSender(sender: Actor): unit
private[actors] def popSender(): unit
- private[actors] var kill: () => Unit = _
private[actors] var suspendActor: () => unit = _
private[actors] var suspendActorFor: long => unit = _
private[actors] var detachActor: PartialFunction[Any, unit] => unit = _
+ private[actors] var kill: () => Unit = _
private[actors] def scheduleActor(f: PartialFunction[Any, Unit], msg: Any)
@@ -159,11 +168,113 @@ trait Actor {
private[actors] def resetActor(): unit
resetActor()
+
+ private val links = new HashSet[Actor]
+
+ def link(to: Actor): Actor = {
+ links += to
+ to.linkTo(this)
+ to
+ }
+
+ def link(body: => Unit): Actor = {
+ val actor = new ActorThread {
+ def act() = body
+ }
+ link(actor)
+ actor.start()
+ actor
+ }
+
+ private[actors] def linkTo(to: Actor): Unit =
+ links += to
+
+ def unlink(from: Actor): Unit = {
+ links -= from
+ from.unlinkFrom(this)
+ }
+
+ private[actors] def unlinkFrom(from: Actor): Unit =
+ links -= from
+
+ var trapExit = false
+
+ private[actors] var exitReason: String = ""
+
+ def exit(reason: String): Unit
+
+ private[actors] def exit(from: Actor, reason: String): Unit = {
+ if (from == this) {
+ exit(reason)
+ }
+ else {
+ if (trapExit)
+ this ! Exit(from, reason)
+ else if (!reason.equals("normal"))
+ exit(reason)
+ }
+ }
+
+ private[actors] def exitLinked(): Unit =
+ exitLinked(exitReason, new HashSet[Actor])
+
+ private[actors] def exitLinked(reason: String): Unit =
+ exitLinked(reason, new HashSet[Actor])
+
+ private[actors] def exitLinked(reason: String,
+ exitMarks: HashSet[Actor]): Unit = {
+ if (exitMarks contains this) {
+ // we are marked, do nothing
+ }
+ else {
+ exitMarks += this // mark this as exiting
+ // exit linked processes
+ val iter = links.elements
+ while (iter.hasNext) {
+ val linked = iter.next
+ unlink(linked)
+ linked.exit(this, reason)
+ }
+ exitMarks -= this
+ }
+ }
}
-class ActorThread extends Thread with ThreadedActor
+case class Exit(from: Actor, reason: String)
+
+
+abstract class ActorThread extends Thread with ThreadedActor {
+ override def run(): Unit = {
+ try {
+ act()
+ if (isInterrupted())
+ throw new InterruptedException
+ kill()
+ if (isInterrupted())
+ throw new InterruptedException
+ exit("normal")
+ }
+ catch {
+ case ie: InterruptedException =>
+ exitLinked()
+ case t: Throwable =>
+ exitLinked(t.toString())
+ }
+ }
-class ActorProxy(t: Thread) extends ThreadedActor
+ def exit(reason: String): Unit = {
+ exitReason = reason
+ interrupt()
+ }
+}
+
+class ActorProxy(t: Thread) extends ThreadedActor {
+ def act(): Unit = {}
+ def exit(reason: String): Unit = {
+ exitReason = reason
+ Thread.currentThread().interrupt()
+ }
+}
object RemoteActor {
@@ -193,6 +304,7 @@ object RemoteActor {
def select(node: Node, name: Symbol): Actor =
new Reactor {
+ def act(): Unit = {}
override def !(msg: Any): Unit = msg match {
case a: AnyRef => {
// establish remotely accessible
diff --git a/src/actors/scala/actors/Channel.scala b/src/actors/scala/actors/Channel.scala
index 445d08b6ba..659e6d2f5d 100644
--- a/src/actors/scala/actors/Channel.scala
+++ b/src/actors/scala/actors/Channel.scala
@@ -5,6 +5,10 @@ import Actor._
case object TIMEOUT
class SuspendActorException extends Throwable {
+ /*
+ For efficiency reasons we do not fill in
+ the execution stack trace.
+ */
override def fillInStackTrace(): Throwable = {
this
}
diff --git a/src/actors/scala/actors/Reactor.scala b/src/actors/scala/actors/Reactor.scala
index b0cfdf62b4..939bd4a166 100644
--- a/src/actors/scala/actors/Reactor.scala
+++ b/src/actors/scala/actors/Reactor.scala
@@ -11,9 +11,6 @@ trait Reactor extends Actor {
private[actors] var continuation: PartialFunction[Any, Unit] = null
private[actors] var timeoutPending = false
- //def resumeActor = () => Unit = () => {
- //}
-
def scheduleActor(f: PartialFunction[Any, Unit], msg: Any) = {
if (f == null && continuation == null) {
// do nothing (timeout is handled instead)
@@ -41,11 +38,14 @@ trait Reactor extends Actor {
resetActor()
- def run(): Unit = {}
-
def start(): Unit = {
Scheduler.execute(new StartTask(this))
}
+
+ def exit(reason: String): Unit = {
+ exitReason = reason
+ Thread.currentThread().interrupt()
+ }
}
abstract class Reaction extends Runnable {
@@ -60,12 +60,22 @@ class StartTask(a: Reactor) extends Reaction {
val saved = Actor.selfs.get(t).asInstanceOf[Actor]
Actor.selfs.put(t, a)
try {
- a.run()
+ a.act()
+ if (Thread.currentThread().isInterrupted())
+ throw new InterruptedException
a.kill()
+ if (Thread.currentThread().isInterrupted())
+ throw new InterruptedException
+ a.exit("normal")
}
catch {
- case d: SuspendActorException =>
+ case _: InterruptedException => a.exitLinked()
+ case d: SuspendActorException => {
// do nothing (continuation is already saved)
+ }
+ case t: Throwable => {
+ a.exit(t.toString())
+ }
}
finally {
Actor.selfs.put(t, saved)
@@ -84,11 +94,21 @@ class ActorTask(a: Reactor,
Actor.selfs.put(t, a)
try {
f(msg)
+ if (Thread.currentThread().isInterrupted())
+ throw new InterruptedException
a.kill()
+ if (Thread.currentThread().isInterrupted())
+ throw new InterruptedException
+ a.exit("normal")
}
catch {
- case d: SuspendActorException =>
+ case _: InterruptedException => a.exitLinked()
+ case d: SuspendActorException => {
// do nothing (continuation is already saved)
+ }
+ case t: Throwable => {
+ a.exit(t.toString())
+ }
}
finally {
Actor.selfs.put(t, saved)
diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala
index 24875d6090..2f8bdd1243 100644
--- a/src/actors/scala/actors/Scheduler.scala
+++ b/src/actors/scala/actors/Scheduler.scala
@@ -199,13 +199,10 @@ class SpareWorkerScheduler extends IScheduler {
init()
def execute(task: Reaction): Unit = synchronized {
- //Console.println("received new task " + task)
-
if (!terminating) {
if (idle.length == 0) {
tasks += task
// create new worker
- //Console.println("create new worker thread")
maxWorkers = maxWorkers + 1
val newWorker = new WorkerThread(this)
workers += newWorker
diff --git a/src/actors/scala/actors/remote/NetKernel.scala b/src/actors/scala/actors/remote/NetKernel.scala
index 12e9e5a304..90af3d563c 100644
--- a/src/actors/scala/actors/remote/NetKernel.scala
+++ b/src/actors/scala/actors/remote/NetKernel.scala
@@ -37,7 +37,7 @@ class NetKernel(service: Service) {
case Some(a) => {
val msg = service.serializer.deserialize(data)
val senderProxy = new Reactor {
- override def run() = { a ! msg }
+ def act() = { a ! msg }
override def !(msg: Any): Unit = {
msg match {
case refmsg: AnyRef => {