aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorHerman van Hovell <hvanhovell@databricks.com>2016-09-22 14:29:27 -0700
committerHerman van Hovell <hvanhovell@databricks.com>2016-09-22 14:29:27 -0700
commit0d634875026ccf1eaf984996e9460d7673561f80 (patch)
tree3faf8e6530d10c3767e2a9ca1c5adb4f12e8ec7b /sql/core
parent3cdae0ff2f45643df7bc198cb48623526c7eb1a6 (diff)
downloadspark-0d634875026ccf1eaf984996e9460d7673561f80.tar.gz
spark-0d634875026ccf1eaf984996e9460d7673561f80.tar.bz2
spark-0d634875026ccf1eaf984996e9460d7673561f80.zip
[SPARK-17616][SQL] Support a single distinct aggregate combined with a non-partial aggregate
## What changes were proposed in this pull request? We currently cannot execute an aggregate that contains a single distinct aggregate function and an one or more non-partially plannable aggregate functions, for example: ```sql select grp, collect_list(col1), count(distinct col2) from tbl_a group by 1 ``` This is a regression from Spark 1.6. This is caused by the fact that the single distinct aggregation code path assumes that all aggregates can be planned in two phases (is partially aggregatable). This PR works around this issue by triggering the `RewriteDistinctAggregates` in such cases (this is similar to the approach taken in 1.6). ## How was this patch tested? Created `RewriteDistinctAggregatesSuite` which checks if the aggregates with distinct aggregate functions get rewritten into two `Aggregates` and an `Expand`. Added a regression test to `DataFrameAggregateSuite`. Author: Herman van Hovell <hvanhovell@databricks.com> Closes #15187 from hvanhovell/SPARK-17616.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala8
1 files changed, 8 insertions, 0 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
index 427390a90f..0e172bee4f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -493,4 +493,12 @@ class DataFrameAggregateSuite extends QueryTest with SharedSQLContext {
Row(new java.math.BigDecimal(2.0), new java.math.BigDecimal(1.5)),
Row(new java.math.BigDecimal(3.0), new java.math.BigDecimal(1.5))))
}
+
+ test("SPARK-17616: distinct aggregate combined with a non-partial aggregate") {
+ val df = Seq((1, 3, "a"), (1, 2, "b"), (3, 4, "c"), (3, 4, "c"), (3, 5, "d"))
+ .toDF("x", "y", "z")
+ checkAnswer(
+ df.groupBy($"x").agg(countDistinct($"y"), sort_array(collect_list($"z"))),
+ Seq(Row(1, 2, Seq("a", "b")), Row(3, 2, Seq("c", "c", "d"))))
+ }
}