diff options
author | Cheng Hao <hao.cheng@intel.com> | 2014-12-30 12:11:44 -0800 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2014-12-30 12:11:44 -0800 |
commit | 53f0a00b6051fb6cb52a90f91ae01bcd77e332c5 (patch) | |
tree | d1db4db01969748df0d537c345d507ff1f1abdb8 /sql/catalyst | |
parent | daac221302e0cf71a7b7bda31625134cf7b9dce1 (diff) | |
download | spark-53f0a00b6051fb6cb52a90f91ae01bcd77e332c5.tar.gz spark-53f0a00b6051fb6cb52a90f91ae01bcd77e332c5.tar.bz2 spark-53f0a00b6051fb6cb52a90f91ae01bcd77e332c5.zip |
[Spark-4512] [SQL] Unresolved Attribute Exception in Sort By
It will cause exception while do query like:
SELECT key+key FROM src sort by value;
Author: Cheng Hao <hao.cheng@intel.com>
Closes #3386 from chenghao-intel/sort and squashes the following commits:
38c78cc [Cheng Hao] revert the SortPartition in SparkStrategies
7e9dd15 [Cheng Hao] update the typo
fcd1d64 [Cheng Hao] rebase the latest master and update the SortBy unit test
Diffstat (limited to 'sql/catalyst')
4 files changed, 21 insertions, 11 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index 66860a4c09..f79d4ff444 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -204,8 +204,8 @@ class SqlParser extends AbstractSparkSQLParser { ) protected lazy val sortType: Parser[LogicalPlan => LogicalPlan] = - ( ORDER ~ BY ~> ordering ^^ { case o => l: LogicalPlan => Sort(o, l) } - | SORT ~ BY ~> ordering ^^ { case o => l: LogicalPlan => SortPartitions(o, l) } + ( ORDER ~ BY ~> ordering ^^ { case o => l: LogicalPlan => Sort(o, true, l) } + | SORT ~ BY ~> ordering ^^ { case o => l: LogicalPlan => Sort(o, false, l) } ) protected lazy val ordering: Parser[Seq[SortOrder]] = diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 1c4088b843..72680f37a0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -246,7 +246,7 @@ class Analyzer(catalog: Catalog, case p: LogicalPlan if !p.childrenResolved => p // If the projection list contains Stars, expand it. - case p@Project(projectList, child) if containsStar(projectList) => + case p @ Project(projectList, child) if containsStar(projectList) => Project( projectList.flatMap { case s: Star => s.expand(child.output, resolver) @@ -310,7 +310,8 @@ class Analyzer(catalog: Catalog, */ object ResolveSortReferences extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { - case s @ Sort(ordering, p @ Project(projectList, child)) if !s.resolved && p.resolved => + case s @ Sort(ordering, global, p @ Project(projectList, child)) + if !s.resolved && p.resolved => val unresolved = ordering.flatMap(_.collect { case UnresolvedAttribute(name) => name }) val resolved = unresolved.flatMap(child.resolve(_, resolver)) val requiredAttributes = AttributeSet(resolved.collect { case a: Attribute => a }) @@ -319,13 +320,14 @@ class Analyzer(catalog: Catalog, if (missingInProject.nonEmpty) { // Add missing attributes and then project them away after the sort. Project(projectList.map(_.toAttribute), - Sort(ordering, + Sort(ordering, global, Project(projectList ++ missingInProject, child))) } else { logDebug(s"Failed to find $missingInProject in ${p.output.mkString(", ")}") s // Nothing we can do here. Return original plan. } - case s @ Sort(ordering, a @ Aggregate(grouping, aggs, child)) if !s.resolved && a.resolved => + case s @ Sort(ordering, global, a @ Aggregate(grouping, aggs, child)) + if !s.resolved && a.resolved => val unresolved = ordering.flatMap(_.collect { case UnresolvedAttribute(name) => name }) // A small hack to create an object that will allow us to resolve any references that // refer to named expressions that are present in the grouping expressions. @@ -340,8 +342,7 @@ class Analyzer(catalog: Catalog, if (missingInAggs.nonEmpty) { // Add missing grouping exprs and then project them away after the sort. Project(a.output, - Sort(ordering, - Aggregate(grouping, aggs ++ missingInAggs, child))) + Sort(ordering, global, Aggregate(grouping, aggs ++ missingInAggs, child))) } else { s // Nothing we can do here. Return original plan. } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index fb252cdf51..a14e5b9ef1 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -244,9 +244,9 @@ package object dsl { condition: Option[Expression] = None) = Join(logicalPlan, otherPlan, joinType, condition) - def orderBy(sortExprs: SortOrder*) = Sort(sortExprs, logicalPlan) + def orderBy(sortExprs: SortOrder*) = Sort(sortExprs, true, logicalPlan) - def sortBy(sortExprs: SortOrder*) = SortPartitions(sortExprs, logicalPlan) + def sortBy(sortExprs: SortOrder*) = Sort(sortExprs, false, logicalPlan) def groupBy(groupingExprs: Expression*)(aggregateExprs: Expression*) = { val aliasedExprs = aggregateExprs.map { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index a9282b98ad..0b9f01cbae 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -130,7 +130,16 @@ case class WriteToFile( override def output = child.output } -case class Sort(order: Seq[SortOrder], child: LogicalPlan) extends UnaryNode { +/** + * @param order The ordering expressions + * @param global True means global sorting apply for entire data set, + * False means sorting only apply within the partition. + * @param child Child logical plan + */ +case class Sort( + order: Seq[SortOrder], + global: Boolean, + child: LogicalPlan) extends UnaryNode { override def output = child.output } |