From 6dbfd7ecf41297213f4ce8024d00c40808c5ac8f Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 7 Oct 2015 15:38:46 -0700 Subject: [SPARK-10982] [SQL] Rename ExpressionAggregate -> DeclarativeAggregate. DeclarativeAggregate matches more closely with ImperativeAggregate we already have. Author: Reynold Xin Closes #9013 from rxin/SPARK-10982. --- .../sql/catalyst/expressions/aggregate/functions.scala | 16 ++++++++-------- .../sql/catalyst/expressions/aggregate/interfaces.scala | 4 ++-- .../sql/execution/aggregate/AggregationIterator.scala | 16 ++++++++-------- .../aggregate/TungstenAggregationIterator.scala | 12 ++++++------ .../org/apache/spark/sql/execution/aggregate/utils.scala | 8 ++++---- 5 files changed, 28 insertions(+), 28 deletions(-) (limited to 'sql') diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/functions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/functions.scala index 4ad2607a85..8aad0b7dee 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/functions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/functions.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.types._ -case class Average(child: Expression) extends ExpressionAggregate { +case class Average(child: Expression) extends DeclarativeAggregate { override def children: Seq[Expression] = child :: Nil @@ -88,7 +88,7 @@ case class Average(child: Expression) extends ExpressionAggregate { } } -case class Count(child: Expression) extends ExpressionAggregate { +case class Count(child: Expression) extends DeclarativeAggregate { override def children: Seq[Expression] = child :: Nil override def nullable: Boolean = false @@ -118,7 +118,7 @@ case class Count(child: Expression) extends ExpressionAggregate { override val evaluateExpression = Cast(currentCount, LongType) } -case class First(child: Expression) extends ExpressionAggregate { +case class First(child: Expression) extends DeclarativeAggregate { override def children: Seq[Expression] = child :: Nil @@ -152,7 +152,7 @@ case class First(child: Expression) extends ExpressionAggregate { override val evaluateExpression = first } -case class Last(child: Expression) extends ExpressionAggregate { +case class Last(child: Expression) extends DeclarativeAggregate { override def children: Seq[Expression] = child :: Nil @@ -186,7 +186,7 @@ case class Last(child: Expression) extends ExpressionAggregate { override val evaluateExpression = last } -case class Max(child: Expression) extends ExpressionAggregate { +case class Max(child: Expression) extends DeclarativeAggregate { override def children: Seq[Expression] = child :: Nil @@ -220,7 +220,7 @@ case class Max(child: Expression) extends ExpressionAggregate { override val evaluateExpression = max } -case class Min(child: Expression) extends ExpressionAggregate { +case class Min(child: Expression) extends DeclarativeAggregate { override def children: Seq[Expression] = child :: Nil @@ -277,7 +277,7 @@ case class StddevSamp(child: Expression) extends StddevAgg(child) { // Compute standard deviation based on online algorithm specified here: // http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance -abstract class StddevAgg(child: Expression) extends ExpressionAggregate { +abstract class StddevAgg(child: Expression) extends DeclarativeAggregate { override def children: Seq[Expression] = child :: Nil @@ -397,7 +397,7 @@ abstract class StddevAgg(child: Expression) extends ExpressionAggregate { } } -case class Sum(child: Expression) extends ExpressionAggregate { +case class Sum(child: Expression) extends DeclarativeAggregate { override def children: Seq[Expression] = child :: Nil diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala index 74e15ec90b..9ba3a9c980 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala @@ -97,7 +97,7 @@ private[sql] case class AggregateExpression2( * * - [[ImperativeAggregate]] is for aggregation functions that are specified in terms of * initialize(), update(), and merge() functions that operate on Row-based aggregation buffers. - * - [[ExpressionAggregate]] is for aggregation functions that are specified using + * - [[DeclarativeAggregate]] is for aggregation functions that are specified using * Catalyst expressions. * * In both interfaces, aggregates must define the schema ([[aggBufferSchema]]) and attributes @@ -244,7 +244,7 @@ abstract class ImperativeAggregate extends AggregateFunction2 { * can then use these attributes when defining `updateExpressions`, `mergeExpressions`, and * `evaluateExpressions`. */ -abstract class ExpressionAggregate +abstract class DeclarativeAggregate extends AggregateFunction2 with Serializable with Unevaluable { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala index 04903022e5..5f7341e88c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala @@ -127,7 +127,7 @@ abstract class AggregationIterator( var i = 0 while (i < allAggregateFunctions.length) { allAggregateFunctions(i) match { - case agg: ExpressionAggregate => + case agg: DeclarativeAggregate => case _ => positions += i } i += 1 @@ -146,7 +146,7 @@ abstract class AggregationIterator( // The projection used to initialize buffer values for all expression-based aggregates. private[this] val expressionAggInitialProjection = { val initExpressions = allAggregateFunctions.flatMap { - case ae: ExpressionAggregate => ae.initialValues + case ae: DeclarativeAggregate => ae.initialValues // For the positions corresponding to imperative aggregate functions, we'll use special // no-op expressions which are ignored during projection code-generation. case i: ImperativeAggregate => Seq.fill(i.aggBufferAttributes.length)(NoOp) @@ -172,7 +172,7 @@ abstract class AggregationIterator( // Partial-only case (Some(Partial), None) => val updateExpressions = nonCompleteAggregateFunctions.flatMap { - case ae: ExpressionAggregate => ae.updateExpressions + case ae: DeclarativeAggregate => ae.updateExpressions case agg: AggregateFunction2 => Seq.fill(agg.aggBufferAttributes.length)(NoOp) } val expressionAggUpdateProjection = @@ -204,7 +204,7 @@ abstract class AggregationIterator( // groupingKeyAttributes ++ // allAggregateFunctions.flatMap(_.cloneBufferAttributes) val mergeExpressions = nonCompleteAggregateFunctions.flatMap { - case ae: ExpressionAggregate => ae.mergeExpressions + case ae: DeclarativeAggregate => ae.mergeExpressions case agg: AggregateFunction2 => Seq.fill(agg.aggBufferAttributes.length)(NoOp) } // This projection is used to merge buffer values for all expression-based aggregates. @@ -248,7 +248,7 @@ abstract class AggregationIterator( nonCompleteAggregateFunctions.flatMap(_.inputAggBufferAttributes) val mergeExpressions = nonCompleteAggregateFunctions.flatMap { - case ae: ExpressionAggregate => ae.mergeExpressions + case ae: DeclarativeAggregate => ae.mergeExpressions case agg: AggregateFunction2 => Seq.fill(agg.aggBufferAttributes.length)(NoOp) } ++ completeOffsetExpressions val finalExpressionAggMergeProjection = @@ -256,7 +256,7 @@ abstract class AggregationIterator( val updateExpressions = finalOffsetExpressions ++ completeAggregateFunctions.flatMap { - case ae: ExpressionAggregate => ae.updateExpressions + case ae: DeclarativeAggregate => ae.updateExpressions case agg: AggregateFunction2 => Seq.fill(agg.aggBufferAttributes.length)(NoOp) } val completeExpressionAggUpdateProjection = @@ -291,7 +291,7 @@ abstract class AggregationIterator( val updateExpressions = completeAggregateFunctions.flatMap { - case ae: ExpressionAggregate => ae.updateExpressions + case ae: DeclarativeAggregate => ae.updateExpressions case agg: AggregateFunction2 => Seq.fill(agg.aggBufferAttributes.length)(NoOp) } val completeExpressionAggUpdateProjection = @@ -353,7 +353,7 @@ abstract class AggregationIterator( val bufferSchemata = allAggregateFunctions.flatMap(_.aggBufferAttributes) val evalExpressions = allAggregateFunctions.map { - case ae: ExpressionAggregate => ae.evaluateExpression + case ae: DeclarativeAggregate => ae.evaluateExpression case agg: AggregateFunction2 => NoOp } val expressionAggEvalProjection = newMutableProjection(evalExpressions, bufferSchemata)() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala index 6a84c0af0b..a6f4c1d92f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala @@ -131,15 +131,15 @@ class TungstenAggregationIterator( // All aggregate functions. TungstenAggregationIterator only handles expression-based aggregate. // If there is any functions that is an ImperativeAggregateFunction, we throw an // IllegalStateException. - private[this] val allAggregateFunctions: Array[ExpressionAggregate] = { + private[this] val allAggregateFunctions: Array[DeclarativeAggregate] = { if (!allAggregateExpressions.forall( - _.aggregateFunction.isInstanceOf[ExpressionAggregate])) { + _.aggregateFunction.isInstanceOf[DeclarativeAggregate])) { throw new IllegalStateException( "Only ExpressionAggregateFunctions should be passed in TungstenAggregationIterator.") } allAggregateExpressions - .map(_.aggregateFunction.asInstanceOf[ExpressionAggregate]) + .map(_.aggregateFunction.asInstanceOf[DeclarativeAggregate]) .toArray } @@ -203,9 +203,9 @@ class TungstenAggregationIterator( // Final-Complete case (Some(Final), Some(Complete)) => - val nonCompleteAggregateFunctions: Array[ExpressionAggregate] = + val nonCompleteAggregateFunctions: Array[DeclarativeAggregate] = allAggregateFunctions.take(nonCompleteAggregateExpressions.length) - val completeAggregateFunctions: Array[ExpressionAggregate] = + val completeAggregateFunctions: Array[DeclarativeAggregate] = allAggregateFunctions.takeRight(completeAggregateExpressions.length) val completeOffsetExpressions = @@ -235,7 +235,7 @@ class TungstenAggregationIterator( // Complete-only case (None, Some(Complete)) => - val completeAggregateFunctions: Array[ExpressionAggregate] = + val completeAggregateFunctions: Array[DeclarativeAggregate] = allAggregateFunctions.takeRight(completeAggregateExpressions.length) val updateExpressions = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala index e1d7e1bf02..e1c2d9475a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala @@ -97,7 +97,7 @@ object Utils { // Check if we can use TungstenAggregate. val usesTungstenAggregate = child.sqlContext.conf.unsafeEnabled && - aggregateExpressions.forall(_.aggregateFunction.isInstanceOf[ExpressionAggregate]) && + aggregateExpressions.forall(_.aggregateFunction.isInstanceOf[DeclarativeAggregate]) && supportsTungstenAggregate( groupingExpressions, aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes)) @@ -157,7 +157,7 @@ object Utils { // aggregateFunctionMap contains unique aggregate functions. val aggregateFunction = aggregateFunctionMap(agg.aggregateFunction, agg.isDistinct)._1 - aggregateFunction.asInstanceOf[ExpressionAggregate].evaluateExpression + aggregateFunction.asInstanceOf[DeclarativeAggregate].evaluateExpression case expression => // We do not rely on the equality check at here since attributes may // different cosmetically. Instead, we use semanticEquals. @@ -215,7 +215,7 @@ object Utils { val usesTungstenAggregate = child.sqlContext.conf.unsafeEnabled && aggregateExpressions.forall( - _.aggregateFunction.isInstanceOf[ExpressionAggregate]) && + _.aggregateFunction.isInstanceOf[DeclarativeAggregate]) && supportsTungstenAggregate( groupingExpressions, aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes)) @@ -359,7 +359,7 @@ object Utils { // aggregate functions that have not been rewritten. aggregateFunctionMap(function, isDistinct)._1 } - aggregateFunction.asInstanceOf[ExpressionAggregate].evaluateExpression + aggregateFunction.asInstanceOf[DeclarativeAggregate].evaluateExpression case expression => // We do not rely on the equality check at here since attributes may // different cosmetically. Instead, we use semanticEquals. -- cgit v1.2.3