aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorDilip Biswal <dbiswal@us.ibm.com>2015-11-26 11:31:28 -0800
committerMichael Armbrust <michael@databricks.com>2015-11-26 11:31:28 -0800
commitbc16a67562560c732833260cbc34825f7e9dcb8f (patch)
tree19eaba747238fa585db4c65a049d7964ca6e0a97 /sql/catalyst
parent001f0528a851ac314b390e65eb0583f89e69a949 (diff)
downloadspark-bc16a67562560c732833260cbc34825f7e9dcb8f.tar.gz
spark-bc16a67562560c732833260cbc34825f7e9dcb8f.tar.bz2
spark-bc16a67562560c732833260cbc34825f7e9dcb8f.zip
[SPARK-11863][SQL] Unable to resolve order by if it contains mixture of aliases and real columns
this is based on https://github.com/apache/spark/pull/9844, with some bug fix and clean up. The problems is that, normal operator should be resolved based on its child, but `Sort` operator can also be resolved based on its grandchild. So we have 3 rules that can resolve `Sort`: `ResolveReferences`, `ResolveSortReferences`(if grandchild is `Project`) and `ResolveAggregateFunctions`(if grandchild is `Aggregate`). For example, `select c1 as a , c2 as b from tab group by c1, c2 order by a, c2`, we need to resolve `a` and `c2` for `Sort`. Firstly `a` will be resolved in `ResolveReferences` based on its child, and when we reach `ResolveAggregateFunctions`, we will try to resolve both `a` and `c2` based on its grandchild, but failed because `a` is not a legal aggregate expression. whoever merge this PR, please give the credit to dilipbiswal Author: Dilip Biswal <dbiswal@us.ibm.com> Author: Wenchen Fan <wenchen@databricks.com> Closes #9961 from cloud-fan/sort.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala13
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala18
2 files changed, 28 insertions, 3 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 47962ebe6e..94ffbbb2e5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -630,7 +630,8 @@ class Analyzer(
// Try resolving the ordering as though it is in the aggregate clause.
try {
- val aliasedOrdering = sortOrder.map(o => Alias(o.child, "aggOrder")())
+ val unresolvedSortOrders = sortOrder.filter(s => !s.resolved || containsAggregate(s))
+ val aliasedOrdering = unresolvedSortOrders.map(o => Alias(o.child, "aggOrder")())
val aggregatedOrdering = aggregate.copy(aggregateExpressions = aliasedOrdering)
val resolvedAggregate: Aggregate = execute(aggregatedOrdering).asInstanceOf[Aggregate]
val resolvedAliasedOrdering: Seq[Alias] =
@@ -663,13 +664,19 @@ class Analyzer(
}
}
+ val sortOrdersMap = unresolvedSortOrders
+ .map(new TreeNodeRef(_))
+ .zip(evaluatedOrderings)
+ .toMap
+ val finalSortOrders = sortOrder.map(s => sortOrdersMap.getOrElse(new TreeNodeRef(s), s))
+
// Since we don't rely on sort.resolved as the stop condition for this rule,
// we need to check this and prevent applying this rule multiple times
- if (sortOrder == evaluatedOrderings) {
+ if (sortOrder == finalSortOrders) {
sort
} else {
Project(aggregate.output,
- Sort(evaluatedOrderings, global,
+ Sort(finalSortOrders, global,
aggregate.copy(aggregateExpressions = originalAggExprs ++ needsPushDown)))
}
} catch {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index e051069951..aeeca802d8 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -220,6 +220,24 @@ class AnalysisSuite extends AnalysisTest {
// checkUDF(udf4, expected4)
}
+ test("SPARK-11863 mixture of aliases and real columns in order by clause - tpcds 19,55,71") {
+ val a = testRelation2.output(0)
+ val c = testRelation2.output(2)
+ val alias1 = a.as("a1")
+ val alias2 = c.as("a2")
+ val alias3 = count(a).as("a3")
+
+ val plan = testRelation2
+ .groupBy('a, 'c)('a.as("a1"), 'c.as("a2"), count('a).as("a3"))
+ .orderBy('a1.asc, 'c.asc)
+
+ val expected = testRelation2
+ .groupBy(a, c)(alias1, alias2, alias3)
+ .orderBy(alias1.toAttribute.asc, alias2.toAttribute.asc)
+ .select(alias1.toAttribute, alias2.toAttribute, alias3.toAttribute)
+ checkAnalysis(plan, expected)
+ }
+
test("analyzer should replace current_timestamp with literals") {
val in = Project(Seq(Alias(CurrentTimestamp(), "a")(), Alias(CurrentTimestamp(), "b")()),
LocalRelation())