summaryrefslogtreecommitdiff
path: root/src/actors
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2009-10-26 17:21:52 +0000
committerPhilipp Haller <hallerp@gmail.com>2009-10-26 17:21:52 +0000
commit0b16c1266296e93339f56054f1b5e3f5908d3e55 (patch)
treef8c744d294678cc75f87eb13e2ffbcfd83a13851 /src/actors
parent180c140953263e11b3656652d6a2ac22c5c327d6 (diff)
downloadscala-0b16c1266296e93339f56054f1b5e3f5908d3e55.tar.gz
scala-0b16c1266296e93339f56054f1b5e3f5908d3e55.tar.bz2
scala-0b16c1266296e93339f56054f1b5e3f5908d3e55.zip
First half of fix for #1518.
Diffstat (limited to 'src/actors')
-rw-r--r--src/actors/scala/actors/Actor.scala10
-rw-r--r--src/actors/scala/actors/MessageQueue.scala18
-rw-r--r--src/actors/scala/actors/Reactor.scala2
-rw-r--r--src/actors/scala/actors/ReplyReactor.scala32
4 files changed, 47 insertions, 15 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala
index 2adf3f5ffd..553019932d 100644
--- a/src/actors/scala/actors/Actor.scala
+++ b/src/actors/scala/actors/Actor.scala
@@ -439,7 +439,7 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor {
var done = false
while (!done) {
- val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m))
+ val qel = mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => f.isDefinedAt(m))
if (null eq qel) {
synchronized {
// in mean time new stuff might have arrived
@@ -484,7 +484,7 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor {
}
// first, remove spurious TIMEOUT message from mailbox if any
- mailbox.extractFirst((m: Any) => m == TIMEOUT)
+ mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => m == TIMEOUT)
val receiveTimeout = () => {
if (f.isDefinedAt(TIMEOUT)) {
@@ -496,7 +496,7 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor {
var done = false
while (!done) {
- val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m))
+ val qel = mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => f.isDefinedAt(m))
if (null eq qel) {
val todo = synchronized {
// in mean time new stuff might have arrived
@@ -580,7 +580,7 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor {
}
// first, remove spurious TIMEOUT message from mailbox if any
- mailbox.extractFirst((m: Any) => m == TIMEOUT)
+ mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => m == TIMEOUT)
val receiveTimeout = () => {
if (f.isDefinedAt(TIMEOUT)) {
@@ -592,7 +592,7 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor {
var done = false
while (!done) {
- val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m))
+ val qel = mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => f.isDefinedAt(m))
if (null eq qel) {
val todo = synchronized {
// in mean time new stuff might have arrived
diff --git a/src/actors/scala/actors/MessageQueue.scala b/src/actors/scala/actors/MessageQueue.scala
index 540a992640..469b24c1c1 100644
--- a/src/actors/scala/actors/MessageQueue.scala
+++ b/src/actors/scala/actors/MessageQueue.scala
@@ -92,30 +92,30 @@ class MessageQueue(protected val label: String) {
/** Removes the n-th message that satisfies the predicate <code>p</code>.
*/
- def remove(n: Int)(p: Any => Boolean): Option[(Any, OutputChannel[Any])] =
+ def remove(n: Int)(p: (Any, OutputChannel[Any]) => Boolean): Option[(Any, OutputChannel[Any])] =
removeInternal(n)(p) map (x => (x.msg, x.session))
/** Extracts the first message that satisfies the predicate <code>p</code>
* or <code>null</code> if <code>p</code> fails for all of them.
*/
- def extractFirst(p: Any => Boolean): MessageQueueElement =
+ def extractFirst(p: (Any, OutputChannel[Any]) => Boolean): MessageQueueElement =
removeInternal(0)(p) orNull
- private def removeInternal(n: Int)(p: Any => Boolean): Option[MessageQueueElement] = {
+ private def removeInternal(n: Int)(p: (Any, OutputChannel[Any]) => Boolean): Option[MessageQueueElement] = {
var pos = 0
def foundMsg(x: MessageQueueElement) = {
changeSize(-1)
Some(x)
}
- def test(msg: Any): Boolean =
- p(msg) && (pos == n || { pos += 1 ; false })
+ def test(msg: Any, session: OutputChannel[Any]): Boolean =
+ p(msg, session) && (pos == n || { pos += 1 ; false })
if (isEmpty) // early return
return None
// special handling if returning the head
- if (test(first.msg)) {
+ if (test(first.msg, first.session)) {
val res = first
first = first.next
if (res eq last)
@@ -128,7 +128,7 @@ class MessageQueue(protected val label: String) {
var prev = first
while (curr != null) {
- if (test(curr.msg)) {
+ if (test(curr.msg, curr.session)) {
prev.next = curr.next
if (curr eq last)
last = prev
@@ -161,12 +161,12 @@ private[actors] trait MessageQueueTracer extends MessageQueue
printQueue("GET %s" format res)
res
}
- override def remove(n: Int)(p: Any => Boolean): Option[(Any, OutputChannel[Any])] = {
+ override def remove(n: Int)(p: (Any, OutputChannel[Any]) => Boolean): Option[(Any, OutputChannel[Any])] = {
val res = super.remove(n)(p)
printQueue("REMOVE %s" format res)
res
}
- override def extractFirst(p: Any => Boolean): MessageQueueElement = {
+ override def extractFirst(p: (Any, OutputChannel[Any]) => Boolean): MessageQueueElement = {
val res = super.extractFirst(p)
printQueue("EXTRACT_FIRST %s" format res)
res
diff --git a/src/actors/scala/actors/Reactor.scala b/src/actors/scala/actors/Reactor.scala
index 4a1f5ac051..7fda8678c1 100644
--- a/src/actors/scala/actors/Reactor.scala
+++ b/src/actors/scala/actors/Reactor.scala
@@ -135,7 +135,7 @@ trait Reactor extends OutputChannel[Any] {
var tmpMbox = startMbox
var done = false
while (!done) {
- val qel = tmpMbox.extractFirst(handlesMessage)
+ val qel = tmpMbox.extractFirst((msg: Any, replyTo: OutputChannel[Any]) => handlesMessage(msg))
if (tmpMbox ne mailbox)
tmpMbox.foreach((m, s) => mailbox.append(m, s))
if (null eq qel) {
diff --git a/src/actors/scala/actors/ReplyReactor.scala b/src/actors/scala/actors/ReplyReactor.scala
index 762ceef4d6..03c97ebdae 100644
--- a/src/actors/scala/actors/ReplyReactor.scala
+++ b/src/actors/scala/actors/ReplyReactor.scala
@@ -61,4 +61,36 @@ trait ReplyReactor extends Reactor with ReplyableReactor {
scheduleActor(continuation, item._1)
}
+ // assume continuation != null
+ private[actors] override def searchMailbox(startMbox: MessageQueue,
+ handlesMessage: Any => Boolean,
+ resumeOnSameThread: Boolean) {
+ var tmpMbox = startMbox
+ var done = false
+ while (!done) {
+ val qel = tmpMbox.extractFirst((msg: Any, replyTo: OutputChannel[Any]) => {
+ senders = List(replyTo)
+ handlesMessage(msg)
+ })
+ if (tmpMbox ne mailbox)
+ tmpMbox.foreach((m, s) => mailbox.append(m, s))
+ if (null eq qel) {
+ synchronized {
+ // in mean time new stuff might have arrived
+ if (!sendBuffer.isEmpty) {
+ tmpMbox = new MessageQueue("Temp")
+ drainSendBuffer(tmpMbox)
+ // keep going
+ } else {
+ waitingFor = handlesMessage
+ done = true
+ }
+ }
+ } else {
+ resumeReceiver((qel.msg, qel.session), resumeOnSameThread)
+ done = true
+ }
+ }
+ }
+
}