diff options
author | Philipp Haller <hallerp@gmail.com> | 2010-04-29 08:51:32 +0000 |
---|---|---|
committer | Philipp Haller <hallerp@gmail.com> | 2010-04-29 08:51:32 +0000 |
commit | 8f2d31cbcdd9a99195a554d9c9063a79c38b9444 (patch) | |
tree | 1c61dae9b841cc57fcd7dfa8fdb52be34d1946c2 | |
parent | 54e1e31679f4e5ca78238a092b0b857190e8bfde (diff) | |
download | scala-8f2d31cbcdd9a99195a554d9c9063a79c38b9444.tar.gz scala-8f2d31cbcdd9a99195a554d9c9063a79c38b9444.tar.bz2 scala-8f2d31cbcdd9a99195a554d9c9063a79c38b9444.zip |
Closes #3369. Review by plocinic.
-rw-r--r-- | src/actors/scala/actors/Actor.scala | 39 |
1 files changed, 39 insertions, 0 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala index baf4c3d92e..4e245f27cf 100644 --- a/src/actors/scala/actors/Actor.scala +++ b/src/actors/scala/actors/Actor.scala @@ -408,6 +408,43 @@ trait Actor extends AbstractActor with ReplyReactor with ActorCanReply with Inpu } } else super.startSearch(msg, replyTo, handler) + // we override this method to check `shouldExit` before suspending + private[actors] override def searchMailbox(startMbox: MQueue[Any], + handler: PartialFunction[Any, Any], + resumeOnSameThread: Boolean) { + var tmpMbox = startMbox + var done = false + while (!done) { + val qel = tmpMbox.extractFirst((msg: Any, replyTo: OutputChannel[Any]) => { + senders = List(replyTo) + handler.isDefinedAt(msg) + }) + if (tmpMbox ne mailbox) + tmpMbox.foreach((m, s) => mailbox.append(m, s)) + if (null eq qel) { + synchronized { + // in mean time new stuff might have arrived + if (!sendBuffer.isEmpty) { + tmpMbox = new MQueue[Any]("Temp") + drainSendBuffer(tmpMbox) + // keep going + } else { + // very important to check for `shouldExit` at this point + // since linked actors might have set it after we checked + // last time (e.g., at the beginning of `react`) + if (shouldExit) exit() + waitingFor = handler + // see Reactor.searchMailbox + throw Actor.suspendException + } + } + } else { + resumeReceiver((qel.msg, qel.session), handler, resumeOnSameThread) + done = true + } + } + } + private[actors] override def makeReaction(fun: () => Unit, handler: PartialFunction[Any, Any], msg: Any): Runnable = new ActorTask(this, fun, handler, msg) @@ -754,6 +791,8 @@ trait Actor extends AbstractActor with ReplyReactor with ActorCanReply with Inpu if (isSuspended) resumeActor() else if (waitingFor ne Reactor.waitingForNone) { + waitingFor = Reactor.waitingForNone + // it doesn't matter what partial function we are passing here scheduleActor(waitingFor, null) /* Here we should not throw a SuspendActorControl, since the current method is called from an actor that |