From 5adbb39482631998dbfe4a1da88f6e75b30fb5ac Mon Sep 17 00:00:00 2001 From: wangfei Date: Tue, 3 Feb 2015 12:16:31 -0800 Subject: [SPARK-5383][SQL] Support alias for udtfs Add support for alias of udtfs, such as ``` select stack(2, key, value, key, value) as (a, b) from src limit 5; select a, b from (select stack(2, key, value, key, value) as (a, b) from src) t limit 5 ``` Author: wangfei Author: scwf Author: Fei Wang Closes #4186 from scwf/multi-alias-names and squashes the following commits: c35e922 [wangfei] fix conflicts adc8311 [wangfei] minor format fix 2783aed [wangfei] convert it to a Generate instead of leaving it inside of a Project clause a87668a [wangfei] minor improvement b25d9b3 [wangfei] resolve conflicts d38f041 [wangfei] style fix 8cfcebf [wangfei] minor improvement 12a239e [wangfei] fix test case 050177f [wangfei] added extendedCheckRules 3d69329 [wangfei] added CheckMultiAlias to analyzer 324150d [wangfei] added multi alias node 74f5a81 [Fei Wang] imports order fix 5bc3f59 [scwf] style fix 3daec28 [scwf] support alias for udfs with multi output columns --- .../spark/sql/catalyst/analysis/Analyzer.scala | 5 +-- .../spark/sql/catalyst/analysis/unresolved.scala | 38 ++++++++++++++++++++++ .../catalyst/expressions/namedExpressions.scala | 2 +- .../org/apache/spark/sql/hive/HiveContext.scala | 1 + .../scala/org/apache/spark/sql/hive/HiveQl.scala | 16 ++++++--- .../scala/org/apache/spark/sql/hive/hiveUdfs.scala | 19 ++++++++++- .../spark/sql/hive/execution/HiveQuerySuite.scala | 12 +++++++ 7 files changed, 85 insertions(+), 8 deletions(-) (limited to 'sql') diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index cefd70acf3..ae7f7b9feb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -69,8 +69,9 @@ class Analyzer(catalog: Catalog, typeCoercionRules ++ extendedRules : _*), Batch("Check Analysis", Once, - CheckResolution, - CheckAggregation), + CheckResolution :: + CheckAggregation :: + Nil: _*), Batch("AnalysisOperators", fixedPoint, EliminateAnalysisOperators) ) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index 6606028918..f35921e2a7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -128,6 +128,44 @@ case class UnresolvedStar(table: Option[String]) extends Star { override def toString = table.map(_ + ".").getOrElse("") + "*" } +/** + * Used to assign new names to Generator's output, such as hive udtf. + * For example the SQL expression "stack(2, key, value, key, value) as (a, b)" could be represented + * as follows: + * MultiAlias(stack_function, Seq(a, b)) + + * @param child the computation being performed + * @param names the names to be associated with each output of computing [[child]]. + */ +case class MultiAlias(child: Expression, names: Seq[String]) + extends Attribute with trees.UnaryNode[Expression] { + + override def name = throw new UnresolvedException(this, "name") + + override def exprId = throw new UnresolvedException(this, "exprId") + + override def dataType = throw new UnresolvedException(this, "dataType") + + override def nullable = throw new UnresolvedException(this, "nullable") + + override def qualifiers = throw new UnresolvedException(this, "qualifiers") + + override lazy val resolved = false + + override def newInstance = this + + override def withNullability(newNullability: Boolean) = this + + override def withQualifiers(newQualifiers: Seq[String]) = this + + override def withName(newName: String) = this + + override def eval(input: Row = null): EvaluatedType = + throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}") + + override def toString: String = s"$child AS $names" + +} /** * Represents all the resolved input attributes to a given relational operator. This is used diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index f388cd5972..e6ab1fd8d7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -75,7 +75,7 @@ abstract class Attribute extends NamedExpression { /** * Used to assign a new name to a computation. * For example the SQL expression "1 + 1 AS a" could be represented as follows: - * Alias(Add(Literal(1), Literal(1), "a")() + * Alias(Add(Literal(1), Literal(1)), "a")() * * Note that exprId and qualifiers are in a separate parameter list because * we only pattern match on child and name. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 50f266a4bc..922e61f0be 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -323,6 +323,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { catalog.CreateTables :: catalog.PreInsertionCasts :: ExtractPythonUdfs :: + ResolveUdtfsAlias :: Nil } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index ab305e1f82..74ca0d4ed5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive import java.sql.Date +import scala.collection.mutable.ArrayBuffer import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.ql.Context @@ -968,14 +969,21 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C } protected def selExprNodeToExpr(node: Node): Option[Expression] = node match { - case Token("TOK_SELEXPR", - e :: Nil) => + case Token("TOK_SELEXPR", e :: Nil) => Some(nodeToExpr(e)) - case Token("TOK_SELEXPR", - e :: Token(alias, Nil) :: Nil) => + case Token("TOK_SELEXPR", e :: Token(alias, Nil) :: Nil) => Some(Alias(nodeToExpr(e), cleanIdentifier(alias))()) + case Token("TOK_SELEXPR", e :: aliasChildren) => + var aliasNames = ArrayBuffer[String]() + aliasChildren.foreach { _ match { + case Token(name, Nil) => aliasNames += cleanIdentifier(name) + case _ => + } + } + Some(MultiAlias(nodeToExpr(e), aliasNames)) + /* Hints are ignored */ case Token("TOK_HINTLIST", _) => None diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala index 76d2140372..34c21c1176 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala @@ -33,8 +33,11 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDF._ import org.apache.spark.Logging import org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical.{Generate, Project, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.types._ -import org.apache.spark.util.Utils.getContextOrSparkClassLoader +import org.apache.spark.sql.catalyst.analysis.MultiAlias +import org.apache.spark.sql.catalyst.errors.TreeNodeException /* Implicit conversions */ import scala.collection.JavaConversions._ @@ -321,6 +324,20 @@ private[hive] case class HiveGenericUdtf( override def toString = s"$nodeName#${funcWrapper.functionClassName}(${children.mkString(",")})" } +/** + * Resolve Udtfs Alias. + */ +private[spark] object ResolveUdtfsAlias extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan) = plan transform { + case p @ Project(projectList, _) + if projectList.exists(_.isInstanceOf[MultiAlias]) && projectList.size != 1 => + throw new TreeNodeException(p, "only single Generator supported for SELECT clause") + + case Project(Seq(MultiAlias(udtf @ HiveGenericUdtf(_, _, _), names)), child) => + Generate(udtf.copy(aliasNames = names), join = false, outer = false, None, child) + } +} + private[hive] case class HiveUdafFunction( funcWrapper: HiveFunctionWrapper, exprs: Seq[Expression], diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 4c53b10ba9..8e84d279fe 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -583,6 +583,18 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { assert(sql("select key from src having key > 490").collect().size < 100) } + test("SPARK-5383 alias for udfs with multi output columns") { + assert( + sql("select stack(2, key, value, key, value) as (a, b) from src limit 5") + .collect() + .size == 5) + + assert( + sql("select a, b from (select stack(2, key, value, key, value) as (a, b) from src) t limit 5") + .collect() + .size == 5) + } + test("SPARK-5367: resolve star expression in udf") { assert(sql("select concat(*) from src limit 5").collect().size == 5) assert(sql("select array(*) from src limit 5").collect().size == 5) -- cgit v1.2.3