diff options
author | Dongjoon Hyun <dongjoon@apache.org> | 2016-07-05 16:19:22 +0800 |
---|---|---|
committer | Cheng Lian <lian@databricks.com> | 2016-07-05 16:19:22 +0800 |
commit | 7f7eb3934ea258f2b163a87da06766bf5c7d443d (patch) | |
tree | 461376b83bd368ba4bb677ee604a3a359f1687ce /sql | |
parent | 7742d9f1584150befeb2f3d76cdbd4ea1f37c914 (diff) | |
download | spark-7f7eb3934ea258f2b163a87da06766bf5c7d443d.tar.gz spark-7f7eb3934ea258f2b163a87da06766bf5c7d443d.tar.bz2 spark-7f7eb3934ea258f2b163a87da06766bf5c7d443d.zip |
[SPARK-16360][SQL] Speed up SQL query performance by removing redundant `executePlan` call
## What changes were proposed in this pull request?
Currently, there are a few reports about Spark 2.0 query performance regression for large queries.
This PR speeds up SQL query processing performance by removing redundant **consecutive `executePlan`** call in `Dataset.ofRows` function and `Dataset` instantiation. Specifically, this PR aims to reduce the overhead of SQL query execution plan generation, not real query execution. So, we can not see the result in the Spark Web UI. Please use the following query script. The result is **25.78 sec** -> **12.36 sec** as expected.
**Sample Query**
```scala
val n = 4000
val values = (1 to n).map(_.toString).mkString(", ")
val columns = (1 to n).map("column" + _).mkString(", ")
val query =
s"""
|SELECT $columns
|FROM VALUES ($values) T($columns)
|WHERE 1=2 AND 1 IN ($columns)
|GROUP BY $columns
|ORDER BY $columns
|""".stripMargin
def time[R](block: => R): R = {
val t0 = System.nanoTime()
val result = block
println("Elapsed time: " + ((System.nanoTime - t0) / 1e9) + "s")
result
}
```
**Before**
```scala
scala> time(sql(query))
Elapsed time: 30.138142577s // First query has a little overhead of initialization.
res0: org.apache.spark.sql.DataFrame = [column1: int, column2: int ... 3998 more fields]
scala> time(sql(query))
Elapsed time: 25.787751452s // Let's compare this one.
res1: org.apache.spark.sql.DataFrame = [column1: int, column2: int ... 3998 more fields]
```
**After**
```scala
scala> time(sql(query))
Elapsed time: 17.500279659s // First query has a little overhead of initialization.
res0: org.apache.spark.sql.DataFrame = [column1: int, column2: int ... 3998 more fields]
scala> time(sql(query))
Elapsed time: 12.364812255s // This shows the real difference. The speed up is about 2 times.
res1: org.apache.spark.sql.DataFrame = [column1: int, column2: int ... 3998 more fields]
```
## How was this patch tested?
Manual by the above script.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes #14044 from dongjoon-hyun/SPARK-16360.
Diffstat (limited to 'sql')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 2 |
1 files changed, 1 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index e64669a19c..ededf7f4fe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -62,7 +62,7 @@ private[sql] object Dataset { def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame = { val qe = sparkSession.sessionState.executePlan(logicalPlan) qe.assertAnalyzed() - new Dataset[Row](sparkSession, logicalPlan, RowEncoder(qe.analyzed.schema)) + new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema)) } } |