diff options
author | Cheng Hao <hao.cheng@intel.com> | 2014-08-27 12:50:47 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2014-08-27 12:50:47 -0700 |
commit | 4238c17dc9e1f2f93cc9e6c768f92bd27bf1df66 (patch) | |
tree | e26dbad704aa2c96c2e58f8a2765ebe00eb3febe /bin/spark-class2.cmd | |
parent | 3b5eb7083d3e1955de288e4fd365dca6221f32fb (diff) | |
download | spark-4238c17dc9e1f2f93cc9e6c768f92bd27bf1df66.tar.gz spark-4238c17dc9e1f2f93cc9e6c768f92bd27bf1df66.tar.bz2 spark-4238c17dc9e1f2f93cc9e6c768f92bd27bf1df66.zip |
[SPARK-3197] [SQL] Reduce the Expression tree object creations for aggregation function (min/max)
Aggregation function min/max in catalyst will create expression tree for each single row, however, the expression tree creation is quite expensive in a multithreading env currently. Hence we got a very bad performance for the min/max.
Here is the benchmark that I've done in my local.
Master | Previous Result (ms) | Current Result (ms)
------------ | ------------- | -------------
local | 3645 | 3416
local[6] | 3602 | 1002
The Benchmark source code.
```
case class Record(key: Int, value: Int)
object TestHive2 extends HiveContext(new SparkContext("local[6]", "TestSQLContext", new SparkConf()))
object DataPrepare extends App {
import TestHive2._
val rdd = sparkContext.parallelize((1 to 10000000).map(i => Record(i % 3000, i)), 12)
runSqlHive("SHOW TABLES")
runSqlHive("DROP TABLE if exists a")
runSqlHive("DROP TABLE if exists result")
rdd.registerAsTable("records")
runSqlHive("""CREATE TABLE a (key INT, value INT)
| ROW FORMAT SERDE
| 'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe'
| STORED AS RCFILE
""".stripMargin)
runSqlHive("""CREATE TABLE result (key INT, value INT)
| ROW FORMAT SERDE
| 'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe'
| STORED AS RCFILE
""".stripMargin)
hql(s"""from records
| insert into table a
| select key, value
""".stripMargin)
}
object PerformanceTest extends App {
import TestHive2._
hql("SHOW TABLES")
hql("set spark.sql.shuffle.partitions=12")
val cmd = "select min(value), max(value) from a group by key"
val results = ("Result1", benchmark(cmd)) ::
("Result2", benchmark(cmd)) ::
("Result3", benchmark(cmd)) :: Nil
results.foreach { case (prompt, result) => {
println(s"$prompt: took ${result._1} ms (${result._2} records)")
}
}
def benchmark(cmd: String) = {
val begin = System.currentTimeMillis()
val count = hql(cmd).count
val end = System.currentTimeMillis()
((end - begin), count)
}
}
```
Author: Cheng Hao <hao.cheng@intel.com>
Closes #2113 from chenghao-intel/aggregation_expression_optimization and squashes the following commits:
db40395 [Cheng Hao] remove the transient and add val for the expression property
d56167d [Cheng Hao] Reduce the Expressions creation
Diffstat (limited to 'bin/spark-class2.cmd')
0 files changed, 0 insertions, 0 deletions