From b1b11f7221b216f2d8b80bc34c2fca31d34a2c4a Mon Sep 17 00:00:00 2001 From: Philipp Haller Date: Tue, 10 Oct 2006 14:07:52 +0000 Subject: --- src/actors/scala/actors/Actor.scala | 105 +++++++++++++--------- src/actors/scala/actors/ActorProxy.scala | 29 +++++++ src/actors/scala/actors/Channel.scala | 14 ++- src/actors/scala/actors/Reaction.scala | 92 ++++++++++++++++++++ src/actors/scala/actors/Reactor.scala | 139 +++++++++++++++++++++++++++++- src/actors/scala/actors/Scheduler.scala | 21 +++-- src/actors/scala/actors/TimerThread.scala | 7 +- 7 files changed, 346 insertions(+), 61 deletions(-) create mode 100644 src/actors/scala/actors/ActorProxy.scala create mode 100644 src/actors/scala/actors/Reaction.scala (limited to 'src/actors') diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala index b1f8bcde23..cf60bd95e2 100644 --- a/src/actors/scala/actors/Actor.scala +++ b/src/actors/scala/actors/Actor.scala @@ -18,6 +18,7 @@ import scala.collection.mutable.HashSet * receive, react, reply, * etc. * + * @version Beta2 * @author Philipp Haller */ object Actor { @@ -40,7 +41,7 @@ object Actor { } def actor(body: => Unit): Actor = synchronized { - val actor = new Reactor { + val actor = new Actor { def act() = body } actor.start() @@ -52,14 +53,16 @@ object Actor { * channel which can be used for typed communication with other * actors. */ +/* def actor[a](ch: Channel[a])(body: => Unit): Actor = synchronized { - val actor = new Reactor { + val actor = new Actor { def act() = body } ch.receiver = actor actor.start() actor } +*/ /** * Receives a message from the mailbox of @@ -228,17 +231,21 @@ object Actor { } /** - * This trait defines commonalities between thread-based and - * event-based actors. + * This class provides (together with Channel) an + * implementation of event-based actors. * + * The main ideas of our approach are explained in the paper
+ * Event-Based Programming without Inversion of Control, Philipp Haller, Martin Odersky Proc. JMLC 2006 + * + * @version Beta2 * @author Philipp Haller */ trait Actor extends OutputChannel[Any] { - private[actors] val in = new Channel[Any] in.receiver = this private var rc: Channel[Any] = null + private[actors] def reply: Channel[Any] = { if (rc == null) { rc = new Channel[Any] @@ -273,9 +280,38 @@ trait Actor extends OutputChannel[Any] { */ def !?(msg: Any): Any = in !? msg - private[actors] def sender: Actor - private[actors] def pushSender(sender: Actor): unit - private[actors] def popSender(): unit + private val lastSenders = new scala.collection.mutable.Stack[Actor] + + private[actors] def sender: Actor = { + if (lastSenders.isEmpty) null + else lastSenders.top + } + + private[actors] def pushSender(s: Actor) = { lastSenders.push(s) } + private[actors] def popSender(): Unit = { lastSenders.pop } + + private[actors] var continuation: PartialFunction[Any, Unit] = null + private[actors] var timeoutPending = false + + private[actors] def scheduleActor(f: PartialFunction[Any, Unit], msg: Any) = + if (f == null && continuation == null) { + // do nothing (timeout is handled instead) + } + else { + val task = new ActorTask(this, + if (f == null) continuation else f, + msg) + Scheduler.execute(task) + } + + private[actors] def tick(): Unit = + Scheduler.tick(this) + + private[actors] def defaultDetachActor: PartialFunction[Any, Unit] => Unit = + (f: PartialFunction[Any, Unit]) => { + continuation = f + throw new SuspendActorException + } private[actors] var suspendActor: () => Unit = _ private[actors] var suspendActorFor: long => Unit = _ @@ -283,12 +319,22 @@ trait Actor extends OutputChannel[Any] { private[actors] var detachActor: PartialFunction[Any, Unit] => Unit = _ private[actors] var kill: () => Unit = _ - private[actors] def scheduleActor(f: PartialFunction[Any, Unit], msg: Any) - private[actors] def tick(): Unit - private[actors] def resetActor(): unit + private[actors] def resetActor(): Unit = { + suspendActor = () => wait() + suspendActorFor = (msec: long) => wait(msec) + resumeActor = () => notify() + detachActor = defaultDetachActor + kill = () => {} + } resetActor() + /** + * Starts this reactor. + */ + def start(): Unit = + Scheduler.execute(new StartTask(this)) + private val links = new HashSet[Actor] /** @@ -304,7 +350,7 @@ trait Actor extends OutputChannel[Any] { Links self to actor defined by body. */ def link(body: => Unit): Actor = { - val actor = new Reactor { + val actor = new Actor { def act() = body } link(actor) @@ -343,7 +389,10 @@ trait Actor extends OutputChannel[Any] { call a.exit(reason) if !reason.equals("normal"). */ - def exit(reason: String): Unit + def exit(reason: String): Unit = { + exitReason = reason + Thread.currentThread().interrupt() + } private[actors] def exit(from: Actor, reason: String): Unit = { if (from == this) { @@ -382,45 +431,19 @@ trait Actor extends OutputChannel[Any] { } } + /** * Messages of this type are sent to each actor a * that is linked to an actor b whenever * b terminates and a has * trapExit set to true. * + * @version Beta2 * @author Philipp Haller */ case class Exit(from: Actor, reason: String) -/** - * This class provides a dynamic actor proxy for normal Java - * threads. - * - * @author Philipp Haller - */ -private[actors] class ActorProxy(t: Thread) extends Reactor { - def act(): Unit = {} - /** - Terminates execution of self with the following - effect on linked actors: - - For each linked actor a with - trapExit set to true, send message - Exit(self, reason) to a. - - For each linked actor a with - trapExit set to false (default), - call a.exit(reason) if - !reason.equals("normal"). - */ - override def exit(reason: String): Unit = { - exitReason = reason - t.interrupt() - } -} - - /** *

* This class is used by our efficient message queue diff --git a/src/actors/scala/actors/ActorProxy.scala b/src/actors/scala/actors/ActorProxy.scala new file mode 100644 index 0000000000..9e0ce052e9 --- /dev/null +++ b/src/actors/scala/actors/ActorProxy.scala @@ -0,0 +1,29 @@ +package scala.actors + +/** + * This class provides a dynamic actor proxy for normal Java + * threads. + * + * @version Beta2 + * @author Philipp Haller + */ +private[actors] class ActorProxy(t: Thread) extends Actor { + def act(): Unit = {} + /** + Terminates execution of self with the following + effect on linked actors: + + For each linked actor a with + trapExit set to true, send message + Exit(self, reason) to a. + + For each linked actor a with + trapExit set to false (default), + call a.exit(reason) if + !reason.equals("normal"). + */ + override def exit(reason: String): Unit = { + exitReason = reason + t.interrupt() + } +} diff --git a/src/actors/scala/actors/Channel.scala b/src/actors/scala/actors/Channel.scala index e7c800a8f2..3638253cdc 100644 --- a/src/actors/scala/actors/Channel.scala +++ b/src/actors/scala/actors/Channel.scala @@ -27,6 +27,7 @@ class SuspendActorException extends Throwable { * actors. Only the actor creating an instance of a * Channel may receive from it. * + * @version Beta2 * @author Philipp Haller */ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] { @@ -56,12 +57,9 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] { waitingFor = waitingForNone waitingForSender = null - if (receiver.isInstanceOf[Reactor]) { - val myReactor = receiver.asInstanceOf[Reactor] - if (myReactor.timeoutPending) { - myReactor.timeoutPending = false - TimerThread.trashRequest(myReactor) - } + if (receiver.timeoutPending) { + receiver.timeoutPending = false + TimerThread.trashRequest(receiver) } if (isSuspended) @@ -335,8 +333,8 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] { } case None => { this.synchronized { - TimerThread.requestTimeout(receiver.asInstanceOf[Reactor], f, msec) - receiver.asInstanceOf[Reactor].timeoutPending = true + TimerThread.requestTimeout(receiver, f, msec) + receiver.timeoutPending = true receiver.detachActor(f) } } diff --git a/src/actors/scala/actors/Reaction.scala b/src/actors/scala/actors/Reaction.scala new file mode 100644 index 0000000000..80ecd5c8a2 --- /dev/null +++ b/src/actors/scala/actors/Reaction.scala @@ -0,0 +1,92 @@ +package scala.actors + +/** + * The abstract class Reaction associates an instance + * of an Actor with a + * java.lang.Runnable. It is also the super class of + * the different kinds of tasks used for the execution of + * event-based Actors. + * + * @version Beta2 + * @author Philipp Haller + */ +private[actors] abstract class Reaction extends Runnable { + def actor: Actor +} + +/** + * This class represents task items used to start the execution + * of Actors. + * + * @version Beta2 + * @author Philipp Haller + */ +private[actors] class StartTask(a: Actor) extends Reaction { + def actor = a + + def run(): Unit = { + val t = Thread.currentThread() + val saved = Actor.selfs.get(t).asInstanceOf[Actor] + Actor.selfs.put(t, a) + try { + a.act() + if (Thread.currentThread().isInterrupted()) + throw new InterruptedException + a.kill() + if (Thread.currentThread().isInterrupted()) + throw new InterruptedException + a.exit("normal") + } + catch { + case _: InterruptedException => + a.exitLinked() + case d: SuspendActorException => + // do nothing (continuation is already saved) + case t: Throwable => + a.exit(t.toString()) + } + finally { + Actor.selfs.put(t, saved) + } + } +} + +/** + * This class represents task items used to execute actions + * specified in arguments of react and + * reactWithin. + * + * @version Beta2 + * @author Philipp Haller + */ +private[actors] class ActorTask(a: Actor, + f: PartialFunction[Any, Unit], + msg: Any) extends Reaction { + def actor = a + + def run(): Unit = { + val t = Thread.currentThread() + val saved = Actor.selfs.get(t).asInstanceOf[Actor] + Actor.selfs.put(t, a) + try { + f(msg) + if (Thread.currentThread().isInterrupted()) + throw new InterruptedException + a.kill() + if (Thread.currentThread().isInterrupted()) + throw new InterruptedException + a.exit("normal") + } + catch { + case _: InterruptedException => + a.exitLinked() + case d: SuspendActorException => + // do nothing (continuation is already saved) + case t: Throwable => + a.exit(t.toString()) + } + finally { + Actor.selfs.put(t, saved) + } + } +} diff --git a/src/actors/scala/actors/Reactor.scala b/src/actors/scala/actors/Reactor.scala index aa16eea7ac..f3569361d3 100644 --- a/src/actors/scala/actors/Reactor.scala +++ b/src/actors/scala/actors/Reactor.scala @@ -17,10 +17,50 @@ package scala.actors * The main ideas of our approach are explained in the paper
* Event-Based Programming without Inversion of Control, Philipp Haller, Martin Odersky Proc. JMLC 2006 * + * @version Beta2 * @author Philipp Haller */ trait Reactor extends Actor { + private[actors] val in = new Channel[Any] + in.receiver = this + + private var rc: Channel[Any] = null + + private[actors] def reply: Channel[Any] = { + if (rc == null) { + rc = new Channel[Any] + rc.receiver = this + } + rc + } + + private[actors] def freshReply(): Unit = { + rc = new Channel[Any] + rc.receiver = this + } + + /** + * The behavior of an actor is specified by implementing this + * abstract method. Note that the preferred way to create actors + * is through the actor and reactor + * methods defined in object Actor. + */ + def act(): Unit + + /** + * Sends msg to this actor (asynchronous). + */ + def !(msg: Any): Unit = in ! msg + + def forward(msg: Any): Unit = in forward msg + + /** + * Sends msg to this actor and awaits reply + * (synchronous). + */ + def !?(msg: Any): Any = in !? msg + private val lastSenders = new scala.collection.mutable.Stack[Actor] private[actors] def sender: Actor = { @@ -54,6 +94,12 @@ trait Reactor extends Actor { throw new SuspendActorException } + private[actors] var suspendActor: () => Unit = _ + private[actors] var suspendActorFor: long => Unit = _ + private[actors] var resumeActor: () => Unit = _ + private[actors] var detachActor: PartialFunction[Any, Unit] => Unit = _ + private[actors] var kill: () => Unit = _ + private[actors] def resetActor(): Unit = { suspendActor = () => wait() suspendActorFor = (msec: long) => wait(msec) @@ -70,14 +116,100 @@ trait Reactor extends Actor { def start(): Unit = Scheduler.execute(new StartTask(this)) + private val links = new HashSet[Actor] + + /** + Links self to actor to. + */ + def link(to: Actor): Actor = { + links += to + to.linkTo(this) + to + } + + /** + Links self to actor defined by body. + */ + def link(body: => Unit): Actor = { + val actor = new Reactor { + def act() = body + } + link(actor) + actor.start() + actor + } + + private[actors] def linkTo(to: Actor): Unit = + links += to + /** - * Terminates this reactor, thereby influencing linked actors - * (see Actor.exit). + Unlinks self from actor from. + */ + def unlink(from: Actor): Unit = { + links -= from + from.unlinkFrom(this) + } + + private[actors] def unlinkFrom(from: Actor): Unit = + links -= from + + var trapExit = false + + private[actors] var exitReason: String = "" + + /** + Terminates execution of self with the following + effect on linked actors: + + For each linked actor a with + trapExit set to true, send message + Exit(self, reason) to a. + + For each linked actor a with + trapExit set to false (default), + call a.exit(reason) if + !reason.equals("normal"). */ def exit(reason: String): Unit = { exitReason = reason Thread.currentThread().interrupt() } + + private[actors] def exit(from: Actor, reason: String): Unit = { + if (from == this) { + exit(reason) + } + else { + if (trapExit) + this ! Exit(from, reason) + else if (!reason.equals("normal")) + exit(reason) + } + } + + private[actors] def exitLinked(): Unit = + exitLinked(exitReason, new HashSet[Actor]) + + private[actors] def exitLinked(reason: String): Unit = + exitLinked(reason, new HashSet[Actor]) + + private[actors] def exitLinked(reason: String, + exitMarks: HashSet[Actor]): Unit = { + if (exitMarks contains this) { + // we are marked, do nothing + } + else { + exitMarks += this // mark this as exiting + // exit linked processes + val iter = links.elements + while (iter.hasNext) { + val linked = iter.next + unlink(linked) + linked.exit(this, reason) + } + exitMarks -= this + } + } } /** @@ -87,6 +219,7 @@ trait Reactor extends Actor { * the different kinds of tasks used for the execution of * Reactors. * + * @version Beta2 * @author Philipp Haller */ private[actors] abstract class Reaction extends Runnable { @@ -97,6 +230,7 @@ private[actors] abstract class Reaction extends Runnable { * This class represents task items used to start the execution * of Reactors. * + * @version Beta2 * @author Philipp Haller */ private[actors] class StartTask(a: Reactor) extends Reaction { @@ -134,6 +268,7 @@ private[actors] class StartTask(a: Reactor) extends Reaction { * specified in arguments of react and * reactWithin. * + * @version Beta2 * @author Philipp Haller */ private[actors] class ActorTask(a: Reactor, diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala index 4899f545d8..4f50248d29 100644 --- a/src/actors/scala/actors/Scheduler.scala +++ b/src/actors/scala/actors/Scheduler.scala @@ -14,9 +14,10 @@ import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue} /** * The Scheduler object is used by - * Reactor to execute tasks of an execution of a + * Actor to execute tasks of an execution of a * reactor. * + * @version Beta2 * @author Philipp Haller */ object Scheduler { @@ -33,7 +34,7 @@ object Scheduler { sched.execute(task) } - def tick(a: Reactor) = sched.tick(a) + def tick(a: Actor) = sched.tick(a) def shutdown(): Unit = sched.shutdown() } @@ -42,17 +43,18 @@ object Scheduler { * This abstract class provides a common interface for all * schedulers used to execute reactors. * + * @version Beta2 * @author Philipp Haller */ abstract class IScheduler { def execute(task: Reaction): Unit def getTask(worker: WorkerThread): Runnable - def tick(a: Reactor): Unit + def tick(a: Actor): Unit def shutdown(): Unit val QUIT_TASK = new Reaction() { - def actor: Reactor = null + def actor: Actor = null def run(): Unit = {} override def toString() = "QUIT_TASK" } @@ -62,6 +64,7 @@ abstract class IScheduler { * This scheduler executes the tasks of a reactor on a single * thread (the current thread). * + * @version Beta2 * @author Philipp Haller */ class SingleThreadedScheduler extends IScheduler { @@ -72,7 +75,7 @@ class SingleThreadedScheduler extends IScheduler { def getTask(worker: WorkerThread): Runnable = { null } - def tick(a: Reactor): Unit = {} + def tick(a: Actor): Unit = {} def shutdown(): Unit = {} } @@ -81,6 +84,7 @@ class SingleThreadedScheduler extends IScheduler { * This scheduler creates additional threads whenever there is no * idle thread available. * + * @version Beta2 * @author Philipp Haller */ class SpareWorkerScheduler extends IScheduler { @@ -126,7 +130,7 @@ class SpareWorkerScheduler extends IScheduler { } } - def tick(a: Reactor): Unit = {} + def tick(a: Actor): Unit = {} def shutdown(): Unit = synchronized { terminating = true @@ -231,7 +235,7 @@ class TickedScheduler extends IScheduler { var ticksCnt = 0 - def tick(a: Reactor): unit = synchronized { + def tick(a: Actor): unit = synchronized { ticksCnt = ticksCnt + 1 executing.get(a) match { case None => // thread outside of scheduler; error("No worker thread associated with actor " + a) @@ -270,6 +274,9 @@ class QuitException extends Throwable { * The class WorkerThread is used by schedulers to execute * reactor tasks on multiple threads. * + * TODO: put proof of deadlock-freedom here! + * + * @version Beta2 * @author Philipp Haller */ class WorkerThread(sched: IScheduler) extends Thread { diff --git a/src/actors/scala/actors/TimerThread.scala b/src/actors/scala/actors/TimerThread.scala index 60bc2240f6..3c401292cf 100644 --- a/src/actors/scala/actors/TimerThread.scala +++ b/src/actors/scala/actors/TimerThread.scala @@ -16,13 +16,14 @@ package scala.actors * Note that the library deletes non-received TIMEOUT message if a * message is received before the time-out occurs. * + * @version Beta2 * @author Sebastien Noir * @author Philipp Haller */ object TimerThread extends AnyRef with Runnable { - case class WakedActor(actor: Reactor, f: PartialFunction[Any, Unit], time: long) + case class WakedActor(actor: Actor, f: PartialFunction[Any, Unit], time: long) extends Ordered[WakedActor] { var valid = true def compare(that: WakedActor): int = -(this.time compare that.time) @@ -33,7 +34,7 @@ object TimerThread extends AnyRef with Runnable { var lateList: List[WakedActor] = Nil - def trashRequest(a: Reactor) = synchronized { + def trashRequest(a: Actor) = synchronized { // keep in mind: killing dead people is a bad idea! queue.elements.find((wa: WakedActor) => wa.actor == a && wa.valid) match { case Some(b) => @@ -74,7 +75,7 @@ object TimerThread extends AnyRef with Runnable { } } - def requestTimeout(a: Reactor, f: PartialFunction[Any, Unit], waitMillis: long): unit = synchronized { + def requestTimeout(a: Actor, f: PartialFunction[Any, Unit], waitMillis: long): unit = synchronized { val wakeTime = now + waitMillis if (waitMillis <= 0) { a.continuation = null -- cgit v1.2.3