diff options
author | Philipp Haller <hallerp@gmail.com> | 2007-01-10 10:04:32 +0000 |
---|---|---|
committer | Philipp Haller <hallerp@gmail.com> | 2007-01-10 10:04:32 +0000 |
commit | 1828ef43100374982b6313ae94b885616ccf85a1 (patch) | |
tree | 1c10ffd31919f1acf4c4979bf3f8648f2cc79785 /src/actors | |
parent | f75cbd338f81a00ab9696fb0482fd561ce1a0826 (diff) | |
download | scala-1828ef43100374982b6313ae94b885616ccf85a1.tar.gz scala-1828ef43100374982b6313ae94b885616ccf85a1.tar.bz2 scala-1828ef43100374982b6313ae94b885616ccf85a1.zip |
Fixed bug in andThen.
Diffstat (limited to 'src/actors')
-rw-r--r-- | src/actors/scala/actors/Actor.scala | 37 | ||||
-rw-r--r-- | src/actors/scala/actors/ActorProxy.scala | 6 | ||||
-rw-r--r-- | src/actors/scala/actors/Channel.scala | 153 | ||||
-rw-r--r-- | src/actors/scala/actors/Reaction.scala | 29 | ||||
-rw-r--r-- | src/actors/scala/actors/Scheduler.scala | 15 |
5 files changed, 171 insertions, 69 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala index 3e8bf190c3..478466f916 100644 --- a/src/actors/scala/actors/Actor.scala +++ b/src/actors/scala/actors/Actor.scala @@ -171,12 +171,12 @@ object Actor { private[actors] trait Body[a] { def orElse[b >: a](other: => b): b - def andThen[b >: a](other: => b): b + def andThen[b](other: => b): Nothing } implicit def mkBody[a](body: => a) = new Body[a] { def orElse[b >: a](other: => b): b = choose(body, other) - def andThen[b >: a](other: => b): b = seq(body, other) + def andThen[b](other: => b): Nothing = seq(body, other) } private[actors] def choose[a, b >: a](alt1: => a, alt2: => b): b = { @@ -223,6 +223,7 @@ object Actor { val s = self s.kill = () => { body; s.kill() } body + exit("normal") } /** @@ -232,11 +233,12 @@ object Actor { * @param first ... * @param next ... */ - def seq[a, b >: a](first: => a, next: => b): b = { + def seq[a, b](first: => a, next: => b): Nothing = { val s = self val killNext = s.kill s.kill = () => { s.kill = killNext; next; s.kill() } first + exit("normal") } /** @@ -279,7 +281,7 @@ object Actor { * <code>!reason.equals("normal")</code>. * </p> */ - def exit(reason: String): Unit = self.exit(reason) + def exit(reason: String): Nothing = self.exit(reason) } case class Request[a](msg: a) { @@ -401,7 +403,7 @@ trait Actor extends OutputChannel[Any] { try { wait() } catch { - case t: InterruptedException => + case _: InterruptedException => } } } @@ -418,7 +420,7 @@ trait Actor extends OutputChannel[Any] { fromExc = false wait(waittime) } catch { - case t: InterruptedException => { + case _: InterruptedException => { fromExc = true val now = Platform.currentTime val waited = now-ts @@ -450,23 +452,6 @@ trait Actor extends OutputChannel[Any] { def start(): Unit = Scheduler start new Reaction(this) - - /* - * Debugging support. - */ - private[actors] var name = "" - - private var childCnt = 0 - - private[actors] def nextChildName = { - val s = childCnt + name - childCnt = childCnt + 1 - s - } - - private[actors] def setName(n: String) = - name = n - private val links = new HashSet[Actor] /** @@ -528,9 +513,11 @@ trait Actor extends OutputChannel[Any] { * <code>!reason.equals("normal")</code>. * </p> */ - def exit(reason: String): Unit = { + def exit(reason: String): Nothing = { + kill() exitReason = reason - currentThread.interrupt() + //currentThread.interrupt() + throw new ExitActorException } private[actors] def exit(from: Actor, reason: String): Unit = { diff --git a/src/actors/scala/actors/ActorProxy.scala b/src/actors/scala/actors/ActorProxy.scala index 2cfd12cb0a..0cdbcde587 100644 --- a/src/actors/scala/actors/ActorProxy.scala +++ b/src/actors/scala/actors/ActorProxy.scala @@ -46,8 +46,10 @@ private[actors] class ActorProxy(t: Thread) extends Actor { * * @param reason the exit reason of the interrupted thread. */ - override def exit(reason: String): Unit = { + override def exit(reason: String): Nothing = { + kill() exitReason = reason - t.interrupt() + exitLinked() + throw new InterruptedException } } diff --git a/src/actors/scala/actors/Channel.scala b/src/actors/scala/actors/Channel.scala index cae5f3d1b9..5b5318f615 100644 --- a/src/actors/scala/actors/Channel.scala +++ b/src/actors/scala/actors/Channel.scala @@ -47,9 +47,55 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] { private[actors] var isSuspended = false + type WaitingPart = Tuple4[Msg=>boolean, // waitingFor + Option[Actor], // sender + Option[PartialFunction[Any, Unit]], + Option[Actor]] + var waitingParts: List[WaitingPart] = Nil + + var waitingPart: WaitingPart + + def waitingFor(msg: Msg, sender: Actor): boolean = { + waitingPart = waitingParts.find(p => p._1(msg) && + (if (!p._2.isEmpty) p._2.get == sender + else true)) + } + + def waitForNothing() = + waitingParts = Nil + + def resumeWaitingPart(msg: Msg, sender: Actor) = { + receiver.pushSender(sender) + waitingPart._3 match { + case Some(cont) => receiver.scheduleActor(cont, msg) + case None => // resume blocked thread + received = Some(msg) + p._4.get.resumeActor() + } + } + //private val messageQueue = new MessageQueue[Msg] private val mailbox = new scala.collection.mutable.Queue[Pair[Msg, Actor]] +/* + private def send(msg: Msg, sender: Actor) = receiver.synchronized { + receiver.tick() + if (waitingFor(msg, sender)) { + waitForNothing() + + if (receiver.timeoutPending) { + receiver.timeoutPending = false + TimerThread.trashRequest(receiver) + } + + resumeWaitingPart(msg, sender) + } else { + //messageQueue.append(msg, sender) + mailbox += Pair(msg, sender) + } + } +*/ + private def send(msg: Msg, sender: Actor) = receiver.synchronized { receiver.tick() if (waitingFor(msg) && ((waitingForSender eq null) || @@ -99,6 +145,16 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] { } } + def !?(msec: long, msg: Msg): Option[Any] = { + Debug.info("rpc with timeout "+msec) + Actor.self.freshReply() + this ! msg + Actor.self.reply.receiveWithinFrom(msec)(receiver) { + case TIMEOUT => None + case x => Some(x) + } + } + /** * Forwards <code>msg</code> to <code>this</code> keeping the * last sender as sender instead of <code>self</code>. @@ -112,7 +168,7 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] { assert(Actor.self == receiver, "receive from channel belonging to other actor") receiver.synchronized { receiver.tick() - waitingFor = f.isDefinedAt + /* val q = messageQueue.extractFirst(waitingFor) if (q != null) { @@ -136,14 +192,17 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] { case None => { // acquire lock because we might call wait() this.synchronized { + waitingFor = f.isDefinedAt isSuspended = true + //val wp = Tuple4(f.isDefinedAt, None, None, Some(receiver)) + //waitingParts = wp :: waitingParts receiver.suspendActor() } } } - isSuspended = false waitingFor = waitingForNone + isSuspended = false } val result = f(received.get) receiver.popSender() @@ -154,8 +213,7 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] { assert(Actor.self == receiver, "receive from channel belonging to other actor") receiver.synchronized { receiver.tick() - waitingFor = f.isDefinedAt - waitingForSender = r + /* var q = messageQueue.dequeueFirst((item: MessageQueueResult[Msg]) => { waitingFor(item.msg) && item.sender == r @@ -180,15 +238,17 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] { case None => { // acquire lock because we might call wait() this.synchronized { + waitingFor = f.isDefinedAt + waitingForSender = r isSuspended = true receiver.suspendActor() } } } - isSuspended = false waitingFor = waitingForNone waitingForSender = null + isSuspended = false } val result = f(received.get) receiver.popSender() @@ -205,7 +265,7 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] { assert(Actor.self == receiver, "receive from channel belonging to other actor") receiver.synchronized { receiver.tick() - waitingFor = f.isDefinedAt + /* val q = messageQueue.extractFirst(waitingFor) if (q != null) { @@ -236,6 +296,7 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] { case None => { // acquire lock because we might call wait() this.synchronized { + waitingFor = f.isDefinedAt isSuspended = true received = None receiver.suspendActorFor(msec) @@ -255,8 +316,75 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] { } } + waitingFor = waitingForNone isSuspended = false + } + val result = f(received.get) + receiver.popSender() + result + } + + def receiveWithinFrom[R](msec: long)(r: Actor)(f: PartialFunction[Any, R]): R = { + assert(Actor.self == receiver, "receive from channel belonging to other actor") + receiver.synchronized { + receiver.tick() + +/* + val q = messageQueue.extractFirst(waitingFor) + if (q != null) { + received = q.msg + receiver.pushSender(q.sender) + } + else synchronized { + waitingFor = f.isDefinedAt + waitingForSender = r + isSuspended = true + receiver.suspendActorFor(msec) + if (received eq null) + if (f.isDefinedAt(TIMEOUT)) { + isSuspended = false + val result = f(TIMEOUT) + return result + } + else + error("unhandled timeout") + } +*/ + + mailbox.dequeueFirst((p: Pair[Msg, Actor]) => { + waitingFor(p._1) && p._2 == r + }) match { + case Some(Pair(msg, sender)) => { + received = Some(msg) + receiver.pushSender(sender) + } + case None => { + // acquire lock because we might call wait() + this.synchronized { + waitingFor = f.isDefinedAt + waitingForSender = r + isSuspended = true + received = None + receiver.suspendActorFor(msec) + Debug.info("received: "+received) + if (received.isEmpty) { + Debug.info("no message received after "+msec+" millis") + if (f.isDefinedAt(TIMEOUT)) { + Debug.info("executing TIMEOUT action") + isSuspended = false + val result = f(TIMEOUT) + return result + } + else + error("unhandled timeout") + } + } + } + } + waitingFor = waitingForNone + waitingForSender = null + isSuspended = false } val result = f(received.get) receiver.popSender() @@ -271,7 +399,7 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] { Scheduler.pendReaction receiver.synchronized { receiver.tick() - waitingFor = f.isDefinedAt + /* val q = messageQueue.extractFirst(waitingFor) if (q != null) { @@ -289,14 +417,15 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] { waitingFor(p._1) }) match { case Some(Pair(msg, sender)) => { - received = Some(msg) receiver.pushSender(sender) - waitingFor = waitingForNone - receiver.scheduleActor(f, received.get) + receiver.scheduleActor(f, msg) } case None => { this.synchronized { //Scheduler.detached(receiver) + waitingFor = f.isDefinedAt + //val wp = Tuple4(f.isDefinedAt, None, Some(f), None) + //waitingParts = wp :: waitingParts receiver.detachActor(f) } } @@ -314,7 +443,7 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] { Scheduler.pendReaction receiver.synchronized { receiver.tick() - waitingFor = f.isDefinedAt + /* val q = messageQueue.extractFirst(waitingFor) if (q != null) { @@ -324,6 +453,7 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] { receiver.scheduleActor(f, received) } else synchronized { + waitingFor = f.isDefinedAt TimerThread.requestTimeout(receiver.asInstanceOf[Reactor], f, msec) receiver.asInstanceOf[Reactor].timeoutPending = true receiver.detachActor(f) @@ -341,6 +471,7 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] { } case None => { this.synchronized { + waitingFor = f.isDefinedAt TimerThread.requestTimeout(receiver, f, msec) receiver.timeoutPending = true receiver.detachActor(f) diff --git a/src/actors/scala/actors/Reaction.scala b/src/actors/scala/actors/Reaction.scala index 06410ff899..107b5e7847 100644 --- a/src/actors/scala/actors/Reaction.scala +++ b/src/actors/scala/actors/Reaction.scala @@ -14,6 +14,8 @@ package scala.actors import java.lang.{InterruptedException, Runnable} import java.util.logging.{Level, Logger} +class ExitActorException extends Throwable + /** * The abstract class <code>Reaction</code> associates * an instance of an <code>Actor</code> with a @@ -52,23 +54,16 @@ private[actors] class Reaction(a: Actor, Scheduler.unPendReaction a.isDetached = false try { - if (f == null) - a.act() - else - f(msg) - - if (currentThread.isInterrupted()) - throw new InterruptedException - - a.kill() - - if (currentThread.isInterrupted()) - throw new InterruptedException - - a.exit("normal") - - if (currentThread.isInterrupted()) - throw new InterruptedException + try { + if (f == null) + a.act() + else + f(msg) + a.exit("normal") + } catch { + case _: ExitActorException => + throw new InterruptedException + } } catch { case ie: InterruptedException => { diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala index b6696f2793..a4b5eb3368 100644 --- a/src/actors/scala/actors/Scheduler.scala +++ b/src/actors/scala/actors/Scheduler.scala @@ -121,7 +121,7 @@ class TickedScheduler extends Thread with IScheduler { def printActorDump { var num = 0 for (val a <- alive.elements) { - Console.println("Actor `"+a.name+"' ("+num+"): "+a) + Console.println("Actor ("+num+"): "+a) if (a.isDetached) Console.println("Detached") else { @@ -143,19 +143,6 @@ class TickedScheduler extends Thread with IScheduler { def start(task: Reaction): unit = synchronized { Debug.info("Starting " + task.actor) alive += task.actor - - // determine name of actor - val creator = Actor.self - if (creator.isInstanceOf[ActorProxy]) { - // created by Java thread - // only ok, if it is the main thread - val tname = currentThread.toString() - if (tname.indexOf("main") == -1) { - // print/log warning - Console.println("Warning: Some debugging features not available if actors are created by non-main Java threads.") - } else task.actor.name = creator.nextChildName - } else task.actor.name = creator.nextChildName - execute(task) } |