aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/util/Benchmark.scala38
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala154
2 files changed, 89 insertions, 103 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/Benchmark.scala b/core/src/main/scala/org/apache/spark/util/Benchmark.scala
index d484cec7ae..d1699f5c28 100644
--- a/core/src/main/scala/org/apache/spark/util/Benchmark.scala
+++ b/core/src/main/scala/org/apache/spark/util/Benchmark.scala
@@ -18,6 +18,7 @@
package org.apache.spark.util
import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
import org.apache.commons.lang3.SystemUtils
@@ -59,17 +60,21 @@ private[spark] class Benchmark(
}
println
- val firstRate = results.head.avgRate
+ val firstBest = results.head.bestMs
+ val firstAvg = results.head.avgMs
// The results are going to be processor specific so it is useful to include that.
println(Benchmark.getProcessorName())
- printf("%-30s %16s %16s %14s\n", name + ":", "Avg Time(ms)", "Avg Rate(M/s)", "Relative Rate")
- println("-------------------------------------------------------------------------------")
- results.zip(benchmarks).foreach { r =>
- printf("%-30s %16s %16s %14s\n",
- r._2.name,
- "%10.2f" format r._1.avgMs,
- "%10.2f" format r._1.avgRate,
- "%6.2f X" format (r._1.avgRate / firstRate))
+ printf("%-35s %16s %12s %13s %10s\n", name + ":", "Best/Avg Time(ms)", "Rate(M/s)",
+ "Per Row(ns)", "Relative")
+ println("-----------------------------------------------------------------------------------" +
+ "--------")
+ results.zip(benchmarks).foreach { case (result, benchmark) =>
+ printf("%-35s %16s %12s %13s %10s\n",
+ benchmark.name,
+ "%5.0f / %4.0f" format (result.bestMs, result.avgMs),
+ "%10.1f" format result.bestRate,
+ "%6.1f" format (1000 / result.bestRate),
+ "%3.1fX" format (firstBest / result.bestMs))
}
println
// scalastyle:on
@@ -78,7 +83,7 @@ private[spark] class Benchmark(
private[spark] object Benchmark {
case class Case(name: String, fn: Int => Unit)
- case class Result(avgMs: Double, avgRate: Double)
+ case class Result(avgMs: Double, bestRate: Double, bestMs: Double)
/**
* This should return a user helpful processor information. Getting at this depends on the OS.
@@ -99,22 +104,27 @@ private[spark] object Benchmark {
* the rate of the function.
*/
def measure(num: Long, iters: Int, outputPerIteration: Boolean)(f: Int => Unit): Result = {
- var totalTime = 0L
+ val runTimes = ArrayBuffer[Long]()
for (i <- 0 until iters + 1) {
val start = System.nanoTime()
f(i)
val end = System.nanoTime()
- if (i != 0) totalTime += end - start
+ val runTime = end - start
+ if (i > 0) {
+ runTimes += runTime
+ }
if (outputPerIteration) {
// scalastyle:off
- println(s"Iteration $i took ${(end - start) / 1000} microseconds")
+ println(s"Iteration $i took ${runTime / 1000} microseconds")
// scalastyle:on
}
}
- Result(totalTime.toDouble / 1000000 / iters, num * iters / (totalTime.toDouble / 1000))
+ val best = runTimes.min
+ val avg = runTimes.sum / iters
+ Result(avg / 1000000, num / (best / 1000), best / 1000000)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
index 15ba773531..33d4976403 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
@@ -34,54 +34,47 @@ import org.apache.spark.util.Benchmark
*/
class BenchmarkWholeStageCodegen extends SparkFunSuite {
lazy val conf = new SparkConf().setMaster("local[1]").setAppName("benchmark")
+ .set("spark.sql.shuffle.partitions", "1")
lazy val sc = SparkContext.getOrCreate(conf)
lazy val sqlContext = SQLContext.getOrCreate(sc)
- def testWholeStage(values: Int): Unit = {
- val benchmark = new Benchmark("rang/filter/aggregate", values)
+ def runBenchmark(name: String, values: Int)(f: => Unit): Unit = {
+ val benchmark = new Benchmark(name, values)
- benchmark.addCase("Without codegen") { iter =>
- sqlContext.setConf("spark.sql.codegen.wholeStage", "false")
- sqlContext.range(values).filter("(id & 1) = 1").count()
- }
-
- benchmark.addCase("With codegen") { iter =>
- sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
- sqlContext.range(values).filter("(id & 1) = 1").count()
+ Seq(false, true).foreach { enabled =>
+ benchmark.addCase(s"$name codegen=$enabled") { iter =>
+ sqlContext.setConf("spark.sql.codegen.wholeStage", enabled.toString)
+ f
+ }
}
- /*
- Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
- rang/filter/aggregate: Avg Time(ms) Avg Rate(M/s) Relative Rate
- -------------------------------------------------------------------------------
- Without codegen 7775.53 26.97 1.00 X
- With codegen 342.15 612.94 22.73 X
- */
benchmark.run()
}
- def testStatFunctions(values: Int): Unit = {
-
- val benchmark = new Benchmark("stat functions", values)
-
- benchmark.addCase("stddev w/o codegen") { iter =>
- sqlContext.setConf("spark.sql.codegen.wholeStage", "false")
- sqlContext.range(values).groupBy().agg("id" -> "stddev").collect()
+ // These benchmark are skipped in normal build
+ ignore("range/filter/sum") {
+ val N = 500 << 20
+ runBenchmark("rang/filter/sum", N) {
+ sqlContext.range(N).filter("(id & 1) = 1").groupBy().sum().collect()
}
+ /*
+ Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
+ rang/filter/sum: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ -------------------------------------------------------------------------------------------
+ rang/filter/sum codegen=false 14332 / 16646 36.0 27.8 1.0X
+ rang/filter/sum codegen=true 845 / 940 620.0 1.6 17.0X
+ */
+ }
- benchmark.addCase("stddev w codegen") { iter =>
- sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
- sqlContext.range(values).groupBy().agg("id" -> "stddev").collect()
- }
+ ignore("stat functions") {
+ val N = 100 << 20
- benchmark.addCase("kurtosis w/o codegen") { iter =>
- sqlContext.setConf("spark.sql.codegen.wholeStage", "false")
- sqlContext.range(values).groupBy().agg("id" -> "kurtosis").collect()
+ runBenchmark("stddev", N) {
+ sqlContext.range(N).groupBy().agg("id" -> "stddev").collect()
}
- benchmark.addCase("kurtosis w codegen") { iter =>
- sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
- sqlContext.range(values).groupBy().agg("id" -> "kurtosis").collect()
+ runBenchmark("kurtosis", N) {
+ sqlContext.range(N).groupBy().agg("id" -> "kurtosis").collect()
}
@@ -99,64 +92,56 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
Using DeclarativeAggregate:
Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
- stddev: Avg Time(ms) Avg Rate(M/s) Relative Rate
- -------------------------------------------------------------------------------
- stddev w/o codegen 989.22 21.20 1.00 X
- stddev w codegen 352.35 59.52 2.81 X
- kurtosis w/o codegen 3636.91 5.77 0.27 X
- kurtosis w codegen 369.25 56.79 2.68 X
+ stddev: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ -------------------------------------------------------------------------------------------
+ stddev codegen=false 5630 / 5776 18.0 55.6 1.0X
+ stddev codegen=true 1259 / 1314 83.0 12.0 4.5X
+
+ Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
+ kurtosis: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ -------------------------------------------------------------------------------------------
+ kurtosis codegen=false 14847 / 15084 7.0 142.9 1.0X
+ kurtosis codegen=true 1652 / 2124 63.0 15.9 9.0X
*/
- benchmark.run()
}
- def testAggregateWithKey(values: Int): Unit = {
- val benchmark = new Benchmark("Aggregate with keys", values)
+ ignore("aggregate with keys") {
+ val N = 20 << 20
- benchmark.addCase("Aggregate w/o codegen") { iter =>
- sqlContext.setConf("spark.sql.codegen.wholeStage", "false")
- sqlContext.range(values).selectExpr("(id & 65535) as k").groupBy("k").sum().collect()
- }
- benchmark.addCase(s"Aggregate w codegen") { iter =>
- sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
- sqlContext.range(values).selectExpr("(id & 65535) as k").groupBy("k").sum().collect()
+ runBenchmark("Aggregate w keys", N) {
+ sqlContext.range(N).selectExpr("(id & 65535) as k").groupBy("k").sum().collect()
}
/*
Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
- Aggregate with keys: Avg Time(ms) Avg Rate(M/s) Relative Rate
- -------------------------------------------------------------------------------
- Aggregate w/o codegen 4254.38 4.93 1.00 X
- Aggregate w codegen 2661.45 7.88 1.60 X
+ Aggregate w keys: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ -------------------------------------------------------------------------------------------
+ Aggregate w keys codegen=false 2402 / 2551 8.0 125.0 1.0X
+ Aggregate w keys codegen=true 1620 / 1670 12.0 83.3 1.5X
*/
- benchmark.run()
}
- def testBroadcastHashJoin(values: Int): Unit = {
- val benchmark = new Benchmark("BroadcastHashJoin", values)
-
+ ignore("broadcast hash join") {
+ val N = 20 << 20
val dim = broadcast(sqlContext.range(1 << 16).selectExpr("id as k", "cast(id as string) as v"))
- benchmark.addCase("BroadcastHashJoin w/o codegen") { iter =>
- sqlContext.setConf("spark.sql.codegen.wholeStage", "false")
- sqlContext.range(values).join(dim, (col("id") % 60000) === col("k")).count()
- }
- benchmark.addCase(s"BroadcastHashJoin w codegen") { iter =>
- sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
- sqlContext.range(values).join(dim, (col("id") % 60000) === col("k")).count()
+ runBenchmark("BroadcastHashJoin", N) {
+ sqlContext.range(N).join(dim, (col("id") % 60000) === col("k")).count()
}
/*
- Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
- BroadcastHashJoin: Avg Time(ms) Avg Rate(M/s) Relative Rate
- -------------------------------------------------------------------------------
- BroadcastHashJoin w/o codegen 3053.41 3.43 1.00 X
- BroadcastHashJoin w codegen 1028.40 10.20 2.97 X
+ Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
+ BroadcastHashJoin: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ -------------------------------------------------------------------------------------------
+ BroadcastHashJoin codegen=false 4405 / 6147 4.0 250.0 1.0X
+ BroadcastHashJoin codegen=true 1857 / 1878 11.0 90.9 2.4X
*/
- benchmark.run()
}
- def testBytesToBytesMap(values: Int): Unit = {
- val benchmark = new Benchmark("BytesToBytesMap", values)
+ ignore("hash and BytesToBytesMap") {
+ val N = 50 << 20
+
+ val benchmark = new Benchmark("BytesToBytesMap", N)
benchmark.addCase("hash") { iter =>
var i = 0
@@ -167,7 +152,7 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
val value = new UnsafeRow(2)
value.pointTo(valueBytes, Platform.BYTE_ARRAY_OFFSET, 16)
var s = 0
- while (i < values) {
+ while (i < N) {
key.setInt(0, i % 1000)
val h = Murmur3_x86_32.hashUnsafeWords(
key.getBaseObject, key.getBaseOffset, key.getSizeInBytes, 0)
@@ -194,7 +179,7 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
val value = new UnsafeRow(2)
value.pointTo(valueBytes, Platform.BYTE_ARRAY_OFFSET, 16)
var i = 0
- while (i < values) {
+ while (i < N) {
key.setInt(0, i % 65536)
val loc = map.lookup(key.getBaseObject, key.getBaseOffset, key.getSizeInBytes)
if (loc.isDefined) {
@@ -212,21 +197,12 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
/**
Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
- Aggregate with keys: Avg Time(ms) Avg Rate(M/s) Relative Rate
- -------------------------------------------------------------------------------
- hash 662.06 79.19 1.00 X
- BytesToBytesMap (off Heap) 2209.42 23.73 0.30 X
- BytesToBytesMap (on Heap) 2957.68 17.73 0.22 X
+ BytesToBytesMap: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ -------------------------------------------------------------------------------------------
+ hash 628 / 661 83.0 12.0 1.0X
+ BytesToBytesMap (off Heap) 3292 / 3408 15.0 66.7 0.2X
+ BytesToBytesMap (on Heap) 3349 / 4267 15.0 66.7 0.2X
*/
benchmark.run()
}
-
- // These benchmark are skipped in normal build
- ignore("benchmark") {
- // testWholeStage(200 << 20)
- // testStatFunctions(20 << 20)
- // testAggregateWithKey(20 << 20)
- // testBytesToBytesMap(50 << 20)
- // testBroadcastHashJoin(10 << 20)
- }
}