From 14cf753704ea60f358cb870b018cbcf73654f198 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Thu, 12 Nov 2015 16:47:00 +0800 Subject: [SPARK-11661][SQL] Still pushdown filters returned by unhandledFilters. https://issues.apache.org/jira/browse/SPARK-11661 Author: Yin Huai Closes #9634 from yhuai/unhandledFilters. --- .../execution/datasources/DataSourceStrategy.scala | 15 ++++++--- .../org/apache/spark/sql/sources/interfaces.scala | 8 +++-- .../datasources/parquet/ParquetFilterSuite.scala | 25 ++++++++++++++ .../spark/sql/sources/FilteredScanSuite.scala | 39 ++++++++++++++-------- .../sources/SimpleTextHadoopFsRelationSuite.scala | 8 ++--- 5 files changed, 71 insertions(+), 24 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 7265d6a4de..d7c01b6e6f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -453,8 +453,8 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { * * @return A pair of `Seq[Expression]` and `Seq[Filter]`. The first element contains all Catalyst * predicate [[Expression]]s that are either not convertible or cannot be handled by - * `relation`. The second element contains all converted data source [[Filter]]s that can - * be handled by `relation`. + * `relation`. The second element contains all converted data source [[Filter]]s that + * will be pushed down to the data source. */ protected[sql] def selectFilters( relation: BaseRelation, @@ -476,7 +476,9 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { // Catalyst predicate expressions that cannot be translated to data source filters. val unrecognizedPredicates = predicates.filterNot(translatedMap.contains) - // Data source filters that cannot be handled by `relation` + // Data source filters that cannot be handled by `relation`. The semantic of a unhandled filter + // at here is that a data source may not be able to apply this filter to every row + // of the underlying dataset. val unhandledFilters = relation.unhandledFilters(translatedMap.values.toArray).toSet val (unhandled, handled) = translated.partition { @@ -491,6 +493,11 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { // Translated data source filters that can be handled by `relation` val (_, handledFilters) = handled.unzip - (unrecognizedPredicates ++ unhandledPredicates, handledFilters) + // translated contains all filters that have been converted to the public Filter interface. + // We should always push them to the data source no matter whether the data source can apply + // a filter to every row or not. + val (_, translatedFilters) = translated.unzip + + (unrecognizedPredicates ++ unhandledPredicates, translatedFilters) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 48de693a99..2be6cd4533 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -235,9 +235,11 @@ abstract class BaseRelation { def needConversion: Boolean = true /** - * Given an array of [[Filter]]s, returns an array of [[Filter]]s that this data source relation - * cannot handle. Spark SQL will apply all returned [[Filter]]s against rows returned by this - * data source relation. + * Returns the list of [[Filter]]s that this datasource may not be able to handle. + * These returned [[Filter]]s will be evaluated by Spark SQL after data is output by a scan. + * By default, this function will return all filters, as it is always safe to + * double evaluate a [[Filter]]. However, specific implementations can override this function to + * avoid double filtering when they are capable of processing a filter internally. * * @since 1.6.0 */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 579dabf733..2ac87ad6cd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -336,4 +336,29 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex } } } + + test("SPARK-11661 Still pushdown filters returned by unhandledFilters") { + import testImplicits._ + withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") { + withTempPath { dir => + val path = s"${dir.getCanonicalPath}/part=1" + (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path) + val df = sqlContext.read.parquet(path).filter("a = 2") + + // This is the source RDD without Spark-side filtering. + val childRDD = + df + .queryExecution + .executedPlan.asInstanceOf[org.apache.spark.sql.execution.Filter] + .child + .execute() + + // The result should be single row. + // When a filter is pushed to Parquet, Parquet can apply it to every row. + // So, we can check the number of rows returned from the Parquet + // to make sure our filter pushdown work. + assert(childRDD.count == 1) + } + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala index 2cad964e55..398b8a1a66 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala @@ -254,7 +254,11 @@ class FilteredScanSuite extends DataSourceTest with SharedSQLContext with Predic testPushDown("SELECT * FROM oneToTenFiltered WHERE a IN (1,3,5)", 3, Set("a", "b", "c")) testPushDown("SELECT * FROM oneToTenFiltered WHERE a = 20", 0, Set("a", "b", "c")) - testPushDown("SELECT * FROM oneToTenFiltered WHERE b = 1", 10, Set("a", "b", "c")) + testPushDown( + "SELECT * FROM oneToTenFiltered WHERE b = 1", + 10, + Set("a", "b", "c"), + Set(EqualTo("b", 1))) testPushDown("SELECT * FROM oneToTenFiltered WHERE a < 5 AND a > 1", 3, Set("a", "b", "c")) testPushDown("SELECT * FROM oneToTenFiltered WHERE a < 3 OR a > 8", 4, Set("a", "b", "c")) @@ -283,12 +287,23 @@ class FilteredScanSuite extends DataSourceTest with SharedSQLContext with Predic | WHERE a + b > 9 | AND b < 16 | AND c IN ('bbbbbBBBBB', 'cccccCCCCC', 'dddddDDDDD', 'foo') - """.stripMargin.split("\n").map(_.trim).mkString(" "), 3, Set("a", "b")) + """.stripMargin.split("\n").map(_.trim).mkString(" "), + 3, + Set("a", "b"), + Set(LessThan("b", 16))) def testPushDown( - sqlString: String, - expectedCount: Int, - requiredColumnNames: Set[String]): Unit = { + sqlString: String, + expectedCount: Int, + requiredColumnNames: Set[String]): Unit = { + testPushDown(sqlString, expectedCount, requiredColumnNames, Set.empty[Filter]) + } + + def testPushDown( + sqlString: String, + expectedCount: Int, + requiredColumnNames: Set[String], + expectedUnhandledFilters: Set[Filter]): Unit = { test(s"PushDown Returns $expectedCount: $sqlString") { val queryExecution = sql(sqlString).queryExecution val rawPlan = queryExecution.executedPlan.collect { @@ -300,15 +315,13 @@ class FilteredScanSuite extends DataSourceTest with SharedSQLContext with Predic val rawCount = rawPlan.execute().count() assert(ColumnsRequired.set === requiredColumnNames) - assert { - val table = caseInsensitiveContext.table("oneToTenFiltered") - val relation = table.queryExecution.logical.collectFirst { - case LogicalRelation(r, _) => r - }.get + val table = caseInsensitiveContext.table("oneToTenFiltered") + val relation = table.queryExecution.logical.collectFirst { + case LogicalRelation(r, _) => r + }.get - // `relation` should be able to handle all pushed filters - relation.unhandledFilters(FiltersPushed.list.toArray).isEmpty - } + assert( + relation.unhandledFilters(FiltersPushed.list.toArray).toSet === expectedUnhandledFilters) if (rawCount != expectedCount) { fail( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala index 9251a69f31..81af684ba0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala @@ -248,7 +248,7 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest with Predicat projections = Seq('c, 'p), filter = 'a < 3 && 'p > 0, requiredColumns = Seq("c", "a"), - pushedFilters = Nil, + pushedFilters = Seq(LessThan("a", 3)), inconvertibleFilters = Nil, unhandledFilters = Seq('a < 3), partitioningFilters = Seq('p > 0) @@ -327,7 +327,7 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest with Predicat projections = Seq('b, 'p), filter = 'c > "val_7" && 'b < 18 && 'p > 0, requiredColumns = Seq("b"), - pushedFilters = Seq(GreaterThan("c", "val_7")), + pushedFilters = Seq(GreaterThan("c", "val_7"), LessThan("b", 18)), inconvertibleFilters = Nil, unhandledFilters = Seq('b < 18), partitioningFilters = Seq('p > 0) @@ -344,7 +344,7 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest with Predicat projections = Seq('b, 'p), filter = 'a % 2 === 0 && 'c > "val_7" && 'b < 18 && 'p > 0, requiredColumns = Seq("b", "a"), - pushedFilters = Seq(GreaterThan("c", "val_7")), + pushedFilters = Seq(GreaterThan("c", "val_7"), LessThan("b", 18)), inconvertibleFilters = Seq('a % 2 === 0), unhandledFilters = Seq('b < 18), partitioningFilters = Seq('p > 0) @@ -361,7 +361,7 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest with Predicat projections = Seq('b, 'p), filter = 'a > 7 && 'a < 9, requiredColumns = Seq("b", "a"), - pushedFilters = Seq(GreaterThan("a", 7)), + pushedFilters = Seq(GreaterThan("a", 7), LessThan("a", 9)), inconvertibleFilters = Nil, unhandledFilters = Seq('a < 9), partitioningFilters = Nil -- cgit v1.2.3