summaryrefslogtreecommitdiff
path: root/src/library
diff options
context:
space:
mode:
authorPaul Phillips <paulp@improving.org>2009-04-28 20:14:12 +0000
committerPaul Phillips <paulp@improving.org>2009-04-28 20:14:12 +0000
commit912da5d2ea6993d7393f340b411fba12fe545894 (patch)
tree162b685e3ae60f3f3440eaeeab526ee483d055dc /src/library
parentae897e4d285782753670b082f62d7f14936a4bef (diff)
downloadscala-912da5d2ea6993d7393f340b411fba12fe545894.tar.gz
scala-912da5d2ea6993d7393f340b411fba12fe545894.tar.bz2
scala-912da5d2ea6993d7393f340b411fba12fe545894.zip
Rewrote XML pull parser using exciting modern t...
Rewrote XML pull parser using exciting modern techniques.
Diffstat (limited to 'src/library')
-rw-r--r--src/library/scala/xml/pull/XMLEventReader.scala213
1 files changed, 118 insertions, 95 deletions
diff --git a/src/library/scala/xml/pull/XMLEventReader.scala b/src/library/scala/xml/pull/XMLEventReader.scala
index e2e874e191..a0ab9667d0 100644
--- a/src/library/scala/xml/pull/XMLEventReader.scala
+++ b/src/library/scala/xml/pull/XMLEventReader.scala
@@ -8,139 +8,162 @@
// $Id$
-
package scala.xml.pull
-
-import java.lang.{Runnable, Thread}
+import java.util.concurrent.LinkedBlockingQueue
+import java.nio.channels.ClosedChannelException
import scala.io.Source
-import scala.xml.parsing.{ExternalSources, MarkupHandler, MarkupParser}
+import scala.xml.parsing.{ ExternalSources, MarkupHandler, MarkupParser }
/** <p>
* A pull parser that offers to view an XML document as a series of events.
- * Please note that this API might change. Here's how to use this class
+ * Example usage:
* </p><pre>
- * <b>import</b> scala.xml._
* <b>import</b> scala.xml.pull._
* <b>import</b> scala.io.Source
*
* <b>object</b> reader {
* <b>val</b> src = Source.fromString("<hello><world/></hello>")
- * <b>val</b> er = new XMLEventReader().initialize(src)
+ * <b>val</b> er = new XMLEventReader(src)
*
* <b>def</b> main(args: Array[String]) {
- * Console.println(er.next)
- * Console.println(er.next)
+ * while (er.hasNext)
+ * Console.println(er.next)
* }
* }
* </pre>
*
* @author Burak Emir
+ * @author Paul Phillips
*/
-class XMLEventReader extends Iterator[XMLEvent] {
-
- var src:Source = null
- def getSource = this.src
- def initialize(src: Source): this.type = {
- this.src = src
- this.parserThread = new Thread(new Parser())
- this.parserThread.start()
- this
- }
-
- // -- this part of the class is for communication with the thread
- var xmlEvent: XMLEvent = null
- var continue: Boolean = true
-
- def myresume = synchronized {
- while (continue) {
- wait()
- }
- continue = true
- notify()
- }
- def getAndClearEvent: XMLEvent = synchronized {
- while (xmlEvent eq null) {
- wait()
- }
- val r = xmlEvent
- xmlEvent = null
- r
- }
- def setEvent(e: XMLEvent) {
- xmlEvent = e
- }
-
- def doNotify(): NodeSeq = synchronized {
- XMLEventReader.this.continue = false
- notify()
- while (!XMLEventReader.this.continue) {
- try { wait() } catch {
- case _: java.lang.InterruptedException => /* ignore */
- }
- }
- NodeSeq.Empty
- }
-
- // iterator methods
-
- def next: XMLEvent = {
- myresume
- val r = getAndClearEvent
- r
- }
-
- def hasNext = true
- // After calling stop, one must call initialize to be able to get new events.
+class XMLEventReader(src: Source) extends ProducerConsumerIterator[XMLEvent]
+{
+ // We implement a pull parser as an iterator, but since we may be operating on
+ // a stream (e.g. XML over a network) there may be arbitrarily long periods when
+ // the queue is empty. Fortunately the ProducerConsumerIterator is ideally
+ // suited to this task, possibly because it was written for use by this class.
+
+ // to override as necessary
+ val preserveWS = true
+
+ override val MaxQueueSize = 1000
+ protected case object POISON extends XMLEvent
+ val EndOfStream = POISON
+
+ // thread machinery
+ private[this] val parser = new Parser(src)
+ private[this] val parserThread = new Thread(parser, "XMLEventReader")
+ parserThread.start
+ // enqueueing the poison object is the reliable way to cause the
+ // iterator to terminate; hasNext will return false once it sees it.
+ // Calling interrupt() on the parserThread is the only way we can get
+ // it to stop producing tokens since it's lost deep in document() -
+ // we cross our fingers the interrupt() gets to its target, but if it
+ // fails for whatever reason the iterator correctness is not impacted,
+ // only performance (because it will finish the entire XML document,
+ // or at least as much as it can fit in the queue.)
def stop = {
- continue = true;
- parserThread.interrupt();
- parserThread = null;
+ parser.setEvent(POISON)
+ parserThread.interrupt()
}
- var parserThread: Thread = null
+ private class Parser(val input: Source) extends MarkupHandler with MarkupParser with ExternalSources with Runnable {
+ val preserveWS = XMLEventReader.this.preserveWS
- class Parser extends MarkupHandler with MarkupParser with ExternalSources with Runnable {
-
- val preserveWS = true
- lazy val input = XMLEventReader.this.getSource
- // document must contain one element - avoid spurious syntax error
- final val ignore_node = <ignore/>
+ // this is Parser's way to add to the queue - the odd return type
+ // is to conform to MarkupHandler's interface
+ def setEvent(es: XMLEvent*): NodeSeq = {
+ es foreach produce
+ NodeSeq.Empty
+ }
override def elemStart(pos: Int, pre: String, label: String, attrs: MetaData, scope: NamespaceBinding) {
- setEvent(EvElemStart(pre, label, attrs, scope)); doNotify
+ setEvent(EvElemStart(pre, label, attrs, scope))
}
-
override def elemEnd(pos: Int, pre: String, label: String) {
- setEvent(EvElemEnd(pre, label)); doNotify
+ setEvent(EvElemEnd(pre, label))
}
- final def elem(pos: Int, pre: String, label: String, attrs: MetaData, pscope: NamespaceBinding, nodes: NodeSeq): NodeSeq =
- ignore_node
+ // this is a dummy to satisfy MarkupHandler's API
+ final def elem(pos: Int, pre: String, label: String, attrs: MetaData, pscope: NamespaceBinding, nodes: NodeSeq): NodeSeq = <ignore/>
- def procInstr(pos: Int, target: String, txt: String): NodeSeq = {
- setEvent(EvProcInstr(target, txt)); doNotify
- }
+ def procInstr(pos: Int, target: String, txt: String) = setEvent(EvProcInstr(target, txt))
+ def comment(pos: Int, txt: String) = setEvent(EvComment(txt))
+ def entityRef(pos: Int, n: String) = setEvent(EvEntityRef(n))
+ def text(pos: Int, txt:String) = setEvent(EvText(txt))
- def comment(pos: Int, txt: String): NodeSeq = {
- setEvent(EvComment(txt)); doNotify
+ override def run() {
+ curInput = input
+ try {
+ this.nextch
+ this.document()
+ }
+ catch {
+ case _: InterruptedException => Thread.currentThread.interrupt()
+ case _: ClosedChannelException =>
+ }
+
+ setEvent(POISON)
}
+ }
+}
- def entityRef(pos: Int, n: String): NodeSeq = {
- setEvent(EvEntityRef(n)); doNotify
+// An iterator designed for one or more producers to generate
+// elements, and a single consumer to iterate. Iteration will continue
+// until closeIterator() is called, after which point producers
+// calling produce() will receive interruptions.
+//
+// Since hasNext may block indefinitely if nobody is producing,
+// there is also an available() method which will return true if
+// the next call hasNext is guaranteed not to block.
+//
+// This is not thread-safe for multiple consumers!
+trait ProducerConsumerIterator[T >: Null] extends Iterator[T]
+{
+ // abstract - iterator-specific distinguished object for marking eos
+ val EndOfStream: T
+
+ // defaults to unbounded - override to positive Int if desired
+ val MaxQueueSize = -1
+
+ private[this] lazy val queue =
+ if (MaxQueueSize < 0) new LinkedBlockingQueue[T]()
+ else new LinkedBlockingQueue[T](MaxQueueSize)
+ private[this] var buffer: T = _
+ private def fillBuffer() = {
+ buffer = try queue.take catch {
+ case _: InterruptedException => Thread.currentThread.interrupt() ; EndOfStream
+ case _: ClosedChannelException => EndOfStream
}
+ isElement(buffer)
+ }
+ private def isElement(x: T) = x != null && x != EndOfStream
+ private def eos() = buffer == EndOfStream
+
+ // public producer interface - this is the only method producers call, so
+ // LinkedBlockingQueue's synchronization is all we need.
+ def produce(x: T): Unit = if (!eos) try queue put x catch {
+ case _: InterruptedException => Thread.currentThread.interrupt()
+ case _: ClosedChannelException =>
+ }
- def text(pos: Int, txt:String): NodeSeq = {
- setEvent(EvText(txt)); doNotify
- }
+ // consumer/iterator interface - we need not synchronize access to buffer
+ // because we required there to be only one consumer.
+ def hasNext() = !eos && (buffer != null || fillBuffer)
+ def next() = {
+ if (eos) throw new NoSuchElementException("ProducerConsumerIterator")
+ if (buffer == null) fillBuffer
- override def run() {
- curInput = input
- this.nextch
- doNotify()
- this.document()
- }
+ drainBuffer
+ }
+ def available() = isElement(buffer) || isElement(queue.peek)
+
+ private def drainBuffer() = {
+ assert(!eos)
+ val res = buffer
+ buffer = null
+ res
}
}