aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2016-02-02 11:50:14 -0800
committerDavies Liu <davies.liu@gmail.com>2016-02-02 11:50:14 -0800
commitbe5dd881f1eff248224a92d57cfd1309cb3acf38 (patch)
tree7fdf890c80dc6a7e63028b0829f1020ca0c65a54 /sql/core
parent7f6e3ec79b77400f558ceffa10b2af011962115f (diff)
downloadspark-be5dd881f1eff248224a92d57cfd1309cb3acf38.tar.gz
spark-be5dd881f1eff248224a92d57cfd1309cb3acf38.tar.bz2
spark-be5dd881f1eff248224a92d57cfd1309cb3acf38.zip
[SPARK-12913] [SQL] Improve performance of stat functions
As benchmarked and discussed here: https://github.com/apache/spark/pull/10786/files#r50038294, benefits from codegen, the declarative aggregate function could be much faster than imperative one. Author: Davies Liu <davies@databricks.com> Closes #10960 from davies/stddev.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala1
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala55
3 files changed, 57 insertions, 5 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
index 26a7340f1a..84154a47de 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
@@ -198,7 +198,8 @@ case class Window(
functions,
ordinal,
child.output,
- (expressions, schema) => newMutableProjection(expressions, schema))
+ (expressions, schema) =>
+ newMutableProjection(expressions, schema, subexpressionEliminationEnabled))
// Create the factory
val factory = key match {
@@ -210,7 +211,8 @@ case class Window(
ordinal,
functions,
child.output,
- (expressions, schema) => newMutableProjection(expressions, schema),
+ (expressions, schema) =>
+ newMutableProjection(expressions, schema, subexpressionEliminationEnabled),
offset)
// Growing Frame.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
index 57db7262fd..a8a81d6d65 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
@@ -240,7 +240,6 @@ case class TungstenAggregate(
| ${bufVars(i).value} = ${ev.value};
""".stripMargin
}
-
s"""
| // do aggregate
| ${aggVals.map(_.code).mkString("\n")}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
index 2f09c8a114..1ccf0e3d06 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
@@ -59,6 +59,55 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
benchmark.run()
}
+ def testStatFunctions(values: Int): Unit = {
+
+ val benchmark = new Benchmark("stat functions", values)
+
+ benchmark.addCase("stddev w/o codegen") { iter =>
+ sqlContext.setConf("spark.sql.codegen.wholeStage", "false")
+ sqlContext.range(values).groupBy().agg("id" -> "stddev").collect()
+ }
+
+ benchmark.addCase("stddev w codegen") { iter =>
+ sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
+ sqlContext.range(values).groupBy().agg("id" -> "stddev").collect()
+ }
+
+ benchmark.addCase("kurtosis w/o codegen") { iter =>
+ sqlContext.setConf("spark.sql.codegen.wholeStage", "false")
+ sqlContext.range(values).groupBy().agg("id" -> "kurtosis").collect()
+ }
+
+ benchmark.addCase("kurtosis w codegen") { iter =>
+ sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
+ sqlContext.range(values).groupBy().agg("id" -> "kurtosis").collect()
+ }
+
+
+ /**
+ Using ImperativeAggregate (as implemented in Spark 1.6):
+
+ Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
+ stddev: Avg Time(ms) Avg Rate(M/s) Relative Rate
+ -------------------------------------------------------------------------------
+ stddev w/o codegen 2019.04 10.39 1.00 X
+ stddev w codegen 2097.29 10.00 0.96 X
+ kurtosis w/o codegen 2108.99 9.94 0.96 X
+ kurtosis w codegen 2090.69 10.03 0.97 X
+
+ Using DeclarativeAggregate:
+
+ Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
+ stddev: Avg Time(ms) Avg Rate(M/s) Relative Rate
+ -------------------------------------------------------------------------------
+ stddev w/o codegen 989.22 21.20 1.00 X
+ stddev w codegen 352.35 59.52 2.81 X
+ kurtosis w/o codegen 3636.91 5.77 0.27 X
+ kurtosis w codegen 369.25 56.79 2.68 X
+ */
+ benchmark.run()
+ }
+
def testAggregateWithKey(values: Int): Unit = {
val benchmark = new Benchmark("Aggregate with keys", values)
@@ -147,8 +196,10 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
benchmark.run()
}
- test("benchmark") {
- // testWholeStage(1024 * 1024 * 200)
+ // These benchmark are skipped in normal build
+ ignore("benchmark") {
+ // testWholeStage(200 << 20)
+ // testStddev(20 << 20)
// testAggregateWithKey(20 << 20)
// testBytesToBytesMap(1024 * 1024 * 50)
}