diff options
author | Takeshi YAMAMURO <linguin.m.s@gmail.com> | 2016-08-30 16:43:47 +0800 |
---|---|---|
committer | Cheng Lian <lian@databricks.com> | 2016-08-30 16:43:47 +0800 |
commit | 94922d79e9f90fac3777db0974ccf7566b8ac3b3 (patch) | |
tree | 858f11e3b3df7644384ee5e5568799693d4e5f4e /common/network-yarn/src/main/java | |
parent | 8fb445d9bdead6f0ff2bd9879145fe688b3bdc80 (diff) | |
download | spark-94922d79e9f90fac3777db0974ccf7566b8ac3b3.tar.gz spark-94922d79e9f90fac3777db0974ccf7566b8ac3b3.tar.bz2 spark-94922d79e9f90fac3777db0974ccf7566b8ac3b3.zip |
[SPARK-17289][SQL] Fix a bug to satisfy sort requirements in partial aggregations
## What changes were proposed in this pull request?
Partial aggregations are generated in `EnsureRequirements`, but the planner fails to
check if partial aggregation satisfies sort requirements.
For the following query:
```
val df2 = (0 to 1000).map(x => (x % 2, x.toString)).toDF("a", "b").createOrReplaceTempView("t2")
spark.sql("select max(b) from t2 group by a").explain(true)
```
Now, the SortAggregator won't insert Sort operator before partial aggregation, this will break sort-based partial aggregation.
```
== Physical Plan ==
SortAggregate(key=[a#5], functions=[max(b#6)], output=[max(b)#17])
+- *Sort [a#5 ASC], false, 0
+- Exchange hashpartitioning(a#5, 200)
+- SortAggregate(key=[a#5], functions=[partial_max(b#6)], output=[a#5, max#19])
+- LocalTableScan [a#5, b#6]
```
Actually, a correct plan is:
```
== Physical Plan ==
SortAggregate(key=[a#5], functions=[max(b#6)], output=[max(b)#17])
+- *Sort [a#5 ASC], false, 0
+- Exchange hashpartitioning(a#5, 200)
+- SortAggregate(key=[a#5], functions=[partial_max(b#6)], output=[a#5, max#19])
+- *Sort [a#5 ASC], false, 0
+- LocalTableScan [a#5, b#6]
```
## How was this patch tested?
Added tests in `PlannerSuite`.
Author: Takeshi YAMAMURO <linguin.m.s@gmail.com>
Closes #14865 from maropu/SPARK-17289.
Diffstat (limited to 'common/network-yarn/src/main/java')
0 files changed, 0 insertions, 0 deletions