aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKoert Kuipers <koert@tresata.com>2016-07-04 12:14:14 +0800
committerWenchen Fan <wenchen@databricks.com>2016-07-04 12:14:14 +0800
commit8cdb81fa8264085b1bc04638b649b681ae871843 (patch)
tree0f9ff2f8df401a2382e9be0dedc2980438564aa6
parent88134e736829f5f93a82879c08cb191f175ff8af (diff)
downloadspark-8cdb81fa8264085b1bc04638b649b681ae871843.tar.gz
spark-8cdb81fa8264085b1bc04638b649b681ae871843.tar.bz2
spark-8cdb81fa8264085b1bc04638b649b681ae871843.zip
[SPARK-15204][SQL] improve nullability inference for Aggregator
## What changes were proposed in this pull request? TypedAggregateExpression sets nullable based on the schema of the outputEncoder ## How was this patch tested? Add test in DatasetAggregatorSuite Author: Koert Kuipers <koert@tresata.com> Closes #13532 from koertkuipers/feat-aggregator-nullable.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala9
2 files changed, 13 insertions, 4 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala
index 8bdfa48a30..2cdf4703a5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala
@@ -51,7 +51,8 @@ object TypedAggregateExpression {
bufferDeserializer,
outputEncoder.serializer,
outputEncoder.deserializer.dataType,
- outputType)
+ outputType,
+ !outputEncoder.flat || outputEncoder.schema.head.nullable)
}
}
@@ -65,9 +66,8 @@ case class TypedAggregateExpression(
bufferDeserializer: Expression,
outputSerializer: Seq[Expression],
outputExternalType: DataType,
- dataType: DataType) extends DeclarativeAggregate with NonSQLExpression {
-
- override def nullable: Boolean = true
+ dataType: DataType,
+ nullable: Boolean) extends DeclarativeAggregate with NonSQLExpression {
override def deterministic: Boolean = true
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala
index 32fcf84b02..ddc4dcd239 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala
@@ -305,4 +305,13 @@ class DatasetAggregatorSuite extends QueryTest with SharedSQLContext {
val ds = Seq(1, 2, 3).toDS()
checkDataset(ds.select(MapTypeBufferAgg.toColumn), 1)
}
+
+ test("SPARK-15204 improve nullability inference for Aggregator") {
+ val ds1 = Seq(1, 3, 2, 5).toDS()
+ assert(ds1.select(typed.sum((i: Int) => i)).schema.head.nullable === false)
+ val ds2 = Seq(AggData(1, "a"), AggData(2, "a")).toDS()
+ assert(ds2.select(SeqAgg.toColumn).schema.head.nullable === true)
+ val ds3 = sql("SELECT 'Some String' AS b, 1279869254 AS a").as[AggData]
+ assert(ds3.select(NameAgg.toColumn).schema.head.nullable === true)
+ }
}