aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/util/Benchmark.scala21
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala137
3 files changed, 93 insertions, 67 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/Benchmark.scala b/core/src/main/scala/org/apache/spark/util/Benchmark.scala
index 1fc0ad7a4d..0c685b1918 100644
--- a/core/src/main/scala/org/apache/spark/util/Benchmark.scala
+++ b/core/src/main/scala/org/apache/spark/util/Benchmark.scala
@@ -38,7 +38,7 @@ import org.apache.commons.lang3.SystemUtils
private[spark] class Benchmark(
name: String,
valuesPerIteration: Long,
- iters: Int = 5,
+ defaultNumIters: Int = 5,
outputPerIteration: Boolean = false) {
val benchmarks = mutable.ArrayBuffer.empty[Benchmark.Case]
@@ -46,8 +46,8 @@ private[spark] class Benchmark(
* Adds a case to run when run() is called. The given function will be run for several
* iterations to collect timing statistics.
*/
- def addCase(name: String)(f: Int => Unit): Unit = {
- addTimerCase(name) { timer =>
+ def addCase(name: String, numIters: Int = 0)(f: Int => Unit): Unit = {
+ addTimerCase(name, numIters) { timer =>
timer.startTiming()
f(timer.iteration)
timer.stopTiming()
@@ -59,8 +59,8 @@ private[spark] class Benchmark(
* until timer.startTiming() is called within the given function. The corresponding
* timer.stopTiming() method must be called before the function returns.
*/
- def addTimerCase(name: String)(f: Benchmark.Timer => Unit): Unit = {
- benchmarks += Benchmark.Case(name, f)
+ def addTimerCase(name: String, numIters: Int = 0)(f: Benchmark.Timer => Unit): Unit = {
+ benchmarks += Benchmark.Case(name, f, if (numIters == 0) defaultNumIters else numIters)
}
/**
@@ -75,7 +75,7 @@ private[spark] class Benchmark(
val results = benchmarks.map { c =>
println(" Running case: " + c.name)
- Benchmark.measure(valuesPerIteration, iters, outputPerIteration)(c.fn)
+ Benchmark.measure(valuesPerIteration, c.numIters, outputPerIteration)(c.fn)
}
println
@@ -83,12 +83,11 @@ private[spark] class Benchmark(
// The results are going to be processor specific so it is useful to include that.
println(Benchmark.getJVMOSInfo())
println(Benchmark.getProcessorName())
- printf("%-35s %16s %12s %13s %10s\n", name + ":", "Best/Avg Time(ms)", "Rate(M/s)",
+ printf("%-40s %16s %12s %13s %10s\n", name + ":", "Best/Avg Time(ms)", "Rate(M/s)",
"Per Row(ns)", "Relative")
- println("-----------------------------------------------------------------------------------" +
- "--------")
+ println("-" * 96)
results.zip(benchmarks).foreach { case (result, benchmark) =>
- printf("%-35s %16s %12s %13s %10s\n",
+ printf("%-40s %16s %12s %13s %10s\n",
benchmark.name,
"%5.0f / %4.0f" format (result.bestMs, result.avgMs),
"%10.1f" format result.bestRate,
@@ -128,7 +127,7 @@ private[spark] object Benchmark {
}
}
- case class Case(name: String, fn: Timer => Unit)
+ case class Case(name: String, fn: Timer => Unit, numIters: Int)
case class Result(avgMs: Double, bestRate: Double, bestMs: Double)
/**
diff --git a/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala
index 52428634e5..b03df1a94d 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala
@@ -244,7 +244,7 @@ class RadixSortSuite extends SparkFunSuite with Logging {
RadixSort.sortKeyPrefixArray(buf2, size, 0, 7, false, false)
timer.stopTiming()
}
- benchmark.run
+ benchmark.run()
/**
Running benchmark: radix sort 25000000
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
index 841263d3da..7ca4b75f48 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
@@ -36,6 +36,8 @@ import org.apache.spark.util.Benchmark
* Benchmark to measure whole stage codegen performance.
* To run this:
* build/sbt "sql/test-only *BenchmarkWholeStageCodegen"
+ *
+ * Benchmarks in this file are skipped in normal builds.
*/
class BenchmarkWholeStageCodegen extends SparkFunSuite {
lazy val conf = new SparkConf().setMaster("local[1]").setAppName("benchmark")
@@ -44,31 +46,50 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
lazy val sc = SparkContext.getOrCreate(conf)
lazy val sqlContext = SQLContext.getOrCreate(sc)
- def runBenchmark(name: String, values: Long)(f: => Unit): Unit = {
- val benchmark = new Benchmark(name, values)
+ /** Runs function `f` with whole stage codegen on and off. */
+ def runBenchmark(name: String, cardinality: Long)(f: => Unit): Unit = {
+ val benchmark = new Benchmark(name, cardinality)
- Seq(false, true).foreach { enabled =>
- benchmark.addCase(s"$name codegen=$enabled") { iter =>
- sqlContext.setConf("spark.sql.codegen.wholeStage", enabled.toString)
- f
- }
+ benchmark.addCase(s"$name wholestage off", numIters = 2) { iter =>
+ sqlContext.setConf("spark.sql.codegen.wholeStage", "false")
+ f
+ }
+
+ benchmark.addCase(s"$name wholestage on", numIters = 5) { iter =>
+ sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
+ f
}
benchmark.run()
}
- // These benchmark are skipped in normal build
- ignore("range/filter/sum") {
- val N = 500L << 20
- runBenchmark("rang/filter/sum", N) {
+ ignore("aggregate without grouping") {
+ val N = 500L << 22
+ val benchmark = new Benchmark("agg without grouping", N)
+ runBenchmark("agg w/o group", N) {
+ sqlContext.range(N).selectExpr("sum(id)").collect()
+ }
+ /*
+ agg w/o group: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ ------------------------------------------------------------------------------------------------
+ agg w/o group wholestage off 30136 / 31885 69.6 14.4 1.0X
+ agg w/o group wholestage on 1851 / 1860 1132.9 0.9 16.3X
+ */
+ }
+
+ ignore("filter & aggregate without group") {
+ val N = 500L << 22
+ runBenchmark("range/filter/sum", N) {
sqlContext.range(N).filter("(id & 1) = 1").groupBy().sum().collect()
}
/*
- Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
- rang/filter/sum: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
- -------------------------------------------------------------------------------------------
- rang/filter/sum codegen=false 14332 / 16646 36.0 27.8 1.0X
- rang/filter/sum codegen=true 897 / 1022 584.6 1.7 16.4X
+ Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.11
+ Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
+
+ range/filter/sum: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ ------------------------------------------------------------------------------------------------
+ range/filter/sum codegen=false 30663 / 31216 68.4 14.6 1.0X
+ range/filter/sum codegen=true 2399 / 2409 874.1 1.1 12.8X
*/
}
@@ -86,28 +107,32 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
*/
}
- ignore("range/sample/sum") {
- val N = 500 << 20
- runBenchmark("range/sample/sum", N) {
- sqlContext.range(N).sample(true, 0.01).groupBy().sum().collect()
+ ignore("sample") {
+ val N = 500 << 18
+ runBenchmark("sample with replacement", N) {
+ sqlContext.range(N).sample(withReplacement = true, 0.01).groupBy().sum().collect()
}
/*
- Westmere E56xx/L56xx/X56xx (Nehalem-C)
- range/sample/sum: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
- -------------------------------------------------------------------------------------------
- range/sample/sum codegen=false 53888 / 56592 9.7 102.8 1.0X
- range/sample/sum codegen=true 41614 / 42607 12.6 79.4 1.3X
+ Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.11
+ Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
+
+ sample with replacement: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ ------------------------------------------------------------------------------------------------
+ sample with replacement codegen=false 7073 / 7227 18.5 54.0 1.0X
+ sample with replacement codegen=true 5199 / 5203 25.2 39.7 1.4X
*/
- runBenchmark("range/sample/sum", N) {
- sqlContext.range(N).sample(false, 0.01).groupBy().sum().collect()
+ runBenchmark("sample without replacement", N) {
+ sqlContext.range(N).sample(withReplacement = false, 0.01).groupBy().sum().collect()
}
/*
- Westmere E56xx/L56xx/X56xx (Nehalem-C)
- range/sample/sum: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
- -------------------------------------------------------------------------------------------
- range/sample/sum codegen=false 12982 / 13384 40.4 24.8 1.0X
- range/sample/sum codegen=true 7074 / 7383 74.1 13.5 1.8X
+ Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.11
+ Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
+
+ sample without replacement: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ ------------------------------------------------------------------------------------------------
+ sample without replacement codegen=false 1508 / 1529 86.9 11.5 1.0X
+ sample without replacement codegen=true 644 / 662 203.5 4.9 2.3X
*/
}
@@ -151,23 +176,23 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
}
ignore("aggregate with linear keys") {
- val N = 20 << 20
+ val N = 20 << 22
val benchmark = new Benchmark("Aggregate w keys", N)
def f(): Unit = sqlContext.range(N).selectExpr("(id & 65535) as k").groupBy("k").sum().collect()
- benchmark.addCase(s"codegen = F") { iter =>
+ benchmark.addCase(s"codegen = F", numIters = 2) { iter =>
sqlContext.setConf("spark.sql.codegen.wholeStage", "false")
f()
}
- benchmark.addCase(s"codegen = T hashmap = F") { iter =>
+ benchmark.addCase(s"codegen = T hashmap = F", numIters = 3) { iter =>
sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "0")
f()
}
- benchmark.addCase(s"codegen = T hashmap = T") { iter =>
+ benchmark.addCase(s"codegen = T hashmap = T", numIters = 5) { iter =>
sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "3")
f()
@@ -176,36 +201,37 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
benchmark.run()
/*
- Java HotSpot(TM) 64-Bit Server VM 1.8.0_73-b02 on Mac OS X 10.11.4
+ Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.11
Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
- Aggregate w keys: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
- -------------------------------------------------------------------------------------------
- codegen = F 2067 / 2166 10.1 98.6 1.0X
- codegen = T hashmap = F 1149 / 1321 18.3 54.8 1.8X
- codegen = T hashmap = T 388 / 475 54.0 18.5 5.3X
+
+ Aggregate w keys: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ ------------------------------------------------------------------------------------------------
+ codegen = F 6619 / 6780 12.7 78.9 1.0X
+ codegen = T hashmap = F 3935 / 4059 21.3 46.9 1.7X
+ codegen = T hashmap = T 897 / 971 93.5 10.7 7.4X
*/
}
ignore("aggregate with randomized keys") {
- val N = 20 << 20
+ val N = 20 << 22
val benchmark = new Benchmark("Aggregate w keys", N)
sqlContext.range(N).selectExpr("id", "floor(rand() * 10000) as k").registerTempTable("test")
def f(): Unit = sqlContext.sql("select k, k, sum(id) from test group by k, k").collect()
- benchmark.addCase(s"codegen = F") { iter =>
+ benchmark.addCase(s"codegen = F", numIters = 2) { iter =>
sqlContext.setConf("spark.sql.codegen.wholeStage", "false")
f()
}
- benchmark.addCase(s"codegen = T hashmap = F") { iter =>
+ benchmark.addCase(s"codegen = T hashmap = F", numIters = 3) { iter =>
sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "0")
f()
}
- benchmark.addCase(s"codegen = T hashmap = T") { iter =>
+ benchmark.addCase(s"codegen = T hashmap = T", numIters = 5) { iter =>
sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "3")
f()
@@ -214,13 +240,14 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
benchmark.run()
/*
- Java HotSpot(TM) 64-Bit Server VM 1.8.0_73-b02 on Mac OS X 10.11.4
+ Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.11
Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
- Aggregate w keys: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
- -------------------------------------------------------------------------------------------
- codegen = F 2517 / 2608 8.3 120.0 1.0X
- codegen = T hashmap = F 1484 / 1560 14.1 70.8 1.7X
- codegen = T hashmap = T 794 / 908 26.4 37.9 3.2X
+
+ Aggregate w keys: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ ------------------------------------------------------------------------------------------------
+ codegen = F 7445 / 7517 11.3 88.7 1.0X
+ codegen = T hashmap = F 4672 / 4703 18.0 55.7 1.6X
+ codegen = T hashmap = T 1764 / 1958 47.6 21.0 4.2X
*/
}
@@ -231,18 +258,18 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
def f(): Unit = sqlContext.range(N).selectExpr("id", "cast(id & 1023 as string) as k")
.groupBy("k").count().collect()
- benchmark.addCase(s"codegen = F") { iter =>
+ benchmark.addCase(s"codegen = F", numIters = 2) { iter =>
sqlContext.setConf("spark.sql.codegen.wholeStage", "false")
f()
}
- benchmark.addCase(s"codegen = T hashmap = F") { iter =>
+ benchmark.addCase(s"codegen = T hashmap = F", numIters = 3) { iter =>
sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "0")
f()
}
- benchmark.addCase(s"codegen = T hashmap = T") { iter =>
+ benchmark.addCase(s"codegen = T hashmap = T", numIters = 5) { iter =>
sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "3")
f()