aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2016-02-03 17:07:27 -0800
committerDavies Liu <davies.liu@gmail.com>2016-02-03 17:07:27 -0800
commitde0914522fc5b2658959f9e2272b4e3162b14978 (patch)
treef4adb6adf78ea97f1fdf8053d63b6da5211bb4e3
parent915a75398ecbccdbf9a1e07333104c857ae1ce5e (diff)
downloadspark-de0914522fc5b2658959f9e2272b4e3162b14978.tar.gz
spark-de0914522fc5b2658959f9e2272b4e3162b14978.tar.bz2
spark-de0914522fc5b2658959f9e2272b4e3162b14978.zip
[SPARK-13131] [SQL] Use best and average time in benchmark
Best time is stabler than average time, also added a column for nano seconds per row (which could be used to estimate contributions of each components in a query). Having best time and average time together for more information (we can see kind of variance). rate, time per row and relative are all calculated using best time. The result looks like this: ``` Intel(R) Core(TM) i7-4558U CPU 2.80GHz rang/filter/sum: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- rang/filter/sum codegen=false 14332 / 16646 36.0 27.8 1.0X rang/filter/sum codegen=true 845 / 940 620.0 1.6 17.0X ``` Author: Davies Liu <davies@databricks.com> Closes #11018 from davies/gen_bench.
-rw-r--r--core/src/main/scala/org/apache/spark/util/Benchmark.scala38
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala154
2 files changed, 89 insertions, 103 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/Benchmark.scala b/core/src/main/scala/org/apache/spark/util/Benchmark.scala
index d484cec7ae..d1699f5c28 100644
--- a/core/src/main/scala/org/apache/spark/util/Benchmark.scala
+++ b/core/src/main/scala/org/apache/spark/util/Benchmark.scala
@@ -18,6 +18,7 @@
package org.apache.spark.util
import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
import org.apache.commons.lang3.SystemUtils
@@ -59,17 +60,21 @@ private[spark] class Benchmark(
}
println
- val firstRate = results.head.avgRate
+ val firstBest = results.head.bestMs
+ val firstAvg = results.head.avgMs
// The results are going to be processor specific so it is useful to include that.
println(Benchmark.getProcessorName())
- printf("%-30s %16s %16s %14s\n", name + ":", "Avg Time(ms)", "Avg Rate(M/s)", "Relative Rate")
- println("-------------------------------------------------------------------------------")
- results.zip(benchmarks).foreach { r =>
- printf("%-30s %16s %16s %14s\n",
- r._2.name,
- "%10.2f" format r._1.avgMs,
- "%10.2f" format r._1.avgRate,
- "%6.2f X" format (r._1.avgRate / firstRate))
+ printf("%-35s %16s %12s %13s %10s\n", name + ":", "Best/Avg Time(ms)", "Rate(M/s)",
+ "Per Row(ns)", "Relative")
+ println("-----------------------------------------------------------------------------------" +
+ "--------")
+ results.zip(benchmarks).foreach { case (result, benchmark) =>
+ printf("%-35s %16s %12s %13s %10s\n",
+ benchmark.name,
+ "%5.0f / %4.0f" format (result.bestMs, result.avgMs),
+ "%10.1f" format result.bestRate,
+ "%6.1f" format (1000 / result.bestRate),
+ "%3.1fX" format (firstBest / result.bestMs))
}
println
// scalastyle:on
@@ -78,7 +83,7 @@ private[spark] class Benchmark(
private[spark] object Benchmark {
case class Case(name: String, fn: Int => Unit)
- case class Result(avgMs: Double, avgRate: Double)
+ case class Result(avgMs: Double, bestRate: Double, bestMs: Double)
/**
* This should return a user helpful processor information. Getting at this depends on the OS.
@@ -99,22 +104,27 @@ private[spark] object Benchmark {
* the rate of the function.
*/
def measure(num: Long, iters: Int, outputPerIteration: Boolean)(f: Int => Unit): Result = {
- var totalTime = 0L
+ val runTimes = ArrayBuffer[Long]()
for (i <- 0 until iters + 1) {
val start = System.nanoTime()
f(i)
val end = System.nanoTime()
- if (i != 0) totalTime += end - start
+ val runTime = end - start
+ if (i > 0) {
+ runTimes += runTime
+ }
if (outputPerIteration) {
// scalastyle:off
- println(s"Iteration $i took ${(end - start) / 1000} microseconds")
+ println(s"Iteration $i took ${runTime / 1000} microseconds")
// scalastyle:on
}
}
- Result(totalTime.toDouble / 1000000 / iters, num * iters / (totalTime.toDouble / 1000))
+ val best = runTimes.min
+ val avg = runTimes.sum / iters
+ Result(avg / 1000000, num / (best / 1000), best / 1000000)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
index 15ba773531..33d4976403 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
@@ -34,54 +34,47 @@ import org.apache.spark.util.Benchmark
*/
class BenchmarkWholeStageCodegen extends SparkFunSuite {
lazy val conf = new SparkConf().setMaster("local[1]").setAppName("benchmark")
+ .set("spark.sql.shuffle.partitions", "1")
lazy val sc = SparkContext.getOrCreate(conf)
lazy val sqlContext = SQLContext.getOrCreate(sc)
- def testWholeStage(values: Int): Unit = {
- val benchmark = new Benchmark("rang/filter/aggregate", values)
+ def runBenchmark(name: String, values: Int)(f: => Unit): Unit = {
+ val benchmark = new Benchmark(name, values)
- benchmark.addCase("Without codegen") { iter =>
- sqlContext.setConf("spark.sql.codegen.wholeStage", "false")
- sqlContext.range(values).filter("(id & 1) = 1").count()
- }
-
- benchmark.addCase("With codegen") { iter =>
- sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
- sqlContext.range(values).filter("(id & 1) = 1").count()
+ Seq(false, true).foreach { enabled =>
+ benchmark.addCase(s"$name codegen=$enabled") { iter =>
+ sqlContext.setConf("spark.sql.codegen.wholeStage", enabled.toString)
+ f
+ }
}
- /*
- Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
- rang/filter/aggregate: Avg Time(ms) Avg Rate(M/s) Relative Rate
- -------------------------------------------------------------------------------
- Without codegen 7775.53 26.97 1.00 X
- With codegen 342.15 612.94 22.73 X
- */
benchmark.run()
}
- def testStatFunctions(values: Int): Unit = {
-
- val benchmark = new Benchmark("stat functions", values)
-
- benchmark.addCase("stddev w/o codegen") { iter =>
- sqlContext.setConf("spark.sql.codegen.wholeStage", "false")
- sqlContext.range(values).groupBy().agg("id" -> "stddev").collect()
+ // These benchmark are skipped in normal build
+ ignore("range/filter/sum") {
+ val N = 500 << 20
+ runBenchmark("rang/filter/sum", N) {
+ sqlContext.range(N).filter("(id & 1) = 1").groupBy().sum().collect()
}
+ /*
+ Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
+ rang/filter/sum: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ -------------------------------------------------------------------------------------------
+ rang/filter/sum codegen=false 14332 / 16646 36.0 27.8 1.0X
+ rang/filter/sum codegen=true 845 / 940 620.0 1.6 17.0X
+ */
+ }
- benchmark.addCase("stddev w codegen") { iter =>
- sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
- sqlContext.range(values).groupBy().agg("id" -> "stddev").collect()
- }
+ ignore("stat functions") {
+ val N = 100 << 20
- benchmark.addCase("kurtosis w/o codegen") { iter =>
- sqlContext.setConf("spark.sql.codegen.wholeStage", "false")
- sqlContext.range(values).groupBy().agg("id" -> "kurtosis").collect()
+ runBenchmark("stddev", N) {
+ sqlContext.range(N).groupBy().agg("id" -> "stddev").collect()
}
- benchmark.addCase("kurtosis w codegen") { iter =>
- sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
- sqlContext.range(values).groupBy().agg("id" -> "kurtosis").collect()
+ runBenchmark("kurtosis", N) {
+ sqlContext.range(N).groupBy().agg("id" -> "kurtosis").collect()
}
@@ -99,64 +92,56 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
Using DeclarativeAggregate:
Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
- stddev: Avg Time(ms) Avg Rate(M/s) Relative Rate
- -------------------------------------------------------------------------------
- stddev w/o codegen 989.22 21.20 1.00 X
- stddev w codegen 352.35 59.52 2.81 X
- kurtosis w/o codegen 3636.91 5.77 0.27 X
- kurtosis w codegen 369.25 56.79 2.68 X
+ stddev: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ -------------------------------------------------------------------------------------------
+ stddev codegen=false 5630 / 5776 18.0 55.6 1.0X
+ stddev codegen=true 1259 / 1314 83.0 12.0 4.5X
+
+ Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
+ kurtosis: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ -------------------------------------------------------------------------------------------
+ kurtosis codegen=false 14847 / 15084 7.0 142.9 1.0X
+ kurtosis codegen=true 1652 / 2124 63.0 15.9 9.0X
*/
- benchmark.run()
}
- def testAggregateWithKey(values: Int): Unit = {
- val benchmark = new Benchmark("Aggregate with keys", values)
+ ignore("aggregate with keys") {
+ val N = 20 << 20
- benchmark.addCase("Aggregate w/o codegen") { iter =>
- sqlContext.setConf("spark.sql.codegen.wholeStage", "false")
- sqlContext.range(values).selectExpr("(id & 65535) as k").groupBy("k").sum().collect()
- }
- benchmark.addCase(s"Aggregate w codegen") { iter =>
- sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
- sqlContext.range(values).selectExpr("(id & 65535) as k").groupBy("k").sum().collect()
+ runBenchmark("Aggregate w keys", N) {
+ sqlContext.range(N).selectExpr("(id & 65535) as k").groupBy("k").sum().collect()
}
/*
Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
- Aggregate with keys: Avg Time(ms) Avg Rate(M/s) Relative Rate
- -------------------------------------------------------------------------------
- Aggregate w/o codegen 4254.38 4.93 1.00 X
- Aggregate w codegen 2661.45 7.88 1.60 X
+ Aggregate w keys: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ -------------------------------------------------------------------------------------------
+ Aggregate w keys codegen=false 2402 / 2551 8.0 125.0 1.0X
+ Aggregate w keys codegen=true 1620 / 1670 12.0 83.3 1.5X
*/
- benchmark.run()
}
- def testBroadcastHashJoin(values: Int): Unit = {
- val benchmark = new Benchmark("BroadcastHashJoin", values)
-
+ ignore("broadcast hash join") {
+ val N = 20 << 20
val dim = broadcast(sqlContext.range(1 << 16).selectExpr("id as k", "cast(id as string) as v"))
- benchmark.addCase("BroadcastHashJoin w/o codegen") { iter =>
- sqlContext.setConf("spark.sql.codegen.wholeStage", "false")
- sqlContext.range(values).join(dim, (col("id") % 60000) === col("k")).count()
- }
- benchmark.addCase(s"BroadcastHashJoin w codegen") { iter =>
- sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
- sqlContext.range(values).join(dim, (col("id") % 60000) === col("k")).count()
+ runBenchmark("BroadcastHashJoin", N) {
+ sqlContext.range(N).join(dim, (col("id") % 60000) === col("k")).count()
}
/*
- Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
- BroadcastHashJoin: Avg Time(ms) Avg Rate(M/s) Relative Rate
- -------------------------------------------------------------------------------
- BroadcastHashJoin w/o codegen 3053.41 3.43 1.00 X
- BroadcastHashJoin w codegen 1028.40 10.20 2.97 X
+ Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
+ BroadcastHashJoin: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ -------------------------------------------------------------------------------------------
+ BroadcastHashJoin codegen=false 4405 / 6147 4.0 250.0 1.0X
+ BroadcastHashJoin codegen=true 1857 / 1878 11.0 90.9 2.4X
*/
- benchmark.run()
}
- def testBytesToBytesMap(values: Int): Unit = {
- val benchmark = new Benchmark("BytesToBytesMap", values)
+ ignore("hash and BytesToBytesMap") {
+ val N = 50 << 20
+
+ val benchmark = new Benchmark("BytesToBytesMap", N)
benchmark.addCase("hash") { iter =>
var i = 0
@@ -167,7 +152,7 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
val value = new UnsafeRow(2)
value.pointTo(valueBytes, Platform.BYTE_ARRAY_OFFSET, 16)
var s = 0
- while (i < values) {
+ while (i < N) {
key.setInt(0, i % 1000)
val h = Murmur3_x86_32.hashUnsafeWords(
key.getBaseObject, key.getBaseOffset, key.getSizeInBytes, 0)
@@ -194,7 +179,7 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
val value = new UnsafeRow(2)
value.pointTo(valueBytes, Platform.BYTE_ARRAY_OFFSET, 16)
var i = 0
- while (i < values) {
+ while (i < N) {
key.setInt(0, i % 65536)
val loc = map.lookup(key.getBaseObject, key.getBaseOffset, key.getSizeInBytes)
if (loc.isDefined) {
@@ -212,21 +197,12 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
/**
Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
- Aggregate with keys: Avg Time(ms) Avg Rate(M/s) Relative Rate
- -------------------------------------------------------------------------------
- hash 662.06 79.19 1.00 X
- BytesToBytesMap (off Heap) 2209.42 23.73 0.30 X
- BytesToBytesMap (on Heap) 2957.68 17.73 0.22 X
+ BytesToBytesMap: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ -------------------------------------------------------------------------------------------
+ hash 628 / 661 83.0 12.0 1.0X
+ BytesToBytesMap (off Heap) 3292 / 3408 15.0 66.7 0.2X
+ BytesToBytesMap (on Heap) 3349 / 4267 15.0 66.7 0.2X
*/
benchmark.run()
}
-
- // These benchmark are skipped in normal build
- ignore("benchmark") {
- // testWholeStage(200 << 20)
- // testStatFunctions(20 << 20)
- // testAggregateWithKey(20 << 20)
- // testBytesToBytesMap(50 << 20)
- // testBroadcastHashJoin(10 << 20)
- }
}