aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2016-04-13 17:17:19 -0700
committerDavies Liu <davies.liu@gmail.com>2016-04-13 17:17:19 -0700
commit62b7f306fbf77de7f6cbb36181ebebdb4a55acc5 (patch)
tree0753ac43c4d4d5e6dda3d0c4fd35fcfe4cebb4f9 /sql/core/src
parentfc3cd2f5090b3ba1cfde0fca3b3ce632d0b2f9c4 (diff)
downloadspark-62b7f306fbf77de7f6cbb36181ebebdb4a55acc5.tar.gz
spark-62b7f306fbf77de7f6cbb36181ebebdb4a55acc5.tar.bz2
spark-62b7f306fbf77de7f6cbb36181ebebdb4a55acc5.zip
[SPARK-14607] [SPARK-14484] [SQL] fix case-insensitive predicates in FileSourceStrategy
## What changes were proposed in this pull request? When prune the partitions or push down predicates, case-sensitivity is not respected. In order to make it work with case-insensitive, this PR update the AttributeReference inside predicate to use the name from schema. ## How was this patch tested? Add regression tests for case-insensitive. Author: Davies Liu <davies@databricks.com> Closes #12371 from davies/case_insensi.
Diffstat (limited to 'sql/core/src')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala14
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala5
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala28
3 files changed, 41 insertions, 6 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index bcddf72851..80a9156ddc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -64,18 +64,28 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
// - filters that need to be evaluated again after the scan
val filterSet = ExpressionSet(filters)
+ // The attribute name of predicate could be different than the one in schema in case of
+ // case insensitive, we should change them to match the one in schema, so we donot need to
+ // worry about case sensitivity anymore.
+ val normalizedFilters = filters.map { e =>
+ e transform {
+ case a: AttributeReference =>
+ a.withName(l.output.find(_.semanticEquals(a)).get.name)
+ }
+ }
+
val partitionColumns =
l.resolve(files.partitionSchema, files.sqlContext.sessionState.analyzer.resolver)
val partitionSet = AttributeSet(partitionColumns)
val partitionKeyFilters =
- ExpressionSet(filters.filter(_.references.subsetOf(partitionSet)))
+ ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet)))
logInfo(s"Pruning directories with: ${partitionKeyFilters.mkString(",")}")
val dataColumns =
l.resolve(files.dataSchema, files.sqlContext.sessionState.analyzer.resolver)
// Partition keys are not available in the statistics of the files.
- val dataFilters = filters.filter(_.references.intersect(partitionSet).isEmpty)
+ val dataFilters = normalizedFilters.filter(_.references.intersect(partitionSet).isEmpty)
// Predicates with both partition keys and attributes need to be evaluated after the scan.
val afterScanFilters = filterSet -- partitionKeyFilters
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index bea243a3be..4b9bf8daae 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -593,10 +593,7 @@ class HDFSFileCatalog(
}
if (partitionPruningPredicates.nonEmpty) {
- val predicate =
- partitionPruningPredicates
- .reduceOption(expressions.And)
- .getOrElse(Literal(true))
+ val predicate = partitionPruningPredicates.reduce(expressions.And)
val boundPredicate = InterpretedPredicate.create(predicate.transform {
case a: AttributeReference =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index 90d7f53884..0b74f07540 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -196,6 +196,34 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
checkDataFilters(Set(IsNotNull("c1"), EqualTo("c1", 1)))
}
+ test("partitioned table - case insensitive") {
+ withSQLConf("spark.sql.caseSensitive" -> "false") {
+ val table =
+ createTable(
+ files = Seq(
+ "p1=1/file1" -> 10,
+ "p1=2/file2" -> 10))
+
+ // Only one file should be read.
+ checkScan(table.where("P1 = 1")) { partitions =>
+ assert(partitions.size == 1, "when checking partitions")
+ assert(partitions.head.files.size == 1, "when files in partition 1")
+ }
+ // We don't need to reevaluate filters that are only on partitions.
+ checkDataFilters(Set.empty)
+
+ // Only one file should be read.
+ checkScan(table.where("P1 = 1 AND C1 = 1 AND (P1 + C1) = 1")) { partitions =>
+ assert(partitions.size == 1, "when checking partitions")
+ assert(partitions.head.files.size == 1, "when checking files in partition 1")
+ assert(partitions.head.files.head.partitionValues.getInt(0) == 1,
+ "when checking partition values")
+ }
+ // Only the filters that do not contain the partition column should be pushed down
+ checkDataFilters(Set(IsNotNull("c1"), EqualTo("c1", 1)))
+ }
+ }
+
test("partitioned table - after scan filters") {
val table =
createTable(