aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZee Chen <zeechen@us.ibm.com>2015-11-16 14:21:28 -0800
committerMichael Armbrust <michael@databricks.com>2015-11-16 14:21:28 -0800
commit985b38dd2fa5d8f1e23f1c420ce6262e7e3ed181 (patch)
treed6badf05c5b54271d18783c226a8b4bd3c7b7a65
parentb1a9662623951079e80bd7498e064c4cae4977e9 (diff)
downloadspark-985b38dd2fa5d8f1e23f1c420ce6262e7e3ed181.tar.gz
spark-985b38dd2fa5d8f1e23f1c420ce6262e7e3ed181.tar.bz2
spark-985b38dd2fa5d8f1e23f1c420ce6262e7e3ed181.zip
[SPARK-11390][SQL] Query plan with/without filterPushdown indistinguishable
…ishable Propagate pushed filters to PhyicalRDD in DataSourceStrategy.apply Author: Zee Chen <zeechen@us.ibm.com> Closes #9679 from zeocio/spark-11390.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala14
3 files changed, 22 insertions, 4 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 8b41d3d3d8..62620ec642 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -106,7 +106,9 @@ private[sql] object PhysicalRDD {
def createFromDataSource(
output: Seq[Attribute],
rdd: RDD[InternalRow],
- relation: BaseRelation): PhysicalRDD = {
- PhysicalRDD(output, rdd, relation.toString, relation.isInstanceOf[HadoopFsRelation])
+ relation: BaseRelation,
+ extraInformation: String = ""): PhysicalRDD = {
+ PhysicalRDD(output, rdd, relation.toString + extraInformation,
+ relation.isInstanceOf[HadoopFsRelation])
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 9bbbfa7c77..544d5eccec 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -315,6 +315,8 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
// `Filter`s or cannot be handled by `relation`.
val filterCondition = unhandledPredicates.reduceLeftOption(expressions.And)
+ val pushedFiltersString = pushedFilters.mkString(" PushedFilter: [", ",", "] ")
+
if (projects.map(_.toAttribute) == projects &&
projectSet.size == projects.size &&
filterSet.subsetOf(projectSet)) {
@@ -332,7 +334,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
val scan = execution.PhysicalRDD.createFromDataSource(
projects.map(_.toAttribute),
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
- relation.relation)
+ relation.relation, pushedFiltersString)
filterCondition.map(execution.Filter(_, scan)).getOrElse(scan)
} else {
// Don't request columns that are only referenced by pushed filters.
@@ -342,7 +344,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
val scan = execution.PhysicalRDD.createFromDataSource(
requestedColumns,
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
- relation.relation)
+ relation.relation, pushedFiltersString)
execution.Project(
projects, filterCondition.map(execution.Filter(_, scan)).getOrElse(scan))
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index be53ec3e27..dfec139985 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -160,6 +160,20 @@ class PlannerSuite extends SharedSQLContext {
}
}
+ test("SPARK-11390 explain should print PushedFilters of PhysicalRDD") {
+ withTempPath { file =>
+ val path = file.getCanonicalPath
+ testData.write.parquet(path)
+ val df = sqlContext.read.parquet(path)
+ sqlContext.registerDataFrameAsTable(df, "testPushed")
+
+ withTempTable("testPushed") {
+ val exp = sql("select * from testPushed where key = 15").queryExecution.executedPlan
+ assert(exp.toString.contains("PushedFilter: [EqualTo(key,15)]"))
+ }
+ }
+ }
+
test("efficient limit -> project -> sort") {
{
val query =