diff options
author | Wenchen Fan <wenchen@databricks.com> | 2017-01-04 12:46:30 +0100 |
---|---|---|
committer | Herman van Hovell <hvanhovell@databricks.com> | 2017-01-04 12:46:30 +0100 |
commit | 101556d0fa704deca0f4a2e5070906d4af2c861b (patch) | |
tree | 4e597be920b0f6ae7247ac486b22a0875f12b5c5 /sql/core | |
parent | fe1c895e16c475a6f271ce600a42a8d0dc7986e5 (diff) | |
download | spark-101556d0fa704deca0f4a2e5070906d4af2c861b.tar.gz spark-101556d0fa704deca0f4a2e5070906d4af2c861b.tar.bz2 spark-101556d0fa704deca0f4a2e5070906d4af2c861b.zip |
[SPARK-19060][SQL] remove the supportsPartial flag in AggregateFunction
## What changes were proposed in this pull request?
Now all aggregation functions support partial aggregate, we can remove the `supportsPartual` flag in `AggregateFunction`
## How was this patch tested?
existing tests.
Author: Wenchen Fan <wenchen@databricks.com>
Closes #16461 from cloud-fan/partial.
Diffstat (limited to 'sql/core')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala | 13 | ||||
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala | 20 |
2 files changed, 1 insertions, 32 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 81cd5ef340..28808f8e3e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -262,18 +262,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } val aggregateOperator = - if (aggregateExpressions.map(_.aggregateFunction).exists(!_.supportsPartial)) { - if (functionsWithDistinct.nonEmpty) { - sys.error("Distinct columns cannot exist in Aggregate operator containing " + - "aggregate functions which don't support partial aggregation.") - } else { - aggregate.AggUtils.planAggregateWithoutPartial( - groupingExpressions, - aggregateExpressions, - resultExpressions, - planLater(child)) - } - } else if (functionsWithDistinct.isEmpty) { + if (functionsWithDistinct.isEmpty) { aggregate.AggUtils.planAggregateWithoutDistinct( groupingExpressions, aggregateExpressions, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala index 8b8ccf4239..aa789af6f8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala @@ -27,26 +27,6 @@ import org.apache.spark.sql.internal.SQLConf * Utility functions used by the query planner to convert our plan to new aggregation code path. */ object AggUtils { - - def planAggregateWithoutPartial( - groupingExpressions: Seq[NamedExpression], - aggregateExpressions: Seq[AggregateExpression], - resultExpressions: Seq[NamedExpression], - child: SparkPlan): Seq[SparkPlan] = { - - val completeAggregateExpressions = aggregateExpressions.map(_.copy(mode = Complete)) - val completeAggregateAttributes = completeAggregateExpressions.map(_.resultAttribute) - SortAggregateExec( - requiredChildDistributionExpressions = Some(groupingExpressions), - groupingExpressions = groupingExpressions, - aggregateExpressions = completeAggregateExpressions, - aggregateAttributes = completeAggregateAttributes, - initialInputBufferOffset = 0, - resultExpressions = resultExpressions, - child = child - ) :: Nil - } - private def createAggregate( requiredChildDistributionExpressions: Option[Seq[Expression]] = None, groupingExpressions: Seq[NamedExpression] = Nil, |