aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorCheng Lian <lian.cs.zju@gmail.com>2014-05-24 20:42:01 -0700
committerReynold Xin <rxin@apache.org>2014-05-24 20:42:01 -0700
commit5afe6af0b192ce7e908634992e8752537b1c4ed1 (patch)
tree74064440c9b3cdfe917fe5687d79bce8c2677cd7 /sql/core
parent4e4831b8facc186cda6ef31040ccdeab48acbbb7 (diff)
downloadspark-5afe6af0b192ce7e908634992e8752537b1c4ed1.tar.gz
spark-5afe6af0b192ce7e908634992e8752537b1c4ed1.tar.bz2
spark-5afe6af0b192ce7e908634992e8752537b1c4ed1.zip
[SPARK-1913][SQL] Bug fix: column pruning error in Parquet support
JIRA issue: [SPARK-1913](https://issues.apache.org/jira/browse/SPARK-1913) When scanning Parquet tables, attributes referenced only in predicates that are pushed down are not passed to the `ParquetTableScan` operator and causes exception. Author: Cheng Lian <lian.cs.zju@gmail.com> Closes #863 from liancheng/spark-1913 and squashes the following commits: f976b73 [Cheng Lian] Addessed the readability issue commented by @rxin f5b257d [Cheng Lian] Added back comments deleted by mistake ae60ab3 [Cheng Lian] [SPARK-1913] Attributes referenced only in predicates pushed down should remain in ParquetTableScan operator
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala20
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala6
3 files changed, 21 insertions, 11 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index bfebfa0c28..043be58edc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -206,17 +206,21 @@ class SQLContext(@transient val sparkContext: SparkContext)
* final desired output requires complex expressions to be evaluated or when columns can be
* further eliminated out after filtering has been done.
*
+ * The `prunePushedDownFilters` parameter is used to remove those filters that can be optimized
+ * away by the filter pushdown optimization.
+ *
* The required attributes for both filtering and expression evaluation are passed to the
* provided `scanBuilder` function so that it can avoid unnecessary column materialization.
*/
def pruneFilterProject(
projectList: Seq[NamedExpression],
filterPredicates: Seq[Expression],
+ prunePushedDownFilters: Seq[Expression] => Seq[Expression],
scanBuilder: Seq[Attribute] => SparkPlan): SparkPlan = {
val projectSet = projectList.flatMap(_.references).toSet
val filterSet = filterPredicates.flatMap(_.references).toSet
- val filterCondition = filterPredicates.reduceLeftOption(And)
+ val filterCondition = prunePushedDownFilters(filterPredicates).reduceLeftOption(And)
// Right now we still use a projection even if the only evaluation is applying an alias
// to a column. Since this is a no-op, it could be avoided. However, using this
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 394a59700d..cfa8bdae58 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -141,14 +141,14 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case logical.InsertIntoTable(table: ParquetRelation, partition, child, overwrite) =>
InsertIntoParquetTable(table, planLater(child), overwrite)(sparkContext) :: Nil
case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) => {
- val remainingFilters =
+ val prunePushedDownFilters =
if (sparkContext.conf.getBoolean(ParquetFilters.PARQUET_FILTER_PUSHDOWN_ENABLED, true)) {
- filters.filter {
- // Note: filters cannot be pushed down to Parquet if they contain more complex
- // expressions than simple "Attribute cmp Literal" comparisons. Here we remove
- // all filters that have been pushed down. Note that a predicate such as
- // "(A AND B) OR C" can result in "A OR C" being pushed down.
- filter =>
+ (filters: Seq[Expression]) => {
+ filters.filter { filter =>
+ // Note: filters cannot be pushed down to Parquet if they contain more complex
+ // expressions than simple "Attribute cmp Literal" comparisons. Here we remove
+ // all filters that have been pushed down. Note that a predicate such as
+ // "(A AND B) OR C" can result in "A OR C" being pushed down.
val recordFilter = ParquetFilters.createFilter(filter)
if (!recordFilter.isDefined) {
// First case: the pushdown did not result in any record filter.
@@ -159,13 +159,15 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
// still want to keep "A AND B" in the higher-level filter, not just "B".
!ParquetFilters.findExpression(recordFilter.get, filter).isDefined
}
+ }
}
} else {
- filters
+ identity[Seq[Expression]] _
}
pruneFilterProject(
projectList,
- remainingFilters,
+ filters,
+ prunePushedDownFilters,
ParquetTableScan(_, relation, filters)(sparkContext)) :: Nil
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index 65f4c17aee..f9731e82e4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -358,5 +358,9 @@ class ParquetQuerySuite extends QueryTest with FunSuite with BeforeAndAfterAll {
assert(stringResult(0).getString(2) == "100", "stringvalue incorrect")
assert(stringResult(0).getInt(1) === 100)
}
-}
+ test("SPARK-1913 regression: columns only referenced by pushed down filters should remain") {
+ val query = sql(s"SELECT mystring FROM testfiltersource WHERE myint < 10")
+ assert(query.collect().size === 10)
+ }
+}