aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-12-27 22:42:28 +0800
committerWenchen Fan <wenchen@databricks.com>2016-12-27 22:42:28 +0800
commita05cc425a0a7d18570b99883993a04ad175aa071 (patch)
treedeb783fc9c45f18b651cfce815d46e6d2fc6facf /sql/core
parentc2a2069dae2bc438b0d196c240c06eac8b4e6761 (diff)
downloadspark-a05cc425a0a7d18570b99883993a04ad175aa071.tar.gz
spark-a05cc425a0a7d18570b99883993a04ad175aa071.tar.bz2
spark-a05cc425a0a7d18570b99883993a04ad175aa071.zip
[SPARK-18990][SQL] make DatasetBenchmark fairer for Dataset
## What changes were proposed in this pull request? Currently `DatasetBenchmark` use `case class Data(l: Long, s: String)` as the record type of `RDD` and `Dataset`, which introduce serialization overhead only to `Dataset` and is unfair. This PR use `Long` as the record type, to be fairer for `Dataset` ## How was this patch tested? existing tests Author: Wenchen Fan <wenchen@databricks.com> Closes #16391 from cloud-fan/benchmark.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DatasetBenchmark.scala75
1 files changed, 42 insertions, 33 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetBenchmark.scala
index 66d94d6016..cd925e630e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetBenchmark.scala
@@ -17,7 +17,6 @@
package org.apache.spark.sql
-import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.expressions.scalalang.typed
import org.apache.spark.sql.functions._
@@ -34,11 +33,13 @@ object DatasetBenchmark {
def backToBackMap(spark: SparkSession, numRows: Long, numChains: Int): Benchmark = {
import spark.implicits._
- val df = spark.range(1, numRows).select($"id".as("l"), $"id".cast(StringType).as("s"))
+ val rdd = spark.sparkContext.range(0, numRows)
+ val ds = spark.range(0, numRows)
+ val df = ds.toDF("l")
+ val func = (l: Long) => l + 1
+
val benchmark = new Benchmark("back-to-back map", numRows)
- val func = (d: Data) => Data(d.l + 1, d.s)
- val rdd = spark.sparkContext.range(1, numRows).map(l => Data(l, l.toString))
benchmark.addCase("RDD") { iter =>
var res = rdd
var i = 0
@@ -53,14 +54,14 @@ object DatasetBenchmark {
var res = df
var i = 0
while (i < numChains) {
- res = res.select($"l" + 1 as "l", $"s")
+ res = res.select($"l" + 1 as "l")
i += 1
}
res.queryExecution.toRdd.foreach(_ => Unit)
}
benchmark.addCase("Dataset") { iter =>
- var res = df.as[Data]
+ var res = ds.as[Long]
var i = 0
while (i < numChains) {
res = res.map(func)
@@ -75,14 +76,14 @@ object DatasetBenchmark {
def backToBackFilter(spark: SparkSession, numRows: Long, numChains: Int): Benchmark = {
import spark.implicits._
- val df = spark.range(1, numRows).select($"id".as("l"), $"id".cast(StringType).as("s"))
+ val rdd = spark.sparkContext.range(0, numRows)
+ val ds = spark.range(0, numRows)
+ val df = ds.toDF("l")
+ val func = (l: Long, i: Int) => l % (100L + i) == 0L
+ val funcs = 0.until(numChains).map { i => (l: Long) => func(l, i) }
+
val benchmark = new Benchmark("back-to-back filter", numRows)
- val func = (d: Data, i: Int) => d.l % (100L + i) == 0L
- val funcs = 0.until(numChains).map { i =>
- (d: Data) => func(d, i)
- }
- val rdd = spark.sparkContext.range(1, numRows).map(l => Data(l, l.toString))
benchmark.addCase("RDD") { iter =>
var res = rdd
var i = 0
@@ -104,7 +105,7 @@ object DatasetBenchmark {
}
benchmark.addCase("Dataset") { iter =>
- var res = df.as[Data]
+ var res = ds.as[Long]
var i = 0
while (i < numChains) {
res = res.filter(funcs(i))
@@ -133,24 +134,29 @@ object DatasetBenchmark {
def aggregate(spark: SparkSession, numRows: Long): Benchmark = {
import spark.implicits._
- val df = spark.range(1, numRows).select($"id".as("l"), $"id".cast(StringType).as("s"))
+ val rdd = spark.sparkContext.range(0, numRows)
+ val ds = spark.range(0, numRows)
+ val df = ds.toDF("l")
+
val benchmark = new Benchmark("aggregate", numRows)
- val rdd = spark.sparkContext.range(1, numRows).map(l => Data(l, l.toString))
benchmark.addCase("RDD sum") { iter =>
- rdd.aggregate(0L)(_ + _.l, _ + _)
+ rdd.map(l => (l % 10, l)).reduceByKey(_ + _).foreach(_ => Unit)
}
benchmark.addCase("DataFrame sum") { iter =>
- df.select(sum($"l")).queryExecution.toRdd.foreach(_ => Unit)
+ df.groupBy($"l" % 10).agg(sum($"l")).queryExecution.toRdd.foreach(_ => Unit)
}
benchmark.addCase("Dataset sum using Aggregator") { iter =>
- df.as[Data].select(typed.sumLong((d: Data) => d.l)).queryExecution.toRdd.foreach(_ => Unit)
+ val result = ds.as[Long].groupByKey(_ % 10).agg(typed.sumLong[Long](identity))
+ result.queryExecution.toRdd.foreach(_ => Unit)
}
+ val complexDs = df.select($"l", $"l".cast(StringType).as("s")).as[Data]
benchmark.addCase("Dataset complex Aggregator") { iter =>
- df.as[Data].select(ComplexAggregator.toColumn).queryExecution.toRdd.foreach(_ => Unit)
+ val result = complexDs.groupByKey(_.l % 10).agg(ComplexAggregator.toColumn)
+ result.queryExecution.toRdd.foreach(_ => Unit)
}
benchmark
@@ -170,36 +176,39 @@ object DatasetBenchmark {
val benchmark3 = aggregate(spark, numRows)
/*
- OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 3.10.0-327.18.2.el7.x86_64
- Intel Xeon E3-12xx v2 (Ivy Bridge)
+ Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.12.1
+ Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
+
back-to-back map: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
- RDD 3448 / 3646 29.0 34.5 1.0X
- DataFrame 2647 / 3116 37.8 26.5 1.3X
- Dataset 4781 / 5155 20.9 47.8 0.7X
+ RDD 3963 / 3976 25.2 39.6 1.0X
+ DataFrame 826 / 834 121.1 8.3 4.8X
+ Dataset 5178 / 5198 19.3 51.8 0.8X
*/
benchmark.run()
/*
- OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 3.10.0-327.18.2.el7.x86_64
- Intel Xeon E3-12xx v2 (Ivy Bridge)
+ Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.12.1
+ Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
+
back-to-back filter: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
- RDD 1346 / 1618 74.3 13.5 1.0X
- DataFrame 59 / 72 1695.4 0.6 22.8X
- Dataset 2777 / 2805 36.0 27.8 0.5X
+ RDD 533 / 587 187.6 5.3 1.0X
+ DataFrame 79 / 91 1269.0 0.8 6.8X
+ Dataset 550 / 559 181.7 5.5 1.0X
*/
benchmark2.run()
/*
Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.12.1
Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
+
aggregate: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
- RDD sum 1913 / 1942 52.3 19.1 1.0X
- DataFrame sum 46 / 61 2157.7 0.5 41.3X
- Dataset sum using Aggregator 4656 / 4758 21.5 46.6 0.4X
- Dataset complex Aggregator 6636 / 7039 15.1 66.4 0.3X
+ RDD sum 2297 / 2440 43.5 23.0 1.0X
+ DataFrame sum 630 / 637 158.7 6.3 3.6X
+ Dataset sum using Aggregator 3129 / 3247 32.0 31.3 0.7X
+ Dataset complex Aggregator 12109 / 12142 8.3 121.1 0.2X
*/
benchmark3.run()
}