aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorwangfei <wangfei1@huawei.com>2015-02-03 12:16:31 -0800
committerMichael Armbrust <michael@databricks.com>2015-02-03 12:16:31 -0800
commit5adbb39482631998dbfe4a1da88f6e75b30fb5ac (patch)
tree851a86986f171d0164e0741fb667656a0fdf9c67 /sql
parentca7a6cdff004eb4605fd223e127b4a46a0a214e7 (diff)
downloadspark-5adbb39482631998dbfe4a1da88f6e75b30fb5ac.tar.gz
spark-5adbb39482631998dbfe4a1da88f6e75b30fb5ac.tar.bz2
spark-5adbb39482631998dbfe4a1da88f6e75b30fb5ac.zip
[SPARK-5383][SQL] Support alias for udtfs
Add support for alias of udtfs, such as ``` select stack(2, key, value, key, value) as (a, b) from src limit 5; select a, b from (select stack(2, key, value, key, value) as (a, b) from src) t limit 5 ``` Author: wangfei <wangfei1@huawei.com> Author: scwf <wangfei1@huawei.com> Author: Fei Wang <wangfei1@huawei.com> Closes #4186 from scwf/multi-alias-names and squashes the following commits: c35e922 [wangfei] fix conflicts adc8311 [wangfei] minor format fix 2783aed [wangfei] convert it to a Generate instead of leaving it inside of a Project clause a87668a [wangfei] minor improvement b25d9b3 [wangfei] resolve conflicts d38f041 [wangfei] style fix 8cfcebf [wangfei] minor improvement 12a239e [wangfei] fix test case 050177f [wangfei] added extendedCheckRules 3d69329 [wangfei] added CheckMultiAlias to analyzer 324150d [wangfei] added multi alias node 74f5a81 [Fei Wang] imports order fix 5bc3f59 [scwf] style fix 3daec28 [scwf] support alias for udfs with multi output columns
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala5
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala38
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala1
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala16
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala19
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala12
7 files changed, 85 insertions, 8 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index cefd70acf3..ae7f7b9feb 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -69,8 +69,9 @@ class Analyzer(catalog: Catalog,
typeCoercionRules ++
extendedRules : _*),
Batch("Check Analysis", Once,
- CheckResolution,
- CheckAggregation),
+ CheckResolution ::
+ CheckAggregation ::
+ Nil: _*),
Batch("AnalysisOperators", fixedPoint,
EliminateAnalysisOperators)
)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
index 6606028918..f35921e2a7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
@@ -128,6 +128,44 @@ case class UnresolvedStar(table: Option[String]) extends Star {
override def toString = table.map(_ + ".").getOrElse("") + "*"
}
+/**
+ * Used to assign new names to Generator's output, such as hive udtf.
+ * For example the SQL expression "stack(2, key, value, key, value) as (a, b)" could be represented
+ * as follows:
+ * MultiAlias(stack_function, Seq(a, b))
+
+ * @param child the computation being performed
+ * @param names the names to be associated with each output of computing [[child]].
+ */
+case class MultiAlias(child: Expression, names: Seq[String])
+ extends Attribute with trees.UnaryNode[Expression] {
+
+ override def name = throw new UnresolvedException(this, "name")
+
+ override def exprId = throw new UnresolvedException(this, "exprId")
+
+ override def dataType = throw new UnresolvedException(this, "dataType")
+
+ override def nullable = throw new UnresolvedException(this, "nullable")
+
+ override def qualifiers = throw new UnresolvedException(this, "qualifiers")
+
+ override lazy val resolved = false
+
+ override def newInstance = this
+
+ override def withNullability(newNullability: Boolean) = this
+
+ override def withQualifiers(newQualifiers: Seq[String]) = this
+
+ override def withName(newName: String) = this
+
+ override def eval(input: Row = null): EvaluatedType =
+ throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")
+
+ override def toString: String = s"$child AS $names"
+
+}
/**
* Represents all the resolved input attributes to a given relational operator. This is used
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
index f388cd5972..e6ab1fd8d7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
@@ -75,7 +75,7 @@ abstract class Attribute extends NamedExpression {
/**
* Used to assign a new name to a computation.
* For example the SQL expression "1 + 1 AS a" could be represented as follows:
- * Alias(Add(Literal(1), Literal(1), "a")()
+ * Alias(Add(Literal(1), Literal(1)), "a")()
*
* Note that exprId and qualifiers are in a separate parameter list because
* we only pattern match on child and name.
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 50f266a4bc..922e61f0be 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -323,6 +323,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
catalog.CreateTables ::
catalog.PreInsertionCasts ::
ExtractPythonUdfs ::
+ ResolveUdtfsAlias ::
Nil
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index ab305e1f82..74ca0d4ed5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.hive
import java.sql.Date
+import scala.collection.mutable.ArrayBuffer
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.Context
@@ -968,14 +969,21 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
}
protected def selExprNodeToExpr(node: Node): Option[Expression] = node match {
- case Token("TOK_SELEXPR",
- e :: Nil) =>
+ case Token("TOK_SELEXPR", e :: Nil) =>
Some(nodeToExpr(e))
- case Token("TOK_SELEXPR",
- e :: Token(alias, Nil) :: Nil) =>
+ case Token("TOK_SELEXPR", e :: Token(alias, Nil) :: Nil) =>
Some(Alias(nodeToExpr(e), cleanIdentifier(alias))())
+ case Token("TOK_SELEXPR", e :: aliasChildren) =>
+ var aliasNames = ArrayBuffer[String]()
+ aliasChildren.foreach { _ match {
+ case Token(name, Nil) => aliasNames += cleanIdentifier(name)
+ case _ =>
+ }
+ }
+ Some(MultiAlias(nodeToExpr(e), aliasNames))
+
/* Hints are ignored */
case Token("TOK_HINTLIST", _) => None
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
index 76d2140372..34c21c1176 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
@@ -33,8 +33,11 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDF._
import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical.{Generate, Project, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.types._
-import org.apache.spark.util.Utils.getContextOrSparkClassLoader
+import org.apache.spark.sql.catalyst.analysis.MultiAlias
+import org.apache.spark.sql.catalyst.errors.TreeNodeException
/* Implicit conversions */
import scala.collection.JavaConversions._
@@ -321,6 +324,20 @@ private[hive] case class HiveGenericUdtf(
override def toString = s"$nodeName#${funcWrapper.functionClassName}(${children.mkString(",")})"
}
+/**
+ * Resolve Udtfs Alias.
+ */
+private[spark] object ResolveUdtfsAlias extends Rule[LogicalPlan] {
+ def apply(plan: LogicalPlan) = plan transform {
+ case p @ Project(projectList, _)
+ if projectList.exists(_.isInstanceOf[MultiAlias]) && projectList.size != 1 =>
+ throw new TreeNodeException(p, "only single Generator supported for SELECT clause")
+
+ case Project(Seq(MultiAlias(udtf @ HiveGenericUdtf(_, _, _), names)), child) =>
+ Generate(udtf.copy(aliasNames = names), join = false, outer = false, None, child)
+ }
+}
+
private[hive] case class HiveUdafFunction(
funcWrapper: HiveFunctionWrapper,
exprs: Seq[Expression],
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index 4c53b10ba9..8e84d279fe 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -583,6 +583,18 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
assert(sql("select key from src having key > 490").collect().size < 100)
}
+ test("SPARK-5383 alias for udfs with multi output columns") {
+ assert(
+ sql("select stack(2, key, value, key, value) as (a, b) from src limit 5")
+ .collect()
+ .size == 5)
+
+ assert(
+ sql("select a, b from (select stack(2, key, value, key, value) as (a, b) from src) t limit 5")
+ .collect()
+ .size == 5)
+ }
+
test("SPARK-5367: resolve star expression in udf") {
assert(sql("select concat(*) from src limit 5").collect().size == 5)
assert(sql("select array(*) from src limit 5").collect().size == 5)