diff options
author | Koert Kuipers <koert@tresata.com> | 2016-07-04 12:14:14 +0800 |
---|---|---|
committer | Wenchen Fan <wenchen@databricks.com> | 2016-07-04 12:14:14 +0800 |
commit | 8cdb81fa8264085b1bc04638b649b681ae871843 (patch) | |
tree | 0f9ff2f8df401a2382e9be0dedc2980438564aa6 /sql | |
parent | 88134e736829f5f93a82879c08cb191f175ff8af (diff) | |
download | spark-8cdb81fa8264085b1bc04638b649b681ae871843.tar.gz spark-8cdb81fa8264085b1bc04638b649b681ae871843.tar.bz2 spark-8cdb81fa8264085b1bc04638b649b681ae871843.zip |
[SPARK-15204][SQL] improve nullability inference for Aggregator
## What changes were proposed in this pull request?
TypedAggregateExpression sets nullable based on the schema of the outputEncoder
## How was this patch tested?
Add test in DatasetAggregatorSuite
Author: Koert Kuipers <koert@tresata.com>
Closes #13532 from koertkuipers/feat-aggregator-nullable.
Diffstat (limited to 'sql')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala | 8 | ||||
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala | 9 |
2 files changed, 13 insertions, 4 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala index 8bdfa48a30..2cdf4703a5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala @@ -51,7 +51,8 @@ object TypedAggregateExpression { bufferDeserializer, outputEncoder.serializer, outputEncoder.deserializer.dataType, - outputType) + outputType, + !outputEncoder.flat || outputEncoder.schema.head.nullable) } } @@ -65,9 +66,8 @@ case class TypedAggregateExpression( bufferDeserializer: Expression, outputSerializer: Seq[Expression], outputExternalType: DataType, - dataType: DataType) extends DeclarativeAggregate with NonSQLExpression { - - override def nullable: Boolean = true + dataType: DataType, + nullable: Boolean) extends DeclarativeAggregate with NonSQLExpression { override def deterministic: Boolean = true diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala index 32fcf84b02..ddc4dcd239 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala @@ -305,4 +305,13 @@ class DatasetAggregatorSuite extends QueryTest with SharedSQLContext { val ds = Seq(1, 2, 3).toDS() checkDataset(ds.select(MapTypeBufferAgg.toColumn), 1) } + + test("SPARK-15204 improve nullability inference for Aggregator") { + val ds1 = Seq(1, 3, 2, 5).toDS() + assert(ds1.select(typed.sum((i: Int) => i)).schema.head.nullable === false) + val ds2 = Seq(AggData(1, "a"), AggData(2, "a")).toDS() + assert(ds2.select(SeqAgg.toColumn).schema.head.nullable === true) + val ds3 = sql("SELECT 'Some String' AS b, 1279869254 AS a").as[AggData] + assert(ds3.select(NameAgg.toColumn).schema.head.nullable === true) + } } |