aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala28
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala41
2 files changed, 64 insertions, 5 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 8a15a51d82..3741a9cb32 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -77,7 +77,8 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
val pushedFilters = filters.filter(_.references.intersect(partitionColumns).isEmpty)
// Predicates with both partition keys and attributes
- val combineFilters = filters.toSet -- partitionFilters.toSet -- pushedFilters.toSet
+ val partitionAndNormalColumnFilters =
+ filters.toSet -- partitionFilters.toSet -- pushedFilters.toSet
val selectedPartitions = prunePartitions(partitionFilters, t.partitionSpec).toArray
@@ -88,16 +89,33 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
s"Selected $selected partitions out of $total, pruned $percentPruned% partitions."
}
+ // need to add projections from "partitionAndNormalColumnAttrs" in if it is not empty
+ val partitionAndNormalColumnAttrs = AttributeSet(partitionAndNormalColumnFilters)
+ val partitionAndNormalColumnProjs = if (partitionAndNormalColumnAttrs.isEmpty) {
+ projects
+ } else {
+ (partitionAndNormalColumnAttrs ++ projects).toSeq
+ }
+
val scan = buildPartitionedTableScan(
l,
- projects,
+ partitionAndNormalColumnProjs,
pushedFilters,
t.partitionSpec.partitionColumns,
selectedPartitions)
- combineFilters
- .reduceLeftOption(expressions.And)
- .map(execution.Filter(_, scan)).getOrElse(scan) :: Nil
+ // Add a Projection to guarantee the original projection:
+ // this is because "partitionAndNormalColumnAttrs" may be different
+ // from the original "projects", in elements or their ordering
+
+ partitionAndNormalColumnFilters.reduceLeftOption(expressions.And).map(cf =>
+ if (projects.isEmpty || projects == partitionAndNormalColumnProjs) {
+ // if the original projection is empty, no need for the additional Project either
+ execution.Filter(cf, scan)
+ } else {
+ execution.Project(projects, execution.Filter(cf, scan))
+ }
+ ).getOrElse(scan) :: Nil
// Scanning non-partitioned HadoopFsRelation
case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation, _)) =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index a0abfe458d..f42f173b2a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -325,6 +325,47 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
}
}
+ test("SPARK-12231: test the filter and empty project in partitioned DataSource scan") {
+ import testImplicits._
+
+ withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
+ withTempPath { dir =>
+ val path = s"${dir.getCanonicalPath}"
+ (1 to 3).map(i => (i, i + 1, i + 2, i + 3)).toDF("a", "b", "c", "d").
+ write.partitionBy("a").parquet(path)
+
+ // The filter "a > 1 or b < 2" will not get pushed down, and the projection is empty,
+ // this query will throw an exception since the project from combinedFilter expect
+ // two projection while the
+ val df1 = sqlContext.read.parquet(dir.getCanonicalPath)
+
+ assert(df1.filter("a > 1 or b < 2").count() == 2)
+ }
+ }
+ }
+
+ test("SPARK-12231: test the new projection in partitioned DataSource scan") {
+ import testImplicits._
+
+ withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
+ withTempPath { dir =>
+ val path = s"${dir.getCanonicalPath}"
+ (1 to 3).map(i => (i, i + 1, i + 2, i + 3)).toDF("a", "b", "c", "d").
+ write.partitionBy("a").parquet(path)
+
+ // test the generate new projection case
+ // when projects != partitionAndNormalColumnProjs
+
+ val df1 = sqlContext.read.parquet(dir.getCanonicalPath)
+
+ checkAnswer(
+ df1.filter("a > 1 or b > 2").orderBy("a").selectExpr("a", "b", "c", "d"),
+ (2 to 3).map(i => Row(i, i + 1, i + 2, i + 3)))
+ }
+ }
+ }
+
+
test("SPARK-11103: Filter applied on merged Parquet schema with new column fails") {
import testImplicits._