aboutsummaryrefslogtreecommitdiff
path: root/sql/hive/src
diff options
context:
space:
mode:
authorCheng Hao <hao.cheng@intel.com>2015-05-14 00:14:59 +0800
committerCheng Lian <lian@databricks.com>2015-05-14 00:14:59 +0800
commit0da254fb2903c01e059fa7d0dc81df5740312b35 (patch)
treec99831581e4987626b950f0df96ce36a26e0ee53 /sql/hive/src
parentaa6ba3f2166edcc8bcda3abc70482fa8605e83b7 (diff)
downloadspark-0da254fb2903c01e059fa7d0dc81df5740312b35.tar.gz
spark-0da254fb2903c01e059fa7d0dc81df5740312b35.tar.bz2
spark-0da254fb2903c01e059fa7d0dc81df5740312b35.zip
[SPARK-6734] [SQL] Add UDTF.close support in Generate
Some third-party UDTF extensions generate additional rows in the "GenericUDTF.close()" method, which is supported / documented by Hive. https://cwiki.apache.org/confluence/display/Hive/DeveloperGuide+UDTF However, Spark SQL ignores the "GenericUDTF.close()", and it causes bug while porting job from Hive to Spark SQL. Author: Cheng Hao <hao.cheng@intel.com> Closes #5383 from chenghao-intel/udtf_close and squashes the following commits: 98b4e4b [Cheng Hao] Support UDTF.close
Diffstat (limited to 'sql/hive/src')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala18
-rw-r--r--sql/hive/src/test/resources/TestUDTF.jarbin0 -> 1328 bytes
-rw-r--r--sql/hive/src/test/resources/golden/Test UDTF.close in Lateral Views-0-ac5c96224a534f07b49462ad766206782
-rw-r--r--sql/hive/src/test/resources/golden/Test UDTF.close in SELECT-0-517f834fef35b896ec64399f42b2a1512
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala21
5 files changed, 40 insertions, 3 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
index fd0b6f0585..bc6b3a2d58 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
@@ -483,7 +483,11 @@ private[hive] case class HiveGenericUdtf(
extends Generator with HiveInspectors {
@transient
- protected lazy val function: GenericUDTF = funcWrapper.createFunction()
+ protected lazy val function: GenericUDTF = {
+ val fun: GenericUDTF = funcWrapper.createFunction()
+ fun.setCollector(collector)
+ fun
+ }
@transient
protected lazy val inputInspectors = children.map(toInspector)
@@ -494,6 +498,9 @@ private[hive] case class HiveGenericUdtf(
@transient
protected lazy val udtInput = new Array[AnyRef](children.length)
+ @transient
+ protected lazy val collector = new UDTFCollector
+
lazy val elementTypes = outputInspector.getAllStructFieldRefs.map {
field => (inspectorToDataType(field.getFieldObjectInspector), true)
}
@@ -502,8 +509,7 @@ private[hive] case class HiveGenericUdtf(
outputInspector // Make sure initialized.
val inputProjection = new InterpretedProjection(children)
- val collector = new UDTFCollector
- function.setCollector(collector)
+
function.process(wrap(inputProjection(input), inputInspectors, udtInput))
collector.collectRows()
}
@@ -525,6 +531,12 @@ private[hive] case class HiveGenericUdtf(
}
}
+ override def terminate(): TraversableOnce[Row] = {
+ outputInspector // Make sure initialized.
+ function.close()
+ collector.collectRows()
+ }
+
override def toString: String = {
s"$nodeName#${funcWrapper.functionClassName}(${children.mkString(",")})"
}
diff --git a/sql/hive/src/test/resources/TestUDTF.jar b/sql/hive/src/test/resources/TestUDTF.jar
new file mode 100644
index 0000000000..514f2d5d26
--- /dev/null
+++ b/sql/hive/src/test/resources/TestUDTF.jar
Binary files differ
diff --git a/sql/hive/src/test/resources/golden/Test UDTF.close in Lateral Views-0-ac5c96224a534f07b49462ad76620678 b/sql/hive/src/test/resources/golden/Test UDTF.close in Lateral Views-0-ac5c96224a534f07b49462ad76620678
new file mode 100644
index 0000000000..946e72fc87
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/Test UDTF.close in Lateral Views-0-ac5c96224a534f07b49462ad76620678
@@ -0,0 +1,2 @@
+97 500
+97 500
diff --git a/sql/hive/src/test/resources/golden/Test UDTF.close in SELECT-0-517f834fef35b896ec64399f42b2a151 b/sql/hive/src/test/resources/golden/Test UDTF.close in SELECT-0-517f834fef35b896ec64399f42b2a151
new file mode 100644
index 0000000000..a5c8806279
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/Test UDTF.close in SELECT-0-517f834fef35b896ec64399f42b2a151
@@ -0,0 +1,2 @@
+3
+3
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index 2c9c08a9f3..089a57e25c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -20,6 +20,9 @@ package org.apache.spark.sql.hive.execution
import java.io.File
import java.util.{Locale, TimeZone}
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory
+import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorFactory, StructObjectInspector, ObjectInspector}
import org.scalatest.BeforeAndAfter
import scala.util.Try
@@ -51,14 +54,32 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles"))
// Add Locale setting
Locale.setDefault(Locale.US)
+ sql(s"ADD JAR ${TestHive.getHiveFile("TestUDTF.jar").getCanonicalPath()}")
+ // The function source code can be found at:
+ // https://cwiki.apache.org/confluence/display/Hive/DeveloperGuide+UDTF
+ sql(
+ """
+ |CREATE TEMPORARY FUNCTION udtf_count2
+ |AS 'org.apache.spark.sql.hive.execution.GenericUDTFCount2'
+ """.stripMargin)
}
override def afterAll() {
TestHive.cacheTables = false
TimeZone.setDefault(originalTimeZone)
Locale.setDefault(originalLocale)
+ sql("DROP TEMPORARY FUNCTION udtf_count2")
}
+ createQueryTest("Test UDTF.close in Lateral Views",
+ """
+ |SELECT key, cc
+ |FROM src LATERAL VIEW udtf_count2(value) dd AS cc
+ """.stripMargin, false) // false mean we have to keep the temp function in registry
+
+ createQueryTest("Test UDTF.close in SELECT",
+ "SELECT udtf_count2(a) FROM (SELECT 1 AS a FROM src LIMIT 3) table", false)
+
test("SPARK-4908: concurrent hive native commands") {
(1 to 100).par.map { _ =>
sql("USE default")