aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorYadong Qi <qiyadong2010@gmail.com>2015-06-15 12:01:52 -0700
committerMichael Armbrust <michael@databricks.com>2015-06-15 12:01:52 -0700
commit6ae21a944a0f4580b55749776223c827450b00da (patch)
treed46a873103a3dd4ca30071f5227cc6c1cc0c5616 /sql
parent56d4e8a2d0f6aab9a599cd8733e20500ffe8fc8a (diff)
downloadspark-6ae21a944a0f4580b55749776223c827450b00da.tar.gz
spark-6ae21a944a0f4580b55749776223c827450b00da.tar.bz2
spark-6ae21a944a0f4580b55749776223c827450b00da.zip
[SPARK-6583] [SQL] Support aggregate functions in ORDER BY
Add aggregates in ORDER BY clauses to the `Aggregate` operator beneath. Project these results away after the Sort. Based on work by watermen. Also Closes #5290. Author: Yadong Qi <qiyadong2010@gmail.com> Author: Michael Armbrust <michael@databricks.com> Closes #6816 from marmbrus/pr/5290 and squashes the following commits: 3226a97 [Michael Armbrust] consistent ordering eb8938d [Michael Armbrust] no vars c8b25c1 [Yadong Qi] move the test data. 7f9b736 [Yadong Qi] delete Substring case a1e87c1 [Yadong Qi] fix conflict f119849 [Yadong Qi] order by aggregated function
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala19
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala45
2 files changed, 61 insertions, 3 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 4b7fef7126..badf903478 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.types._
+import scala.collection.mutable.ArrayBuffer
/**
* A trivial [[Analyzer]] with an [[EmptyCatalog]] and [[EmptyFunctionRegistry]]. Used for testing
@@ -396,19 +397,31 @@ class Analyzer(
}
case s @ Sort(ordering, global, a @ Aggregate(grouping, aggs, child))
if !s.resolved && a.resolved =>
- val unresolved = ordering.flatMap(_.collect { case UnresolvedAttribute(name) => name })
// A small hack to create an object that will allow us to resolve any references that
// refer to named expressions that are present in the grouping expressions.
val groupingRelation = LocalRelation(
grouping.collect { case ne: NamedExpression => ne.toAttribute }
)
- val (resolvedOrdering, missing) = resolveAndFindMissing(ordering, a, groupingRelation)
+ // Find sort attributes that are projected away so we can temporarily add them back in.
+ val (resolvedOrdering, unresolved) = resolveAndFindMissing(ordering, a, groupingRelation)
+
+ // Find aggregate expressions and evaluate them early, since they can't be evaluated in a
+ // Sort.
+ val (withAggsRemoved, aliasedAggregateList) = resolvedOrdering.map {
+ case aggOrdering if aggOrdering.collect { case a: AggregateExpression => a }.nonEmpty =>
+ val aliased = Alias(aggOrdering.child, "_aggOrdering")()
+ (aggOrdering.copy(child = aliased.toAttribute), aliased :: Nil)
+
+ case other => (other, Nil)
+ }.unzip
+
+ val missing = unresolved ++ aliasedAggregateList.flatten
if (missing.nonEmpty) {
// Add missing grouping exprs and then project them away after the sort.
Project(a.output,
- Sort(resolvedOrdering, global,
+ Sort(withAggsRemoved, global,
Aggregate(grouping, aggs ++ missing, child)))
} else {
s // Nothing we can do here. Return original plan.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index d1520b757e..a47cc30e92 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -1366,6 +1366,51 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll with SQLTestUtils {
checkAnswer(sql("SELECT a.`c.b`, `b.$q`[0].`a@!.q`, `q.w`.`w.i&`[0] FROM t"), Row(1, 1, 1))
}
+ test("SPARK-6583 order by aggregated function") {
+ Seq("1" -> 3, "1" -> 4, "2" -> 7, "2" -> 8, "3" -> 5, "3" -> 6, "4" -> 1, "4" -> 2)
+ .toDF("a", "b").registerTempTable("orderByData")
+
+ checkAnswer(
+ sql(
+ """
+ |SELECT a
+ |FROM orderByData
+ |GROUP BY a
+ |ORDER BY sum(b)
+ """.stripMargin),
+ Row("4") :: Row("1") :: Row("3") :: Row("2") :: Nil)
+
+ checkAnswer(
+ sql(
+ """
+ |SELECT sum(b)
+ |FROM orderByData
+ |GROUP BY a
+ |ORDER BY sum(b)
+ """.stripMargin),
+ Row(3) :: Row(7) :: Row(11) :: Row(15) :: Nil)
+
+ checkAnswer(
+ sql(
+ """
+ |SELECT a, sum(b)
+ |FROM orderByData
+ |GROUP BY a
+ |ORDER BY sum(b)
+ """.stripMargin),
+ Row("4", 3) :: Row("1", 7) :: Row("3", 11) :: Row("2", 15) :: Nil)
+
+ checkAnswer(
+ sql(
+ """
+ |SELECT a, sum(b)
+ |FROM orderByData
+ |GROUP BY a
+ |ORDER BY sum(b) + 1
+ """.stripMargin),
+ Row("4", 3) :: Row("1", 7) :: Row("3", 11) :: Row("2", 15) :: Nil)
+ }
+
test("SPARK-7952: fix the equality check between boolean and numeric types") {
withTempTable("t") {
// numeric field i, boolean field j, result of i = j, result of i <=> j