aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-10-07 15:38:46 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-10-07 15:38:46 -0700
commit6dbfd7ecf41297213f4ce8024d00c40808c5ac8f (patch)
tree555d81bf2a10626a182b021b377a7f45da2a3880 /sql
parentda936fbb74b852d5c98286ce92522dc3efd6ad6c (diff)
downloadspark-6dbfd7ecf41297213f4ce8024d00c40808c5ac8f.tar.gz
spark-6dbfd7ecf41297213f4ce8024d00c40808c5ac8f.tar.bz2
spark-6dbfd7ecf41297213f4ce8024d00c40808c5ac8f.zip
[SPARK-10982] [SQL] Rename ExpressionAggregate -> DeclarativeAggregate.
DeclarativeAggregate matches more closely with ImperativeAggregate we already have. Author: Reynold Xin <rxin@databricks.com> Closes #9013 from rxin/SPARK-10982.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/functions.scala16
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala16
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala8
5 files changed, 28 insertions, 28 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/functions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/functions.scala
index 4ad2607a85..8aad0b7dee 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/functions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/functions.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types._
-case class Average(child: Expression) extends ExpressionAggregate {
+case class Average(child: Expression) extends DeclarativeAggregate {
override def children: Seq[Expression] = child :: Nil
@@ -88,7 +88,7 @@ case class Average(child: Expression) extends ExpressionAggregate {
}
}
-case class Count(child: Expression) extends ExpressionAggregate {
+case class Count(child: Expression) extends DeclarativeAggregate {
override def children: Seq[Expression] = child :: Nil
override def nullable: Boolean = false
@@ -118,7 +118,7 @@ case class Count(child: Expression) extends ExpressionAggregate {
override val evaluateExpression = Cast(currentCount, LongType)
}
-case class First(child: Expression) extends ExpressionAggregate {
+case class First(child: Expression) extends DeclarativeAggregate {
override def children: Seq[Expression] = child :: Nil
@@ -152,7 +152,7 @@ case class First(child: Expression) extends ExpressionAggregate {
override val evaluateExpression = first
}
-case class Last(child: Expression) extends ExpressionAggregate {
+case class Last(child: Expression) extends DeclarativeAggregate {
override def children: Seq[Expression] = child :: Nil
@@ -186,7 +186,7 @@ case class Last(child: Expression) extends ExpressionAggregate {
override val evaluateExpression = last
}
-case class Max(child: Expression) extends ExpressionAggregate {
+case class Max(child: Expression) extends DeclarativeAggregate {
override def children: Seq[Expression] = child :: Nil
@@ -220,7 +220,7 @@ case class Max(child: Expression) extends ExpressionAggregate {
override val evaluateExpression = max
}
-case class Min(child: Expression) extends ExpressionAggregate {
+case class Min(child: Expression) extends DeclarativeAggregate {
override def children: Seq[Expression] = child :: Nil
@@ -277,7 +277,7 @@ case class StddevSamp(child: Expression) extends StddevAgg(child) {
// Compute standard deviation based on online algorithm specified here:
// http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
-abstract class StddevAgg(child: Expression) extends ExpressionAggregate {
+abstract class StddevAgg(child: Expression) extends DeclarativeAggregate {
override def children: Seq[Expression] = child :: Nil
@@ -397,7 +397,7 @@ abstract class StddevAgg(child: Expression) extends ExpressionAggregate {
}
}
-case class Sum(child: Expression) extends ExpressionAggregate {
+case class Sum(child: Expression) extends DeclarativeAggregate {
override def children: Seq[Expression] = child :: Nil
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
index 74e15ec90b..9ba3a9c980 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
@@ -97,7 +97,7 @@ private[sql] case class AggregateExpression2(
*
* - [[ImperativeAggregate]] is for aggregation functions that are specified in terms of
* initialize(), update(), and merge() functions that operate on Row-based aggregation buffers.
- * - [[ExpressionAggregate]] is for aggregation functions that are specified using
+ * - [[DeclarativeAggregate]] is for aggregation functions that are specified using
* Catalyst expressions.
*
* In both interfaces, aggregates must define the schema ([[aggBufferSchema]]) and attributes
@@ -244,7 +244,7 @@ abstract class ImperativeAggregate extends AggregateFunction2 {
* can then use these attributes when defining `updateExpressions`, `mergeExpressions`, and
* `evaluateExpressions`.
*/
-abstract class ExpressionAggregate
+abstract class DeclarativeAggregate
extends AggregateFunction2
with Serializable
with Unevaluable {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala
index 04903022e5..5f7341e88c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala
@@ -127,7 +127,7 @@ abstract class AggregationIterator(
var i = 0
while (i < allAggregateFunctions.length) {
allAggregateFunctions(i) match {
- case agg: ExpressionAggregate =>
+ case agg: DeclarativeAggregate =>
case _ => positions += i
}
i += 1
@@ -146,7 +146,7 @@ abstract class AggregationIterator(
// The projection used to initialize buffer values for all expression-based aggregates.
private[this] val expressionAggInitialProjection = {
val initExpressions = allAggregateFunctions.flatMap {
- case ae: ExpressionAggregate => ae.initialValues
+ case ae: DeclarativeAggregate => ae.initialValues
// For the positions corresponding to imperative aggregate functions, we'll use special
// no-op expressions which are ignored during projection code-generation.
case i: ImperativeAggregate => Seq.fill(i.aggBufferAttributes.length)(NoOp)
@@ -172,7 +172,7 @@ abstract class AggregationIterator(
// Partial-only
case (Some(Partial), None) =>
val updateExpressions = nonCompleteAggregateFunctions.flatMap {
- case ae: ExpressionAggregate => ae.updateExpressions
+ case ae: DeclarativeAggregate => ae.updateExpressions
case agg: AggregateFunction2 => Seq.fill(agg.aggBufferAttributes.length)(NoOp)
}
val expressionAggUpdateProjection =
@@ -204,7 +204,7 @@ abstract class AggregationIterator(
// groupingKeyAttributes ++
// allAggregateFunctions.flatMap(_.cloneBufferAttributes)
val mergeExpressions = nonCompleteAggregateFunctions.flatMap {
- case ae: ExpressionAggregate => ae.mergeExpressions
+ case ae: DeclarativeAggregate => ae.mergeExpressions
case agg: AggregateFunction2 => Seq.fill(agg.aggBufferAttributes.length)(NoOp)
}
// This projection is used to merge buffer values for all expression-based aggregates.
@@ -248,7 +248,7 @@ abstract class AggregationIterator(
nonCompleteAggregateFunctions.flatMap(_.inputAggBufferAttributes)
val mergeExpressions =
nonCompleteAggregateFunctions.flatMap {
- case ae: ExpressionAggregate => ae.mergeExpressions
+ case ae: DeclarativeAggregate => ae.mergeExpressions
case agg: AggregateFunction2 => Seq.fill(agg.aggBufferAttributes.length)(NoOp)
} ++ completeOffsetExpressions
val finalExpressionAggMergeProjection =
@@ -256,7 +256,7 @@ abstract class AggregationIterator(
val updateExpressions =
finalOffsetExpressions ++ completeAggregateFunctions.flatMap {
- case ae: ExpressionAggregate => ae.updateExpressions
+ case ae: DeclarativeAggregate => ae.updateExpressions
case agg: AggregateFunction2 => Seq.fill(agg.aggBufferAttributes.length)(NoOp)
}
val completeExpressionAggUpdateProjection =
@@ -291,7 +291,7 @@ abstract class AggregationIterator(
val updateExpressions =
completeAggregateFunctions.flatMap {
- case ae: ExpressionAggregate => ae.updateExpressions
+ case ae: DeclarativeAggregate => ae.updateExpressions
case agg: AggregateFunction2 => Seq.fill(agg.aggBufferAttributes.length)(NoOp)
}
val completeExpressionAggUpdateProjection =
@@ -353,7 +353,7 @@ abstract class AggregationIterator(
val bufferSchemata =
allAggregateFunctions.flatMap(_.aggBufferAttributes)
val evalExpressions = allAggregateFunctions.map {
- case ae: ExpressionAggregate => ae.evaluateExpression
+ case ae: DeclarativeAggregate => ae.evaluateExpression
case agg: AggregateFunction2 => NoOp
}
val expressionAggEvalProjection = newMutableProjection(evalExpressions, bufferSchemata)()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
index 6a84c0af0b..a6f4c1d92f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
@@ -131,15 +131,15 @@ class TungstenAggregationIterator(
// All aggregate functions. TungstenAggregationIterator only handles expression-based aggregate.
// If there is any functions that is an ImperativeAggregateFunction, we throw an
// IllegalStateException.
- private[this] val allAggregateFunctions: Array[ExpressionAggregate] = {
+ private[this] val allAggregateFunctions: Array[DeclarativeAggregate] = {
if (!allAggregateExpressions.forall(
- _.aggregateFunction.isInstanceOf[ExpressionAggregate])) {
+ _.aggregateFunction.isInstanceOf[DeclarativeAggregate])) {
throw new IllegalStateException(
"Only ExpressionAggregateFunctions should be passed in TungstenAggregationIterator.")
}
allAggregateExpressions
- .map(_.aggregateFunction.asInstanceOf[ExpressionAggregate])
+ .map(_.aggregateFunction.asInstanceOf[DeclarativeAggregate])
.toArray
}
@@ -203,9 +203,9 @@ class TungstenAggregationIterator(
// Final-Complete
case (Some(Final), Some(Complete)) =>
- val nonCompleteAggregateFunctions: Array[ExpressionAggregate] =
+ val nonCompleteAggregateFunctions: Array[DeclarativeAggregate] =
allAggregateFunctions.take(nonCompleteAggregateExpressions.length)
- val completeAggregateFunctions: Array[ExpressionAggregate] =
+ val completeAggregateFunctions: Array[DeclarativeAggregate] =
allAggregateFunctions.takeRight(completeAggregateExpressions.length)
val completeOffsetExpressions =
@@ -235,7 +235,7 @@ class TungstenAggregationIterator(
// Complete-only
case (None, Some(Complete)) =>
- val completeAggregateFunctions: Array[ExpressionAggregate] =
+ val completeAggregateFunctions: Array[DeclarativeAggregate] =
allAggregateFunctions.takeRight(completeAggregateExpressions.length)
val updateExpressions =
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala
index e1d7e1bf02..e1c2d9475a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala
@@ -97,7 +97,7 @@ object Utils {
// Check if we can use TungstenAggregate.
val usesTungstenAggregate =
child.sqlContext.conf.unsafeEnabled &&
- aggregateExpressions.forall(_.aggregateFunction.isInstanceOf[ExpressionAggregate]) &&
+ aggregateExpressions.forall(_.aggregateFunction.isInstanceOf[DeclarativeAggregate]) &&
supportsTungstenAggregate(
groupingExpressions,
aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes))
@@ -157,7 +157,7 @@ object Utils {
// aggregateFunctionMap contains unique aggregate functions.
val aggregateFunction =
aggregateFunctionMap(agg.aggregateFunction, agg.isDistinct)._1
- aggregateFunction.asInstanceOf[ExpressionAggregate].evaluateExpression
+ aggregateFunction.asInstanceOf[DeclarativeAggregate].evaluateExpression
case expression =>
// We do not rely on the equality check at here since attributes may
// different cosmetically. Instead, we use semanticEquals.
@@ -215,7 +215,7 @@ object Utils {
val usesTungstenAggregate =
child.sqlContext.conf.unsafeEnabled &&
aggregateExpressions.forall(
- _.aggregateFunction.isInstanceOf[ExpressionAggregate]) &&
+ _.aggregateFunction.isInstanceOf[DeclarativeAggregate]) &&
supportsTungstenAggregate(
groupingExpressions,
aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes))
@@ -359,7 +359,7 @@ object Utils {
// aggregate functions that have not been rewritten.
aggregateFunctionMap(function, isDistinct)._1
}
- aggregateFunction.asInstanceOf[ExpressionAggregate].evaluateExpression
+ aggregateFunction.asInstanceOf[DeclarativeAggregate].evaluateExpression
case expression =>
// We do not rely on the equality check at here since attributes may
// different cosmetically. Instead, we use semanticEquals.