aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorpetermaxlee <petermaxlee@gmail.com>2016-06-28 21:07:52 -0700
committerReynold Xin <rxin@databricks.com>2016-06-28 21:07:52 -0700
commit153c2f9ac12846367a09684fd875c496d350a603 (patch)
treee00b3c58a3a3b9d9b4c3ecce6765cafe90c18ba5 /sql
parent0df5ce1bc1387a58b33cd185008f4022bd3dcc69 (diff)
downloadspark-153c2f9ac12846367a09684fd875c496d350a603.tar.gz
spark-153c2f9ac12846367a09684fd875c496d350a603.tar.bz2
spark-153c2f9ac12846367a09684fd875c496d350a603.zip
[SPARK-16271][SQL] Implement Hive's UDFXPathUtil
## What changes were proposed in this pull request? This patch ports Hive's UDFXPathUtil over to Spark, which can be used to implement xpath functionality in Spark in the near future. ## How was this patch tested? Added two new test suites UDFXPathUtilSuite and ReusableStringReaderSuite. They have been ported over from Hive (but rewritten in Scala in order to leverage ScalaTest). Author: petermaxlee <petermaxlee@gmail.com> Closes #13961 from petermaxlee/xpath.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/xml/UDFXPathUtil.java192
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/xml/ReusableStringReaderSuite.scala103
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/xml/UDFXPathUtilSuite.scala99
3 files changed, 394 insertions, 0 deletions
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/xml/UDFXPathUtil.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/xml/UDFXPathUtil.java
new file mode 100644
index 0000000000..01a11f9bdc
--- /dev/null
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/xml/UDFXPathUtil.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.xml;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.io.StringReader;
+
+import javax.xml.namespace.QName;
+import javax.xml.xpath.XPath;
+import javax.xml.xpath.XPathConstants;
+import javax.xml.xpath.XPathExpression;
+import javax.xml.xpath.XPathExpressionException;
+import javax.xml.xpath.XPathFactory;
+
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+import org.xml.sax.InputSource;
+
+/**
+ * Utility class for all XPath UDFs. Each UDF instance should keep an instance of this class.
+ *
+ * This is based on Hive's UDFXPathUtil implementation.
+ */
+public class UDFXPathUtil {
+ private XPath xpath = XPathFactory.newInstance().newXPath();
+ private ReusableStringReader reader = new ReusableStringReader();
+ private InputSource inputSource = new InputSource(reader);
+ private XPathExpression expression = null;
+ private String oldPath = null;
+
+ public Object eval(String xml, String path, QName qname) {
+ if (xml == null || path == null || qname == null) {
+ return null;
+ }
+
+ if (xml.length() == 0 || path.length() == 0) {
+ return null;
+ }
+
+ if (!path.equals(oldPath)) {
+ try {
+ expression = xpath.compile(path);
+ } catch (XPathExpressionException e) {
+ expression = null;
+ }
+ oldPath = path;
+ }
+
+ if (expression == null) {
+ return null;
+ }
+
+ reader.set(xml);
+
+ try {
+ return expression.evaluate(inputSource, qname);
+ } catch (XPathExpressionException e) {
+ throw new RuntimeException ("Invalid expression '" + oldPath + "'", e);
+ }
+ }
+
+ public Boolean evalBoolean(String xml, String path) {
+ return (Boolean) eval(xml, path, XPathConstants.BOOLEAN);
+ }
+
+ public String evalString(String xml, String path) {
+ return (String) eval(xml, path, XPathConstants.STRING);
+ }
+
+ public Double evalNumber(String xml, String path) {
+ return (Double) eval(xml, path, XPathConstants.NUMBER);
+ }
+
+ public Node evalNode(String xml, String path) {
+ return (Node) eval(xml, path, XPathConstants.NODE);
+ }
+
+ public NodeList evalNodeList(String xml, String path) {
+ return (NodeList) eval(xml, path, XPathConstants.NODESET);
+ }
+
+ /**
+ * Reusable, non-threadsafe version of {@link StringReader}.
+ */
+ public static class ReusableStringReader extends Reader {
+
+ private String str = null;
+ private int length = -1;
+ private int next = 0;
+ private int mark = 0;
+
+ public ReusableStringReader() {
+ }
+
+ public void set(String s) {
+ this.str = s;
+ this.length = s.length();
+ this.mark = 0;
+ this.next = 0;
+ }
+
+ /** Check to make sure that the stream has not been closed */
+ private void ensureOpen() throws IOException {
+ if (str == null)
+ throw new IOException("Stream closed");
+ }
+
+ @Override
+ public int read() throws IOException {
+ ensureOpen();
+ if (next >= length)
+ return -1;
+ return str.charAt(next++);
+ }
+
+ @Override
+ public int read(char cbuf[], int off, int len) throws IOException {
+ ensureOpen();
+ if ((off < 0) || (off > cbuf.length) || (len < 0)
+ || ((off + len) > cbuf.length) || ((off + len) < 0)) {
+ throw new IndexOutOfBoundsException();
+ } else if (len == 0) {
+ return 0;
+ }
+ if (next >= length)
+ return -1;
+ int n = Math.min(length - next, len);
+ str.getChars(next, next + n, cbuf, off);
+ next += n;
+ return n;
+ }
+
+ @Override
+ public long skip(long ns) throws IOException {
+ ensureOpen();
+ if (next >= length)
+ return 0;
+ // Bound skip by beginning and end of the source
+ long n = Math.min(length - next, ns);
+ n = Math.max(-next, n);
+ next += n;
+ return n;
+ }
+
+ @Override
+ public boolean ready() throws IOException {
+ ensureOpen();
+ return true;
+ }
+
+ @Override
+ public boolean markSupported() {
+ return true;
+ }
+
+ @Override
+ public void mark(int readAheadLimit) throws IOException {
+ if (readAheadLimit < 0) {
+ throw new IllegalArgumentException("Read-ahead limit < 0");
+ }
+ ensureOpen();
+ mark = next;
+ }
+
+ @Override
+ public void reset() throws IOException {
+ ensureOpen();
+ next = mark;
+ }
+
+ @Override
+ public void close() {
+ str = null;
+ }
+ }
+}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/xml/ReusableStringReaderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/xml/ReusableStringReaderSuite.scala
new file mode 100644
index 0000000000..e06d209c47
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/xml/ReusableStringReaderSuite.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.xml
+
+import java.io.IOException
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.expressions.xml.UDFXPathUtil.ReusableStringReader
+
+/**
+ * Unit tests for [[UDFXPathUtil.ReusableStringReader]].
+ *
+ * Loosely based on Hive's TestReusableStringReader.java.
+ */
+class ReusableStringReaderSuite extends SparkFunSuite {
+
+ private val fox = "Quick brown fox jumps over the lazy dog."
+
+ test("empty reader") {
+ val reader = new ReusableStringReader
+
+ intercept[IOException] {
+ reader.read()
+ }
+
+ intercept[IOException] {
+ reader.ready()
+ }
+
+ reader.close()
+ }
+
+ test("mark reset") {
+ val reader = new ReusableStringReader
+
+ if (reader.markSupported()) {
+ reader.asInstanceOf[ReusableStringReader].set(fox)
+ assert(reader.ready())
+
+ val cc = new Array[Char](6)
+ var read = reader.read(cc)
+ assert(read == 6)
+ assert("Quick " == new String(cc))
+
+ reader.mark(100)
+
+ read = reader.read(cc)
+ assert(read == 6)
+ assert("brown " == new String(cc))
+
+ reader.reset()
+ read = reader.read(cc)
+ assert(read == 6)
+ assert("brown " == new String(cc))
+ }
+ reader.close()
+ }
+
+ test("skip") {
+ val reader = new ReusableStringReader
+ reader.asInstanceOf[ReusableStringReader].set(fox)
+
+ // skip entire the data:
+ var skipped = reader.skip(fox.length() + 1)
+ assert(fox.length() == skipped)
+ assert(-1 == reader.read())
+
+ reader.asInstanceOf[ReusableStringReader].set(fox) // reset the data
+ val cc = new Array[Char](6)
+ var read = reader.read(cc)
+ assert(read == 6)
+ assert("Quick " == new String(cc))
+
+ // skip some piece of data:
+ skipped = reader.skip(30)
+ assert(skipped == 30)
+ read = reader.read(cc)
+ assert(read == 4)
+ assert("dog." == new String(cc, 0, read))
+
+ // skip when already at EOF:
+ skipped = reader.skip(300)
+ assert(skipped == 0, skipped)
+ assert(reader.read() == -1)
+
+ reader.close()
+ }
+}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/xml/UDFXPathUtilSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/xml/UDFXPathUtilSuite.scala
new file mode 100644
index 0000000000..a5614f8384
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/xml/UDFXPathUtilSuite.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.xml
+
+import javax.xml.xpath.XPathConstants.STRING
+
+import org.w3c.dom.Node
+import org.w3c.dom.NodeList
+
+import org.apache.spark.SparkFunSuite
+
+/**
+ * Unit tests for [[UDFXPathUtil]]. Loosely based on Hive's TestUDFXPathUtil.java.
+ */
+class UDFXPathUtilSuite extends SparkFunSuite {
+
+ private lazy val util = new UDFXPathUtil
+
+ test("illegal arguments") {
+ // null args
+ assert(util.eval(null, "a/text()", STRING) == null)
+ assert(util.eval("<a><b>b1</b><b>b2</b><b>b3</b><c>c1</c><c>c2</c></a>", null, STRING) == null)
+ assert(
+ util.eval("<a><b>b1</b><b>b2</b><b>b3</b><c>c1</c><c>c2</c></a>", "a/text()", null) == null)
+
+ // empty String args
+ assert(util.eval("", "a/text()", STRING) == null)
+ assert(util.eval("<a><b>b1</b><b>b2</b><b>b3</b><c>c1</c><c>c2</c></a>", "", STRING) == null)
+
+ // wrong expression:
+ assert(
+ util.eval("<a><b>b1</b><b>b2</b><b>b3</b><c>c1</c><c>c2</c></a>", "a/text(", STRING) == null)
+ }
+
+ test("generic eval") {
+ val ret =
+ util.eval("<a><b>b1</b><b>b2</b><b>b3</b><c>c1</c><c>c2</c></a>", "a/c[2]/text()", STRING)
+ assert(ret == "c2")
+ }
+
+ test("boolean eval") {
+ var ret =
+ util.evalBoolean("<a><b>true</b><b>false</b><b>b3</b><c>c1</c><c>c2</c></a>", "a/b[1]/text()")
+ assert(ret == true)
+
+ ret = util.evalBoolean("<a><b>true</b><b>false</b><b>b3</b><c>c1</c><c>c2</c></a>", "a/b[4]")
+ assert(ret == false)
+ }
+
+ test("string eval") {
+ var ret =
+ util.evalString("<a><b>true</b><b>false</b><b>b3</b><c>c1</c><c>c2</c></a>", "a/b[3]/text()")
+ assert(ret == "b3")
+
+ ret =
+ util.evalString("<a><b>true</b><b>false</b><b>b3</b><c>c1</c><c>c2</c></a>", "a/b[4]/text()")
+ assert(ret == "")
+
+ ret = util.evalString(
+ "<a><b>true</b><b k=\"foo\">FALSE</b><b>b3</b><c>c1</c><c>c2</c></a>", "a/b[2]/@k")
+ assert(ret == "foo")
+ }
+
+ test("number eval") {
+ var ret =
+ util.evalNumber("<a><b>true</b><b>false</b><b>b3</b><c>c1</c><c>-77</c></a>", "a/c[2]")
+ assert(ret == -77.0d)
+
+ ret = util.evalNumber(
+ "<a><b>true</b><b k=\"foo\">FALSE</b><b>b3</b><c>c1</c><c>c2</c></a>", "a/b[2]/@k")
+ assert(ret.isNaN)
+ }
+
+ test("node eval") {
+ val ret = util.evalNode("<a><b>true</b><b>false</b><b>b3</b><c>c1</c><c>-77</c></a>", "a/c[2]")
+ assert(ret != null && ret.isInstanceOf[Node])
+ }
+
+ test("node list eval") {
+ val ret = util.evalNodeList("<a><b>true</b><b>false</b><b>b3</b><c>c1</c><c>-77</c></a>", "a/*")
+ assert(ret != null && ret.isInstanceOf[NodeList])
+ assert(ret.asInstanceOf[NodeList].getLength == 5)
+ }
+}