aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorCheng Hao <hao.cheng@intel.com>2015-04-21 15:11:15 -0700
committerMichael Armbrust <michael@databricks.com>2015-04-21 15:11:15 -0700
commit7662ec23bb6c4d4fe4c857b6928eaed0a97d3c04 (patch)
treed9d0b5821fa04de211f0072d860b57cf95c69a74 /sql/hive
parent2a24bf92e6d36e876bad6a8b4e0ff12c407ebb8a (diff)
downloadspark-7662ec23bb6c4d4fe4c857b6928eaed0a97d3c04.tar.gz
spark-7662ec23bb6c4d4fe4c857b6928eaed0a97d3c04.tar.bz2
spark-7662ec23bb6c4d4fe4c857b6928eaed0a97d3c04.zip
[SPARK-5817] [SQL] Fix bug of udtf with column names
It's a bug while do query like: ```sql select d from (select explode(array(1,1)) d from src limit 1) t ``` And it will throws exception like: ``` org.apache.spark.sql.AnalysisException: cannot resolve 'd' given input columns _c0; line 1 pos 7 at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$apply$3$$anonfun$apply$1.applyOrElse(CheckAnalysis.scala:48) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$apply$3$$anonfun$apply$1.applyOrElse(CheckAnalysis.scala:45) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:250) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:250) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:50) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:249) at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$transformExpressionUp$1(QueryPlan.scala:103) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2$$anonfun$apply$2.apply(QueryPlan.scala:117) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:116) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) ``` To solve the bug, it requires code refactoring for UDTF The major changes are about: * Simplifying the UDTF development, UDTF will manage the output attribute names any more, instead, the `logical.Generate` will handle that properly. * UDTF will be asked for the output schema (data types) during the logical plan analyzing. Author: Cheng Hao <hao.cheng@intel.com> Closes #4602 from chenghao-intel/explode_bug and squashes the following commits: c2a5132 [Cheng Hao] add back resolved for Alias 556e982 [Cheng Hao] revert the unncessary change 002c361 [Cheng Hao] change the rule of resolved for Generate 04ae500 [Cheng Hao] add qualifier only for generator output 5ee5d2c [Cheng Hao] prepend the new qualifier d2e8b43 [Cheng Hao] Update the code as feedback ca5e7f4 [Cheng Hao] shrink the commits
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala1
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala37
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala38
-rw-r--r--sql/hive/src/test/resources/golden/Specify the udtf output-0-d1f244bce64f22b34ad5bf9fd360b6321
-rw-r--r--sql/hive/src/test/resources/golden/insert table with generator with column name-0-7ac701cf43e73e9e416888e4df6943480
-rw-r--r--sql/hive/src/test/resources/golden/insert table with generator with column name-1-5cdf9d51fc0e105e365d82e7611e37f30
-rw-r--r--sql/hive/src/test/resources/golden/insert table with generator with column name-2-f963396461294e06cb7cafe22a1419e43
-rw-r--r--sql/hive/src/test/resources/golden/insert table with generator with multiple column names-0-46bdb27b3359dc81d8c246b9f69d4b820
-rw-r--r--sql/hive/src/test/resources/golden/insert table with generator with multiple column names-1-cdf6989f3b055257f1692c3bbd80dc730
-rw-r--r--sql/hive/src/test/resources/golden/insert table with generator with multiple column names-2-ab3954b69d7a991bc801a509c3166cc53
-rw-r--r--sql/hive/src/test/resources/golden/insert table with generator without column name-0-7ac701cf43e73e9e416888e4df6943480
-rw-r--r--sql/hive/src/test/resources/golden/insert table with generator without column name-1-26599718c322ff4f9740040c066d82920
-rw-r--r--sql/hive/src/test/resources/golden/insert table with generator without column name-2-f963396461294e06cb7cafe22a1419e43
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala40
14 files changed, 71 insertions, 55 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 7c6a7df2bd..c4a73b3004 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -249,7 +249,6 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
catalog.CreateTables ::
catalog.PreInsertionCasts ::
ExtractPythonUdfs ::
- ResolveUdtfsAlias ::
sources.PreInsertCastAndRename ::
Nil
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index fd305eb480..85061f2277 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -725,12 +725,14 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
val alias =
getClause("TOK_TABALIAS", clauses).getChildren.head.asInstanceOf[ASTNode].getText
- Generate(
- nodesToGenerator(clauses),
- join = true,
- outer = false,
- Some(alias.toLowerCase),
- withWhere)
+ val (generator, attributes) = nodesToGenerator(clauses)
+ Generate(
+ generator,
+ join = true,
+ outer = false,
+ Some(alias.toLowerCase),
+ attributes.map(UnresolvedAttribute(_)),
+ withWhere)
}.getOrElse(withWhere)
// The projection of the query can either be a normal projection, an aggregation
@@ -833,12 +835,14 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
val alias = getClause("TOK_TABALIAS", clauses).getChildren.head.asInstanceOf[ASTNode].getText
- Generate(
- nodesToGenerator(clauses),
- join = true,
- outer = isOuter.nonEmpty,
- Some(alias.toLowerCase),
- nodeToRelation(relationClause))
+ val (generator, attributes) = nodesToGenerator(clauses)
+ Generate(
+ generator,
+ join = true,
+ outer = isOuter.nonEmpty,
+ Some(alias.toLowerCase),
+ attributes.map(UnresolvedAttribute(_)),
+ nodeToRelation(relationClause))
/* All relations, possibly with aliases or sampling clauses. */
case Token("TOK_TABREF", clauses) =>
@@ -1311,7 +1315,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
val explode = "(?i)explode".r
- def nodesToGenerator(nodes: Seq[Node]): Generator = {
+ def nodesToGenerator(nodes: Seq[Node]): (Generator, Seq[String]) = {
val function = nodes.head
val attributes = nodes.flatMap {
@@ -1321,7 +1325,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
function match {
case Token("TOK_FUNCTION", Token(explode(), Nil) :: child :: Nil) =>
- Explode(attributes, nodeToExpr(child))
+ (Explode(nodeToExpr(child)), attributes)
case Token("TOK_FUNCTION", Token(functionName, Nil) :: children) =>
val functionInfo: FunctionInfo =
@@ -1329,10 +1333,9 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
sys.error(s"Couldn't find function $functionName"))
val functionClassName = functionInfo.getFunctionClass.getName
- HiveGenericUdtf(
+ (HiveGenericUdtf(
new HiveFunctionWrapper(functionClassName),
- attributes,
- children.map(nodeToExpr))
+ children.map(nodeToExpr)), attributes)
case a: ASTNode =>
throw new NotImplementedError(
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
index 47305571e5..4b6f0ad75f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
@@ -66,7 +66,7 @@ private[hive] abstract class HiveFunctionRegistry
} else if (classOf[UDAF].isAssignableFrom(functionInfo.getFunctionClass)) {
HiveUdaf(new HiveFunctionWrapper(functionClassName), children)
} else if (classOf[GenericUDTF].isAssignableFrom(functionInfo.getFunctionClass)) {
- HiveGenericUdtf(new HiveFunctionWrapper(functionClassName), Nil, children)
+ HiveGenericUdtf(new HiveFunctionWrapper(functionClassName), children)
} else {
sys.error(s"No handler for udf ${functionInfo.getFunctionClass}")
}
@@ -266,7 +266,6 @@ private[hive] case class HiveUdaf(
*/
private[hive] case class HiveGenericUdtf(
funcWrapper: HiveFunctionWrapper,
- aliasNames: Seq[String],
children: Seq[Expression])
extends Generator with HiveInspectors {
@@ -282,23 +281,8 @@ private[hive] case class HiveGenericUdtf(
@transient
protected lazy val udtInput = new Array[AnyRef](children.length)
- protected lazy val outputDataTypes = outputInspector.getAllStructFieldRefs.map {
- field => inspectorToDataType(field.getFieldObjectInspector)
- }
-
- override protected def makeOutput() = {
- // Use column names when given, otherwise _c1, _c2, ... _cn.
- if (aliasNames.size == outputDataTypes.size) {
- aliasNames.zip(outputDataTypes).map {
- case (attrName, attrDataType) =>
- AttributeReference(attrName, attrDataType, nullable = true)()
- }
- } else {
- outputDataTypes.zipWithIndex.map {
- case (attrDataType, i) =>
- AttributeReference(s"_c$i", attrDataType, nullable = true)()
- }
- }
+ lazy val elementTypes = outputInspector.getAllStructFieldRefs.map {
+ field => (inspectorToDataType(field.getFieldObjectInspector), true)
}
override def eval(input: Row): TraversableOnce[Row] = {
@@ -333,22 +317,6 @@ private[hive] case class HiveGenericUdtf(
}
}
-/**
- * Resolve Udtfs Alias.
- */
-private[spark] object ResolveUdtfsAlias extends Rule[LogicalPlan] {
- def apply(plan: LogicalPlan): LogicalPlan = plan transform {
- case p @ Project(projectList, _)
- if projectList.exists(_.isInstanceOf[MultiAlias]) && projectList.size != 1 =>
- throw new TreeNodeException(p, "only single Generator supported for SELECT clause")
-
- case Project(Seq(Alias(udtf @ HiveGenericUdtf(_, _, _), name)), child) =>
- Generate(udtf.copy(aliasNames = Seq(name)), join = false, outer = false, None, child)
- case Project(Seq(MultiAlias(udtf @ HiveGenericUdtf(_, _, _), names)), child) =>
- Generate(udtf.copy(aliasNames = names), join = false, outer = false, None, child)
- }
-}
-
private[hive] case class HiveUdafFunction(
funcWrapper: HiveFunctionWrapper,
exprs: Seq[Expression],
diff --git a/sql/hive/src/test/resources/golden/Specify the udtf output-0-d1f244bce64f22b34ad5bf9fd360b632 b/sql/hive/src/test/resources/golden/Specify the udtf output-0-d1f244bce64f22b34ad5bf9fd360b632
new file mode 100644
index 0000000000..d00491fd7e
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/Specify the udtf output-0-d1f244bce64f22b34ad5bf9fd360b632
@@ -0,0 +1 @@
+1
diff --git a/sql/hive/src/test/resources/golden/insert table with generator with column name-0-7ac701cf43e73e9e416888e4df694348 b/sql/hive/src/test/resources/golden/insert table with generator with column name-0-7ac701cf43e73e9e416888e4df694348
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/insert table with generator with column name-0-7ac701cf43e73e9e416888e4df694348
diff --git a/sql/hive/src/test/resources/golden/insert table with generator with column name-1-5cdf9d51fc0e105e365d82e7611e37f3 b/sql/hive/src/test/resources/golden/insert table with generator with column name-1-5cdf9d51fc0e105e365d82e7611e37f3
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/insert table with generator with column name-1-5cdf9d51fc0e105e365d82e7611e37f3
diff --git a/sql/hive/src/test/resources/golden/insert table with generator with column name-2-f963396461294e06cb7cafe22a1419e4 b/sql/hive/src/test/resources/golden/insert table with generator with column name-2-f963396461294e06cb7cafe22a1419e4
new file mode 100644
index 0000000000..01e79c32a8
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/insert table with generator with column name-2-f963396461294e06cb7cafe22a1419e4
@@ -0,0 +1,3 @@
+1
+2
+3
diff --git a/sql/hive/src/test/resources/golden/insert table with generator with multiple column names-0-46bdb27b3359dc81d8c246b9f69d4b82 b/sql/hive/src/test/resources/golden/insert table with generator with multiple column names-0-46bdb27b3359dc81d8c246b9f69d4b82
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/insert table with generator with multiple column names-0-46bdb27b3359dc81d8c246b9f69d4b82
diff --git a/sql/hive/src/test/resources/golden/insert table with generator with multiple column names-1-cdf6989f3b055257f1692c3bbd80dc73 b/sql/hive/src/test/resources/golden/insert table with generator with multiple column names-1-cdf6989f3b055257f1692c3bbd80dc73
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/insert table with generator with multiple column names-1-cdf6989f3b055257f1692c3bbd80dc73
diff --git a/sql/hive/src/test/resources/golden/insert table with generator with multiple column names-2-ab3954b69d7a991bc801a509c3166cc5 b/sql/hive/src/test/resources/golden/insert table with generator with multiple column names-2-ab3954b69d7a991bc801a509c3166cc5
new file mode 100644
index 0000000000..0c7520f209
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/insert table with generator with multiple column names-2-ab3954b69d7a991bc801a509c3166cc5
@@ -0,0 +1,3 @@
+86 val_86
+238 val_238
+311 val_311
diff --git a/sql/hive/src/test/resources/golden/insert table with generator without column name-0-7ac701cf43e73e9e416888e4df694348 b/sql/hive/src/test/resources/golden/insert table with generator without column name-0-7ac701cf43e73e9e416888e4df694348
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/insert table with generator without column name-0-7ac701cf43e73e9e416888e4df694348
diff --git a/sql/hive/src/test/resources/golden/insert table with generator without column name-1-26599718c322ff4f9740040c066d8292 b/sql/hive/src/test/resources/golden/insert table with generator without column name-1-26599718c322ff4f9740040c066d8292
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/insert table with generator without column name-1-26599718c322ff4f9740040c066d8292
diff --git a/sql/hive/src/test/resources/golden/insert table with generator without column name-2-f963396461294e06cb7cafe22a1419e4 b/sql/hive/src/test/resources/golden/insert table with generator without column name-2-f963396461294e06cb7cafe22a1419e4
new file mode 100644
index 0000000000..01e79c32a8
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/insert table with generator without column name-2-f963396461294e06cb7cafe22a1419e4
@@ -0,0 +1,3 @@
+1
+2
+3
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index 300b1f7920..ac10b17330 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -27,7 +27,7 @@ import scala.util.Try
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.spark.{SparkFiles, SparkException}
-import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.functions._
import org.apache.spark.sql.hive._
@@ -67,6 +67,40 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
}
}
+ createQueryTest("insert table with generator with column name",
+ """
+ | CREATE TABLE gen_tmp (key Int);
+ | INSERT OVERWRITE TABLE gen_tmp
+ | SELECT explode(array(1,2,3)) AS val FROM src LIMIT 3;
+ | SELECT key FROM gen_tmp ORDER BY key ASC;
+ """.stripMargin)
+
+ createQueryTest("insert table with generator with multiple column names",
+ """
+ | CREATE TABLE gen_tmp (key Int, value String);
+ | INSERT OVERWRITE TABLE gen_tmp
+ | SELECT explode(map(key, value)) as (k1, k2) FROM src LIMIT 3;
+ | SELECT key, value FROM gen_tmp ORDER BY key, value ASC;
+ """.stripMargin)
+
+ createQueryTest("insert table with generator without column name",
+ """
+ | CREATE TABLE gen_tmp (key Int);
+ | INSERT OVERWRITE TABLE gen_tmp
+ | SELECT explode(array(1,2,3)) FROM src LIMIT 3;
+ | SELECT key FROM gen_tmp ORDER BY key ASC;
+ """.stripMargin)
+
+ test("multiple generator in projection") {
+ intercept[AnalysisException] {
+ sql("SELECT explode(map(key, value)), key FROM src").collect()
+ }
+
+ intercept[AnalysisException] {
+ sql("SELECT explode(map(key, value)) as k1, k2, key FROM src").collect()
+ }
+ }
+
createQueryTest("! operator",
"""
|SELECT a FROM (
@@ -456,7 +490,6 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
createQueryTest("lateral view2",
"SELECT * FROM src LATERAL VIEW explode(array(1,2)) tbl")
-
createQueryTest("lateral view3",
"FROM src SELECT key, D.* lateral view explode(array(key+3, key+4)) D as CX")
@@ -478,6 +511,9 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
createQueryTest("lateral view6",
"SELECT * FROM src LATERAL VIEW explode(map(key+3,key+4)) D as k, v")
+ createQueryTest("Specify the udtf output",
+ "SELECT d FROM (SELECT explode(array(1,1)) d FROM src LIMIT 1) t")
+
test("sampling") {
sql("SELECT * FROM src TABLESAMPLE(0.1 PERCENT) s")
sql("SELECT * FROM src TABLESAMPLE(100 PERCENT) s")