aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2015-11-12 16:47:00 +0800
committerCheng Lian <lian@databricks.com>2015-11-12 16:47:00 +0800
commit14cf753704ea60f358cb870b018cbcf73654f198 (patch)
tree3f6fd89fe52f762d1f9d47a9b77b3bb4f5d617be
parente2957bc085d39d59c09e4b33c26a05f0263200a3 (diff)
downloadspark-14cf753704ea60f358cb870b018cbcf73654f198.tar.gz
spark-14cf753704ea60f358cb870b018cbcf73654f198.tar.bz2
spark-14cf753704ea60f358cb870b018cbcf73654f198.zip
[SPARK-11661][SQL] Still pushdown filters returned by unhandledFilters.
https://issues.apache.org/jira/browse/SPARK-11661 Author: Yin Huai <yhuai@databricks.com> Closes #9634 from yhuai/unhandledFilters.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala15
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala25
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala39
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala8
5 files changed, 71 insertions, 24 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 7265d6a4de..d7c01b6e6f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -453,8 +453,8 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
*
* @return A pair of `Seq[Expression]` and `Seq[Filter]`. The first element contains all Catalyst
* predicate [[Expression]]s that are either not convertible or cannot be handled by
- * `relation`. The second element contains all converted data source [[Filter]]s that can
- * be handled by `relation`.
+ * `relation`. The second element contains all converted data source [[Filter]]s that
+ * will be pushed down to the data source.
*/
protected[sql] def selectFilters(
relation: BaseRelation,
@@ -476,7 +476,9 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
// Catalyst predicate expressions that cannot be translated to data source filters.
val unrecognizedPredicates = predicates.filterNot(translatedMap.contains)
- // Data source filters that cannot be handled by `relation`
+ // Data source filters that cannot be handled by `relation`. The semantic of a unhandled filter
+ // at here is that a data source may not be able to apply this filter to every row
+ // of the underlying dataset.
val unhandledFilters = relation.unhandledFilters(translatedMap.values.toArray).toSet
val (unhandled, handled) = translated.partition {
@@ -491,6 +493,11 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
// Translated data source filters that can be handled by `relation`
val (_, handledFilters) = handled.unzip
- (unrecognizedPredicates ++ unhandledPredicates, handledFilters)
+ // translated contains all filters that have been converted to the public Filter interface.
+ // We should always push them to the data source no matter whether the data source can apply
+ // a filter to every row or not.
+ val (_, translatedFilters) = translated.unzip
+
+ (unrecognizedPredicates ++ unhandledPredicates, translatedFilters)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 48de693a99..2be6cd4533 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -235,9 +235,11 @@ abstract class BaseRelation {
def needConversion: Boolean = true
/**
- * Given an array of [[Filter]]s, returns an array of [[Filter]]s that this data source relation
- * cannot handle. Spark SQL will apply all returned [[Filter]]s against rows returned by this
- * data source relation.
+ * Returns the list of [[Filter]]s that this datasource may not be able to handle.
+ * These returned [[Filter]]s will be evaluated by Spark SQL after data is output by a scan.
+ * By default, this function will return all filters, as it is always safe to
+ * double evaluate a [[Filter]]. However, specific implementations can override this function to
+ * avoid double filtering when they are capable of processing a filter internally.
*
* @since 1.6.0
*/
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 579dabf733..2ac87ad6cd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -336,4 +336,29 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
}
}
}
+
+ test("SPARK-11661 Still pushdown filters returned by unhandledFilters") {
+ import testImplicits._
+ withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
+ withTempPath { dir =>
+ val path = s"${dir.getCanonicalPath}/part=1"
+ (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path)
+ val df = sqlContext.read.parquet(path).filter("a = 2")
+
+ // This is the source RDD without Spark-side filtering.
+ val childRDD =
+ df
+ .queryExecution
+ .executedPlan.asInstanceOf[org.apache.spark.sql.execution.Filter]
+ .child
+ .execute()
+
+ // The result should be single row.
+ // When a filter is pushed to Parquet, Parquet can apply it to every row.
+ // So, we can check the number of rows returned from the Parquet
+ // to make sure our filter pushdown work.
+ assert(childRDD.count == 1)
+ }
+ }
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
index 2cad964e55..398b8a1a66 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
@@ -254,7 +254,11 @@ class FilteredScanSuite extends DataSourceTest with SharedSQLContext with Predic
testPushDown("SELECT * FROM oneToTenFiltered WHERE a IN (1,3,5)", 3, Set("a", "b", "c"))
testPushDown("SELECT * FROM oneToTenFiltered WHERE a = 20", 0, Set("a", "b", "c"))
- testPushDown("SELECT * FROM oneToTenFiltered WHERE b = 1", 10, Set("a", "b", "c"))
+ testPushDown(
+ "SELECT * FROM oneToTenFiltered WHERE b = 1",
+ 10,
+ Set("a", "b", "c"),
+ Set(EqualTo("b", 1)))
testPushDown("SELECT * FROM oneToTenFiltered WHERE a < 5 AND a > 1", 3, Set("a", "b", "c"))
testPushDown("SELECT * FROM oneToTenFiltered WHERE a < 3 OR a > 8", 4, Set("a", "b", "c"))
@@ -283,12 +287,23 @@ class FilteredScanSuite extends DataSourceTest with SharedSQLContext with Predic
| WHERE a + b > 9
| AND b < 16
| AND c IN ('bbbbbBBBBB', 'cccccCCCCC', 'dddddDDDDD', 'foo')
- """.stripMargin.split("\n").map(_.trim).mkString(" "), 3, Set("a", "b"))
+ """.stripMargin.split("\n").map(_.trim).mkString(" "),
+ 3,
+ Set("a", "b"),
+ Set(LessThan("b", 16)))
def testPushDown(
- sqlString: String,
- expectedCount: Int,
- requiredColumnNames: Set[String]): Unit = {
+ sqlString: String,
+ expectedCount: Int,
+ requiredColumnNames: Set[String]): Unit = {
+ testPushDown(sqlString, expectedCount, requiredColumnNames, Set.empty[Filter])
+ }
+
+ def testPushDown(
+ sqlString: String,
+ expectedCount: Int,
+ requiredColumnNames: Set[String],
+ expectedUnhandledFilters: Set[Filter]): Unit = {
test(s"PushDown Returns $expectedCount: $sqlString") {
val queryExecution = sql(sqlString).queryExecution
val rawPlan = queryExecution.executedPlan.collect {
@@ -300,15 +315,13 @@ class FilteredScanSuite extends DataSourceTest with SharedSQLContext with Predic
val rawCount = rawPlan.execute().count()
assert(ColumnsRequired.set === requiredColumnNames)
- assert {
- val table = caseInsensitiveContext.table("oneToTenFiltered")
- val relation = table.queryExecution.logical.collectFirst {
- case LogicalRelation(r, _) => r
- }.get
+ val table = caseInsensitiveContext.table("oneToTenFiltered")
+ val relation = table.queryExecution.logical.collectFirst {
+ case LogicalRelation(r, _) => r
+ }.get
- // `relation` should be able to handle all pushed filters
- relation.unhandledFilters(FiltersPushed.list.toArray).isEmpty
- }
+ assert(
+ relation.unhandledFilters(FiltersPushed.list.toArray).toSet === expectedUnhandledFilters)
if (rawCount != expectedCount) {
fail(
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
index 9251a69f31..81af684ba0 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
@@ -248,7 +248,7 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest with Predicat
projections = Seq('c, 'p),
filter = 'a < 3 && 'p > 0,
requiredColumns = Seq("c", "a"),
- pushedFilters = Nil,
+ pushedFilters = Seq(LessThan("a", 3)),
inconvertibleFilters = Nil,
unhandledFilters = Seq('a < 3),
partitioningFilters = Seq('p > 0)
@@ -327,7 +327,7 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest with Predicat
projections = Seq('b, 'p),
filter = 'c > "val_7" && 'b < 18 && 'p > 0,
requiredColumns = Seq("b"),
- pushedFilters = Seq(GreaterThan("c", "val_7")),
+ pushedFilters = Seq(GreaterThan("c", "val_7"), LessThan("b", 18)),
inconvertibleFilters = Nil,
unhandledFilters = Seq('b < 18),
partitioningFilters = Seq('p > 0)
@@ -344,7 +344,7 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest with Predicat
projections = Seq('b, 'p),
filter = 'a % 2 === 0 && 'c > "val_7" && 'b < 18 && 'p > 0,
requiredColumns = Seq("b", "a"),
- pushedFilters = Seq(GreaterThan("c", "val_7")),
+ pushedFilters = Seq(GreaterThan("c", "val_7"), LessThan("b", 18)),
inconvertibleFilters = Seq('a % 2 === 0),
unhandledFilters = Seq('b < 18),
partitioningFilters = Seq('p > 0)
@@ -361,7 +361,7 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest with Predicat
projections = Seq('b, 'p),
filter = 'a > 7 && 'a < 9,
requiredColumns = Seq("b", "a"),
- pushedFilters = Seq(GreaterThan("a", 7)),
+ pushedFilters = Seq(GreaterThan("a", 7), LessThan("a", 9)),
inconvertibleFilters = Nil,
unhandledFilters = Seq('a < 9),
partitioningFilters = Nil