diff options
author | Philipp Haller <hallerp@gmail.com> | 2009-10-26 17:21:52 +0000 |
---|---|---|
committer | Philipp Haller <hallerp@gmail.com> | 2009-10-26 17:21:52 +0000 |
commit | 0b16c1266296e93339f56054f1b5e3f5908d3e55 (patch) | |
tree | f8c744d294678cc75f87eb13e2ffbcfd83a13851 /src/actors | |
parent | 180c140953263e11b3656652d6a2ac22c5c327d6 (diff) | |
download | scala-0b16c1266296e93339f56054f1b5e3f5908d3e55.tar.gz scala-0b16c1266296e93339f56054f1b5e3f5908d3e55.tar.bz2 scala-0b16c1266296e93339f56054f1b5e3f5908d3e55.zip |
First half of fix for #1518.
Diffstat (limited to 'src/actors')
-rw-r--r-- | src/actors/scala/actors/Actor.scala | 10 | ||||
-rw-r--r-- | src/actors/scala/actors/MessageQueue.scala | 18 | ||||
-rw-r--r-- | src/actors/scala/actors/Reactor.scala | 2 | ||||
-rw-r--r-- | src/actors/scala/actors/ReplyReactor.scala | 32 |
4 files changed, 47 insertions, 15 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala index 2adf3f5ffd..553019932d 100644 --- a/src/actors/scala/actors/Actor.scala +++ b/src/actors/scala/actors/Actor.scala @@ -439,7 +439,7 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor { var done = false while (!done) { - val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m)) + val qel = mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => f.isDefinedAt(m)) if (null eq qel) { synchronized { // in mean time new stuff might have arrived @@ -484,7 +484,7 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor { } // first, remove spurious TIMEOUT message from mailbox if any - mailbox.extractFirst((m: Any) => m == TIMEOUT) + mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => m == TIMEOUT) val receiveTimeout = () => { if (f.isDefinedAt(TIMEOUT)) { @@ -496,7 +496,7 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor { var done = false while (!done) { - val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m)) + val qel = mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => f.isDefinedAt(m)) if (null eq qel) { val todo = synchronized { // in mean time new stuff might have arrived @@ -580,7 +580,7 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor { } // first, remove spurious TIMEOUT message from mailbox if any - mailbox.extractFirst((m: Any) => m == TIMEOUT) + mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => m == TIMEOUT) val receiveTimeout = () => { if (f.isDefinedAt(TIMEOUT)) { @@ -592,7 +592,7 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor { var done = false while (!done) { - val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m)) + val qel = mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => f.isDefinedAt(m)) if (null eq qel) { val todo = synchronized { // in mean time new stuff might have arrived diff --git a/src/actors/scala/actors/MessageQueue.scala b/src/actors/scala/actors/MessageQueue.scala index 540a992640..469b24c1c1 100644 --- a/src/actors/scala/actors/MessageQueue.scala +++ b/src/actors/scala/actors/MessageQueue.scala @@ -92,30 +92,30 @@ class MessageQueue(protected val label: String) { /** Removes the n-th message that satisfies the predicate <code>p</code>. */ - def remove(n: Int)(p: Any => Boolean): Option[(Any, OutputChannel[Any])] = + def remove(n: Int)(p: (Any, OutputChannel[Any]) => Boolean): Option[(Any, OutputChannel[Any])] = removeInternal(n)(p) map (x => (x.msg, x.session)) /** Extracts the first message that satisfies the predicate <code>p</code> * or <code>null</code> if <code>p</code> fails for all of them. */ - def extractFirst(p: Any => Boolean): MessageQueueElement = + def extractFirst(p: (Any, OutputChannel[Any]) => Boolean): MessageQueueElement = removeInternal(0)(p) orNull - private def removeInternal(n: Int)(p: Any => Boolean): Option[MessageQueueElement] = { + private def removeInternal(n: Int)(p: (Any, OutputChannel[Any]) => Boolean): Option[MessageQueueElement] = { var pos = 0 def foundMsg(x: MessageQueueElement) = { changeSize(-1) Some(x) } - def test(msg: Any): Boolean = - p(msg) && (pos == n || { pos += 1 ; false }) + def test(msg: Any, session: OutputChannel[Any]): Boolean = + p(msg, session) && (pos == n || { pos += 1 ; false }) if (isEmpty) // early return return None // special handling if returning the head - if (test(first.msg)) { + if (test(first.msg, first.session)) { val res = first first = first.next if (res eq last) @@ -128,7 +128,7 @@ class MessageQueue(protected val label: String) { var prev = first while (curr != null) { - if (test(curr.msg)) { + if (test(curr.msg, curr.session)) { prev.next = curr.next if (curr eq last) last = prev @@ -161,12 +161,12 @@ private[actors] trait MessageQueueTracer extends MessageQueue printQueue("GET %s" format res) res } - override def remove(n: Int)(p: Any => Boolean): Option[(Any, OutputChannel[Any])] = { + override def remove(n: Int)(p: (Any, OutputChannel[Any]) => Boolean): Option[(Any, OutputChannel[Any])] = { val res = super.remove(n)(p) printQueue("REMOVE %s" format res) res } - override def extractFirst(p: Any => Boolean): MessageQueueElement = { + override def extractFirst(p: (Any, OutputChannel[Any]) => Boolean): MessageQueueElement = { val res = super.extractFirst(p) printQueue("EXTRACT_FIRST %s" format res) res diff --git a/src/actors/scala/actors/Reactor.scala b/src/actors/scala/actors/Reactor.scala index 4a1f5ac051..7fda8678c1 100644 --- a/src/actors/scala/actors/Reactor.scala +++ b/src/actors/scala/actors/Reactor.scala @@ -135,7 +135,7 @@ trait Reactor extends OutputChannel[Any] { var tmpMbox = startMbox var done = false while (!done) { - val qel = tmpMbox.extractFirst(handlesMessage) + val qel = tmpMbox.extractFirst((msg: Any, replyTo: OutputChannel[Any]) => handlesMessage(msg)) if (tmpMbox ne mailbox) tmpMbox.foreach((m, s) => mailbox.append(m, s)) if (null eq qel) { diff --git a/src/actors/scala/actors/ReplyReactor.scala b/src/actors/scala/actors/ReplyReactor.scala index 762ceef4d6..03c97ebdae 100644 --- a/src/actors/scala/actors/ReplyReactor.scala +++ b/src/actors/scala/actors/ReplyReactor.scala @@ -61,4 +61,36 @@ trait ReplyReactor extends Reactor with ReplyableReactor { scheduleActor(continuation, item._1) } + // assume continuation != null + private[actors] override def searchMailbox(startMbox: MessageQueue, + handlesMessage: Any => Boolean, + resumeOnSameThread: Boolean) { + var tmpMbox = startMbox + var done = false + while (!done) { + val qel = tmpMbox.extractFirst((msg: Any, replyTo: OutputChannel[Any]) => { + senders = List(replyTo) + handlesMessage(msg) + }) + if (tmpMbox ne mailbox) + tmpMbox.foreach((m, s) => mailbox.append(m, s)) + if (null eq qel) { + synchronized { + // in mean time new stuff might have arrived + if (!sendBuffer.isEmpty) { + tmpMbox = new MessageQueue("Temp") + drainSendBuffer(tmpMbox) + // keep going + } else { + waitingFor = handlesMessage + done = true + } + } + } else { + resumeReceiver((qel.msg, qel.session), resumeOnSameThread) + done = true + } + } + } + } |