aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorAndrew Ray <ray.andrew@gmail.com>2016-05-02 22:47:32 -0700
committerYin Huai <yhuai@databricks.com>2016-05-02 22:47:32 -0700
commitd8f528ceb61e3c2ac7ac97cd8147dafbb625932f (patch)
tree5e5e3987d3efc5740c4ed21eb53fc2eea478ce46 /sql
parentbb9ab56b960153d374d7e8838f62a18e7e45481e (diff)
downloadspark-d8f528ceb61e3c2ac7ac97cd8147dafbb625932f.tar.gz
spark-d8f528ceb61e3c2ac7ac97cd8147dafbb625932f.tar.bz2
spark-d8f528ceb61e3c2ac7ac97cd8147dafbb625932f.zip
[SPARK-13749][SQL][FOLLOW-UP] Faster pivot implementation for many distinct values with two phase aggregation
## What changes were proposed in this pull request? This is a follow up PR for #11583. It makes 3 lazy vals into just vals and adds unit test coverage. ## How was this patch tested? Existing unit tests and additional unit tests. Author: Andrew Ray <ray.andrew@gmail.com> Closes #12861 from aray/fast-pivot-follow-up.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PivotFirst.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala17
2 files changed, 20 insertions, 3 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PivotFirst.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PivotFirst.scala
index 9154e96e34..9ead571c53 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PivotFirst.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PivotFirst.scala
@@ -141,12 +141,12 @@ case class PivotFirst(
copy(mutableAggBufferOffset = newMutableAggBufferOffset)
- override lazy val aggBufferAttributes: Seq[AttributeReference] =
+ override val aggBufferAttributes: Seq[AttributeReference] =
pivotIndex.toList.sortBy(_._2).map(kv => AttributeReference(kv._1.toString, valueDataType)())
- override lazy val aggBufferSchema: StructType = StructType.fromAttributes(aggBufferAttributes)
+ override val aggBufferSchema: StructType = StructType.fromAttributes(aggBufferAttributes)
- override lazy val inputAggBufferAttributes: Seq[AttributeReference] =
+ override val inputAggBufferAttributes: Seq[AttributeReference] =
aggBufferAttributes.map(_.newInstance())
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala
index b17284aa94..c6d67519b0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala
@@ -180,4 +180,21 @@ class DataFramePivotSuite extends QueryTest with SharedSQLContext{
)
}
+ test("pivot with datatype not supported by PivotFirst") {
+ checkAnswer(
+ complexData.groupBy().pivot("b", Seq(true, false)).agg(max("a")),
+ Row(Seq(1, 1, 1), Seq(2, 2, 2)) :: Nil
+ )
+ }
+
+ test("pivot with datatype not supported by PivotFirst 2") {
+ checkAnswer(
+ courseSales.withColumn("e", expr("array(earnings, 7.0d)"))
+ .groupBy("year")
+ .pivot("course", Seq("dotNET", "Java"))
+ .agg(min($"e")),
+ Row(2012, Seq(5000.0, 7.0), Seq(20000.0, 7.0)) ::
+ Row(2013, Seq(48000.0, 7.0), Seq(30000.0, 7.0)) :: Nil
+ )
+ }
}