aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorCheng Hao <hao.cheng@intel.com>2015-10-14 16:29:32 -0700
committerCheng Lian <lian@databricks.com>2015-10-14 16:29:32 -0700
commit1baaf2b9bd7c949a8f95cd14fc1be2a56e1139b3 (patch)
tree686955f577440f49e07f6b5a9f3dad497c269702 /sql/core
parent2b5e31c7e97811ef7b4da47609973b7f51444346 (diff)
downloadspark-1baaf2b9bd7c949a8f95cd14fc1be2a56e1139b3.tar.gz
spark-1baaf2b9bd7c949a8f95cd14fc1be2a56e1139b3.tar.bz2
spark-1baaf2b9bd7c949a8f95cd14fc1be2a56e1139b3.zip
[SPARK-10829] [SQL] Filter combine partition key and attribute doesn't work in DataSource scan
```scala withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") { withTempPath { dir => val path = s"${dir.getCanonicalPath}/part=1" (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path) // If the "part = 1" filter gets pushed down, this query will throw an exception since // "part" is not a valid column in the actual Parquet file checkAnswer( sqlContext.read.parquet(path).filter("a > 0 and (part = 0 or a > 1)"), (2 to 3).map(i => Row(i, i.toString, 1))) } } ``` We expect the result to be: ``` 2,1 3,1 ``` But got ``` 1,1 2,1 3,1 ``` Author: Cheng Hao <hao.cheng@intel.com> Closes #8916 from chenghao-intel/partition_filter.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala34
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala17
2 files changed, 39 insertions, 12 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 918db8e7d0..33181fa6c0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -62,7 +62,22 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
// Scanning partitioned HadoopFsRelation
case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation, _))
if t.partitionSpec.partitionColumns.nonEmpty =>
- val selectedPartitions = prunePartitions(filters, t.partitionSpec).toArray
+ // We divide the filter expressions into 3 parts
+ val partitionColumnNames = t.partitionSpec.partitionColumns.map(_.name).toSet
+
+ // TODO this is case-sensitive
+ // Only prunning the partition keys
+ val partitionFilters =
+ filters.filter(_.references.map(_.name).toSet.subsetOf(partitionColumnNames))
+
+ // Only pushes down predicates that do not reference partition keys.
+ val pushedFilters =
+ filters.filter(_.references.map(_.name).toSet.intersect(partitionColumnNames).isEmpty)
+
+ // Predicates with both partition keys and attributes
+ val combineFilters = filters.toSet -- partitionFilters.toSet -- pushedFilters.toSet
+
+ val selectedPartitions = prunePartitions(partitionFilters, t.partitionSpec).toArray
logInfo {
val total = t.partitionSpec.partitions.length
@@ -71,21 +86,16 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
s"Selected $selected partitions out of $total, pruned $percentPruned% partitions."
}
- // Only pushes down predicates that do not reference partition columns.
- val pushedFilters = {
- val partitionColumnNames = t.partitionSpec.partitionColumns.map(_.name).toSet
- filters.filter { f =>
- val referencedColumnNames = f.references.map(_.name).toSet
- referencedColumnNames.intersect(partitionColumnNames).isEmpty
- }
- }
-
- buildPartitionedTableScan(
+ val scan = buildPartitionedTableScan(
l,
projects,
pushedFilters,
t.partitionSpec.partitionColumns,
- selectedPartitions) :: Nil
+ selectedPartitions)
+
+ combineFilters
+ .reduceLeftOption(expressions.And)
+ .map(execution.Filter(_, scan)).getOrElse(scan) :: Nil
// Scanning non-partitioned HadoopFsRelation
case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation, _)) =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 45ad3fde55..7a23f57f40 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -297,4 +297,21 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
}
}
}
+
+ test("SPARK-10829: Filter combine partition key and attribute doesn't work in DataSource scan") {
+ import testImplicits._
+
+ withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
+ withTempPath { dir =>
+ val path = s"${dir.getCanonicalPath}/part=1"
+ (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path)
+
+ // If the "part = 1" filter gets pushed down, this query will throw an exception since
+ // "part" is not a valid column in the actual Parquet file
+ checkAnswer(
+ sqlContext.read.parquet(path).filter("a > 0 and (part = 0 or a > 1)"),
+ (2 to 3).map(i => Row(i, i.toString, 1)))
+ }
+ }
+ }
}