aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala6
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala1
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala13
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala20
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/TestingTypedCount.scala2
7 files changed, 3 insertions, 48 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
index ccd4ae6c2d..80c25d0b0f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
@@ -174,12 +174,6 @@ abstract class AggregateFunction extends Expression {
def inputAggBufferAttributes: Seq[AttributeReference]
/**
- * Indicates if this function supports partial aggregation.
- * Currently Hive UDAF is the only one that doesn't support partial aggregation.
- */
- def supportsPartial: Boolean = true
-
- /**
* Result of the aggregate function when the input is empty. This is currently only used for the
* proper rewriting of distinct aggregate functions.
*/
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
index c0d6a6b92b..13115f4728 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
@@ -436,7 +436,6 @@ abstract class AggregateWindowFunction extends DeclarativeAggregate with WindowF
override val frame = SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow)
override def dataType: DataType = IntegerType
override def nullable: Boolean = true
- override def supportsPartial: Boolean = false
override lazy val mergeExpressions =
throw new UnsupportedOperationException("Window Functions do not support merging.")
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala
index cd8912f793..3b27cd2ffe 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala
@@ -131,11 +131,8 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] {
}
}
- // Check if the aggregates contains functions that do not support partial aggregation.
- val existsNonPartial = aggExpressions.exists(!_.aggregateFunction.supportsPartial)
-
- // Aggregation strategy can handle queries with a single distinct group and partial aggregates.
- if (distinctAggGroups.size > 1 || (distinctAggGroups.size == 1 && existsNonPartial)) {
+ // Aggregation strategy can handle queries with a single distinct group.
+ if (distinctAggGroups.size > 1) {
// Create the attributes for the grouping id and the group by clause.
val gid = AttributeReference("gid", IntegerType, nullable = false)(isGenerated = true)
val groupByMap = a.groupingExpressions.collect {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 81cd5ef340..28808f8e3e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -262,18 +262,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
}
val aggregateOperator =
- if (aggregateExpressions.map(_.aggregateFunction).exists(!_.supportsPartial)) {
- if (functionsWithDistinct.nonEmpty) {
- sys.error("Distinct columns cannot exist in Aggregate operator containing " +
- "aggregate functions which don't support partial aggregation.")
- } else {
- aggregate.AggUtils.planAggregateWithoutPartial(
- groupingExpressions,
- aggregateExpressions,
- resultExpressions,
- planLater(child))
- }
- } else if (functionsWithDistinct.isEmpty) {
+ if (functionsWithDistinct.isEmpty) {
aggregate.AggUtils.planAggregateWithoutDistinct(
groupingExpressions,
aggregateExpressions,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala
index 8b8ccf4239..aa789af6f8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala
@@ -27,26 +27,6 @@ import org.apache.spark.sql.internal.SQLConf
* Utility functions used by the query planner to convert our plan to new aggregation code path.
*/
object AggUtils {
-
- def planAggregateWithoutPartial(
- groupingExpressions: Seq[NamedExpression],
- aggregateExpressions: Seq[AggregateExpression],
- resultExpressions: Seq[NamedExpression],
- child: SparkPlan): Seq[SparkPlan] = {
-
- val completeAggregateExpressions = aggregateExpressions.map(_.copy(mode = Complete))
- val completeAggregateAttributes = completeAggregateExpressions.map(_.resultAttribute)
- SortAggregateExec(
- requiredChildDistributionExpressions = Some(groupingExpressions),
- groupingExpressions = groupingExpressions,
- aggregateExpressions = completeAggregateExpressions,
- aggregateAttributes = completeAggregateAttributes,
- initialInputBufferOffset = 0,
- resultExpressions = resultExpressions,
- child = child
- ) :: Nil
- }
-
private def createAggregate(
requiredChildDistributionExpressions: Option[Seq[Expression]] = None,
groupingExpressions: Seq[NamedExpression] = Nil,
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
index fcefd69272..4590197548 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
@@ -380,8 +380,6 @@ private[hive] case class HiveUDAFFunction(
override def nullable: Boolean = true
- override def supportsPartial: Boolean = true
-
override lazy val dataType: DataType = inspectorToDataType(returnInspector)
override def prettyName: String = name
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/TestingTypedCount.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/TestingTypedCount.scala
index aaf1db65a6..31b2430176 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/TestingTypedCount.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/TestingTypedCount.scala
@@ -42,8 +42,6 @@ case class TestingTypedCount(
override def nullable: Boolean = false
- override val supportsPartial: Boolean = true
-
override def createAggregationBuffer(): State = TestingTypedCount.State(0L)
override def update(buffer: State, input: InternalRow): State = {