aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/util/Benchmark.scala120
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala158
2 files changed, 278 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/Benchmark.scala b/core/src/main/scala/org/apache/spark/util/Benchmark.scala
new file mode 100644
index 0000000000..457a1a05a1
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/Benchmark.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import scala.collection.mutable
+
+import org.apache.commons.lang3.SystemUtils
+
+/**
+ * Utility class to benchmark components. An example of how to use this is:
+ * val benchmark = new Benchmark("My Benchmark", valuesPerIteration)
+ * benchmark.addCase("V1")(<function>)
+ * benchmark.addCase("V2")(<function>)
+ * benchmark.run
+ * This will output the average time to run each function and the rate of each function.
+ *
+ * The benchmark function takes one argument that is the iteration that's being run.
+ *
+ * If outputPerIteration is true, the timing for each run will be printed to stdout.
+ */
+private[spark] class Benchmark(
+ name: String, valuesPerIteration: Long,
+ iters: Int = 5,
+ outputPerIteration: Boolean = false) {
+ val benchmarks = mutable.ArrayBuffer.empty[Benchmark.Case]
+
+ def addCase(name: String)(f: Int => Unit): Unit = {
+ benchmarks += Benchmark.Case(name, f)
+ }
+
+ /**
+ * Runs the benchmark and outputs the results to stdout. This should be copied and added as
+ * a comment with the benchmark. Although the results vary from machine to machine, it should
+ * provide some baseline.
+ */
+ def run(): Unit = {
+ require(benchmarks.nonEmpty)
+ // scalastyle:off
+ println("Running benchmark: " + name)
+
+ val results = benchmarks.map { c =>
+ println(" Running case: " + c.name)
+ Benchmark.measure(valuesPerIteration, iters, outputPerIteration)(c.fn)
+ }
+ println
+
+ val firstRate = results.head.avgRate
+ // The results are going to be processor specific so it is useful to include that.
+ println(Benchmark.getProcessorName())
+ printf("%-24s %16s %16s %14s\n", name + ":", "Avg Time(ms)", "Avg Rate(M/s)", "Relative Rate")
+ println("-------------------------------------------------------------------------")
+ results.zip(benchmarks).foreach { r =>
+ printf("%-24s %16s %16s %14s\n",
+ r._2.name,
+ "%10.2f" format r._1.avgMs,
+ "%10.2f" format r._1.avgRate,
+ "%6.2f X" format (r._1.avgRate / firstRate))
+ }
+ println
+ // scalastyle:on
+ }
+}
+
+private[spark] object Benchmark {
+ case class Case(name: String, fn: Int => Unit)
+ case class Result(avgMs: Double, avgRate: Double)
+
+ /**
+ * This should return a user helpful processor information. Getting at this depends on the OS.
+ * This should return something like "Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz"
+ */
+ def getProcessorName(): String = {
+ if (SystemUtils.IS_OS_MAC_OSX) {
+ Utils.executeAndGetOutput(Seq("/usr/sbin/sysctl", "-n", "machdep.cpu.brand_string"))
+ } else if (SystemUtils.IS_OS_LINUX) {
+ Utils.executeAndGetOutput(Seq("/usr/bin/grep", "-m", "1", "\"model name\"", "/proc/cpuinfo"))
+ } else {
+ System.getenv("PROCESSOR_IDENTIFIER")
+ }
+ }
+
+ /**
+ * Runs a single function `f` for iters, returning the average time the function took and
+ * the rate of the function.
+ */
+ def measure(num: Long, iters: Int, outputPerIteration: Boolean)(f: Int => Unit): Result = {
+ var totalTime = 0L
+ for (i <- 0 until iters + 1) {
+ val start = System.nanoTime()
+
+ f(i)
+
+ val end = System.nanoTime()
+ if (i != 0) totalTime += end - start
+
+ if (outputPerIteration) {
+ // scalastyle:off
+ println(s"Iteration $i took ${(end - start) / 1000} microseconds")
+ // scalastyle:on
+ }
+ }
+ Result(totalTime.toDouble / 1000000 / iters, num * iters / (totalTime.toDouble / 1000))
+ }
+}
+
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
new file mode 100644
index 0000000000..cab6abde6d
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.io.File
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+import org.apache.spark.sql.{SQLConf, SQLContext}
+import org.apache.spark.util.{Benchmark, Utils}
+import org.apache.spark.{SparkConf, SparkContext}
+
+/**
+ * Benchmark to measure parquet read performance.
+ * To run this:
+ * spark-submit --class <this class> --jars <spark sql test jar>
+ */
+object ParquetReadBenchmark {
+ val conf = new SparkConf()
+ conf.set("spark.sql.parquet.compression.codec", "snappy")
+ val sc = new SparkContext("local[1]", "test-sql-context", conf)
+ val sqlContext = new SQLContext(sc)
+
+ def withTempPath(f: File => Unit): Unit = {
+ val path = Utils.createTempDir()
+ path.delete()
+ try f(path) finally Utils.deleteRecursively(path)
+ }
+
+ def withTempTable(tableNames: String*)(f: => Unit): Unit = {
+ try f finally tableNames.foreach(sqlContext.dropTempTable)
+ }
+
+ def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
+ val (keys, values) = pairs.unzip
+ val currentValues = keys.map(key => Try(sqlContext.conf.getConfString(key)).toOption)
+ (keys, values).zipped.foreach(sqlContext.conf.setConfString)
+ try f finally {
+ keys.zip(currentValues).foreach {
+ case (key, Some(value)) => sqlContext.conf.setConfString(key, value)
+ case (key, None) => sqlContext.conf.unsetConf(key)
+ }
+ }
+ }
+
+ def intScanBenchmark(values: Int): Unit = {
+ withTempPath { dir =>
+ sqlContext.range(values).write.parquet(dir.getCanonicalPath)
+ withTempTable("tempTable") {
+ sqlContext.read.parquet(dir.getCanonicalPath).registerTempTable("tempTable")
+ val benchmark = new Benchmark("Single Int Column Scan", values)
+
+ benchmark.addCase("SQL Parquet Reader") { iter =>
+ sqlContext.sql("select sum(id) from tempTable").collect()
+ }
+
+ benchmark.addCase("SQL Parquet MR") { iter =>
+ withSQLConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> "false") {
+ sqlContext.sql("select sum(id) from tempTable").collect()
+ }
+ }
+
+ val files = SpecificParquetRecordReaderBase.listDirectory(dir).toArray
+ benchmark.addCase("ParquetReader") { num =>
+ var sum = 0L
+ files.map(_.asInstanceOf[String]).foreach { p =>
+ val reader = new UnsafeRowParquetRecordReader
+ reader.initialize(p, ("id" :: Nil).asJava)
+
+ while (reader.nextKeyValue()) {
+ val record = reader.getCurrentValue
+ if (!record.isNullAt(0)) sum += record.getInt(0)
+ }
+ reader.close()
+ }}
+
+ /*
+ Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz
+ Single Int Column Scan: Avg Time(ms) Avg Rate(M/s) Relative Rate
+ -------------------------------------------------------------------------
+ SQL Parquet Reader 1910.0 13.72 1.00 X
+ SQL Parquet MR 2330.0 11.25 0.82 X
+ ParquetReader 1252.6 20.93 1.52 X
+ */
+ benchmark.run()
+ }
+ }
+ }
+
+ def intStringScanBenchmark(values: Int): Unit = {
+ withTempPath { dir =>
+ withTempTable("t1", "tempTable") {
+ sqlContext.range(values).registerTempTable("t1")
+ sqlContext.sql("select id as c1, cast(id as STRING) as c2 from t1")
+ .write.parquet(dir.getCanonicalPath)
+ sqlContext.read.parquet(dir.getCanonicalPath).registerTempTable("tempTable")
+
+ val benchmark = new Benchmark("Int and String Scan", values)
+
+ benchmark.addCase("SQL Parquet Reader") { iter =>
+ sqlContext.sql("select sum(c1), sum(length(c2)) from tempTable").collect
+ }
+
+ benchmark.addCase("SQL Parquet MR") { iter =>
+ withSQLConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> "false") {
+ sqlContext.sql("select sum(c1), sum(length(c2)) from tempTable").collect
+ }
+ }
+
+ val files = SpecificParquetRecordReaderBase.listDirectory(dir).toArray
+ benchmark.addCase("ParquetReader") { num =>
+ var sum1 = 0L
+ var sum2 = 0L
+ files.map(_.asInstanceOf[String]).foreach { p =>
+ val reader = new UnsafeRowParquetRecordReader
+ reader.initialize(p, null)
+ while (reader.nextKeyValue()) {
+ val record = reader.getCurrentValue
+ if (!record.isNullAt(0)) sum1 += record.getInt(0)
+ if (!record.isNullAt(1)) sum2 += record.getUTF8String(1).numBytes()
+ }
+ reader.close()
+ }
+ }
+
+ /*
+ Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz
+ Int and String Scan: Avg Time(ms) Avg Rate(M/s) Relative Rate
+ -------------------------------------------------------------------------
+ SQL Parquet Reader 2245.6 7.00 1.00 X
+ SQL Parquet MR 2914.2 5.40 0.77 X
+ ParquetReader 1544.6 10.18 1.45 X
+ */
+ benchmark.run()
+ }
+ }
+ }
+
+ def main(args: Array[String]): Unit = {
+ intScanBenchmark(1024 * 1024 * 15)
+ intStringScanBenchmark(1024 * 1024 * 10)
+ }
+}