aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLiang-Chi Hsieh <viirya@viirya.org>2015-12-23 14:08:29 +0800
committerCheng Lian <lian@databricks.com>2015-12-23 14:08:29 +0800
commit50301c0a28b64c5348b0f2c2d828589c0833c70c (patch)
treec54b893c27a082682045f235a88581e381c9aa26
parent86761e10e145b6867cbe86b1e924ec237ba408af (diff)
downloadspark-50301c0a28b64c5348b0f2c2d828589c0833c70c.tar.gz
spark-50301c0a28b64c5348b0f2c2d828589c0833c70c.tar.bz2
spark-50301c0a28b64c5348b0f2c2d828589c0833c70c.zip
[SPARK-11164][SQL] Add InSet pushdown filter back for Parquet
When the filter is ```"b in ('1', '2')"```, the filter is not pushed down to Parquet. Thanks! Author: gatorsmile <gatorsmile@gmail.com> Author: xiaoli <lixiao1983@gmail.com> Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local> Closes #10278 from gatorsmile/parquetFilterNot.
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala20
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala30
3 files changed, 45 insertions, 8 deletions
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
index cde346e99e..a0c71d83d7 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
@@ -86,23 +86,27 @@ class BooleanSimplificationSuite extends PlanTest with PredicateHelper {
checkCondition(
('a === 'b || 'b > 3) && ('a === 'b || 'a > 3) && ('a === 'b || 'a < 5),
- ('a === 'b || 'b > 3 && 'a > 3 && 'a < 5))
+ 'a === 'b || 'b > 3 && 'a > 3 && 'a < 5)
}
test("a && (!a || b)") {
- checkCondition(('a && (!('a) || 'b )), ('a && 'b))
+ checkCondition('a && (!'a || 'b ), 'a && 'b)
- checkCondition(('a && ('b || !('a) )), ('a && 'b))
+ checkCondition('a && ('b || !'a ), 'a && 'b)
- checkCondition(((!('a) || 'b ) && 'a), ('b && 'a))
+ checkCondition((!'a || 'b ) && 'a, 'b && 'a)
- checkCondition((('b || !('a) ) && 'a), ('b && 'a))
+ checkCondition(('b || !'a ) && 'a, 'b && 'a)
}
- test("!(a && b) , !(a || b)") {
- checkCondition((!('a && 'b)), (!('a) || !('b)))
+ test("DeMorgan's law") {
+ checkCondition(!('a && 'b), !'a || !'b)
- checkCondition(!('a || 'b), (!('a) && !('b)))
+ checkCondition(!('a || 'b), !'a && !'b)
+
+ checkCondition(!(('a && 'b) || ('c && 'd)), (!'a || !'b) && (!'c || !'d))
+
+ checkCondition(!(('a || 'b) && ('c || 'd)), (!'a && !'b) || (!'c && !'d))
}
private val caseInsensitiveAnalyzer =
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 883013bf1b..ac9b65b66d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -256,6 +256,9 @@ private[sql] object ParquetFilters {
case sources.GreaterThanOrEqual(name, value) =>
makeGtEq.lift(dataTypeOf(name)).map(_(name, value))
+ case sources.In(name, valueSet) =>
+ makeInSet.lift(dataTypeOf(name)).map(_(name, valueSet.toSet))
+
case sources.And(lhs, rhs) =>
// At here, it is not safe to just convert one side if we do not understand the
// other side. Here is an example used to explain the reason.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 045425f282..9197b8b563 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -381,4 +381,34 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
}
}
}
+
+ test("SPARK-11164: test the parquet filter in") {
+ import testImplicits._
+ withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
+ withSQLConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> "false") {
+ withTempPath { dir =>
+ val path = s"${dir.getCanonicalPath}/table1"
+ (1 to 5).map(i => (i.toFloat, i%3)).toDF("a", "b").write.parquet(path)
+
+ // When a filter is pushed to Parquet, Parquet can apply it to every row.
+ // So, we can check the number of rows returned from the Parquet
+ // to make sure our filter pushdown work.
+ val df = sqlContext.read.parquet(path).where("b in (0,2)")
+ assert(stripSparkFilter(df).count == 3)
+
+ val df1 = sqlContext.read.parquet(path).where("not (b in (1))")
+ assert(stripSparkFilter(df1).count == 3)
+
+ val df2 = sqlContext.read.parquet(path).where("not (b in (1,3) or a <= 2)")
+ assert(stripSparkFilter(df2).count == 2)
+
+ val df3 = sqlContext.read.parquet(path).where("not (b in (1,3) and a <= 2)")
+ assert(stripSparkFilter(df3).count == 4)
+
+ val df4 = sqlContext.read.parquet(path).where("not (a <= 2)")
+ assert(stripSparkFilter(df4).count == 3)
+ }
+ }
+ }
+ }
}