diff options
author | Paul Phillips <paulp@improving.org> | 2009-09-02 19:05:18 +0000 |
---|---|---|
committer | Paul Phillips <paulp@improving.org> | 2009-09-02 19:05:18 +0000 |
commit | 6ac283c5e4a75601fd4ddba6028977952f5d3eed (patch) | |
tree | bdb15fc467cff56918cb8ef9774fc672976636e2 /src/actors | |
parent | 68c9e7c9249c61ee686ffaafe54b4baa27aa9679 (diff) | |
download | scala-6ac283c5e4a75601fd4ddba6028977952f5d3eed.tar.gz scala-6ac283c5e4a75601fd4ddba6028977952f5d3eed.tar.bz2 scala-6ac283c5e4a75601fd4ddba6028977952f5d3eed.zip |
Rewrite of the actor MessageQueue, adding a tra...
Rewrite of the actor MessageQueue, adding a tracing facility.
Diffstat (limited to 'src/actors')
-rw-r--r-- | src/actors/scala/actors/MessageQueue.scala | 202 | ||||
-rw-r--r-- | src/actors/scala/actors/Reactor.scala | 6 |
2 files changed, 85 insertions, 123 deletions
diff --git a/src/actors/scala/actors/MessageQueue.scala b/src/actors/scala/actors/MessageQueue.scala index d83b6e3eeb..4b55fab21d 100644 --- a/src/actors/scala/actors/MessageQueue.scala +++ b/src/actors/scala/actors/MessageQueue.scala @@ -18,10 +18,18 @@ package scala.actors * @author Philipp Haller */ @serializable -class MessageQueueElement { - var msg: Any = _ - var session: OutputChannel[Any] = null - var next: MessageQueueElement = null +class MessageQueueElement(var msg: Any, var session: OutputChannel[Any], var next: MessageQueueElement) { + def this() = this(null, null, null) + def this(msg: Any, session: OutputChannel[Any]) = this(msg, session, null) +} + +object MessageQueue { + // for tracing purposes + private var queueNumberAssigner = 0 + private def getQueueNumber = synchronized { + queueNumberAssigner += 1 + queueNumberAssigner + } } /** @@ -34,37 +42,33 @@ class MessageQueueElement { * @author Philipp Haller */ @serializable -class MessageQueue { - var first: MessageQueueElement = null - // last == null iff list empty - var last: MessageQueueElement = null - - def isEmpty = null eq last +class MessageQueue(label: String) { + def this() = this("Unlabelled") + private val queueNumber = MessageQueue.getQueueNumber + private var trace = false // set to true to print out all appends/removes + private var first: MessageQueueElement = null + private var last: MessageQueueElement = null // last eq null iff list is empty private var _size = 0 + def size = _size + final def isEmpty = last eq null protected def changeSize(diff: Int) = { _size += diff } - def append(msg: Any, session: OutputChannel[Any]) = { + def append(msg: Any, session: OutputChannel[Any]) { changeSize(1) // size always increases by 1 + if (trace) + printQueue("APPEND %s" format msg) - if (null eq last) { // list empty - val el = new MessageQueueElement - el.msg = msg - el.session = session - first = el - last = el - } - else { - val el = new MessageQueueElement - el.msg = msg - el.session = session - last.next = el - last = el - } + val el = new MessageQueueElement(msg, session) + + if (isEmpty) first = el + else last.next = el + + last = el } def foreach(f: (Any, OutputChannel[Any]) => Unit) { @@ -89,126 +93,84 @@ class MessageQueue { * without removing it. */ def get(n: Int)(p: Any => Boolean): Option[Any] = { - var found: Option[Any] = None var pos = 0 def test(msg: Any): Boolean = - if (p(msg)) { - if (pos == n) - true - else { - pos += 1 - false - } - } else - false + p(msg) && (pos == n || { pos += 1; false }) - if (last == null) None - else if (test(first.msg)) - Some(first.msg) - else { - var curr = first - while(curr.next != null && found.isEmpty) { - curr = curr.next - if (test(curr.msg)) - found = Some(curr.msg) + var curr = first + while (curr != null) + if (test(curr.msg)) { + if (trace) + printQueue("GET %s" format curr.msg) + + return Some(curr.msg) // early return } - found - } + else curr = curr.next + + None } /** Removes the n-th msg that satisfies the predicate. */ - def remove(n: Int)(p: Any => Boolean): Option[(Any, OutputChannel[Any])] = { - var found: Option[(Any, OutputChannel[Any])] = None + def remove(n: Int)(p: Any => Boolean): Option[(Any, OutputChannel[Any])] = + removeInternal(n)(p) map (x => (x.msg, x.session)) + + def extractFirst(p: Any => Boolean): MessageQueueElement = + removeInternal(0)(p) orNull + + private def removeInternal(n: Int)(p: Any => Boolean): Option[MessageQueueElement] = { var pos = 0 + def foundMsg(x: MessageQueueElement) = { + if (trace) + printQueue("REMOVE %s" format x.msg) + + changeSize(-1) + Some(x) + } def test(msg: Any): Boolean = - if (p(msg)) { - if (pos == n) - true - else { - pos += 1 - false - } - } else - false + p(msg) && (pos == n || { pos += 1 ; false }) + + if (isEmpty) // early return + return None - if (last == null) None - else if (test(first.msg)) { - val tmp = first - // remove first element + // special handling if returning the head + if (test(first.msg)) { + val res = first first = first.next - // might have to update last - if (tmp eq last) { + if (res eq last) last = null - } - changeSize(-1) - Some((tmp.msg, tmp.session)) - } else { - var curr = first - var prev = curr - while(curr.next != null && found.isEmpty) { - prev = curr - curr = curr.next + + foundMsg(res) + } + else { + var curr = first.next // init to element #2 + var prev = first + + while (curr != null) { if (test(curr.msg)) { - // remove curr prev.next = curr.next - // might have to update last - if (curr eq last) { + if (curr eq last) last = prev - } - changeSize(-1) - found = Some((curr.msg, curr.session)) - } - } - found - } - } - - def extractFirst(p: Any => Boolean): MessageQueueElement = { - changeSize(-1) // assume size decreases by 1 - val msg = if (null eq last) null - else { - // test first element - if (p(first.msg)) { - val tmp = first - // remove first element - first = first.next - - // might have to update last - if (tmp eq last) { - last = null + return foundMsg(curr) // early return } - - tmp - } - else { - var curr = first - var prev = curr - while(curr.next != null) { + else { prev = curr curr = curr.next - if (p(curr.msg)) { - // remove curr - prev.next = curr.next - - // might have to update last - if (curr eq last) { - last = prev - } - - return curr - } } - null } + // not found + None } + } - if (null eq msg) - changeSize(1) // correct wrong assumption + private def printQueue(msg: String) = { + def firstMsg = if (first eq null) "null" else first.msg + def lastMsg = if (last eq null) "null" else last.msg - msg + println("[%s size=%d] [%s] first = %s, last = %s".format(this, size, msg, firstMsg, lastMsg)) } -} + override def toString() = "%s:%d".format(label, queueNumber) +}
\ No newline at end of file diff --git a/src/actors/scala/actors/Reactor.scala b/src/actors/scala/actors/Reactor.scala index 967dd52a86..6db795569c 100644 --- a/src/actors/scala/actors/Reactor.scala +++ b/src/actors/scala/actors/Reactor.scala @@ -20,7 +20,7 @@ import scala.collection.mutable.Queue trait Reactor extends OutputChannel[Any] { /* The actor's mailbox. */ - protected val mailbox = new MessageQueue + protected val mailbox = new MessageQueue("Reactor") protected var sendBuffer = new Queue[(Any, OutputChannel[Any])] @@ -67,7 +67,7 @@ trait Reactor extends OutputChannel[Any] { val savedWaitingFor = waitingFor waitingFor = waitingForNone () => scheduler execute (makeReaction(() => { - val startMbox = new MessageQueue + val startMbox = new MessageQueue("Start") synchronized { startMbox.append(msg, replyTo) } searchMailbox(startMbox, savedWaitingFor, true) })) @@ -121,7 +121,7 @@ trait Reactor extends OutputChannel[Any] { synchronized { // in mean time new stuff might have arrived if (!sendBuffer.isEmpty) { - tmpMbox = new MessageQueue + tmpMbox = new MessageQueue("Temp") drainSendBuffer(tmpMbox) // keep going } else { |