diff options
Diffstat (limited to 'sql/core')
2 files changed, 64 insertions, 5 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 8a15a51d82..3741a9cb32 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -77,7 +77,8 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { val pushedFilters = filters.filter(_.references.intersect(partitionColumns).isEmpty) // Predicates with both partition keys and attributes - val combineFilters = filters.toSet -- partitionFilters.toSet -- pushedFilters.toSet + val partitionAndNormalColumnFilters = + filters.toSet -- partitionFilters.toSet -- pushedFilters.toSet val selectedPartitions = prunePartitions(partitionFilters, t.partitionSpec).toArray @@ -88,16 +89,33 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { s"Selected $selected partitions out of $total, pruned $percentPruned% partitions." } + // need to add projections from "partitionAndNormalColumnAttrs" in if it is not empty + val partitionAndNormalColumnAttrs = AttributeSet(partitionAndNormalColumnFilters) + val partitionAndNormalColumnProjs = if (partitionAndNormalColumnAttrs.isEmpty) { + projects + } else { + (partitionAndNormalColumnAttrs ++ projects).toSeq + } + val scan = buildPartitionedTableScan( l, - projects, + partitionAndNormalColumnProjs, pushedFilters, t.partitionSpec.partitionColumns, selectedPartitions) - combineFilters - .reduceLeftOption(expressions.And) - .map(execution.Filter(_, scan)).getOrElse(scan) :: Nil + // Add a Projection to guarantee the original projection: + // this is because "partitionAndNormalColumnAttrs" may be different + // from the original "projects", in elements or their ordering + + partitionAndNormalColumnFilters.reduceLeftOption(expressions.And).map(cf => + if (projects.isEmpty || projects == partitionAndNormalColumnProjs) { + // if the original projection is empty, no need for the additional Project either + execution.Filter(cf, scan) + } else { + execution.Project(projects, execution.Filter(cf, scan)) + } + ).getOrElse(scan) :: Nil // Scanning non-partitioned HadoopFsRelation case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation, _)) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index a0abfe458d..f42f173b2a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -325,6 +325,47 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex } } + test("SPARK-12231: test the filter and empty project in partitioned DataSource scan") { + import testImplicits._ + + withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") { + withTempPath { dir => + val path = s"${dir.getCanonicalPath}" + (1 to 3).map(i => (i, i + 1, i + 2, i + 3)).toDF("a", "b", "c", "d"). + write.partitionBy("a").parquet(path) + + // The filter "a > 1 or b < 2" will not get pushed down, and the projection is empty, + // this query will throw an exception since the project from combinedFilter expect + // two projection while the + val df1 = sqlContext.read.parquet(dir.getCanonicalPath) + + assert(df1.filter("a > 1 or b < 2").count() == 2) + } + } + } + + test("SPARK-12231: test the new projection in partitioned DataSource scan") { + import testImplicits._ + + withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") { + withTempPath { dir => + val path = s"${dir.getCanonicalPath}" + (1 to 3).map(i => (i, i + 1, i + 2, i + 3)).toDF("a", "b", "c", "d"). + write.partitionBy("a").parquet(path) + + // test the generate new projection case + // when projects != partitionAndNormalColumnProjs + + val df1 = sqlContext.read.parquet(dir.getCanonicalPath) + + checkAnswer( + df1.filter("a > 1 or b > 2").orderBy("a").selectExpr("a", "b", "c", "d"), + (2 to 3).map(i => Row(i, i + 1, i + 2, i + 3))) + } + } + } + + test("SPARK-11103: Filter applied on merged Parquet schema with new column fails") { import testImplicits._ |