diff options
-rw-r--r-- | src/library/scala/xml/pull/XMLEventReader.scala | 213 | ||||
-rw-r--r-- | test/files/jvm/xmlpull.scala | 20 |
2 files changed, 128 insertions, 105 deletions
diff --git a/src/library/scala/xml/pull/XMLEventReader.scala b/src/library/scala/xml/pull/XMLEventReader.scala index e2e874e191..a0ab9667d0 100644 --- a/src/library/scala/xml/pull/XMLEventReader.scala +++ b/src/library/scala/xml/pull/XMLEventReader.scala @@ -8,139 +8,162 @@ // $Id$ - package scala.xml.pull - -import java.lang.{Runnable, Thread} +import java.util.concurrent.LinkedBlockingQueue +import java.nio.channels.ClosedChannelException import scala.io.Source -import scala.xml.parsing.{ExternalSources, MarkupHandler, MarkupParser} +import scala.xml.parsing.{ ExternalSources, MarkupHandler, MarkupParser } /** <p> * A pull parser that offers to view an XML document as a series of events. - * Please note that this API might change. Here's how to use this class + * Example usage: * </p><pre> - * <b>import</b> scala.xml._ * <b>import</b> scala.xml.pull._ * <b>import</b> scala.io.Source * * <b>object</b> reader { * <b>val</b> src = Source.fromString("<hello><world/></hello>") - * <b>val</b> er = new XMLEventReader().initialize(src) + * <b>val</b> er = new XMLEventReader(src) * * <b>def</b> main(args: Array[String]) { - * Console.println(er.next) - * Console.println(er.next) + * while (er.hasNext) + * Console.println(er.next) * } * } * </pre> * * @author Burak Emir + * @author Paul Phillips */ -class XMLEventReader extends Iterator[XMLEvent] { - - var src:Source = null - def getSource = this.src - def initialize(src: Source): this.type = { - this.src = src - this.parserThread = new Thread(new Parser()) - this.parserThread.start() - this - } - - // -- this part of the class is for communication with the thread - var xmlEvent: XMLEvent = null - var continue: Boolean = true - - def myresume = synchronized { - while (continue) { - wait() - } - continue = true - notify() - } - def getAndClearEvent: XMLEvent = synchronized { - while (xmlEvent eq null) { - wait() - } - val r = xmlEvent - xmlEvent = null - r - } - def setEvent(e: XMLEvent) { - xmlEvent = e - } - - def doNotify(): NodeSeq = synchronized { - XMLEventReader.this.continue = false - notify() - while (!XMLEventReader.this.continue) { - try { wait() } catch { - case _: java.lang.InterruptedException => /* ignore */ - } - } - NodeSeq.Empty - } - - // iterator methods - - def next: XMLEvent = { - myresume - val r = getAndClearEvent - r - } - - def hasNext = true - // After calling stop, one must call initialize to be able to get new events. +class XMLEventReader(src: Source) extends ProducerConsumerIterator[XMLEvent] +{ + // We implement a pull parser as an iterator, but since we may be operating on + // a stream (e.g. XML over a network) there may be arbitrarily long periods when + // the queue is empty. Fortunately the ProducerConsumerIterator is ideally + // suited to this task, possibly because it was written for use by this class. + + // to override as necessary + val preserveWS = true + + override val MaxQueueSize = 1000 + protected case object POISON extends XMLEvent + val EndOfStream = POISON + + // thread machinery + private[this] val parser = new Parser(src) + private[this] val parserThread = new Thread(parser, "XMLEventReader") + parserThread.start + // enqueueing the poison object is the reliable way to cause the + // iterator to terminate; hasNext will return false once it sees it. + // Calling interrupt() on the parserThread is the only way we can get + // it to stop producing tokens since it's lost deep in document() - + // we cross our fingers the interrupt() gets to its target, but if it + // fails for whatever reason the iterator correctness is not impacted, + // only performance (because it will finish the entire XML document, + // or at least as much as it can fit in the queue.) def stop = { - continue = true; - parserThread.interrupt(); - parserThread = null; + parser.setEvent(POISON) + parserThread.interrupt() } - var parserThread: Thread = null + private class Parser(val input: Source) extends MarkupHandler with MarkupParser with ExternalSources with Runnable { + val preserveWS = XMLEventReader.this.preserveWS - class Parser extends MarkupHandler with MarkupParser with ExternalSources with Runnable { - - val preserveWS = true - lazy val input = XMLEventReader.this.getSource - // document must contain one element - avoid spurious syntax error - final val ignore_node = <ignore/> + // this is Parser's way to add to the queue - the odd return type + // is to conform to MarkupHandler's interface + def setEvent(es: XMLEvent*): NodeSeq = { + es foreach produce + NodeSeq.Empty + } override def elemStart(pos: Int, pre: String, label: String, attrs: MetaData, scope: NamespaceBinding) { - setEvent(EvElemStart(pre, label, attrs, scope)); doNotify + setEvent(EvElemStart(pre, label, attrs, scope)) } - override def elemEnd(pos: Int, pre: String, label: String) { - setEvent(EvElemEnd(pre, label)); doNotify + setEvent(EvElemEnd(pre, label)) } - final def elem(pos: Int, pre: String, label: String, attrs: MetaData, pscope: NamespaceBinding, nodes: NodeSeq): NodeSeq = - ignore_node + // this is a dummy to satisfy MarkupHandler's API + final def elem(pos: Int, pre: String, label: String, attrs: MetaData, pscope: NamespaceBinding, nodes: NodeSeq): NodeSeq = <ignore/> - def procInstr(pos: Int, target: String, txt: String): NodeSeq = { - setEvent(EvProcInstr(target, txt)); doNotify - } + def procInstr(pos: Int, target: String, txt: String) = setEvent(EvProcInstr(target, txt)) + def comment(pos: Int, txt: String) = setEvent(EvComment(txt)) + def entityRef(pos: Int, n: String) = setEvent(EvEntityRef(n)) + def text(pos: Int, txt:String) = setEvent(EvText(txt)) - def comment(pos: Int, txt: String): NodeSeq = { - setEvent(EvComment(txt)); doNotify + override def run() { + curInput = input + try { + this.nextch + this.document() + } + catch { + case _: InterruptedException => Thread.currentThread.interrupt() + case _: ClosedChannelException => + } + + setEvent(POISON) } + } +} - def entityRef(pos: Int, n: String): NodeSeq = { - setEvent(EvEntityRef(n)); doNotify +// An iterator designed for one or more producers to generate +// elements, and a single consumer to iterate. Iteration will continue +// until closeIterator() is called, after which point producers +// calling produce() will receive interruptions. +// +// Since hasNext may block indefinitely if nobody is producing, +// there is also an available() method which will return true if +// the next call hasNext is guaranteed not to block. +// +// This is not thread-safe for multiple consumers! +trait ProducerConsumerIterator[T >: Null] extends Iterator[T] +{ + // abstract - iterator-specific distinguished object for marking eos + val EndOfStream: T + + // defaults to unbounded - override to positive Int if desired + val MaxQueueSize = -1 + + private[this] lazy val queue = + if (MaxQueueSize < 0) new LinkedBlockingQueue[T]() + else new LinkedBlockingQueue[T](MaxQueueSize) + private[this] var buffer: T = _ + private def fillBuffer() = { + buffer = try queue.take catch { + case _: InterruptedException => Thread.currentThread.interrupt() ; EndOfStream + case _: ClosedChannelException => EndOfStream } + isElement(buffer) + } + private def isElement(x: T) = x != null && x != EndOfStream + private def eos() = buffer == EndOfStream + + // public producer interface - this is the only method producers call, so + // LinkedBlockingQueue's synchronization is all we need. + def produce(x: T): Unit = if (!eos) try queue put x catch { + case _: InterruptedException => Thread.currentThread.interrupt() + case _: ClosedChannelException => + } - def text(pos: Int, txt:String): NodeSeq = { - setEvent(EvText(txt)); doNotify - } + // consumer/iterator interface - we need not synchronize access to buffer + // because we required there to be only one consumer. + def hasNext() = !eos && (buffer != null || fillBuffer) + def next() = { + if (eos) throw new NoSuchElementException("ProducerConsumerIterator") + if (buffer == null) fillBuffer - override def run() { - curInput = input - this.nextch - doNotify() - this.document() - } + drainBuffer + } + def available() = isElement(buffer) || isElement(queue.peek) + + private def drainBuffer() = { + assert(!eos) + val res = buffer + buffer = null + res } } diff --git a/test/files/jvm/xmlpull.scala b/test/files/jvm/xmlpull.scala index 1f0e6a8497..d2bb72a071 100644 --- a/test/files/jvm/xmlpull.scala +++ b/test/files/jvm/xmlpull.scala @@ -7,25 +7,25 @@ object Test { val src = Source.fromString("<hello><world/>!</hello>") def main(args: Array[String]) { - var er = new XMLEventReader().initialize(src) + var er = new XMLEventReader(src) er.next match { case EvElemStart(_, "hello", _, _) => //println("1") - } - er.next match { + } + er.next match { case EvElemStart(_, "world", _, _) => //println("2") } - er.next match { + er.next match { case EvElemEnd(_, "world") => //println("3") } - er.next match { + er.next match { case EvText("!") => //println("4") - } + } er.next match { case EvElemEnd(_, "hello") => //println("5") - } - // you get the picture... - er.stop // allow thread to be garbage-collected - //println("6") + } + // you get the picture... + er.stop // allow thread to be garbage-collected + //println("6") } } |