aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2015-10-29 16:36:52 -0700
committerYin Huai <yhuai@databricks.com>2015-10-29 16:36:52 -0700
commit96cf87f66d47245b19e719cb83947042b21546fa (patch)
tree97c13354879fe7a5f197beac1637a7312b0ccbeb /sql/core
parent4f5e60c647d7d6827438721b7fabbc3a57b81023 (diff)
downloadspark-96cf87f66d47245b19e719cb83947042b21546fa.tar.gz
spark-96cf87f66d47245b19e719cb83947042b21546fa.tar.bz2
spark-96cf87f66d47245b19e719cb83947042b21546fa.zip
[SPARK-11301] [SQL] fix case sensitivity for filter on partitioned columns
Author: Wenchen Fan <wenchen@databricks.com> Closes #9271 from cloud-fan/filter.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala12
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala10
2 files changed, 15 insertions, 7 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index ffb4645b89..af6626c897 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -63,16 +63,14 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation, _))
if t.partitionSpec.partitionColumns.nonEmpty =>
// We divide the filter expressions into 3 parts
- val partitionColumnNames = t.partitionSpec.partitionColumns.map(_.name).toSet
+ val partitionColumns = AttributeSet(
+ t.partitionColumns.map(c => l.output.find(_.name == c.name).get))
- // TODO this is case-sensitive
- // Only prunning the partition keys
- val partitionFilters =
- filters.filter(_.references.map(_.name).toSet.subsetOf(partitionColumnNames))
+ // Only pruning the partition keys
+ val partitionFilters = filters.filter(_.references.subsetOf(partitionColumns))
// Only pushes down predicates that do not reference partition keys.
- val pushedFilters =
- filters.filter(_.references.map(_.name).toSet.intersect(partitionColumnNames).isEmpty)
+ val pushedFilters = filters.filter(_.references.intersect(partitionColumns).isEmpty)
// Predicates with both partition keys and attributes
val combineFilters = filters.toSet -- partitionFilters.toSet -- pushedFilters.toSet
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 59565a6b13..c9d6e19d2c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -987,4 +987,14 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
val df = (1 to 10).map(Tuple1.apply).toDF("i").as("src")
assert(df.select($"src.i".cast(StringType)).columns.head === "i")
}
+
+ test("SPARK-11301: fix case sensitivity for filter on partitioned columns") {
+ withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
+ withTempPath { path =>
+ Seq(2012 -> "a").toDF("year", "val").write.partitionBy("year").parquet(path.getAbsolutePath)
+ val df = sqlContext.read.parquet(path.getAbsolutePath)
+ checkAnswer(df.filter($"yEAr" > 2000).select($"val"), Row("a"))
+ }
+ }
+ }
}