diff options
author | Yash Datta <Yash.Datta@guavus.com> | 2015-04-13 14:43:07 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2015-04-13 14:43:07 -0700 |
commit | 3a205bbd9e352668a020c3146391e1e4441467af (patch) | |
tree | 58149a40dde7593e95ed5f900649a6c2c476babd /sql/core | |
parent | 85ee0cabe87a27b6947c2d3e8525f04c77f80f6f (diff) | |
download | spark-3a205bbd9e352668a020c3146391e1e4441467af.tar.gz spark-3a205bbd9e352668a020c3146391e1e4441467af.tar.bz2 spark-3a205bbd9e352668a020c3146391e1e4441467af.zip |
[SQL][SPARK-6742]: Don't push down predicates which reference partition column(s)
cc liancheng
Author: Yash Datta <Yash.Datta@guavus.com>
Closes #5390 from saucam/fpush and squashes the following commits:
3f026d6 [Yash Datta] SPARK-6742: Fix scalastyle
ce3d702 [Yash Datta] SPARK-6742: Add test case, fix scalastyle
8592acc [Yash Datta] SPARK-6742: Don't push down predicates which reference partition column(s)
Diffstat (limited to 'sql/core')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala | 11 | ||||
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala | 24 |
2 files changed, 33 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 5268b73340..f0d92ffffc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -215,6 +215,11 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { table: ParquetRelation, partition, child, overwrite, ifNotExists) => InsertIntoParquetTable(table, planLater(child), overwrite) :: Nil case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) => + val partitionColNames = relation.partitioningAttributes.map(_.name).toSet + val filtersToPush = filters.filter { pred => + val referencedColNames = pred.references.map(_.name).toSet + referencedColNames.intersect(partitionColNames).isEmpty + } val prunePushedDownFilters = if (sqlContext.conf.parquetFilterPushDown) { (predicates: Seq[Expression]) => { @@ -226,6 +231,10 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { // "A AND B" in the higher-level filter, not just "B". predicates.map(p => p -> ParquetFilters.createFilter(p)).collect { case (predicate, None) => predicate + // Filter needs to be applied above when it contains partitioning + // columns + case (predicate, _) if(!predicate.references.map(_.name).toSet + .intersect (partitionColNames).isEmpty) => predicate } } } else { @@ -238,7 +247,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { ParquetTableScan( _, relation, - if (sqlContext.conf.parquetFilterPushDown) filters else Nil)) :: Nil + if (sqlContext.conf.parquetFilterPushDown) filtersToPush else Nil)) :: Nil case _ => Nil } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala index 6a2c2a7c40..10d0ede4dc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala @@ -22,7 +22,7 @@ import parquet.filter2.predicate.Operators._ import parquet.filter2.predicate.{FilterPredicate, Operators} import org.apache.spark.sql.catalyst.dsl.expressions._ -import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, Predicate, Row} +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.sources.LogicalRelation import org.apache.spark.sql.test.TestSQLContext @@ -350,4 +350,26 @@ class ParquetDataSourceOffFilterSuite extends ParquetFilterSuiteBase with Before override protected def afterAll(): Unit = { sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString) } + + test("SPARK-6742: don't push down predicates which reference partition columns") { + import sqlContext.implicits._ + + withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED -> "true") { + withTempPath { dir => + val path = s"${dir.getCanonicalPath}/part=1" + (1 to 3).map(i => (i, i.toString)).toDF("a", "b").saveAsParquetFile(path) + + // If the "part = 1" filter gets pushed down, this query will throw an exception since + // "part" is not a valid column in the actual Parquet file + val df = DataFrame(sqlContext, org.apache.spark.sql.parquet.ParquetRelation( + path, + Some(sqlContext.sparkContext.hadoopConfiguration), sqlContext, + Seq(AttributeReference("part", IntegerType, false)()) )) + + checkAnswer( + df.filter("a = 1 or part = 1"), + (1 to 3).map(i => Row(1, i, i.toString))) + } + } + } } |