aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2015-08-06 15:04:44 -0700
committerReynold Xin <rxin@databricks.com>2015-08-06 15:04:44 -0700
commit3504bf3aa9f7b75c0985f04ce2944833d8c5b5bd (patch)
tree8968e5ec73c6d139a974e5874e0087d9eba9575e /sql/catalyst
parent346209097e88fe79015359e40b49c32cc0bdc439 (diff)
downloadspark-3504bf3aa9f7b75c0985f04ce2944833d8c5b5bd.tar.gz
spark-3504bf3aa9f7b75c0985f04ce2944833d8c5b5bd.tar.bz2
spark-3504bf3aa9f7b75c0985f04ce2944833d8c5b5bd.zip
[SPARK-9630] [SQL] Clean up new aggregate operators (SPARK-9240 follow up)
This is the followup of https://github.com/apache/spark/pull/7813. It renames `HybridUnsafeAggregationIterator` to `TungstenAggregationIterator` and makes it only work with `UnsafeRow`. Also, I add a `TungstenAggregate` that uses `TungstenAggregationIterator` and make `SortBasedAggregate` (renamed from `SortBasedAggregate`) only works with `SafeRow`. Author: Yin Huai <yhuai@databricks.com> Closes #7954 from yhuai/agg-followUp and squashes the following commits: 4d2f4fc [Yin Huai] Add comments and free map. 0d7ddb9 [Yin Huai] Add TungstenAggregationQueryWithControlledFallbackSuite to test fall back process. 91d69c2 [Yin Huai] Rename UnsafeHybridAggregationIterator to TungstenAggregateIteraotr and make it only work with UnsafeRow.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/functions.scala14
1 files changed, 12 insertions, 2 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/functions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/functions.scala
index 88fb516e64..a73024d6ad 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/functions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/functions.scala
@@ -31,8 +31,11 @@ case class Average(child: Expression) extends AlgebraicAggregate {
override def dataType: DataType = resultType
// Expected input data type.
- // TODO: Once we remove the old code path, we can use our analyzer to cast NullType
- // to the default data type of the NumericType.
+ // TODO: Right now, we replace old aggregate functions (based on AggregateExpression1) to the
+ // new version at planning time (after analysis phase). For now, NullType is added at here
+ // to make it resolved when we have cases like `select avg(null)`.
+ // We can use our analyzer to cast NullType to the default data type of the NumericType once
+ // we remove the old aggregate functions. Then, we will not need NullType at here.
override def inputTypes: Seq[AbstractDataType] = Seq(TypeCollection(NumericType, NullType))
private val resultType = child.dataType match {
@@ -256,12 +259,19 @@ case class Sum(child: Expression) extends AlgebraicAggregate {
override def dataType: DataType = resultType
// Expected input data type.
+ // TODO: Right now, we replace old aggregate functions (based on AggregateExpression1) to the
+ // new version at planning time (after analysis phase). For now, NullType is added at here
+ // to make it resolved when we have cases like `select sum(null)`.
+ // We can use our analyzer to cast NullType to the default data type of the NumericType once
+ // we remove the old aggregate functions. Then, we will not need NullType at here.
override def inputTypes: Seq[AbstractDataType] =
Seq(TypeCollection(LongType, DoubleType, DecimalType, NullType))
private val resultType = child.dataType match {
case DecimalType.Fixed(precision, scale) =>
DecimalType.bounded(precision + 10, scale)
+ // TODO: Remove this line once we remove the NullType from inputTypes.
+ case NullType => IntegerType
case _ => child.dataType
}