aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-03-26 13:11:37 -0700
committerMichael Armbrust <michael@databricks.com>2015-03-26 13:11:37 -0700
commit71a0d40ebd37c80d8020e184366778b57c762285 (patch)
treebb91e2ec5ef9627937a90600a5a544abb6224c2d /sql
parent784fcd532784fcfd9bf0a1db71c9f71c469ee716 (diff)
downloadspark-71a0d40ebd37c80d8020e184366778b57c762285.tar.gz
spark-71a0d40ebd37c80d8020e184366778b57c762285.tar.bz2
spark-71a0d40ebd37c80d8020e184366778b57c762285.zip
[SPARK-6554] [SQL] Don't push down predicates which reference partition column(s)
There are two cases for the new Parquet data source: 1. Partition columns exist in the Parquet data files We don't need to push-down these predicates since partition pruning already handles them. 1. Partition columns don't exist in the Parquet data files We can't push-down these predicates since they are considered as invalid columns by Parquet. <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/5210) <!-- Reviewable:end --> Author: Cheng Lian <lian@databricks.com> Closes #5210 from liancheng/spark-6554 and squashes the following commits: 4f7ec03 [Cheng Lian] Adds comments e134ced [Cheng Lian] Don't push down predicates which reference partition column(s)
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala17
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala17
2 files changed, 29 insertions, 5 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 3516cfe680..0d68810ec6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -435,11 +435,18 @@ private[sql] case class ParquetRelation2(
// Push down filters when possible. Notice that not all filters can be converted to Parquet
// filter predicate. Here we try to convert each individual predicate and only collect those
// convertible ones.
- predicates
- .flatMap(ParquetFilters.createFilter)
- .reduceOption(FilterApi.and)
- .filter(_ => sqlContext.conf.parquetFilterPushDown)
- .foreach(ParquetInputFormat.setFilterPredicate(jobConf, _))
+ if (sqlContext.conf.parquetFilterPushDown) {
+ predicates
+ // Don't push down predicates which reference partition columns
+ .filter { pred =>
+ val partitionColNames = partitionColumns.map(_.name).toSet
+ val referencedColNames = pred.references.map(_.name).toSet
+ referencedColNames.intersect(partitionColNames).isEmpty
+ }
+ .flatMap(ParquetFilters.createFilter)
+ .reduceOption(FilterApi.and)
+ .foreach(ParquetInputFormat.setFilterPredicate(jobConf, _))
+ }
if (isPartitioned) {
logInfo {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
index 4d32e84fc1..6a2c2a7c40 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
@@ -321,6 +321,23 @@ class ParquetDataSourceOnFilterSuite extends ParquetFilterSuiteBase with BeforeA
override protected def afterAll(): Unit = {
sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
}
+
+ test("SPARK-6554: don't push down predicates which reference partition columns") {
+ import sqlContext.implicits._
+
+ withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED -> "true") {
+ withTempPath { dir =>
+ val path = s"${dir.getCanonicalPath}/part=1"
+ (1 to 3).map(i => (i, i.toString)).toDF("a", "b").saveAsParquetFile(path)
+
+ // If the "part = 1" filter gets pushed down, this query will throw an exception since
+ // "part" is not a valid column in the actual Parquet file
+ checkAnswer(
+ sqlContext.parquetFile(path).filter("part = 1"),
+ (1 to 3).map(i => Row(i, i.toString, 1)))
+ }
+ }
+ }
}
class ParquetDataSourceOffFilterSuite extends ParquetFilterSuiteBase with BeforeAndAfterAll {