diff options
author | Liang-Chi Hsieh <viirya@viirya.org> | 2015-12-23 14:08:29 +0800 |
---|---|---|
committer | Cheng Lian <lian@databricks.com> | 2015-12-23 14:08:29 +0800 |
commit | 50301c0a28b64c5348b0f2c2d828589c0833c70c (patch) | |
tree | c54b893c27a082682045f235a88581e381c9aa26 /sql | |
parent | 86761e10e145b6867cbe86b1e924ec237ba408af (diff) | |
download | spark-50301c0a28b64c5348b0f2c2d828589c0833c70c.tar.gz spark-50301c0a28b64c5348b0f2c2d828589c0833c70c.tar.bz2 spark-50301c0a28b64c5348b0f2c2d828589c0833c70c.zip |
[SPARK-11164][SQL] Add InSet pushdown filter back for Parquet
When the filter is ```"b in ('1', '2')"```, the filter is not pushed down to Parquet. Thanks!
Author: gatorsmile <gatorsmile@gmail.com>
Author: xiaoli <lixiao1983@gmail.com>
Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local>
Closes #10278 from gatorsmile/parquetFilterNot.
Diffstat (limited to 'sql')
3 files changed, 45 insertions, 8 deletions
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala index cde346e99e..a0c71d83d7 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala @@ -86,23 +86,27 @@ class BooleanSimplificationSuite extends PlanTest with PredicateHelper { checkCondition( ('a === 'b || 'b > 3) && ('a === 'b || 'a > 3) && ('a === 'b || 'a < 5), - ('a === 'b || 'b > 3 && 'a > 3 && 'a < 5)) + 'a === 'b || 'b > 3 && 'a > 3 && 'a < 5) } test("a && (!a || b)") { - checkCondition(('a && (!('a) || 'b )), ('a && 'b)) + checkCondition('a && (!'a || 'b ), 'a && 'b) - checkCondition(('a && ('b || !('a) )), ('a && 'b)) + checkCondition('a && ('b || !'a ), 'a && 'b) - checkCondition(((!('a) || 'b ) && 'a), ('b && 'a)) + checkCondition((!'a || 'b ) && 'a, 'b && 'a) - checkCondition((('b || !('a) ) && 'a), ('b && 'a)) + checkCondition(('b || !'a ) && 'a, 'b && 'a) } - test("!(a && b) , !(a || b)") { - checkCondition((!('a && 'b)), (!('a) || !('b))) + test("DeMorgan's law") { + checkCondition(!('a && 'b), !'a || !'b) - checkCondition(!('a || 'b), (!('a) && !('b))) + checkCondition(!('a || 'b), !'a && !'b) + + checkCondition(!(('a && 'b) || ('c && 'd)), (!'a || !'b) && (!'c || !'d)) + + checkCondition(!(('a || 'b) && ('c || 'd)), (!'a && !'b) || (!'c && !'d)) } private val caseInsensitiveAnalyzer = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 883013bf1b..ac9b65b66d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -256,6 +256,9 @@ private[sql] object ParquetFilters { case sources.GreaterThanOrEqual(name, value) => makeGtEq.lift(dataTypeOf(name)).map(_(name, value)) + case sources.In(name, valueSet) => + makeInSet.lift(dataTypeOf(name)).map(_(name, valueSet.toSet)) + case sources.And(lhs, rhs) => // At here, it is not safe to just convert one side if we do not understand the // other side. Here is an example used to explain the reason. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 045425f282..9197b8b563 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -381,4 +381,34 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex } } } + + test("SPARK-11164: test the parquet filter in") { + import testImplicits._ + withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") { + withSQLConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> "false") { + withTempPath { dir => + val path = s"${dir.getCanonicalPath}/table1" + (1 to 5).map(i => (i.toFloat, i%3)).toDF("a", "b").write.parquet(path) + + // When a filter is pushed to Parquet, Parquet can apply it to every row. + // So, we can check the number of rows returned from the Parquet + // to make sure our filter pushdown work. + val df = sqlContext.read.parquet(path).where("b in (0,2)") + assert(stripSparkFilter(df).count == 3) + + val df1 = sqlContext.read.parquet(path).where("not (b in (1))") + assert(stripSparkFilter(df1).count == 3) + + val df2 = sqlContext.read.parquet(path).where("not (b in (1,3) or a <= 2)") + assert(stripSparkFilter(df2).count == 2) + + val df3 = sqlContext.read.parquet(path).where("not (b in (1,3) and a <= 2)") + assert(stripSparkFilter(df3).count == 4) + + val df4 = sqlContext.read.parquet(path).where("not (a <= 2)") + assert(stripSparkFilter(df4).count == 3) + } + } + } + } } |