diff options
author | Wenchen Fan <wenchen@databricks.com> | 2016-11-10 13:03:59 +0800 |
---|---|---|
committer | Wenchen Fan <wenchen@databricks.com> | 2016-11-10 13:03:59 +0800 |
commit | 6021c95a3aa3858b0499782b23b08ef92c73245d (patch) | |
tree | ba777f7f27014d67f90a1a7afb965e1848ebc2bf | |
parent | 3f62e1b5d9e75dc07bac3aa4db3e8d0615cc3cc3 (diff) | |
download | spark-6021c95a3aa3858b0499782b23b08ef92c73245d.tar.gz spark-6021c95a3aa3858b0499782b23b08ef92c73245d.tar.bz2 spark-6021c95a3aa3858b0499782b23b08ef92c73245d.zip |
[SPARK-18147][SQL] do not fail for very complex aggregator result type
## What changes were proposed in this pull request?
~In `TypedAggregateExpression.evaluateExpression`, we may create `ReferenceToExpressions` with `CreateStruct`, and `CreateStruct` may generate too many codes and split them into several methods. `ReferenceToExpressions` will replace `BoundReference` in `CreateStruct` with `LambdaVariable`, which can only be used as local variables and doesn't work if we split the generated code.~
It's already fixed by #15693 , this pr adds regression test
## How was this patch tested?
new test in `DatasetAggregatorSuite`
Author: Wenchen Fan <wenchen@databricks.com>
Closes #15807 from cloud-fan/typed-agg.
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala | 21 |
1 files changed, 21 insertions, 0 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala index b117fbd0bc..36b2651e5a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala @@ -134,6 +134,19 @@ object NullResultAgg extends Aggregator[AggData, AggData, AggData] { override def outputEncoder: Encoder[AggData] = Encoders.product[AggData] } +case class ComplexAggData(d1: AggData, d2: AggData) + +object VeryComplexResultAgg extends Aggregator[Row, String, ComplexAggData] { + override def zero: String = "" + override def reduce(buffer: String, input: Row): String = buffer + input.getString(1) + override def merge(b1: String, b2: String): String = b1 + b2 + override def finish(reduction: String): ComplexAggData = { + ComplexAggData(AggData(reduction.length, reduction), AggData(reduction.length, reduction)) + } + override def bufferEncoder: Encoder[String] = Encoders.STRING + override def outputEncoder: Encoder[ComplexAggData] = Encoders.product[ComplexAggData] +} + class DatasetAggregatorSuite extends QueryTest with SharedSQLContext { import testImplicits._ @@ -312,4 +325,12 @@ class DatasetAggregatorSuite extends QueryTest with SharedSQLContext { val ds3 = sql("SELECT 'Some String' AS b, 1279869254 AS a").as[AggData] assert(ds3.select(NameAgg.toColumn).schema.head.nullable === true) } + + test("SPARK-18147: very complex aggregator result type") { + val df = Seq(1 -> "a", 2 -> "b", 2 -> "c").toDF("i", "j") + + checkAnswer( + df.groupBy($"i").agg(VeryComplexResultAgg.toColumn), + Row(1, Row(Row(1, "a"), Row(1, "a"))) :: Row(2, Row(Row(2, "bc"), Row(2, "bc"))) :: Nil) + } } |