aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorHerman van Hovell <hvanhovell@databricks.com>2017-02-10 14:47:25 -0800
committerWenchen Fan <wenchen@databricks.com>2017-02-10 14:47:25 -0800
commit226d38840c8d3f40639715d755df6fb4fee2715f (patch)
treee5a82c33aa97953868d0c60df837078ddaa5fbcb /sql
parentd785217b791882e075ad537852d49d78fc1ca31b (diff)
downloadspark-226d38840c8d3f40639715d755df6fb4fee2715f.tar.gz
spark-226d38840c8d3f40639715d755df6fb4fee2715f.tar.bz2
spark-226d38840c8d3f40639715d755df6fb4fee2715f.zip
[SPARK-19548][SQL] Support Hive UDFs which return typed Lists/Maps
## What changes were proposed in this pull request? This PR adds support for Hive UDFs that return fully typed java Lists or Maps, for example `List<String>` or `Map<String, Integer>`. It is also allowed to nest these structures, for example `Map<String, List<Integer>>`. Raw collections or collections using wildcards are still not supported, and cannot be supported due to the lack of type information. ## How was this patch tested? Modified existing tests in `HiveUDFSuite`, and I have added test cases for raw collection and collection using wildcards. Author: Herman van Hovell <hvanhovell@databricks.com> Closes #16886 from hvanhovell/SPARK-19548.
Diffstat (limited to 'sql')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala38
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala2
-rw-r--r--sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFRawList.java31
-rw-r--r--sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFRawMap.java31
-rw-r--r--sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToIntIntMap.java18
-rw-r--r--sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToListInt.java8
-rw-r--r--sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToListMapStringListInt.java34
-rw-r--r--sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToListString.java6
-rw-r--r--sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToStringIntMap.java19
-rw-r--r--sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFWildcardList.java31
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala89
11 files changed, 250 insertions, 57 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
index 21b729bf29..6f5b923cd4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.hive
+import java.lang.reflect.{ParameterizedType, Type, WildcardType}
+
import scala.collection.JavaConverters._
import org.apache.hadoop.{io => hadoopIo}
@@ -178,7 +180,7 @@ import org.apache.spark.unsafe.types.UTF8String
*/
private[hive] trait HiveInspectors {
- def javaClassToDataType(clz: Class[_]): DataType = clz match {
+ def javaTypeToDataType(clz: Type): DataType = clz match {
// writable
case c: Class[_] if c == classOf[hadoopIo.DoubleWritable] => DoubleType
case c: Class[_] if c == classOf[hiveIo.DoubleWritable] => DoubleType
@@ -218,26 +220,42 @@ private[hive] trait HiveInspectors {
case c: Class[_] if c == java.lang.Float.TYPE => FloatType
case c: Class[_] if c == java.lang.Boolean.TYPE => BooleanType
- case c: Class[_] if c.isArray => ArrayType(javaClassToDataType(c.getComponentType))
+ case c: Class[_] if c.isArray => ArrayType(javaTypeToDataType(c.getComponentType))
// Hive seems to return this for struct types?
case c: Class[_] if c == classOf[java.lang.Object] => NullType
- // java list type unsupported
- case c: Class[_] if c == classOf[java.util.List[_]] =>
+ case p: ParameterizedType if isSubClassOf(p.getRawType, classOf[java.util.List[_]]) =>
+ val Array(elementType) = p.getActualTypeArguments
+ ArrayType(javaTypeToDataType(elementType))
+
+ case p: ParameterizedType if isSubClassOf(p.getRawType, classOf[java.util.Map[_, _]]) =>
+ val Array(keyType, valueType) = p.getActualTypeArguments
+ MapType(javaTypeToDataType(keyType), javaTypeToDataType(valueType))
+
+ // raw java list type unsupported
+ case c: Class[_] if isSubClassOf(c, classOf[java.util.List[_]]) =>
throw new AnalysisException(
- "List type in java is unsupported because " +
- "JVM type erasure makes spark fail to catch a component type in List<>")
+ "Raw list type in java is unsupported because Spark cannot infer the element type.")
- // java map type unsupported
- case c: Class[_] if c == classOf[java.util.Map[_, _]] =>
+ // raw java map type unsupported
+ case c: Class[_] if isSubClassOf(c, classOf[java.util.Map[_, _]]) =>
throw new AnalysisException(
- "Map type in java is unsupported because " +
- "JVM type erasure makes spark fail to catch key and value types in Map<>")
+ "Raw map type in java is unsupported because Spark cannot infer key and value types.")
+
+ case _: WildcardType =>
+ throw new AnalysisException(
+ "Collection types with wildcards (e.g. List<?> or Map<?, ?>) are unsupported because " +
+ "Spark cannot infer the data type for these type parameters.")
case c => throw new AnalysisException(s"Unsupported java type $c")
}
+ private def isSubClassOf(t: Type, parent: Class[_]): Boolean = t match {
+ case cls: Class[_] => parent.isAssignableFrom(cls)
+ case _ => false
+ }
+
private def withNullSafe(f: Any => Any): Any => Any = {
input => if (input == null) null else f(input)
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
index 4590197548..506949cb68 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
@@ -70,7 +70,7 @@ private[hive] case class HiveSimpleUDF(
@transient
private lazy val conversionHelper = new ConversionHelper(method, arguments)
- override lazy val dataType = javaClassToDataType(method.getReturnType)
+ override lazy val dataType = javaTypeToDataType(method.getGenericReturnType)
@transient
private lazy val wrappers = children.map(x => wrapperFor(toInspector(x), x.dataType)).toArray
diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFRawList.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFRawList.java
new file mode 100644
index 0000000000..6adb1657bf
--- /dev/null
+++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFRawList.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive.execution;
+
+import org.apache.hadoop.hive.ql.exec.UDF;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * UDF that returns a raw (non-parameterized) java List.
+ */
+public class UDFRawList extends UDF {
+ public List evaluate(Object o) {
+ return Collections.singletonList("data1");
+ }
+}
diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFRawMap.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFRawMap.java
new file mode 100644
index 0000000000..4731b6eee8
--- /dev/null
+++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFRawMap.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive.execution;
+
+import org.apache.hadoop.hive.ql.exec.UDF;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * UDF that returns a raw (non-parameterized) java Map.
+ */
+public class UDFRawMap extends UDF {
+ public Map evaluate(Object o) {
+ return Collections.singletonMap("a", "1");
+ }
+}
diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToIntIntMap.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToIntIntMap.java
index b3e8bcbbd8..91b9673a09 100644
--- a/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToIntIntMap.java
+++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToIntIntMap.java
@@ -23,13 +23,13 @@ import java.util.HashMap;
import java.util.Map;
public class UDFToIntIntMap extends UDF {
- public Map<Integer, Integer> evaluate(Object o) {
- return new HashMap<Integer, Integer>() {
- {
- put(1, 1);
- put(2, 1);
- put(3, 1);
- }
- };
- }
+ public Map<Integer, Integer> evaluate(Object o) {
+ return new HashMap<Integer, Integer>() {
+ {
+ put(1, 1);
+ put(2, 1);
+ put(3, 1);
+ }
+ };
+ }
}
diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToListInt.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToListInt.java
index 67576a72f1..66fc8c09fd 100644
--- a/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToListInt.java
+++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToListInt.java
@@ -19,11 +19,11 @@ package org.apache.spark.sql.hive.execution;
import org.apache.hadoop.hive.ql.exec.UDF;
+import java.util.ArrayList;
import java.util.Arrays;
-import java.util.List;
public class UDFToListInt extends UDF {
- public List<Integer> evaluate(Object o) {
- return Arrays.asList(1, 2, 3);
- }
+ public ArrayList<Integer> evaluate(Object o) {
+ return new ArrayList<>(Arrays.asList(1, 2, 3));
+ }
}
diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToListMapStringListInt.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToListMapStringListInt.java
new file mode 100644
index 0000000000..d16f27221d
--- /dev/null
+++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToListMapStringListInt.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive.execution;
+
+import org.apache.hadoop.hive.ql.exec.UDF;
+
+import java.util.*;
+
+/**
+ * UDF that returns a nested list of maps that uses a string as its key and a list of ints as its
+ * values.
+ */
+public class UDFToListMapStringListInt extends UDF {
+ public List<Map<String, List<Integer>>> evaluate(Object o) {
+ final Map<String, List<Integer>> map = new HashMap<>();
+ map.put("a", Arrays.asList(1, 2));
+ map.put("b", Arrays.asList(3, 4));
+ return Collections.singletonList(map);
+ }
+}
diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToListString.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToListString.java
index f02395cbba..5185b47a56 100644
--- a/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToListString.java
+++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToListString.java
@@ -23,7 +23,7 @@ import java.util.Arrays;
import java.util.List;
public class UDFToListString extends UDF {
- public List<String> evaluate(Object o) {
- return Arrays.asList("data1", "data2", "data3");
- }
+ public List<String> evaluate(Object o) {
+ return Arrays.asList("data1", "data2", "data3");
+ }
}
diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToStringIntMap.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToStringIntMap.java
index 9eea5c9a88..b7ca60e036 100644
--- a/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToStringIntMap.java
+++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToStringIntMap.java
@@ -20,16 +20,15 @@ package org.apache.spark.sql.hive.execution;
import org.apache.hadoop.hive.ql.exec.UDF;
import java.util.HashMap;
-import java.util.Map;
public class UDFToStringIntMap extends UDF {
- public Map<String, Integer> evaluate(Object o) {
- return new HashMap<String, Integer>() {
- {
- put("key1", 1);
- put("key2", 2);
- put("key3", 3);
- }
- };
- }
+ public HashMap<String, Integer> evaluate(Object o) {
+ return new HashMap<String, Integer>() {
+ {
+ put("key1", 1);
+ put("key2", 2);
+ put("key3", 3);
+ }
+ };
+ }
}
diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFWildcardList.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFWildcardList.java
new file mode 100644
index 0000000000..717e1117b9
--- /dev/null
+++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFWildcardList.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive.execution;
+
+import org.apache.hadoop.hive.ql.exec.UDF;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * UDF that returns a raw (non-parameterized) java List.
+ */
+public class UDFWildcardList extends UDF {
+ public List<?> evaluate(Object o) {
+ return Collections.singletonList("data1");
+ }
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
index 58909ab9ea..ef6883839d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
@@ -214,11 +214,9 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
testData.createOrReplaceTempView("inputTable")
sql(s"CREATE TEMPORARY FUNCTION testUDFToListString AS '${classOf[UDFToListString].getName}'")
- val errMsg = intercept[AnalysisException] {
- sql("SELECT testUDFToListString(s) FROM inputTable")
- }
- assert(errMsg.getMessage contains "List type in java is unsupported because " +
- "JVM type erasure makes spark fail to catch a component type in List<>;")
+ checkAnswer(
+ sql("SELECT testUDFToListString(s) FROM inputTable"),
+ Seq(Row(Seq("data1", "data2", "data3"))))
sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFToListString")
hiveContext.reset()
@@ -229,11 +227,9 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
testData.createOrReplaceTempView("inputTable")
sql(s"CREATE TEMPORARY FUNCTION testUDFToListInt AS '${classOf[UDFToListInt].getName}'")
- val errMsg = intercept[AnalysisException] {
- sql("SELECT testUDFToListInt(s) FROM inputTable")
- }
- assert(errMsg.getMessage contains "List type in java is unsupported because " +
- "JVM type erasure makes spark fail to catch a component type in List<>;")
+ checkAnswer(
+ sql("SELECT testUDFToListInt(s) FROM inputTable"),
+ Seq(Row(Seq(1, 2, 3))))
sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFToListInt")
hiveContext.reset()
@@ -245,11 +241,9 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
sql(s"CREATE TEMPORARY FUNCTION testUDFToStringIntMap " +
s"AS '${classOf[UDFToStringIntMap].getName}'")
- val errMsg = intercept[AnalysisException] {
- sql("SELECT testUDFToStringIntMap(s) FROM inputTable")
- }
- assert(errMsg.getMessage contains "Map type in java is unsupported because " +
- "JVM type erasure makes spark fail to catch key and value types in Map<>;")
+ checkAnswer(
+ sql("SELECT testUDFToStringIntMap(s) FROM inputTable"),
+ Seq(Row(Map("key1" -> 1, "key2" -> 2, "key3" -> 3))))
sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFToStringIntMap")
hiveContext.reset()
@@ -261,16 +255,71 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
sql(s"CREATE TEMPORARY FUNCTION testUDFToIntIntMap " +
s"AS '${classOf[UDFToIntIntMap].getName}'")
- val errMsg = intercept[AnalysisException] {
- sql("SELECT testUDFToIntIntMap(s) FROM inputTable")
- }
- assert(errMsg.getMessage contains "Map type in java is unsupported because " +
- "JVM type erasure makes spark fail to catch key and value types in Map<>;")
+ checkAnswer(
+ sql("SELECT testUDFToIntIntMap(s) FROM inputTable"),
+ Seq(Row(Map(1 -> 1, 2 -> 1, 3 -> 1))))
sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFToIntIntMap")
hiveContext.reset()
}
+ test("UDFToListMapStringListInt") {
+ val testData = spark.sparkContext.parallelize(StringCaseClass("") :: Nil).toDF()
+ testData.createOrReplaceTempView("inputTable")
+
+ sql(s"CREATE TEMPORARY FUNCTION testUDFToListMapStringListInt " +
+ s"AS '${classOf[UDFToListMapStringListInt].getName}'")
+ checkAnswer(
+ sql("SELECT testUDFToListMapStringListInt(s) FROM inputTable"),
+ Seq(Row(Seq(Map("a" -> Seq(1, 2), "b" -> Seq(3, 4))))))
+
+ sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFToListMapStringListInt")
+ hiveContext.reset()
+ }
+
+ test("UDFRawList") {
+ val testData = spark.sparkContext.parallelize(StringCaseClass("") :: Nil).toDF()
+ testData.createOrReplaceTempView("inputTable")
+
+ sql(s"CREATE TEMPORARY FUNCTION testUDFRawList " +
+ s"AS '${classOf[UDFRawList].getName}'")
+ val err = intercept[AnalysisException](sql("SELECT testUDFRawList(s) FROM inputTable"))
+ assert(err.getMessage.contains(
+ "Raw list type in java is unsupported because Spark cannot infer the element type."))
+
+ sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFRawList")
+ hiveContext.reset()
+ }
+
+ test("UDFRawMap") {
+ val testData = spark.sparkContext.parallelize(StringCaseClass("") :: Nil).toDF()
+ testData.createOrReplaceTempView("inputTable")
+
+ sql(s"CREATE TEMPORARY FUNCTION testUDFRawMap " +
+ s"AS '${classOf[UDFRawMap].getName}'")
+ val err = intercept[AnalysisException](sql("SELECT testUDFRawMap(s) FROM inputTable"))
+ assert(err.getMessage.contains(
+ "Raw map type in java is unsupported because Spark cannot infer key and value types."))
+
+ sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFRawMap")
+ hiveContext.reset()
+ }
+
+ test("UDFWildcardList") {
+ val testData = spark.sparkContext.parallelize(StringCaseClass("") :: Nil).toDF()
+ testData.createOrReplaceTempView("inputTable")
+
+ sql(s"CREATE TEMPORARY FUNCTION testUDFWildcardList " +
+ s"AS '${classOf[UDFWildcardList].getName}'")
+ val err = intercept[AnalysisException](sql("SELECT testUDFWildcardList(s) FROM inputTable"))
+ assert(err.getMessage.contains(
+ "Collection types with wildcards (e.g. List<?> or Map<?, ?>) are unsupported " +
+ "because Spark cannot infer the data type for these type parameters."))
+
+ sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFWildcardList")
+ hiveContext.reset()
+ }
+
test("UDFListListInt") {
val testData = spark.sparkContext.parallelize(
ListListIntCaseClass(Nil) ::