From 226d38840c8d3f40639715d755df6fb4fee2715f Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Fri, 10 Feb 2017 14:47:25 -0800 Subject: [SPARK-19548][SQL] Support Hive UDFs which return typed Lists/Maps ## What changes were proposed in this pull request? This PR adds support for Hive UDFs that return fully typed java Lists or Maps, for example `List` or `Map`. It is also allowed to nest these structures, for example `Map>`. Raw collections or collections using wildcards are still not supported, and cannot be supported due to the lack of type information. ## How was this patch tested? Modified existing tests in `HiveUDFSuite`, and I have added test cases for raw collection and collection using wildcards. Author: Herman van Hovell Closes #16886 from hvanhovell/SPARK-19548. --- .../org/apache/spark/sql/hive/HiveInspectors.scala | 38 ++++++--- .../scala/org/apache/spark/sql/hive/hiveUDFs.scala | 2 +- .../spark/sql/hive/execution/UDFRawList.java | 31 ++++++++ .../apache/spark/sql/hive/execution/UDFRawMap.java | 31 ++++++++ .../spark/sql/hive/execution/UDFToIntIntMap.java | 18 ++--- .../spark/sql/hive/execution/UDFToListInt.java | 8 +- .../hive/execution/UDFToListMapStringListInt.java | 34 +++++++++ .../spark/sql/hive/execution/UDFToListString.java | 6 +- .../sql/hive/execution/UDFToStringIntMap.java | 19 +++-- .../spark/sql/hive/execution/UDFWildcardList.java | 31 ++++++++ .../spark/sql/hive/execution/HiveUDFSuite.scala | 89 +++++++++++++++++----- 11 files changed, 250 insertions(+), 57 deletions(-) create mode 100644 sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFRawList.java create mode 100644 sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFRawMap.java create mode 100644 sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToListMapStringListInt.java create mode 100644 sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFWildcardList.java (limited to 'sql') diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala index 21b729bf29..6f5b923cd4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.hive +import java.lang.reflect.{ParameterizedType, Type, WildcardType} + import scala.collection.JavaConverters._ import org.apache.hadoop.{io => hadoopIo} @@ -178,7 +180,7 @@ import org.apache.spark.unsafe.types.UTF8String */ private[hive] trait HiveInspectors { - def javaClassToDataType(clz: Class[_]): DataType = clz match { + def javaTypeToDataType(clz: Type): DataType = clz match { // writable case c: Class[_] if c == classOf[hadoopIo.DoubleWritable] => DoubleType case c: Class[_] if c == classOf[hiveIo.DoubleWritable] => DoubleType @@ -218,26 +220,42 @@ private[hive] trait HiveInspectors { case c: Class[_] if c == java.lang.Float.TYPE => FloatType case c: Class[_] if c == java.lang.Boolean.TYPE => BooleanType - case c: Class[_] if c.isArray => ArrayType(javaClassToDataType(c.getComponentType)) + case c: Class[_] if c.isArray => ArrayType(javaTypeToDataType(c.getComponentType)) // Hive seems to return this for struct types? case c: Class[_] if c == classOf[java.lang.Object] => NullType - // java list type unsupported - case c: Class[_] if c == classOf[java.util.List[_]] => + case p: ParameterizedType if isSubClassOf(p.getRawType, classOf[java.util.List[_]]) => + val Array(elementType) = p.getActualTypeArguments + ArrayType(javaTypeToDataType(elementType)) + + case p: ParameterizedType if isSubClassOf(p.getRawType, classOf[java.util.Map[_, _]]) => + val Array(keyType, valueType) = p.getActualTypeArguments + MapType(javaTypeToDataType(keyType), javaTypeToDataType(valueType)) + + // raw java list type unsupported + case c: Class[_] if isSubClassOf(c, classOf[java.util.List[_]]) => throw new AnalysisException( - "List type in java is unsupported because " + - "JVM type erasure makes spark fail to catch a component type in List<>") + "Raw list type in java is unsupported because Spark cannot infer the element type.") - // java map type unsupported - case c: Class[_] if c == classOf[java.util.Map[_, _]] => + // raw java map type unsupported + case c: Class[_] if isSubClassOf(c, classOf[java.util.Map[_, _]]) => throw new AnalysisException( - "Map type in java is unsupported because " + - "JVM type erasure makes spark fail to catch key and value types in Map<>") + "Raw map type in java is unsupported because Spark cannot infer key and value types.") + + case _: WildcardType => + throw new AnalysisException( + "Collection types with wildcards (e.g. List or Map) are unsupported because " + + "Spark cannot infer the data type for these type parameters.") case c => throw new AnalysisException(s"Unsupported java type $c") } + private def isSubClassOf(t: Type, parent: Class[_]): Boolean = t match { + case cls: Class[_] => parent.isAssignableFrom(cls) + case _ => false + } + private def withNullSafe(f: Any => Any): Any => Any = { input => if (input == null) null else f(input) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala index 4590197548..506949cb68 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala @@ -70,7 +70,7 @@ private[hive] case class HiveSimpleUDF( @transient private lazy val conversionHelper = new ConversionHelper(method, arguments) - override lazy val dataType = javaClassToDataType(method.getReturnType) + override lazy val dataType = javaTypeToDataType(method.getGenericReturnType) @transient private lazy val wrappers = children.map(x => wrapperFor(toInspector(x), x.dataType)).toArray diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFRawList.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFRawList.java new file mode 100644 index 0000000000..6adb1657bf --- /dev/null +++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFRawList.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive.execution; + +import org.apache.hadoop.hive.ql.exec.UDF; + +import java.util.Collections; +import java.util.List; + +/** + * UDF that returns a raw (non-parameterized) java List. + */ +public class UDFRawList extends UDF { + public List evaluate(Object o) { + return Collections.singletonList("data1"); + } +} diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFRawMap.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFRawMap.java new file mode 100644 index 0000000000..4731b6eee8 --- /dev/null +++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFRawMap.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive.execution; + +import org.apache.hadoop.hive.ql.exec.UDF; + +import java.util.Collections; +import java.util.Map; + +/** + * UDF that returns a raw (non-parameterized) java Map. + */ +public class UDFRawMap extends UDF { + public Map evaluate(Object o) { + return Collections.singletonMap("a", "1"); + } +} diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToIntIntMap.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToIntIntMap.java index b3e8bcbbd8..91b9673a09 100644 --- a/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToIntIntMap.java +++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToIntIntMap.java @@ -23,13 +23,13 @@ import java.util.HashMap; import java.util.Map; public class UDFToIntIntMap extends UDF { - public Map evaluate(Object o) { - return new HashMap() { - { - put(1, 1); - put(2, 1); - put(3, 1); - } - }; - } + public Map evaluate(Object o) { + return new HashMap() { + { + put(1, 1); + put(2, 1); + put(3, 1); + } + }; + } } diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToListInt.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToListInt.java index 67576a72f1..66fc8c09fd 100644 --- a/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToListInt.java +++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToListInt.java @@ -19,11 +19,11 @@ package org.apache.spark.sql.hive.execution; import org.apache.hadoop.hive.ql.exec.UDF; +import java.util.ArrayList; import java.util.Arrays; -import java.util.List; public class UDFToListInt extends UDF { - public List evaluate(Object o) { - return Arrays.asList(1, 2, 3); - } + public ArrayList evaluate(Object o) { + return new ArrayList<>(Arrays.asList(1, 2, 3)); + } } diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToListMapStringListInt.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToListMapStringListInt.java new file mode 100644 index 0000000000..d16f27221d --- /dev/null +++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToListMapStringListInt.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive.execution; + +import org.apache.hadoop.hive.ql.exec.UDF; + +import java.util.*; + +/** + * UDF that returns a nested list of maps that uses a string as its key and a list of ints as its + * values. + */ +public class UDFToListMapStringListInt extends UDF { + public List>> evaluate(Object o) { + final Map> map = new HashMap<>(); + map.put("a", Arrays.asList(1, 2)); + map.put("b", Arrays.asList(3, 4)); + return Collections.singletonList(map); + } +} diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToListString.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToListString.java index f02395cbba..5185b47a56 100644 --- a/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToListString.java +++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToListString.java @@ -23,7 +23,7 @@ import java.util.Arrays; import java.util.List; public class UDFToListString extends UDF { - public List evaluate(Object o) { - return Arrays.asList("data1", "data2", "data3"); - } + public List evaluate(Object o) { + return Arrays.asList("data1", "data2", "data3"); + } } diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToStringIntMap.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToStringIntMap.java index 9eea5c9a88..b7ca60e036 100644 --- a/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToStringIntMap.java +++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToStringIntMap.java @@ -20,16 +20,15 @@ package org.apache.spark.sql.hive.execution; import org.apache.hadoop.hive.ql.exec.UDF; import java.util.HashMap; -import java.util.Map; public class UDFToStringIntMap extends UDF { - public Map evaluate(Object o) { - return new HashMap() { - { - put("key1", 1); - put("key2", 2); - put("key3", 3); - } - }; - } + public HashMap evaluate(Object o) { + return new HashMap() { + { + put("key1", 1); + put("key2", 2); + put("key3", 3); + } + }; + } } diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFWildcardList.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFWildcardList.java new file mode 100644 index 0000000000..717e1117b9 --- /dev/null +++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFWildcardList.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive.execution; + +import org.apache.hadoop.hive.ql.exec.UDF; + +import java.util.Collections; +import java.util.List; + +/** + * UDF that returns a raw (non-parameterized) java List. + */ +public class UDFWildcardList extends UDF { + public List evaluate(Object o) { + return Collections.singletonList("data1"); + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala index 58909ab9ea..ef6883839d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala @@ -214,11 +214,9 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { testData.createOrReplaceTempView("inputTable") sql(s"CREATE TEMPORARY FUNCTION testUDFToListString AS '${classOf[UDFToListString].getName}'") - val errMsg = intercept[AnalysisException] { - sql("SELECT testUDFToListString(s) FROM inputTable") - } - assert(errMsg.getMessage contains "List type in java is unsupported because " + - "JVM type erasure makes spark fail to catch a component type in List<>;") + checkAnswer( + sql("SELECT testUDFToListString(s) FROM inputTable"), + Seq(Row(Seq("data1", "data2", "data3")))) sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFToListString") hiveContext.reset() @@ -229,11 +227,9 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { testData.createOrReplaceTempView("inputTable") sql(s"CREATE TEMPORARY FUNCTION testUDFToListInt AS '${classOf[UDFToListInt].getName}'") - val errMsg = intercept[AnalysisException] { - sql("SELECT testUDFToListInt(s) FROM inputTable") - } - assert(errMsg.getMessage contains "List type in java is unsupported because " + - "JVM type erasure makes spark fail to catch a component type in List<>;") + checkAnswer( + sql("SELECT testUDFToListInt(s) FROM inputTable"), + Seq(Row(Seq(1, 2, 3)))) sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFToListInt") hiveContext.reset() @@ -245,11 +241,9 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { sql(s"CREATE TEMPORARY FUNCTION testUDFToStringIntMap " + s"AS '${classOf[UDFToStringIntMap].getName}'") - val errMsg = intercept[AnalysisException] { - sql("SELECT testUDFToStringIntMap(s) FROM inputTable") - } - assert(errMsg.getMessage contains "Map type in java is unsupported because " + - "JVM type erasure makes spark fail to catch key and value types in Map<>;") + checkAnswer( + sql("SELECT testUDFToStringIntMap(s) FROM inputTable"), + Seq(Row(Map("key1" -> 1, "key2" -> 2, "key3" -> 3)))) sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFToStringIntMap") hiveContext.reset() @@ -261,16 +255,71 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { sql(s"CREATE TEMPORARY FUNCTION testUDFToIntIntMap " + s"AS '${classOf[UDFToIntIntMap].getName}'") - val errMsg = intercept[AnalysisException] { - sql("SELECT testUDFToIntIntMap(s) FROM inputTable") - } - assert(errMsg.getMessage contains "Map type in java is unsupported because " + - "JVM type erasure makes spark fail to catch key and value types in Map<>;") + checkAnswer( + sql("SELECT testUDFToIntIntMap(s) FROM inputTable"), + Seq(Row(Map(1 -> 1, 2 -> 1, 3 -> 1)))) sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFToIntIntMap") hiveContext.reset() } + test("UDFToListMapStringListInt") { + val testData = spark.sparkContext.parallelize(StringCaseClass("") :: Nil).toDF() + testData.createOrReplaceTempView("inputTable") + + sql(s"CREATE TEMPORARY FUNCTION testUDFToListMapStringListInt " + + s"AS '${classOf[UDFToListMapStringListInt].getName}'") + checkAnswer( + sql("SELECT testUDFToListMapStringListInt(s) FROM inputTable"), + Seq(Row(Seq(Map("a" -> Seq(1, 2), "b" -> Seq(3, 4)))))) + + sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFToListMapStringListInt") + hiveContext.reset() + } + + test("UDFRawList") { + val testData = spark.sparkContext.parallelize(StringCaseClass("") :: Nil).toDF() + testData.createOrReplaceTempView("inputTable") + + sql(s"CREATE TEMPORARY FUNCTION testUDFRawList " + + s"AS '${classOf[UDFRawList].getName}'") + val err = intercept[AnalysisException](sql("SELECT testUDFRawList(s) FROM inputTable")) + assert(err.getMessage.contains( + "Raw list type in java is unsupported because Spark cannot infer the element type.")) + + sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFRawList") + hiveContext.reset() + } + + test("UDFRawMap") { + val testData = spark.sparkContext.parallelize(StringCaseClass("") :: Nil).toDF() + testData.createOrReplaceTempView("inputTable") + + sql(s"CREATE TEMPORARY FUNCTION testUDFRawMap " + + s"AS '${classOf[UDFRawMap].getName}'") + val err = intercept[AnalysisException](sql("SELECT testUDFRawMap(s) FROM inputTable")) + assert(err.getMessage.contains( + "Raw map type in java is unsupported because Spark cannot infer key and value types.")) + + sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFRawMap") + hiveContext.reset() + } + + test("UDFWildcardList") { + val testData = spark.sparkContext.parallelize(StringCaseClass("") :: Nil).toDF() + testData.createOrReplaceTempView("inputTable") + + sql(s"CREATE TEMPORARY FUNCTION testUDFWildcardList " + + s"AS '${classOf[UDFWildcardList].getName}'") + val err = intercept[AnalysisException](sql("SELECT testUDFWildcardList(s) FROM inputTable")) + assert(err.getMessage.contains( + "Collection types with wildcards (e.g. List or Map) are unsupported " + + "because Spark cannot infer the data type for these type parameters.")) + + sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFWildcardList") + hiveContext.reset() + } + test("UDFListListInt") { val testData = spark.sparkContext.parallelize( ListListIntCaseClass(Nil) :: -- cgit v1.2.3