diff options
author | Davies Liu <davies@databricks.com> | 2016-02-02 11:50:14 -0800 |
---|---|---|
committer | Davies Liu <davies.liu@gmail.com> | 2016-02-02 11:50:14 -0800 |
commit | be5dd881f1eff248224a92d57cfd1309cb3acf38 (patch) | |
tree | 7fdf890c80dc6a7e63028b0829f1020ca0c65a54 /sql/core | |
parent | 7f6e3ec79b77400f558ceffa10b2af011962115f (diff) | |
download | spark-be5dd881f1eff248224a92d57cfd1309cb3acf38.tar.gz spark-be5dd881f1eff248224a92d57cfd1309cb3acf38.tar.bz2 spark-be5dd881f1eff248224a92d57cfd1309cb3acf38.zip |
[SPARK-12913] [SQL] Improve performance of stat functions
As benchmarked and discussed here: https://github.com/apache/spark/pull/10786/files#r50038294, benefits from codegen, the declarative aggregate function could be much faster than imperative one.
Author: Davies Liu <davies@databricks.com>
Closes #10960 from davies/stddev.
Diffstat (limited to 'sql/core')
3 files changed, 57 insertions, 5 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala index 26a7340f1a..84154a47de 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala @@ -198,7 +198,8 @@ case class Window( functions, ordinal, child.output, - (expressions, schema) => newMutableProjection(expressions, schema)) + (expressions, schema) => + newMutableProjection(expressions, schema, subexpressionEliminationEnabled)) // Create the factory val factory = key match { @@ -210,7 +211,8 @@ case class Window( ordinal, functions, child.output, - (expressions, schema) => newMutableProjection(expressions, schema), + (expressions, schema) => + newMutableProjection(expressions, schema, subexpressionEliminationEnabled), offset) // Growing Frame. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala index 57db7262fd..a8a81d6d65 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala @@ -240,7 +240,6 @@ case class TungstenAggregate( | ${bufVars(i).value} = ${ev.value}; """.stripMargin } - s""" | // do aggregate | ${aggVals.map(_.code).mkString("\n")} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala index 2f09c8a114..1ccf0e3d06 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala @@ -59,6 +59,55 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { benchmark.run() } + def testStatFunctions(values: Int): Unit = { + + val benchmark = new Benchmark("stat functions", values) + + benchmark.addCase("stddev w/o codegen") { iter => + sqlContext.setConf("spark.sql.codegen.wholeStage", "false") + sqlContext.range(values).groupBy().agg("id" -> "stddev").collect() + } + + benchmark.addCase("stddev w codegen") { iter => + sqlContext.setConf("spark.sql.codegen.wholeStage", "true") + sqlContext.range(values).groupBy().agg("id" -> "stddev").collect() + } + + benchmark.addCase("kurtosis w/o codegen") { iter => + sqlContext.setConf("spark.sql.codegen.wholeStage", "false") + sqlContext.range(values).groupBy().agg("id" -> "kurtosis").collect() + } + + benchmark.addCase("kurtosis w codegen") { iter => + sqlContext.setConf("spark.sql.codegen.wholeStage", "true") + sqlContext.range(values).groupBy().agg("id" -> "kurtosis").collect() + } + + + /** + Using ImperativeAggregate (as implemented in Spark 1.6): + + Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + stddev: Avg Time(ms) Avg Rate(M/s) Relative Rate + ------------------------------------------------------------------------------- + stddev w/o codegen 2019.04 10.39 1.00 X + stddev w codegen 2097.29 10.00 0.96 X + kurtosis w/o codegen 2108.99 9.94 0.96 X + kurtosis w codegen 2090.69 10.03 0.97 X + + Using DeclarativeAggregate: + + Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + stddev: Avg Time(ms) Avg Rate(M/s) Relative Rate + ------------------------------------------------------------------------------- + stddev w/o codegen 989.22 21.20 1.00 X + stddev w codegen 352.35 59.52 2.81 X + kurtosis w/o codegen 3636.91 5.77 0.27 X + kurtosis w codegen 369.25 56.79 2.68 X + */ + benchmark.run() + } + def testAggregateWithKey(values: Int): Unit = { val benchmark = new Benchmark("Aggregate with keys", values) @@ -147,8 +196,10 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { benchmark.run() } - test("benchmark") { - // testWholeStage(1024 * 1024 * 200) + // These benchmark are skipped in normal build + ignore("benchmark") { + // testWholeStage(200 << 20) + // testStddev(20 << 20) // testAggregateWithKey(20 << 20) // testBytesToBytesMap(1024 * 1024 * 50) } |