aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorwangfei <wangfei1@huawei.com>2015-02-03 12:16:31 -0800
committerMichael Armbrust <michael@databricks.com>2015-02-03 12:16:31 -0800
commit5adbb39482631998dbfe4a1da88f6e75b30fb5ac (patch)
tree851a86986f171d0164e0741fb667656a0fdf9c67 /sql/hive
parentca7a6cdff004eb4605fd223e127b4a46a0a214e7 (diff)
downloadspark-5adbb39482631998dbfe4a1da88f6e75b30fb5ac.tar.gz
spark-5adbb39482631998dbfe4a1da88f6e75b30fb5ac.tar.bz2
spark-5adbb39482631998dbfe4a1da88f6e75b30fb5ac.zip
[SPARK-5383][SQL] Support alias for udtfs
Add support for alias of udtfs, such as ``` select stack(2, key, value, key, value) as (a, b) from src limit 5; select a, b from (select stack(2, key, value, key, value) as (a, b) from src) t limit 5 ``` Author: wangfei <wangfei1@huawei.com> Author: scwf <wangfei1@huawei.com> Author: Fei Wang <wangfei1@huawei.com> Closes #4186 from scwf/multi-alias-names and squashes the following commits: c35e922 [wangfei] fix conflicts adc8311 [wangfei] minor format fix 2783aed [wangfei] convert it to a Generate instead of leaving it inside of a Project clause a87668a [wangfei] minor improvement b25d9b3 [wangfei] resolve conflicts d38f041 [wangfei] style fix 8cfcebf [wangfei] minor improvement 12a239e [wangfei] fix test case 050177f [wangfei] added extendedCheckRules 3d69329 [wangfei] added CheckMultiAlias to analyzer 324150d [wangfei] added multi alias node 74f5a81 [Fei Wang] imports order fix 5bc3f59 [scwf] style fix 3daec28 [scwf] support alias for udfs with multi output columns
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala1
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala16
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala19
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala12
4 files changed, 43 insertions, 5 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 50f266a4bc..922e61f0be 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -323,6 +323,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
catalog.CreateTables ::
catalog.PreInsertionCasts ::
ExtractPythonUdfs ::
+ ResolveUdtfsAlias ::
Nil
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index ab305e1f82..74ca0d4ed5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.hive
import java.sql.Date
+import scala.collection.mutable.ArrayBuffer
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.Context
@@ -968,14 +969,21 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
}
protected def selExprNodeToExpr(node: Node): Option[Expression] = node match {
- case Token("TOK_SELEXPR",
- e :: Nil) =>
+ case Token("TOK_SELEXPR", e :: Nil) =>
Some(nodeToExpr(e))
- case Token("TOK_SELEXPR",
- e :: Token(alias, Nil) :: Nil) =>
+ case Token("TOK_SELEXPR", e :: Token(alias, Nil) :: Nil) =>
Some(Alias(nodeToExpr(e), cleanIdentifier(alias))())
+ case Token("TOK_SELEXPR", e :: aliasChildren) =>
+ var aliasNames = ArrayBuffer[String]()
+ aliasChildren.foreach { _ match {
+ case Token(name, Nil) => aliasNames += cleanIdentifier(name)
+ case _ =>
+ }
+ }
+ Some(MultiAlias(nodeToExpr(e), aliasNames))
+
/* Hints are ignored */
case Token("TOK_HINTLIST", _) => None
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
index 76d2140372..34c21c1176 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
@@ -33,8 +33,11 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDF._
import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical.{Generate, Project, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.types._
-import org.apache.spark.util.Utils.getContextOrSparkClassLoader
+import org.apache.spark.sql.catalyst.analysis.MultiAlias
+import org.apache.spark.sql.catalyst.errors.TreeNodeException
/* Implicit conversions */
import scala.collection.JavaConversions._
@@ -321,6 +324,20 @@ private[hive] case class HiveGenericUdtf(
override def toString = s"$nodeName#${funcWrapper.functionClassName}(${children.mkString(",")})"
}
+/**
+ * Resolve Udtfs Alias.
+ */
+private[spark] object ResolveUdtfsAlias extends Rule[LogicalPlan] {
+ def apply(plan: LogicalPlan) = plan transform {
+ case p @ Project(projectList, _)
+ if projectList.exists(_.isInstanceOf[MultiAlias]) && projectList.size != 1 =>
+ throw new TreeNodeException(p, "only single Generator supported for SELECT clause")
+
+ case Project(Seq(MultiAlias(udtf @ HiveGenericUdtf(_, _, _), names)), child) =>
+ Generate(udtf.copy(aliasNames = names), join = false, outer = false, None, child)
+ }
+}
+
private[hive] case class HiveUdafFunction(
funcWrapper: HiveFunctionWrapper,
exprs: Seq[Expression],
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index 4c53b10ba9..8e84d279fe 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -583,6 +583,18 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
assert(sql("select key from src having key > 490").collect().size < 100)
}
+ test("SPARK-5383 alias for udfs with multi output columns") {
+ assert(
+ sql("select stack(2, key, value, key, value) as (a, b) from src limit 5")
+ .collect()
+ .size == 5)
+
+ assert(
+ sql("select a, b from (select stack(2, key, value, key, value) as (a, b) from src) t limit 5")
+ .collect()
+ .size == 5)
+ }
+
test("SPARK-5367: resolve star expression in udf") {
assert(sql("select concat(*) from src limit 5").collect().size == 5)
assert(sql("select array(*) from src limit 5").collect().size == 5)