aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDongjoon Hyun <dongjoon@apache.org>2016-07-05 16:19:22 +0800
committerCheng Lian <lian@databricks.com>2016-07-05 16:19:22 +0800
commit7f7eb3934ea258f2b163a87da06766bf5c7d443d (patch)
tree461376b83bd368ba4bb677ee604a3a359f1687ce
parent7742d9f1584150befeb2f3d76cdbd4ea1f37c914 (diff)
downloadspark-7f7eb3934ea258f2b163a87da06766bf5c7d443d.tar.gz
spark-7f7eb3934ea258f2b163a87da06766bf5c7d443d.tar.bz2
spark-7f7eb3934ea258f2b163a87da06766bf5c7d443d.zip
[SPARK-16360][SQL] Speed up SQL query performance by removing redundant `executePlan` call
## What changes were proposed in this pull request? Currently, there are a few reports about Spark 2.0 query performance regression for large queries. This PR speeds up SQL query processing performance by removing redundant **consecutive `executePlan`** call in `Dataset.ofRows` function and `Dataset` instantiation. Specifically, this PR aims to reduce the overhead of SQL query execution plan generation, not real query execution. So, we can not see the result in the Spark Web UI. Please use the following query script. The result is **25.78 sec** -> **12.36 sec** as expected. **Sample Query** ```scala val n = 4000 val values = (1 to n).map(_.toString).mkString(", ") val columns = (1 to n).map("column" + _).mkString(", ") val query = s""" |SELECT $columns |FROM VALUES ($values) T($columns) |WHERE 1=2 AND 1 IN ($columns) |GROUP BY $columns |ORDER BY $columns |""".stripMargin def time[R](block: => R): R = { val t0 = System.nanoTime() val result = block println("Elapsed time: " + ((System.nanoTime - t0) / 1e9) + "s") result } ``` **Before** ```scala scala> time(sql(query)) Elapsed time: 30.138142577s // First query has a little overhead of initialization. res0: org.apache.spark.sql.DataFrame = [column1: int, column2: int ... 3998 more fields] scala> time(sql(query)) Elapsed time: 25.787751452s // Let's compare this one. res1: org.apache.spark.sql.DataFrame = [column1: int, column2: int ... 3998 more fields] ``` **After** ```scala scala> time(sql(query)) Elapsed time: 17.500279659s // First query has a little overhead of initialization. res0: org.apache.spark.sql.DataFrame = [column1: int, column2: int ... 3998 more fields] scala> time(sql(query)) Elapsed time: 12.364812255s // This shows the real difference. The speed up is about 2 times. res1: org.apache.spark.sql.DataFrame = [column1: int, column2: int ... 3998 more fields] ``` ## How was this patch tested? Manual by the above script. Author: Dongjoon Hyun <dongjoon@apache.org> Closes #14044 from dongjoon-hyun/SPARK-16360.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala2
1 files changed, 1 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index e64669a19c..ededf7f4fe 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -62,7 +62,7 @@ private[sql] object Dataset {
def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame = {
val qe = sparkSession.sessionState.executePlan(logicalPlan)
qe.assertAnalyzed()
- new Dataset[Row](sparkSession, logicalPlan, RowEncoder(qe.analyzed.schema))
+ new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema))
}
}