diff options
author | Venkata Ramana Gollamudi <ramana.gollamudi@huawei.com> | 2015-03-21 13:24:24 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2015-03-21 13:24:24 -0700 |
commit | ee569a0c7171d149eee52877def902378eaf695e (patch) | |
tree | 7f33dac98678ed3aaa46979dbcdb2afe0aa23409 /sql | |
parent | 52dd4b2b277eb48bc89db9b21d25f5e836c1d348 (diff) | |
download | spark-ee569a0c7171d149eee52877def902378eaf695e.tar.gz spark-ee569a0c7171d149eee52877def902378eaf695e.tar.bz2 spark-ee569a0c7171d149eee52877def902378eaf695e.zip |
[SPARK-5680][SQL] Sum function on all null values, should return zero
SELECT sum('a'), avg('a'), variance('a'), std('a') FROM src;
Should give output as
0.0 NULL NULL NULL
This fixes hive udaf_number_format.q
Author: Venkata Ramana G <ramana.gollamudihuawei.com>
Author: Venkata Ramana Gollamudi <ramana.gollamudi@huawei.com>
Closes #4466 from gvramana/sum_fix and squashes the following commits:
42e14d1 [Venkata Ramana Gollamudi] Added comments
39415c0 [Venkata Ramana Gollamudi] Handled the partitioned Sum expression scenario
df66515 [Venkata Ramana Gollamudi] code style fix
4be2606 [Venkata Ramana Gollamudi] Add udaf_number_format to whitelist and golden answer
330fd64 [Venkata Ramana Gollamudi] fix sum function for all null data
Diffstat (limited to 'sql')
4 files changed, 67 insertions, 3 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala index 735b7488fd..5297d1e312 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala @@ -346,13 +346,13 @@ case class Sum(child: Expression) extends PartialAggregate with trees.UnaryNode[ case DecimalType.Fixed(_, _) => val partialSum = Alias(Sum(Cast(child, DecimalType.Unlimited)), "PartialSum")() SplitEvaluation( - Cast(Sum(partialSum.toAttribute), dataType), + Cast(CombineSum(partialSum.toAttribute), dataType), partialSum :: Nil) case _ => val partialSum = Alias(Sum(child), "PartialSum")() SplitEvaluation( - Sum(partialSum.toAttribute), + CombineSum(partialSum.toAttribute), partialSum :: Nil) } } @@ -360,6 +360,30 @@ case class Sum(child: Expression) extends PartialAggregate with trees.UnaryNode[ override def newInstance() = new SumFunction(child, this) } +/** + * Sum should satisfy 3 cases: + * 1) sum of all null values = zero + * 2) sum for table column with no data = null + * 3) sum of column with null and not null values = sum of not null values + * Require separate CombineSum Expression and function as it has to distinguish "No data" case + * versus "data equals null" case, while aggregating results and at each partial expression.i.e., + * Combining PartitionLevel InputData + * <-- null + * Zero <-- Zero <-- null + * + * <-- null <-- no data + * null <-- null <-- no data + */ +case class CombineSum(child: Expression) extends AggregateExpression { + def this() = this(null) + + override def children = child :: Nil + override def nullable = true + override def dataType = child.dataType + override def toString = s"CombineSum($child)" + override def newInstance() = new CombineSumFunction(child, this) +} + case class SumDistinct(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression] { @@ -565,7 +589,8 @@ case class SumFunction(expr: Expression, base: AggregateExpression) extends Aggr private val sum = MutableLiteral(null, calcType) - private val addFunction = Coalesce(Seq(Add(Coalesce(Seq(sum, zero)), Cast(expr, calcType)), sum)) + private val addFunction = + Coalesce(Seq(Add(Coalesce(Seq(sum, zero)), Cast(expr, calcType)), sum, zero)) override def update(input: Row): Unit = { sum.update(addFunction, input) @@ -580,6 +605,43 @@ case class SumFunction(expr: Expression, base: AggregateExpression) extends Aggr } } +case class CombineSumFunction(expr: Expression, base: AggregateExpression) + extends AggregateFunction { + + def this() = this(null, null) // Required for serialization. + + private val calcType = + expr.dataType match { + case DecimalType.Fixed(_, _) => + DecimalType.Unlimited + case _ => + expr.dataType + } + + private val zero = Cast(Literal(0), calcType) + + private val sum = MutableLiteral(null, calcType) + + private val addFunction = + Coalesce(Seq(Add(Coalesce(Seq(sum, zero)), Cast(expr, calcType)), sum, zero)) + + override def update(input: Row): Unit = { + val result = expr.eval(input) + // partial sum result can be null only when no input rows present + if(result != null) { + sum.update(addFunction, input) + } + } + + override def eval(input: Row): Any = { + expr.dataType match { + case DecimalType.Fixed(_, _) => + Cast(sum, dataType).eval(null) + case _ => sum.eval(null) + } + } +} + case class SumDistinctFunction(expr: Expression, base: AggregateExpression) extends AggregateFunction { diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index 5180a7f09d..2ae9d018e1 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -800,6 +800,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "udaf_covar_pop", "udaf_covar_samp", "udaf_histogram_numeric", + "udaf_number_format", "udf2", "udf5", "udf6", diff --git a/sql/hive/src/test/resources/golden/udaf_number_format-0-eff4ef3c207d14d5121368f294697964 b/sql/hive/src/test/resources/golden/udaf_number_format-0-eff4ef3c207d14d5121368f294697964 new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/sql/hive/src/test/resources/golden/udaf_number_format-0-eff4ef3c207d14d5121368f294697964 diff --git a/sql/hive/src/test/resources/golden/udaf_number_format-1-4a03c4328565c60ca99689239f07fb16 b/sql/hive/src/test/resources/golden/udaf_number_format-1-4a03c4328565c60ca99689239f07fb16 new file mode 100644 index 0000000000..c6f275a0db --- /dev/null +++ b/sql/hive/src/test/resources/golden/udaf_number_format-1-4a03c4328565c60ca99689239f07fb16 @@ -0,0 +1 @@ +0.0 NULL NULL NULL |