diff options
author | Cheng Hao <hao.cheng@intel.com> | 2015-10-14 16:29:32 -0700 |
---|---|---|
committer | Cheng Lian <lian@databricks.com> | 2015-10-14 16:29:32 -0700 |
commit | 1baaf2b9bd7c949a8f95cd14fc1be2a56e1139b3 (patch) | |
tree | 686955f577440f49e07f6b5a9f3dad497c269702 /sql/core | |
parent | 2b5e31c7e97811ef7b4da47609973b7f51444346 (diff) | |
download | spark-1baaf2b9bd7c949a8f95cd14fc1be2a56e1139b3.tar.gz spark-1baaf2b9bd7c949a8f95cd14fc1be2a56e1139b3.tar.bz2 spark-1baaf2b9bd7c949a8f95cd14fc1be2a56e1139b3.zip |
[SPARK-10829] [SQL] Filter combine partition key and attribute doesn't work in DataSource scan
```scala
withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
withTempPath { dir =>
val path = s"${dir.getCanonicalPath}/part=1"
(1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path)
// If the "part = 1" filter gets pushed down, this query will throw an exception since
// "part" is not a valid column in the actual Parquet file
checkAnswer(
sqlContext.read.parquet(path).filter("a > 0 and (part = 0 or a > 1)"),
(2 to 3).map(i => Row(i, i.toString, 1)))
}
}
```
We expect the result to be:
```
2,1
3,1
```
But got
```
1,1
2,1
3,1
```
Author: Cheng Hao <hao.cheng@intel.com>
Closes #8916 from chenghao-intel/partition_filter.
Diffstat (limited to 'sql/core')
2 files changed, 39 insertions, 12 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 918db8e7d0..33181fa6c0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -62,7 +62,22 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { // Scanning partitioned HadoopFsRelation case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation, _)) if t.partitionSpec.partitionColumns.nonEmpty => - val selectedPartitions = prunePartitions(filters, t.partitionSpec).toArray + // We divide the filter expressions into 3 parts + val partitionColumnNames = t.partitionSpec.partitionColumns.map(_.name).toSet + + // TODO this is case-sensitive + // Only prunning the partition keys + val partitionFilters = + filters.filter(_.references.map(_.name).toSet.subsetOf(partitionColumnNames)) + + // Only pushes down predicates that do not reference partition keys. + val pushedFilters = + filters.filter(_.references.map(_.name).toSet.intersect(partitionColumnNames).isEmpty) + + // Predicates with both partition keys and attributes + val combineFilters = filters.toSet -- partitionFilters.toSet -- pushedFilters.toSet + + val selectedPartitions = prunePartitions(partitionFilters, t.partitionSpec).toArray logInfo { val total = t.partitionSpec.partitions.length @@ -71,21 +86,16 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { s"Selected $selected partitions out of $total, pruned $percentPruned% partitions." } - // Only pushes down predicates that do not reference partition columns. - val pushedFilters = { - val partitionColumnNames = t.partitionSpec.partitionColumns.map(_.name).toSet - filters.filter { f => - val referencedColumnNames = f.references.map(_.name).toSet - referencedColumnNames.intersect(partitionColumnNames).isEmpty - } - } - - buildPartitionedTableScan( + val scan = buildPartitionedTableScan( l, projects, pushedFilters, t.partitionSpec.partitionColumns, - selectedPartitions) :: Nil + selectedPartitions) + + combineFilters + .reduceLeftOption(expressions.And) + .map(execution.Filter(_, scan)).getOrElse(scan) :: Nil // Scanning non-partitioned HadoopFsRelation case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation, _)) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 45ad3fde55..7a23f57f40 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -297,4 +297,21 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex } } } + + test("SPARK-10829: Filter combine partition key and attribute doesn't work in DataSource scan") { + import testImplicits._ + + withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") { + withTempPath { dir => + val path = s"${dir.getCanonicalPath}/part=1" + (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path) + + // If the "part = 1" filter gets pushed down, this query will throw an exception since + // "part" is not a valid column in the actual Parquet file + checkAnswer( + sqlContext.read.parquet(path).filter("a > 0 and (part = 0 or a > 1)"), + (2 to 3).map(i => Row(i, i.toString, 1))) + } + } + } } |