aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2016-01-16 10:29:27 -0800
committerDavies Liu <davies.liu@gmail.com>2016-01-16 10:29:27 -0800
commit3c0d2365d57fc49ac9bf0d7cc9bd2ef633fb5fb6 (patch)
tree2e05f1fbb9eec2c870081b00b9e60505db07e1b9 /sql/hive
parent86972fa52152d2149b88ba75be048a6986006285 (diff)
downloadspark-3c0d2365d57fc49ac9bf0d7cc9bd2ef633fb5fb6.tar.gz
spark-3c0d2365d57fc49ac9bf0d7cc9bd2ef633fb5fb6.tar.bz2
spark-3c0d2365d57fc49ac9bf0d7cc9bd2ef633fb5fb6.zip
[SPARK-12796] [SQL] Whole stage codegen
This is the initial work for whole stage codegen, it support Projection/Filter/Range, we will continue work on this to support more physical operators. A micro benchmark show that a query with range, filter and projection could be 3X faster then before. It's turned on by default. For a tree that have at least two chained plans, a WholeStageCodegen will be inserted into it, for example, the following plan ``` Limit 10 +- Project [(id#5L + 1) AS (id + 1)#6L] +- Filter ((id#5L & 1) = 1) +- Range 0, 1, 4, 10, [id#5L] ``` will be translated into ``` Limit 10 +- WholeStageCodegen +- Project [(id#1L + 1) AS (id + 1)#2L] +- Filter ((id#1L & 1) = 1) +- Range 0, 1, 4, 10, [id#1L] ``` Here is the call graph to generate Java source for A and B (A support codegen, but B does not): ``` * WholeStageCodegen Plan A FakeInput Plan B * ========================================================================= * * -> execute() * | * doExecute() --------> produce() * | * doProduce() -------> produce() * | * doProduce() ---> execute() * | * consume() * doConsume() ------------| * | * doConsume() <----- consume() ``` A SparkPlan that support codegen need to implement doProduce() and doConsume(): ``` def doProduce(ctx: CodegenContext): (RDD[InternalRow], String) def doConsume(ctx: CodegenContext, child: SparkPlan, input: Seq[ExprCode]): String ``` Author: Davies Liu <davies@databricks.com> Closes #10735 from davies/whole2.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTypeCoercionSuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala8
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala6
7 files changed, 14 insertions, 14 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index 9b37dd1103..11863caffe 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -30,12 +30,12 @@ class CachedTableSuite extends QueryTest with TestHiveSingleton {
import hiveContext._
def rddIdOf(tableName: String): Int = {
- val executedPlan = table(tableName).queryExecution.executedPlan
- executedPlan.collect {
+ val plan = table(tableName).queryExecution.sparkPlan
+ plan.collect {
case InMemoryColumnarTableScan(_, _, relation) =>
relation.cachedColumnBuffers.id
case _ =>
- fail(s"Table $tableName is not cached\n" + executedPlan)
+ fail(s"Table $tableName is not cached\n" + plan)
}.head
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
index fd3339a66b..2e0a8698e6 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
@@ -485,7 +485,7 @@ abstract class HiveComparisonTest
val executions = queryList.map(new TestHive.QueryExecution(_))
executions.foreach(_.toRdd)
val tablesGenerated = queryList.zip(executions).flatMap {
- case (q, e) => e.executedPlan.collect {
+ case (q, e) => e.sparkPlan.collect {
case i: InsertIntoHiveTable if tablesRead contains i.table.tableName =>
(q, e, i)
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTypeCoercionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTypeCoercionSuite.scala
index 5bd323ea09..d2f91861ff 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTypeCoercionSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTypeCoercionSuite.scala
@@ -43,7 +43,7 @@ class HiveTypeCoercionSuite extends HiveComparisonTest {
test("[SPARK-2210] boolean cast on boolean value should be removed") {
val q = "select cast(cast(key=0 as boolean) as boolean) from src"
- val project = TestHive.sql(q).queryExecution.executedPlan.collect {
+ val project = TestHive.sql(q).queryExecution.sparkPlan.collect {
case e: Project => e
}.head
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala
index 210d566745..b91248bfb3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala
@@ -144,7 +144,7 @@ class PruningSuite extends HiveComparisonTest with BeforeAndAfter {
expectedScannedColumns: Seq[String],
expectedPartValues: Seq[Seq[String]]): Unit = {
test(s"$testCaseName - pruning test") {
- val plan = new TestHive.QueryExecution(sql).executedPlan
+ val plan = new TestHive.QueryExecution(sql).sparkPlan
val actualOutputColumns = plan.output.map(_.name)
val (actualScannedColumns, actualPartValues) = plan.collect {
case p @ HiveTableScan(columns, relation, _) =>
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index ed544c6380..c997453803 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -190,11 +190,11 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
test(s"conversion is working") {
assert(
- sql("SELECT * FROM normal_parquet").queryExecution.executedPlan.collect {
+ sql("SELECT * FROM normal_parquet").queryExecution.sparkPlan.collect {
case _: HiveTableScan => true
}.isEmpty)
assert(
- sql("SELECT * FROM normal_parquet").queryExecution.executedPlan.collect {
+ sql("SELECT * FROM normal_parquet").queryExecution.sparkPlan.collect {
case _: PhysicalRDD => true
}.nonEmpty)
}
@@ -305,7 +305,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
""".stripMargin)
val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt")
- df.queryExecution.executedPlan match {
+ df.queryExecution.sparkPlan match {
case ExecutedCommand(InsertIntoHadoopFsRelation(_: ParquetRelation, _, _)) => // OK
case o => fail("test_insert_parquet should be converted to a " +
s"${classOf[ParquetRelation].getCanonicalName} and " +
@@ -335,7 +335,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
""".stripMargin)
val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array")
- df.queryExecution.executedPlan match {
+ df.queryExecution.sparkPlan match {
case ExecutedCommand(InsertIntoHadoopFsRelation(r: ParquetRelation, _, _)) => // OK
case o => fail("test_insert_parquet should be converted to a " +
s"${classOf[ParquetRelation].getCanonicalName} and " +
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
index e866493ee6..ba2a483bba 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
@@ -149,7 +149,7 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
sqlContext.range(2).select('id as 'a, 'id as 'b).write.partitionBy("b").parquet(path)
val df = sqlContext.read.parquet(path).filter('a === 0).select('b)
- val physicalPlan = df.queryExecution.executedPlan
+ val physicalPlan = df.queryExecution.sparkPlan
assert(physicalPlan.collect { case p: execution.Project => p }.length === 1)
assert(physicalPlan.collect { case p: execution.Filter => p }.length === 1)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
index 058c101eeb..9ab3e11609 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
@@ -156,9 +156,9 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest with Predicat
test(s"pruning and filtering: df.select(${projections.mkString(", ")}).where($filter)") {
val df = partitionedDF.where(filter).select(projections: _*)
val queryExecution = df.queryExecution
- val executedPlan = queryExecution.executedPlan
+ val sparkPlan = queryExecution.sparkPlan
- val rawScan = executedPlan.collect {
+ val rawScan = sparkPlan.collect {
case p: PhysicalRDD => p
} match {
case Seq(scan) => scan
@@ -177,7 +177,7 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest with Predicat
assert(requiredColumns === SimpleTextRelation.requiredColumns)
val nonPushedFilters = {
- val boundFilters = executedPlan.collect {
+ val boundFilters = sparkPlan.collect {
case f: execution.Filter => f
} match {
case Nil => Nil