diff options
author | Andrew Ray <ray.andrew@gmail.com> | 2016-05-02 22:47:32 -0700 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2016-05-02 22:47:32 -0700 |
commit | d8f528ceb61e3c2ac7ac97cd8147dafbb625932f (patch) | |
tree | 5e5e3987d3efc5740c4ed21eb53fc2eea478ce46 /sql | |
parent | bb9ab56b960153d374d7e8838f62a18e7e45481e (diff) | |
download | spark-d8f528ceb61e3c2ac7ac97cd8147dafbb625932f.tar.gz spark-d8f528ceb61e3c2ac7ac97cd8147dafbb625932f.tar.bz2 spark-d8f528ceb61e3c2ac7ac97cd8147dafbb625932f.zip |
[SPARK-13749][SQL][FOLLOW-UP] Faster pivot implementation for many distinct values with two phase aggregation
## What changes were proposed in this pull request?
This is a follow up PR for #11583. It makes 3 lazy vals into just vals and adds unit test coverage.
## How was this patch tested?
Existing unit tests and additional unit tests.
Author: Andrew Ray <ray.andrew@gmail.com>
Closes #12861 from aray/fast-pivot-follow-up.
Diffstat (limited to 'sql')
-rw-r--r-- | sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PivotFirst.scala | 6 | ||||
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala | 17 |
2 files changed, 20 insertions, 3 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PivotFirst.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PivotFirst.scala index 9154e96e34..9ead571c53 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PivotFirst.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PivotFirst.scala @@ -141,12 +141,12 @@ case class PivotFirst( copy(mutableAggBufferOffset = newMutableAggBufferOffset) - override lazy val aggBufferAttributes: Seq[AttributeReference] = + override val aggBufferAttributes: Seq[AttributeReference] = pivotIndex.toList.sortBy(_._2).map(kv => AttributeReference(kv._1.toString, valueDataType)()) - override lazy val aggBufferSchema: StructType = StructType.fromAttributes(aggBufferAttributes) + override val aggBufferSchema: StructType = StructType.fromAttributes(aggBufferAttributes) - override lazy val inputAggBufferAttributes: Seq[AttributeReference] = + override val inputAggBufferAttributes: Seq[AttributeReference] = aggBufferAttributes.map(_.newInstance()) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala index b17284aa94..c6d67519b0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala @@ -180,4 +180,21 @@ class DataFramePivotSuite extends QueryTest with SharedSQLContext{ ) } + test("pivot with datatype not supported by PivotFirst") { + checkAnswer( + complexData.groupBy().pivot("b", Seq(true, false)).agg(max("a")), + Row(Seq(1, 1, 1), Seq(2, 2, 2)) :: Nil + ) + } + + test("pivot with datatype not supported by PivotFirst 2") { + checkAnswer( + courseSales.withColumn("e", expr("array(earnings, 7.0d)")) + .groupBy("year") + .pivot("course", Seq("dotNET", "Java")) + .agg(min($"e")), + Row(2012, Seq(5000.0, 7.0), Seq(20000.0, 7.0)) :: + Row(2013, Seq(48000.0, 7.0), Seq(30000.0, 7.0)) :: Nil + ) + } } |