aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorCheng Hao <hao.cheng@intel.com>2014-12-30 12:11:44 -0800
committerMichael Armbrust <michael@databricks.com>2014-12-30 12:11:44 -0800
commit53f0a00b6051fb6cb52a90f91ae01bcd77e332c5 (patch)
treed1db4db01969748df0d537c345d507ff1f1abdb8 /sql/catalyst
parentdaac221302e0cf71a7b7bda31625134cf7b9dce1 (diff)
downloadspark-53f0a00b6051fb6cb52a90f91ae01bcd77e332c5.tar.gz
spark-53f0a00b6051fb6cb52a90f91ae01bcd77e332c5.tar.bz2
spark-53f0a00b6051fb6cb52a90f91ae01bcd77e332c5.zip
[Spark-4512] [SQL] Unresolved Attribute Exception in Sort By
It will cause exception while do query like: SELECT key+key FROM src sort by value; Author: Cheng Hao <hao.cheng@intel.com> Closes #3386 from chenghao-intel/sort and squashes the following commits: 38c78cc [Cheng Hao] revert the SortPartition in SparkStrategies 7e9dd15 [Cheng Hao] update the typo fcd1d64 [Cheng Hao] rebase the latest master and update the SortBy unit test
Diffstat (limited to 'sql/catalyst')
-rwxr-xr-xsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala4
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala13
-rwxr-xr-xsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala4
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala11
4 files changed, 21 insertions, 11 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
index 66860a4c09..f79d4ff444 100755
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
@@ -204,8 +204,8 @@ class SqlParser extends AbstractSparkSQLParser {
)
protected lazy val sortType: Parser[LogicalPlan => LogicalPlan] =
- ( ORDER ~ BY ~> ordering ^^ { case o => l: LogicalPlan => Sort(o, l) }
- | SORT ~ BY ~> ordering ^^ { case o => l: LogicalPlan => SortPartitions(o, l) }
+ ( ORDER ~ BY ~> ordering ^^ { case o => l: LogicalPlan => Sort(o, true, l) }
+ | SORT ~ BY ~> ordering ^^ { case o => l: LogicalPlan => Sort(o, false, l) }
)
protected lazy val ordering: Parser[Seq[SortOrder]] =
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 1c4088b843..72680f37a0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -246,7 +246,7 @@ class Analyzer(catalog: Catalog,
case p: LogicalPlan if !p.childrenResolved => p
// If the projection list contains Stars, expand it.
- case p@Project(projectList, child) if containsStar(projectList) =>
+ case p @ Project(projectList, child) if containsStar(projectList) =>
Project(
projectList.flatMap {
case s: Star => s.expand(child.output, resolver)
@@ -310,7 +310,8 @@ class Analyzer(catalog: Catalog,
*/
object ResolveSortReferences extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
- case s @ Sort(ordering, p @ Project(projectList, child)) if !s.resolved && p.resolved =>
+ case s @ Sort(ordering, global, p @ Project(projectList, child))
+ if !s.resolved && p.resolved =>
val unresolved = ordering.flatMap(_.collect { case UnresolvedAttribute(name) => name })
val resolved = unresolved.flatMap(child.resolve(_, resolver))
val requiredAttributes = AttributeSet(resolved.collect { case a: Attribute => a })
@@ -319,13 +320,14 @@ class Analyzer(catalog: Catalog,
if (missingInProject.nonEmpty) {
// Add missing attributes and then project them away after the sort.
Project(projectList.map(_.toAttribute),
- Sort(ordering,
+ Sort(ordering, global,
Project(projectList ++ missingInProject, child)))
} else {
logDebug(s"Failed to find $missingInProject in ${p.output.mkString(", ")}")
s // Nothing we can do here. Return original plan.
}
- case s @ Sort(ordering, a @ Aggregate(grouping, aggs, child)) if !s.resolved && a.resolved =>
+ case s @ Sort(ordering, global, a @ Aggregate(grouping, aggs, child))
+ if !s.resolved && a.resolved =>
val unresolved = ordering.flatMap(_.collect { case UnresolvedAttribute(name) => name })
// A small hack to create an object that will allow us to resolve any references that
// refer to named expressions that are present in the grouping expressions.
@@ -340,8 +342,7 @@ class Analyzer(catalog: Catalog,
if (missingInAggs.nonEmpty) {
// Add missing grouping exprs and then project them away after the sort.
Project(a.output,
- Sort(ordering,
- Aggregate(grouping, aggs ++ missingInAggs, child)))
+ Sort(ordering, global, Aggregate(grouping, aggs ++ missingInAggs, child)))
} else {
s // Nothing we can do here. Return original plan.
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index fb252cdf51..a14e5b9ef1 100755
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -244,9 +244,9 @@ package object dsl {
condition: Option[Expression] = None) =
Join(logicalPlan, otherPlan, joinType, condition)
- def orderBy(sortExprs: SortOrder*) = Sort(sortExprs, logicalPlan)
+ def orderBy(sortExprs: SortOrder*) = Sort(sortExprs, true, logicalPlan)
- def sortBy(sortExprs: SortOrder*) = SortPartitions(sortExprs, logicalPlan)
+ def sortBy(sortExprs: SortOrder*) = Sort(sortExprs, false, logicalPlan)
def groupBy(groupingExprs: Expression*)(aggregateExprs: Expression*) = {
val aliasedExprs = aggregateExprs.map {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index a9282b98ad..0b9f01cbae 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -130,7 +130,16 @@ case class WriteToFile(
override def output = child.output
}
-case class Sort(order: Seq[SortOrder], child: LogicalPlan) extends UnaryNode {
+/**
+ * @param order The ordering expressions
+ * @param global True means global sorting apply for entire data set,
+ * False means sorting only apply within the partition.
+ * @param child Child logical plan
+ */
+case class Sort(
+ order: Seq[SortOrder],
+ global: Boolean,
+ child: LogicalPlan) extends UnaryNode {
override def output = child.output
}