diff options
author | Cheng Hao <hao.cheng@intel.com> | 2015-05-14 00:14:59 +0800 |
---|---|---|
committer | Cheng Lian <lian@databricks.com> | 2015-05-14 00:14:59 +0800 |
commit | 0da254fb2903c01e059fa7d0dc81df5740312b35 (patch) | |
tree | c99831581e4987626b950f0df96ce36a26e0ee53 /sql/hive/src | |
parent | aa6ba3f2166edcc8bcda3abc70482fa8605e83b7 (diff) | |
download | spark-0da254fb2903c01e059fa7d0dc81df5740312b35.tar.gz spark-0da254fb2903c01e059fa7d0dc81df5740312b35.tar.bz2 spark-0da254fb2903c01e059fa7d0dc81df5740312b35.zip |
[SPARK-6734] [SQL] Add UDTF.close support in Generate
Some third-party UDTF extensions generate additional rows in the "GenericUDTF.close()" method, which is supported / documented by Hive.
https://cwiki.apache.org/confluence/display/Hive/DeveloperGuide+UDTF
However, Spark SQL ignores the "GenericUDTF.close()", and it causes bug while porting job from Hive to Spark SQL.
Author: Cheng Hao <hao.cheng@intel.com>
Closes #5383 from chenghao-intel/udtf_close and squashes the following commits:
98b4e4b [Cheng Hao] Support UDTF.close
Diffstat (limited to 'sql/hive/src')
5 files changed, 40 insertions, 3 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala index fd0b6f0585..bc6b3a2d58 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala @@ -483,7 +483,11 @@ private[hive] case class HiveGenericUdtf( extends Generator with HiveInspectors { @transient - protected lazy val function: GenericUDTF = funcWrapper.createFunction() + protected lazy val function: GenericUDTF = { + val fun: GenericUDTF = funcWrapper.createFunction() + fun.setCollector(collector) + fun + } @transient protected lazy val inputInspectors = children.map(toInspector) @@ -494,6 +498,9 @@ private[hive] case class HiveGenericUdtf( @transient protected lazy val udtInput = new Array[AnyRef](children.length) + @transient + protected lazy val collector = new UDTFCollector + lazy val elementTypes = outputInspector.getAllStructFieldRefs.map { field => (inspectorToDataType(field.getFieldObjectInspector), true) } @@ -502,8 +509,7 @@ private[hive] case class HiveGenericUdtf( outputInspector // Make sure initialized. val inputProjection = new InterpretedProjection(children) - val collector = new UDTFCollector - function.setCollector(collector) + function.process(wrap(inputProjection(input), inputInspectors, udtInput)) collector.collectRows() } @@ -525,6 +531,12 @@ private[hive] case class HiveGenericUdtf( } } + override def terminate(): TraversableOnce[Row] = { + outputInspector // Make sure initialized. + function.close() + collector.collectRows() + } + override def toString: String = { s"$nodeName#${funcWrapper.functionClassName}(${children.mkString(",")})" } diff --git a/sql/hive/src/test/resources/TestUDTF.jar b/sql/hive/src/test/resources/TestUDTF.jar Binary files differnew file mode 100644 index 0000000000..514f2d5d26 --- /dev/null +++ b/sql/hive/src/test/resources/TestUDTF.jar diff --git a/sql/hive/src/test/resources/golden/Test UDTF.close in Lateral Views-0-ac5c96224a534f07b49462ad76620678 b/sql/hive/src/test/resources/golden/Test UDTF.close in Lateral Views-0-ac5c96224a534f07b49462ad76620678 new file mode 100644 index 0000000000..946e72fc87 --- /dev/null +++ b/sql/hive/src/test/resources/golden/Test UDTF.close in Lateral Views-0-ac5c96224a534f07b49462ad76620678 @@ -0,0 +1,2 @@ +97 500 +97 500 diff --git a/sql/hive/src/test/resources/golden/Test UDTF.close in SELECT-0-517f834fef35b896ec64399f42b2a151 b/sql/hive/src/test/resources/golden/Test UDTF.close in SELECT-0-517f834fef35b896ec64399f42b2a151 new file mode 100644 index 0000000000..a5c8806279 --- /dev/null +++ b/sql/hive/src/test/resources/golden/Test UDTF.close in SELECT-0-517f834fef35b896ec64399f42b2a151 @@ -0,0 +1,2 @@ +3 +3 diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 2c9c08a9f3..089a57e25c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -20,6 +20,9 @@ package org.apache.spark.sql.hive.execution import java.io.File import java.util.{Locale, TimeZone} +import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory +import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorFactory, StructObjectInspector, ObjectInspector} import org.scalatest.BeforeAndAfter import scala.util.Try @@ -51,14 +54,32 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles")) // Add Locale setting Locale.setDefault(Locale.US) + sql(s"ADD JAR ${TestHive.getHiveFile("TestUDTF.jar").getCanonicalPath()}") + // The function source code can be found at: + // https://cwiki.apache.org/confluence/display/Hive/DeveloperGuide+UDTF + sql( + """ + |CREATE TEMPORARY FUNCTION udtf_count2 + |AS 'org.apache.spark.sql.hive.execution.GenericUDTFCount2' + """.stripMargin) } override def afterAll() { TestHive.cacheTables = false TimeZone.setDefault(originalTimeZone) Locale.setDefault(originalLocale) + sql("DROP TEMPORARY FUNCTION udtf_count2") } + createQueryTest("Test UDTF.close in Lateral Views", + """ + |SELECT key, cc + |FROM src LATERAL VIEW udtf_count2(value) dd AS cc + """.stripMargin, false) // false mean we have to keep the temp function in registry + + createQueryTest("Test UDTF.close in SELECT", + "SELECT udtf_count2(a) FROM (SELECT 1 AS a FROM src LIMIT 3) table", false) + test("SPARK-4908: concurrent hive native commands") { (1 to 100).par.map { _ => sql("USE default") |