aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorYash Datta <Yash.Datta@guavus.com>2015-04-13 14:43:07 -0700
committerMichael Armbrust <michael@databricks.com>2015-04-13 14:43:07 -0700
commit3a205bbd9e352668a020c3146391e1e4441467af (patch)
tree58149a40dde7593e95ed5f900649a6c2c476babd /sql
parent85ee0cabe87a27b6947c2d3e8525f04c77f80f6f (diff)
downloadspark-3a205bbd9e352668a020c3146391e1e4441467af.tar.gz
spark-3a205bbd9e352668a020c3146391e1e4441467af.tar.bz2
spark-3a205bbd9e352668a020c3146391e1e4441467af.zip
[SQL][SPARK-6742]: Don't push down predicates which reference partition column(s)
cc liancheng Author: Yash Datta <Yash.Datta@guavus.com> Closes #5390 from saucam/fpush and squashes the following commits: 3f026d6 [Yash Datta] SPARK-6742: Fix scalastyle ce3d702 [Yash Datta] SPARK-6742: Add test case, fix scalastyle 8592acc [Yash Datta] SPARK-6742: Don't push down predicates which reference partition column(s)
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala11
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala24
2 files changed, 33 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 5268b73340..f0d92ffffc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -215,6 +215,11 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
table: ParquetRelation, partition, child, overwrite, ifNotExists) =>
InsertIntoParquetTable(table, planLater(child), overwrite) :: Nil
case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) =>
+ val partitionColNames = relation.partitioningAttributes.map(_.name).toSet
+ val filtersToPush = filters.filter { pred =>
+ val referencedColNames = pred.references.map(_.name).toSet
+ referencedColNames.intersect(partitionColNames).isEmpty
+ }
val prunePushedDownFilters =
if (sqlContext.conf.parquetFilterPushDown) {
(predicates: Seq[Expression]) => {
@@ -226,6 +231,10 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
// "A AND B" in the higher-level filter, not just "B".
predicates.map(p => p -> ParquetFilters.createFilter(p)).collect {
case (predicate, None) => predicate
+ // Filter needs to be applied above when it contains partitioning
+ // columns
+ case (predicate, _) if(!predicate.references.map(_.name).toSet
+ .intersect (partitionColNames).isEmpty) => predicate
}
}
} else {
@@ -238,7 +247,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
ParquetTableScan(
_,
relation,
- if (sqlContext.conf.parquetFilterPushDown) filters else Nil)) :: Nil
+ if (sqlContext.conf.parquetFilterPushDown) filtersToPush else Nil)) :: Nil
case _ => Nil
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
index 6a2c2a7c40..10d0ede4dc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
@@ -22,7 +22,7 @@ import parquet.filter2.predicate.Operators._
import parquet.filter2.predicate.{FilterPredicate, Operators}
import org.apache.spark.sql.catalyst.dsl.expressions._
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, Predicate, Row}
+import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.sources.LogicalRelation
import org.apache.spark.sql.test.TestSQLContext
@@ -350,4 +350,26 @@ class ParquetDataSourceOffFilterSuite extends ParquetFilterSuiteBase with Before
override protected def afterAll(): Unit = {
sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
}
+
+ test("SPARK-6742: don't push down predicates which reference partition columns") {
+ import sqlContext.implicits._
+
+ withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED -> "true") {
+ withTempPath { dir =>
+ val path = s"${dir.getCanonicalPath}/part=1"
+ (1 to 3).map(i => (i, i.toString)).toDF("a", "b").saveAsParquetFile(path)
+
+ // If the "part = 1" filter gets pushed down, this query will throw an exception since
+ // "part" is not a valid column in the actual Parquet file
+ val df = DataFrame(sqlContext, org.apache.spark.sql.parquet.ParquetRelation(
+ path,
+ Some(sqlContext.sparkContext.hadoopConfiguration), sqlContext,
+ Seq(AttributeReference("part", IntegerType, false)()) ))
+
+ checkAnswer(
+ df.filter("a = 1 or part = 1"),
+ (1 to 3).map(i => Row(1, i, i.toString)))
+ }
+ }
+ }
}