diff options
author | Philipp Haller <hallerp@gmail.com> | 2006-09-30 14:36:20 +0000 |
---|---|---|
committer | Philipp Haller <hallerp@gmail.com> | 2006-09-30 14:36:20 +0000 |
commit | 53cb459ecf29b9018f7c100a419a2bbfd5d4dc64 (patch) | |
tree | 3f40e5e20d8a5a16e5a221512a2cff95deab2f57 | |
parent | 60b3d90f81cf3f83440725a02afc7dc693fa9ea5 (diff) | |
download | scala-53cb459ecf29b9018f7c100a419a2bbfd5d4dc64.tar.gz scala-53cb459ecf29b9018f7c100a419a2bbfd5d4dc64.tar.bz2 scala-53cb459ecf29b9018f7c100a419a2bbfd5d4dc64.zip |
Added actor linking.
-rw-r--r-- | src/actors/scala/actors/Actor.scala | 128 | ||||
-rw-r--r-- | src/actors/scala/actors/Channel.scala | 4 | ||||
-rw-r--r-- | src/actors/scala/actors/Reactor.scala | 36 | ||||
-rw-r--r-- | src/actors/scala/actors/Scheduler.scala | 3 | ||||
-rw-r--r-- | src/actors/scala/actors/remote/NetKernel.scala | 2 |
5 files changed, 153 insertions, 20 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala index 7159cb890e..4397032b60 100644 --- a/src/actors/scala/actors/Actor.scala +++ b/src/actors/scala/actors/Actor.scala @@ -1,5 +1,7 @@ package scala.actors +import scala.collection.mutable.HashSet + object Actor { private[actors] val selfs = new java.util.WeakHashMap(16, 0.5f) @@ -20,10 +22,7 @@ object Actor { def actor(body: => Unit): ActorThread = synchronized { val actor = new ActorThread { - override def run(): Unit = { - body - this.kill() - } + def act() = body } actor.start() actor @@ -31,7 +30,7 @@ object Actor { def reactor(body: => Unit): Reactor = synchronized { val reactor = new Reactor { - override def run(): Unit = body + def act() = body } reactor.start() reactor @@ -120,6 +119,11 @@ object Actor { s.kill = () => { b2 } b1 } + + def link(to: Actor): Actor = self.link(to) + def link(body: => Unit): Actor = self.link(body) + def unlink(from: Actor): Unit = self.unlink(from) + def exit(reason: String): Unit = self.exit(reason) } trait Actor { @@ -141,6 +145,11 @@ trait Actor { rc.receiver = this } + /* + Specification of behavior + */ + def act(): Unit + def !(msg: Any): Unit = in ! msg def !?(msg: Any): Any = in !? msg @@ -148,10 +157,10 @@ trait Actor { private[actors] def pushSender(sender: Actor): unit private[actors] def popSender(): unit - private[actors] var kill: () => Unit = _ private[actors] var suspendActor: () => unit = _ private[actors] var suspendActorFor: long => unit = _ private[actors] var detachActor: PartialFunction[Any, unit] => unit = _ + private[actors] var kill: () => Unit = _ private[actors] def scheduleActor(f: PartialFunction[Any, Unit], msg: Any) @@ -159,11 +168,113 @@ trait Actor { private[actors] def resetActor(): unit resetActor() + + private val links = new HashSet[Actor] + + def link(to: Actor): Actor = { + links += to + to.linkTo(this) + to + } + + def link(body: => Unit): Actor = { + val actor = new ActorThread { + def act() = body + } + link(actor) + actor.start() + actor + } + + private[actors] def linkTo(to: Actor): Unit = + links += to + + def unlink(from: Actor): Unit = { + links -= from + from.unlinkFrom(this) + } + + private[actors] def unlinkFrom(from: Actor): Unit = + links -= from + + var trapExit = false + + private[actors] var exitReason: String = "" + + def exit(reason: String): Unit + + private[actors] def exit(from: Actor, reason: String): Unit = { + if (from == this) { + exit(reason) + } + else { + if (trapExit) + this ! Exit(from, reason) + else if (!reason.equals("normal")) + exit(reason) + } + } + + private[actors] def exitLinked(): Unit = + exitLinked(exitReason, new HashSet[Actor]) + + private[actors] def exitLinked(reason: String): Unit = + exitLinked(reason, new HashSet[Actor]) + + private[actors] def exitLinked(reason: String, + exitMarks: HashSet[Actor]): Unit = { + if (exitMarks contains this) { + // we are marked, do nothing + } + else { + exitMarks += this // mark this as exiting + // exit linked processes + val iter = links.elements + while (iter.hasNext) { + val linked = iter.next + unlink(linked) + linked.exit(this, reason) + } + exitMarks -= this + } + } } -class ActorThread extends Thread with ThreadedActor +case class Exit(from: Actor, reason: String) + + +abstract class ActorThread extends Thread with ThreadedActor { + override def run(): Unit = { + try { + act() + if (isInterrupted()) + throw new InterruptedException + kill() + if (isInterrupted()) + throw new InterruptedException + exit("normal") + } + catch { + case ie: InterruptedException => + exitLinked() + case t: Throwable => + exitLinked(t.toString()) + } + } -class ActorProxy(t: Thread) extends ThreadedActor + def exit(reason: String): Unit = { + exitReason = reason + interrupt() + } +} + +class ActorProxy(t: Thread) extends ThreadedActor { + def act(): Unit = {} + def exit(reason: String): Unit = { + exitReason = reason + Thread.currentThread().interrupt() + } +} object RemoteActor { @@ -193,6 +304,7 @@ object RemoteActor { def select(node: Node, name: Symbol): Actor = new Reactor { + def act(): Unit = {} override def !(msg: Any): Unit = msg match { case a: AnyRef => { // establish remotely accessible diff --git a/src/actors/scala/actors/Channel.scala b/src/actors/scala/actors/Channel.scala index 445d08b6ba..659e6d2f5d 100644 --- a/src/actors/scala/actors/Channel.scala +++ b/src/actors/scala/actors/Channel.scala @@ -5,6 +5,10 @@ import Actor._ case object TIMEOUT class SuspendActorException extends Throwable { + /* + For efficiency reasons we do not fill in + the execution stack trace. + */ override def fillInStackTrace(): Throwable = { this } diff --git a/src/actors/scala/actors/Reactor.scala b/src/actors/scala/actors/Reactor.scala index b0cfdf62b4..939bd4a166 100644 --- a/src/actors/scala/actors/Reactor.scala +++ b/src/actors/scala/actors/Reactor.scala @@ -11,9 +11,6 @@ trait Reactor extends Actor { private[actors] var continuation: PartialFunction[Any, Unit] = null private[actors] var timeoutPending = false - //def resumeActor = () => Unit = () => { - //} - def scheduleActor(f: PartialFunction[Any, Unit], msg: Any) = { if (f == null && continuation == null) { // do nothing (timeout is handled instead) @@ -41,11 +38,14 @@ trait Reactor extends Actor { resetActor() - def run(): Unit = {} - def start(): Unit = { Scheduler.execute(new StartTask(this)) } + + def exit(reason: String): Unit = { + exitReason = reason + Thread.currentThread().interrupt() + } } abstract class Reaction extends Runnable { @@ -60,12 +60,22 @@ class StartTask(a: Reactor) extends Reaction { val saved = Actor.selfs.get(t).asInstanceOf[Actor] Actor.selfs.put(t, a) try { - a.run() + a.act() + if (Thread.currentThread().isInterrupted()) + throw new InterruptedException a.kill() + if (Thread.currentThread().isInterrupted()) + throw new InterruptedException + a.exit("normal") } catch { - case d: SuspendActorException => + case _: InterruptedException => a.exitLinked() + case d: SuspendActorException => { // do nothing (continuation is already saved) + } + case t: Throwable => { + a.exit(t.toString()) + } } finally { Actor.selfs.put(t, saved) @@ -84,11 +94,21 @@ class ActorTask(a: Reactor, Actor.selfs.put(t, a) try { f(msg) + if (Thread.currentThread().isInterrupted()) + throw new InterruptedException a.kill() + if (Thread.currentThread().isInterrupted()) + throw new InterruptedException + a.exit("normal") } catch { - case d: SuspendActorException => + case _: InterruptedException => a.exitLinked() + case d: SuspendActorException => { // do nothing (continuation is already saved) + } + case t: Throwable => { + a.exit(t.toString()) + } } finally { Actor.selfs.put(t, saved) diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala index 24875d6090..2f8bdd1243 100644 --- a/src/actors/scala/actors/Scheduler.scala +++ b/src/actors/scala/actors/Scheduler.scala @@ -199,13 +199,10 @@ class SpareWorkerScheduler extends IScheduler { init() def execute(task: Reaction): Unit = synchronized { - //Console.println("received new task " + task) - if (!terminating) { if (idle.length == 0) { tasks += task // create new worker - //Console.println("create new worker thread") maxWorkers = maxWorkers + 1 val newWorker = new WorkerThread(this) workers += newWorker diff --git a/src/actors/scala/actors/remote/NetKernel.scala b/src/actors/scala/actors/remote/NetKernel.scala index 12e9e5a304..90af3d563c 100644 --- a/src/actors/scala/actors/remote/NetKernel.scala +++ b/src/actors/scala/actors/remote/NetKernel.scala @@ -37,7 +37,7 @@ class NetKernel(service: Service) { case Some(a) => { val msg = service.serializer.deserialize(data) val senderProxy = new Reactor { - override def run() = { a ! msg } + def act() = { a ! msg } override def !(msg: Any): Unit = { msg match { case refmsg: AnyRef => { |