aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorEric Liang <ekl@databricks.com>2016-06-08 16:21:41 -0700
committerHerman van Hovell <hvanhovell@databricks.com>2016-06-08 16:21:41 -0700
commit4e8ac6edd5808ca8245b39d804c6d4f5ea9d0d36 (patch)
tree463e372d0aa4d8bac31b0bd56a901d6a5fb5861e /core/src
parentca70ab27cc73f6ea7fce5d179ca8f13459c8ba95 (diff)
downloadspark-4e8ac6edd5808ca8245b39d804c6d4f5ea9d0d36.tar.gz
spark-4e8ac6edd5808ca8245b39d804c6d4f5ea9d0d36.tar.bz2
spark-4e8ac6edd5808ca8245b39d804c6d4f5ea9d0d36.zip
[SPARK-15735] Allow specifying min time to run in microbenchmarks
## What changes were proposed in this pull request? This makes microbenchmarks run for at least 2 seconds by default, to allow some time for jit compilation to kick in. ## How was this patch tested? Tested manually with existing microbenchmarks. This change is backwards compatible in that existing microbenchmarks which specified numIters per-case will still run exactly that number of iterations. Microbenchmarks which previously overrode defaultNumIters now override minNumIters. cc hvanhovell Author: Eric Liang <ekl@databricks.com> Author: Eric Liang <ekhliang@gmail.com> Closes #13472 from ericl/spark-15735.
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/util/Benchmark.scala109
1 files changed, 72 insertions, 37 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/Benchmark.scala b/core/src/main/scala/org/apache/spark/util/Benchmark.scala
index 0c685b1918..7def44bd2a 100644
--- a/core/src/main/scala/org/apache/spark/util/Benchmark.scala
+++ b/core/src/main/scala/org/apache/spark/util/Benchmark.scala
@@ -17,10 +17,14 @@
package org.apache.spark.util
+import java.io.{OutputStream, PrintStream}
+
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration._
import scala.util.Try
+import org.apache.commons.io.output.TeeOutputStream
import org.apache.commons.lang3.SystemUtils
/**
@@ -33,18 +37,37 @@ import org.apache.commons.lang3.SystemUtils
*
* The benchmark function takes one argument that is the iteration that's being run.
*
- * If outputPerIteration is true, the timing for each run will be printed to stdout.
+ * @param name name of this benchmark.
+ * @param valuesPerIteration number of values used in the test case, used to compute rows/s.
+ * @param minNumIters the min number of iterations that will be run per case, not counting warm-up.
+ * @param warmupTime amount of time to spend running dummy case iterations for JIT warm-up.
+ * @param minTime further iterations will be run for each case until this time is used up.
+ * @param outputPerIteration if true, the timing for each run will be printed to stdout.
+ * @param output optional output stream to write benchmark results to
*/
private[spark] class Benchmark(
name: String,
valuesPerIteration: Long,
- defaultNumIters: Int = 5,
- outputPerIteration: Boolean = false) {
+ minNumIters: Int = 2,
+ warmupTime: FiniteDuration = 2.seconds,
+ minTime: FiniteDuration = 2.seconds,
+ outputPerIteration: Boolean = false,
+ output: Option[OutputStream] = None) {
+ import Benchmark._
val benchmarks = mutable.ArrayBuffer.empty[Benchmark.Case]
+ val out = if (output.isDefined) {
+ new PrintStream(new TeeOutputStream(System.out, output.get))
+ } else {
+ System.out
+ }
+
/**
* Adds a case to run when run() is called. The given function will be run for several
* iterations to collect timing statistics.
+ *
+ * @param name of the benchmark case
+ * @param numIters if non-zero, forces exactly this many iterations to be run
*/
def addCase(name: String, numIters: Int = 0)(f: Int => Unit): Unit = {
addTimerCase(name, numIters) { timer =>
@@ -58,9 +81,12 @@ private[spark] class Benchmark(
* Adds a case with manual timing control. When the function is run, timing does not start
* until timer.startTiming() is called within the given function. The corresponding
* timer.stopTiming() method must be called before the function returns.
+ *
+ * @param name of the benchmark case
+ * @param numIters if non-zero, forces exactly this many iterations to be run
*/
def addTimerCase(name: String, numIters: Int = 0)(f: Benchmark.Timer => Unit): Unit = {
- benchmarks += Benchmark.Case(name, f, if (numIters == 0) defaultNumIters else numIters)
+ benchmarks += Benchmark.Case(name, f, numIters)
}
/**
@@ -75,28 +101,63 @@ private[spark] class Benchmark(
val results = benchmarks.map { c =>
println(" Running case: " + c.name)
- Benchmark.measure(valuesPerIteration, c.numIters, outputPerIteration)(c.fn)
+ measure(valuesPerIteration, c.numIters)(c.fn)
}
println
val firstBest = results.head.bestMs
// The results are going to be processor specific so it is useful to include that.
- println(Benchmark.getJVMOSInfo())
- println(Benchmark.getProcessorName())
- printf("%-40s %16s %12s %13s %10s\n", name + ":", "Best/Avg Time(ms)", "Rate(M/s)",
+ out.println(Benchmark.getJVMOSInfo())
+ out.println(Benchmark.getProcessorName())
+ out.printf("%-40s %16s %12s %13s %10s\n", name + ":", "Best/Avg Time(ms)", "Rate(M/s)",
"Per Row(ns)", "Relative")
- println("-" * 96)
+ out.println("-" * 96)
results.zip(benchmarks).foreach { case (result, benchmark) =>
- printf("%-40s %16s %12s %13s %10s\n",
+ out.printf("%-40s %16s %12s %13s %10s\n",
benchmark.name,
"%5.0f / %4.0f" format (result.bestMs, result.avgMs),
"%10.1f" format result.bestRate,
"%6.1f" format (1000 / result.bestRate),
"%3.1fX" format (firstBest / result.bestMs))
}
- println
+ out.println
// scalastyle:on
}
+
+ /**
+ * Runs a single function `f` for iters, returning the average time the function took and
+ * the rate of the function.
+ */
+ def measure(num: Long, overrideNumIters: Int)(f: Timer => Unit): Result = {
+ System.gc() // ensures garbage from previous cases don't impact this one
+ val warmupDeadline = warmupTime.fromNow
+ while (!warmupDeadline.isOverdue) {
+ f(new Benchmark.Timer(-1))
+ }
+ val minIters = if (overrideNumIters != 0) overrideNumIters else minNumIters
+ val minDuration = if (overrideNumIters != 0) 0 else minTime.toNanos
+ val runTimes = ArrayBuffer[Long]()
+ var i = 0
+ while (i < minIters || runTimes.sum < minDuration) {
+ val timer = new Benchmark.Timer(i)
+ f(timer)
+ val runTime = timer.totalTime()
+ runTimes += runTime
+
+ if (outputPerIteration) {
+ // scalastyle:off
+ println(s"Iteration $i took ${runTime / 1000} microseconds")
+ // scalastyle:on
+ }
+ i += 1
+ }
+ // scalastyle:off
+ println(s" Stopped after $i iterations, ${runTimes.sum / 1000000} ms")
+ // scalastyle:on
+ val best = runTimes.min
+ val avg = runTimes.sum / runTimes.size
+ Result(avg / 1000000.0, num / (best / 1000.0), best / 1000000.0)
+ }
}
private[spark] object Benchmark {
@@ -161,30 +222,4 @@ private[spark] object Benchmark {
val osVersion = System.getProperty("os.version")
s"${vmName} ${runtimeVersion} on ${osName} ${osVersion}"
}
-
- /**
- * Runs a single function `f` for iters, returning the average time the function took and
- * the rate of the function.
- */
- def measure(num: Long, iters: Int, outputPerIteration: Boolean)(f: Timer => Unit): Result = {
- val runTimes = ArrayBuffer[Long]()
- for (i <- 0 until iters + 1) {
- val timer = new Benchmark.Timer(i)
- f(timer)
- val runTime = timer.totalTime()
- if (i > 0) {
- runTimes += runTime
- }
-
- if (outputPerIteration) {
- // scalastyle:off
- println(s"Iteration $i took ${runTime / 1000} microseconds")
- // scalastyle:on
- }
- }
- val best = runTimes.min
- val avg = runTimes.sum / iters
- Result(avg / 1000000.0, num / (best / 1000.0), best / 1000000.0)
- }
}
-