aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2015-12-09 15:15:30 +0800
committerCheng Lian <lian@databricks.com>2015-12-09 15:15:30 +0800
commitf6883bb7afa7d5df480e1c2b3db6cb77198550be (patch)
tree3249b4391dd2eb7a546820b21117fb9961829098
parent3934562d34bbe08d91c54b4bbee27870e93d7571 (diff)
downloadspark-f6883bb7afa7d5df480e1c2b3db6cb77198550be.tar.gz
spark-f6883bb7afa7d5df480e1c2b3db6cb77198550be.tar.bz2
spark-f6883bb7afa7d5df480e1c2b3db6cb77198550be.zip
[SPARK-11676][SQL] Parquet filter tests all pass if filters are not really pushed down
Currently Parquet predicate tests all pass even if filters are not pushed down or this is disabled. In this PR, For checking evaluating filters, Simply it makes the expression from `expression.Filter` and then try to create filters just like Spark does. For checking the results, this manually accesses to the child rdd (of `expression.Filter`) and produces the results which should be filtered properly, and then compares it to expected values. Now, if filters are not pushed down or this is disabled, this throws exceptions. Author: hyukjinkwon <gurwls223@gmail.com> Closes #9659 from HyukjinKwon/SPARK-11676.
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala69
1 files changed, 41 insertions, 28 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index cc5aae03d5..daf41bc292 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -50,27 +50,33 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
val output = predicate.collect { case a: Attribute => a }.distinct
withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
- val query = df
- .select(output.map(e => Column(e)): _*)
- .where(Column(predicate))
-
- val maybeAnalyzedPredicate = query.queryExecution.optimizedPlan.collect {
- case PhysicalOperation(_, filters, LogicalRelation(_: ParquetRelation, _)) => filters
- }.flatten.reduceLeftOption(_ && _)
- assert(maybeAnalyzedPredicate.isDefined)
-
- val selectedFilters = maybeAnalyzedPredicate.flatMap(DataSourceStrategy.translateFilter)
- assert(selectedFilters.nonEmpty)
-
- selectedFilters.foreach { pred =>
- val maybeFilter = ParquetFilters.createFilter(df.schema, pred)
- assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $pred")
- maybeFilter.foreach { f =>
- // Doesn't bother checking type parameters here (e.g. `Eq[Integer]`)
- assert(f.getClass === filterClass)
+ withSQLConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> "false") {
+ val query = df
+ .select(output.map(e => Column(e)): _*)
+ .where(Column(predicate))
+
+ var maybeRelation: Option[ParquetRelation] = None
+ val maybeAnalyzedPredicate = query.queryExecution.optimizedPlan.collect {
+ case PhysicalOperation(_, filters, LogicalRelation(relation: ParquetRelation, _)) =>
+ maybeRelation = Some(relation)
+ filters
+ }.flatten.reduceLeftOption(_ && _)
+ assert(maybeAnalyzedPredicate.isDefined, "No filter is analyzed from the given query")
+
+ val (_, selectedFilters) =
+ DataSourceStrategy.selectFilters(maybeRelation.get, maybeAnalyzedPredicate.toSeq)
+ assert(selectedFilters.nonEmpty, "No filter is pushed down")
+
+ selectedFilters.foreach { pred =>
+ val maybeFilter = ParquetFilters.createFilter(df.schema, pred)
+ assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $pred")
+ maybeFilter.foreach { f =>
+ // Doesn't bother checking type parameters here (e.g. `Eq[Integer]`)
+ assert(f.getClass === filterClass)
+ }
}
+ checker(stripSparkFilter(query), expected)
}
- checker(query, expected)
}
}
@@ -104,6 +110,21 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
checkBinaryFilterPredicate(predicate, filterClass, Seq(Row(expected)))(df)
}
+ /**
+ * Strip Spark-side filtering in order to check if a datasource filters rows correctly.
+ */
+ protected def stripSparkFilter(df: DataFrame): DataFrame = {
+ val schema = df.schema
+ val childRDD = df
+ .queryExecution
+ .executedPlan.asInstanceOf[org.apache.spark.sql.execution.Filter]
+ .child
+ .execute()
+ .map(row => Row.fromSeq(row.toSeq(schema)))
+
+ sqlContext.createDataFrame(childRDD, schema)
+ }
+
test("filter pushdown - boolean") {
withParquetDataFrame((true :: false :: Nil).map(b => Tuple1.apply(Option(b)))) { implicit df =>
checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
@@ -347,19 +368,11 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
(1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path)
val df = sqlContext.read.parquet(path).filter("a = 2")
- // This is the source RDD without Spark-side filtering.
- val childRDD =
- df
- .queryExecution
- .executedPlan.asInstanceOf[org.apache.spark.sql.execution.Filter]
- .child
- .execute()
-
// The result should be single row.
// When a filter is pushed to Parquet, Parquet can apply it to every row.
// So, we can check the number of rows returned from the Parquet
// to make sure our filter pushdown work.
- assert(childRDD.count == 1)
+ assert(stripSparkFilter(df).count == 1)
}
}
}