aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src
diff options
context:
space:
mode:
authorLiang-Chi Hsieh <viirya@viirya.org>2015-12-23 14:08:29 +0800
committerCheng Lian <lian@databricks.com>2015-12-23 14:08:29 +0800
commit50301c0a28b64c5348b0f2c2d828589c0833c70c (patch)
treec54b893c27a082682045f235a88581e381c9aa26 /sql/core/src
parent86761e10e145b6867cbe86b1e924ec237ba408af (diff)
downloadspark-50301c0a28b64c5348b0f2c2d828589c0833c70c.tar.gz
spark-50301c0a28b64c5348b0f2c2d828589c0833c70c.tar.bz2
spark-50301c0a28b64c5348b0f2c2d828589c0833c70c.zip
[SPARK-11164][SQL] Add InSet pushdown filter back for Parquet
When the filter is ```"b in ('1', '2')"```, the filter is not pushed down to Parquet. Thanks! Author: gatorsmile <gatorsmile@gmail.com> Author: xiaoli <lixiao1983@gmail.com> Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local> Closes #10278 from gatorsmile/parquetFilterNot.
Diffstat (limited to 'sql/core/src')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala30
2 files changed, 33 insertions, 0 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 883013bf1b..ac9b65b66d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -256,6 +256,9 @@ private[sql] object ParquetFilters {
case sources.GreaterThanOrEqual(name, value) =>
makeGtEq.lift(dataTypeOf(name)).map(_(name, value))
+ case sources.In(name, valueSet) =>
+ makeInSet.lift(dataTypeOf(name)).map(_(name, valueSet.toSet))
+
case sources.And(lhs, rhs) =>
// At here, it is not safe to just convert one side if we do not understand the
// other side. Here is an example used to explain the reason.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 045425f282..9197b8b563 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -381,4 +381,34 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
}
}
}
+
+ test("SPARK-11164: test the parquet filter in") {
+ import testImplicits._
+ withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
+ withSQLConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> "false") {
+ withTempPath { dir =>
+ val path = s"${dir.getCanonicalPath}/table1"
+ (1 to 5).map(i => (i.toFloat, i%3)).toDF("a", "b").write.parquet(path)
+
+ // When a filter is pushed to Parquet, Parquet can apply it to every row.
+ // So, we can check the number of rows returned from the Parquet
+ // to make sure our filter pushdown work.
+ val df = sqlContext.read.parquet(path).where("b in (0,2)")
+ assert(stripSparkFilter(df).count == 3)
+
+ val df1 = sqlContext.read.parquet(path).where("not (b in (1))")
+ assert(stripSparkFilter(df1).count == 3)
+
+ val df2 = sqlContext.read.parquet(path).where("not (b in (1,3) or a <= 2)")
+ assert(stripSparkFilter(df2).count == 2)
+
+ val df3 = sqlContext.read.parquet(path).where("not (b in (1,3) and a <= 2)")
+ assert(stripSparkFilter(df3).count == 4)
+
+ val df4 = sqlContext.read.parquet(path).where("not (a <= 2)")
+ assert(stripSparkFilter(df4).count == 3)
+ }
+ }
+ }
+ }
}