aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2017-01-19 20:09:48 -0800
committergatorsmile <gatorsmile@gmail.com>2017-01-19 20:09:48 -0800
commit0bf605c2c67ca361cd4aa3a3b4492bef4aef76b9 (patch)
tree2f7bd70db8c44a5c02b769beeafb789686a0da13
parent148a84b37082697c7f61c6a621010abe4b12f2eb (diff)
downloadspark-0bf605c2c67ca361cd4aa3a3b4492bef4aef76b9.tar.gz
spark-0bf605c2c67ca361cd4aa3a3b4492bef4aef76b9.tar.bz2
spark-0bf605c2c67ca361cd4aa3a3b4492bef4aef76b9.zip
[SPARK-19292][SQL] filter with partition columns should be case-insensitive on Hive tables
## What changes were proposed in this pull request? When we query a table with a filter on partitioned columns, we will push the partition filter to the metastore to get matched partitions directly. In `HiveExternalCatalog.listPartitionsByFilter`, we assume the column names in partition filter are already normalized and we don't need to consider case sensitivity. However, `HiveTableScanExec` doesn't follow this assumption. This PR fixes it. ## How was this patch tested? new regression test Author: Wenchen Fan <wenchen@databricks.com> Closes #16647 from cloud-fan/bug.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala12
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala13
3 files changed, 25 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 6d0671d4cb..26e1380eca 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -62,7 +62,7 @@ object FileSourceStrategy extends Strategy with Logging {
val filterSet = ExpressionSet(filters)
// The attribute name of predicate could be different than the one in schema in case of
- // case insensitive, we should change them to match the one in schema, so we donot need to
+ // case insensitive, we should change them to match the one in schema, so we do not need to
// worry about case sensitivity anymore.
val normalizedFilters = filters.map { e =>
e transform {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
index 7ee5fc543c..def6ef3691 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
@@ -146,9 +146,19 @@ case class HiveTableScanExec(
hadoopReader.makeRDDForTable(relation.hiveQlTable)
}
} else {
+ // The attribute name of predicate could be different than the one in schema in case of
+ // case insensitive, we should change them to match the one in schema, so we do not need to
+ // worry about case sensitivity anymore.
+ val normalizedFilters = partitionPruningPred.map { e =>
+ e transform {
+ case a: AttributeReference =>
+ a.withName(relation.output.find(_.semanticEquals(a)).get.name)
+ }
+ }
+
Utils.withDummyCallSite(sqlContext.sparkContext) {
hadoopReader.makeRDDForPartitionedTable(
- prunePartitions(relation.getHiveQlPartitions(partitionPruningPred)))
+ prunePartitions(relation.getHiveQlPartitions(normalizedFilters)))
}
}
val numOutputRows = longMetric("numOutputRows")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 104b5250b6..1a28c4c84a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -2014,4 +2014,17 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
)
}
}
+
+ test("SPARK-19292: filter with partition columns should be case-insensitive on Hive tables") {
+ withTable("tbl") {
+ withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
+ sql("CREATE TABLE tbl(i int, j int) USING hive PARTITIONED BY (j)")
+ sql("INSERT INTO tbl PARTITION(j=10) SELECT 1")
+ checkAnswer(spark.table("tbl"), Row(1, 10))
+
+ checkAnswer(sql("SELECT i, j FROM tbl WHERE J=10"), Row(1, 10))
+ checkAnswer(spark.table("tbl").filter($"J" === 10), Row(1, 10))
+ }
+ }
+ }
}