From 5afe6af0b192ce7e908634992e8752537b1c4ed1 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Sat, 24 May 2014 20:42:01 -0700 Subject: [SPARK-1913][SQL] Bug fix: column pruning error in Parquet support JIRA issue: [SPARK-1913](https://issues.apache.org/jira/browse/SPARK-1913) When scanning Parquet tables, attributes referenced only in predicates that are pushed down are not passed to the `ParquetTableScan` operator and causes exception. Author: Cheng Lian Closes #863 from liancheng/spark-1913 and squashes the following commits: f976b73 [Cheng Lian] Addessed the readability issue commented by @rxin f5b257d [Cheng Lian] Added back comments deleted by mistake ae60ab3 [Cheng Lian] [SPARK-1913] Attributes referenced only in predicates pushed down should remain in ParquetTableScan operator --- .../main/scala/org/apache/spark/sql/SQLContext.scala | 6 +++++- .../apache/spark/sql/execution/SparkStrategies.scala | 20 +++++++++++--------- .../apache/spark/sql/parquet/ParquetQuerySuite.scala | 6 +++++- 3 files changed, 21 insertions(+), 11 deletions(-) (limited to 'sql/core') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index bfebfa0c28..043be58edc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -206,17 +206,21 @@ class SQLContext(@transient val sparkContext: SparkContext) * final desired output requires complex expressions to be evaluated or when columns can be * further eliminated out after filtering has been done. * + * The `prunePushedDownFilters` parameter is used to remove those filters that can be optimized + * away by the filter pushdown optimization. + * * The required attributes for both filtering and expression evaluation are passed to the * provided `scanBuilder` function so that it can avoid unnecessary column materialization. */ def pruneFilterProject( projectList: Seq[NamedExpression], filterPredicates: Seq[Expression], + prunePushedDownFilters: Seq[Expression] => Seq[Expression], scanBuilder: Seq[Attribute] => SparkPlan): SparkPlan = { val projectSet = projectList.flatMap(_.references).toSet val filterSet = filterPredicates.flatMap(_.references).toSet - val filterCondition = filterPredicates.reduceLeftOption(And) + val filterCondition = prunePushedDownFilters(filterPredicates).reduceLeftOption(And) // Right now we still use a projection even if the only evaluation is applying an alias // to a column. Since this is a no-op, it could be avoided. However, using this diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 394a59700d..cfa8bdae58 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -141,14 +141,14 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case logical.InsertIntoTable(table: ParquetRelation, partition, child, overwrite) => InsertIntoParquetTable(table, planLater(child), overwrite)(sparkContext) :: Nil case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) => { - val remainingFilters = + val prunePushedDownFilters = if (sparkContext.conf.getBoolean(ParquetFilters.PARQUET_FILTER_PUSHDOWN_ENABLED, true)) { - filters.filter { - // Note: filters cannot be pushed down to Parquet if they contain more complex - // expressions than simple "Attribute cmp Literal" comparisons. Here we remove - // all filters that have been pushed down. Note that a predicate such as - // "(A AND B) OR C" can result in "A OR C" being pushed down. - filter => + (filters: Seq[Expression]) => { + filters.filter { filter => + // Note: filters cannot be pushed down to Parquet if they contain more complex + // expressions than simple "Attribute cmp Literal" comparisons. Here we remove + // all filters that have been pushed down. Note that a predicate such as + // "(A AND B) OR C" can result in "A OR C" being pushed down. val recordFilter = ParquetFilters.createFilter(filter) if (!recordFilter.isDefined) { // First case: the pushdown did not result in any record filter. @@ -159,13 +159,15 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { // still want to keep "A AND B" in the higher-level filter, not just "B". !ParquetFilters.findExpression(recordFilter.get, filter).isDefined } + } } } else { - filters + identity[Seq[Expression]] _ } pruneFilterProject( projectList, - remainingFilters, + filters, + prunePushedDownFilters, ParquetTableScan(_, relation, filters)(sparkContext)) :: Nil } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala index 65f4c17aee..f9731e82e4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala @@ -358,5 +358,9 @@ class ParquetQuerySuite extends QueryTest with FunSuite with BeforeAndAfterAll { assert(stringResult(0).getString(2) == "100", "stringvalue incorrect") assert(stringResult(0).getInt(1) === 100) } -} + test("SPARK-1913 regression: columns only referenced by pushed down filters should remain") { + val query = sql(s"SELECT mystring FROM testfiltersource WHERE myint < 10") + assert(query.collect().size === 10) + } +} -- cgit v1.2.3