diff options
author | Davies Liu <davies@databricks.com> | 2016-01-16 10:29:27 -0800 |
---|---|---|
committer | Davies Liu <davies.liu@gmail.com> | 2016-01-16 10:29:27 -0800 |
commit | 3c0d2365d57fc49ac9bf0d7cc9bd2ef633fb5fb6 (patch) | |
tree | 2e05f1fbb9eec2c870081b00b9e60505db07e1b9 /sql/hive | |
parent | 86972fa52152d2149b88ba75be048a6986006285 (diff) | |
download | spark-3c0d2365d57fc49ac9bf0d7cc9bd2ef633fb5fb6.tar.gz spark-3c0d2365d57fc49ac9bf0d7cc9bd2ef633fb5fb6.tar.bz2 spark-3c0d2365d57fc49ac9bf0d7cc9bd2ef633fb5fb6.zip |
[SPARK-12796] [SQL] Whole stage codegen
This is the initial work for whole stage codegen, it support Projection/Filter/Range, we will continue work on this to support more physical operators.
A micro benchmark show that a query with range, filter and projection could be 3X faster then before.
It's turned on by default. For a tree that have at least two chained plans, a WholeStageCodegen will be inserted into it, for example, the following plan
```
Limit 10
+- Project [(id#5L + 1) AS (id + 1)#6L]
+- Filter ((id#5L & 1) = 1)
+- Range 0, 1, 4, 10, [id#5L]
```
will be translated into
```
Limit 10
+- WholeStageCodegen
+- Project [(id#1L + 1) AS (id + 1)#2L]
+- Filter ((id#1L & 1) = 1)
+- Range 0, 1, 4, 10, [id#1L]
```
Here is the call graph to generate Java source for A and B (A support codegen, but B does not):
```
* WholeStageCodegen Plan A FakeInput Plan B
* =========================================================================
*
* -> execute()
* |
* doExecute() --------> produce()
* |
* doProduce() -------> produce()
* |
* doProduce() ---> execute()
* |
* consume()
* doConsume() ------------|
* |
* doConsume() <----- consume()
```
A SparkPlan that support codegen need to implement doProduce() and doConsume():
```
def doProduce(ctx: CodegenContext): (RDD[InternalRow], String)
def doConsume(ctx: CodegenContext, child: SparkPlan, input: Seq[ExprCode]): String
```
Author: Davies Liu <davies@databricks.com>
Closes #10735 from davies/whole2.
Diffstat (limited to 'sql/hive')
7 files changed, 14 insertions, 14 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala index 9b37dd1103..11863caffe 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala @@ -30,12 +30,12 @@ class CachedTableSuite extends QueryTest with TestHiveSingleton { import hiveContext._ def rddIdOf(tableName: String): Int = { - val executedPlan = table(tableName).queryExecution.executedPlan - executedPlan.collect { + val plan = table(tableName).queryExecution.sparkPlan + plan.collect { case InMemoryColumnarTableScan(_, _, relation) => relation.cachedColumnBuffers.id case _ => - fail(s"Table $tableName is not cached\n" + executedPlan) + fail(s"Table $tableName is not cached\n" + plan) }.head } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala index fd3339a66b..2e0a8698e6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala @@ -485,7 +485,7 @@ abstract class HiveComparisonTest val executions = queryList.map(new TestHive.QueryExecution(_)) executions.foreach(_.toRdd) val tablesGenerated = queryList.zip(executions).flatMap { - case (q, e) => e.executedPlan.collect { + case (q, e) => e.sparkPlan.collect { case i: InsertIntoHiveTable if tablesRead contains i.table.tableName => (q, e, i) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTypeCoercionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTypeCoercionSuite.scala index 5bd323ea09..d2f91861ff 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTypeCoercionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTypeCoercionSuite.scala @@ -43,7 +43,7 @@ class HiveTypeCoercionSuite extends HiveComparisonTest { test("[SPARK-2210] boolean cast on boolean value should be removed") { val q = "select cast(cast(key=0 as boolean) as boolean) from src" - val project = TestHive.sql(q).queryExecution.executedPlan.collect { + val project = TestHive.sql(q).queryExecution.sparkPlan.collect { case e: Project => e }.head diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala index 210d566745..b91248bfb3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala @@ -144,7 +144,7 @@ class PruningSuite extends HiveComparisonTest with BeforeAndAfter { expectedScannedColumns: Seq[String], expectedPartValues: Seq[Seq[String]]): Unit = { test(s"$testCaseName - pruning test") { - val plan = new TestHive.QueryExecution(sql).executedPlan + val plan = new TestHive.QueryExecution(sql).sparkPlan val actualOutputColumns = plan.output.map(_.name) val (actualScannedColumns, actualPartValues) = plan.collect { case p @ HiveTableScan(columns, relation, _) => diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index ed544c6380..c997453803 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -190,11 +190,11 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { test(s"conversion is working") { assert( - sql("SELECT * FROM normal_parquet").queryExecution.executedPlan.collect { + sql("SELECT * FROM normal_parquet").queryExecution.sparkPlan.collect { case _: HiveTableScan => true }.isEmpty) assert( - sql("SELECT * FROM normal_parquet").queryExecution.executedPlan.collect { + sql("SELECT * FROM normal_parquet").queryExecution.sparkPlan.collect { case _: PhysicalRDD => true }.nonEmpty) } @@ -305,7 +305,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { """.stripMargin) val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt") - df.queryExecution.executedPlan match { + df.queryExecution.sparkPlan match { case ExecutedCommand(InsertIntoHadoopFsRelation(_: ParquetRelation, _, _)) => // OK case o => fail("test_insert_parquet should be converted to a " + s"${classOf[ParquetRelation].getCanonicalName} and " + @@ -335,7 +335,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { """.stripMargin) val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array") - df.queryExecution.executedPlan match { + df.queryExecution.sparkPlan match { case ExecutedCommand(InsertIntoHadoopFsRelation(r: ParquetRelation, _, _)) => // OK case o => fail("test_insert_parquet should be converted to a " + s"${classOf[ParquetRelation].getCanonicalName} and " + diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala index e866493ee6..ba2a483bba 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala @@ -149,7 +149,7 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest { sqlContext.range(2).select('id as 'a, 'id as 'b).write.partitionBy("b").parquet(path) val df = sqlContext.read.parquet(path).filter('a === 0).select('b) - val physicalPlan = df.queryExecution.executedPlan + val physicalPlan = df.queryExecution.sparkPlan assert(physicalPlan.collect { case p: execution.Project => p }.length === 1) assert(physicalPlan.collect { case p: execution.Filter => p }.length === 1) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala index 058c101eeb..9ab3e11609 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala @@ -156,9 +156,9 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest with Predicat test(s"pruning and filtering: df.select(${projections.mkString(", ")}).where($filter)") { val df = partitionedDF.where(filter).select(projections: _*) val queryExecution = df.queryExecution - val executedPlan = queryExecution.executedPlan + val sparkPlan = queryExecution.sparkPlan - val rawScan = executedPlan.collect { + val rawScan = sparkPlan.collect { case p: PhysicalRDD => p } match { case Seq(scan) => scan @@ -177,7 +177,7 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest with Predicat assert(requiredColumns === SimpleTextRelation.requiredColumns) val nonPushedFilters = { - val boundFilters = executedPlan.collect { + val boundFilters = sparkPlan.collect { case f: execution.Filter => f } match { case Nil => Nil |