aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2015-12-18 10:52:14 -0800
committerYin Huai <yhuai@databricks.com>2015-12-18 10:53:13 -0800
commit41ee7c57abd9f52065fd7ffb71a8af229603371d (patch)
treeedb0c957004d8f115eac523333ee45159c282611 /sql
parent4af647c77ded6a0d3087ceafb2e30e01d97e7a06 (diff)
downloadspark-41ee7c57abd9f52065fd7ffb71a8af229603371d.tar.gz
spark-41ee7c57abd9f52065fd7ffb71a8af229603371d.tar.bz2
spark-41ee7c57abd9f52065fd7ffb71a8af229603371d.zip
[SPARK-12218][SQL] Invalid splitting of nested AND expressions in Data Source filter API
JIRA: https://issues.apache.org/jira/browse/SPARK-12218 When creating filters for Parquet/ORC, we should not push nested AND expressions partially. Author: Yin Huai <yhuai@databricks.com> Closes #10362 from yhuai/SPARK-12218.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala12
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala19
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala22
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala20
4 files changed, 60 insertions, 13 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 0771432937..883013bf1b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -257,7 +257,17 @@ private[sql] object ParquetFilters {
makeGtEq.lift(dataTypeOf(name)).map(_(name, value))
case sources.And(lhs, rhs) =>
- (createFilter(schema, lhs) ++ createFilter(schema, rhs)).reduceOption(FilterApi.and)
+ // At here, it is not safe to just convert one side if we do not understand the
+ // other side. Here is an example used to explain the reason.
+ // Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to
+ // convert b in ('1'). If we only convert a = 2, we will end up with a filter
+ // NOT(a = 2), which will generate wrong results.
+ // Pushing one side of AND down is only safe to do at the top level.
+ // You can see ParquetRelation's initializeLocalJobFunc method as an example.
+ for {
+ lhsFilter <- createFilter(schema, lhs)
+ rhsFilter <- createFilter(schema, rhs)
+ } yield FilterApi.and(lhsFilter, rhsFilter)
case sources.Or(lhs, rhs) =>
for {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 6178e37d2a..045425f282 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -362,4 +362,23 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
}
}
}
+
+ test("SPARK-12218: 'Not' is included in Parquet filter pushdown") {
+ import testImplicits._
+
+ withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
+ withTempPath { dir =>
+ val path = s"${dir.getCanonicalPath}/table1"
+ (1 to 5).map(i => (i, (i % 2).toString)).toDF("a", "b").write.parquet(path)
+
+ checkAnswer(
+ sqlContext.read.parquet(path).where("not (a = 2) or not(b in ('1'))"),
+ (1 to 5).map(i => Row(i, (i % 2).toString)))
+
+ checkAnswer(
+ sqlContext.read.parquet(path).where("not (a = 2 and b in ('1'))"),
+ (1 to 5).map(i => Row(i, (i % 2).toString)))
+ }
+ }
+ }
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala
index 27193f54d3..ebfb1759b8 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala
@@ -74,22 +74,20 @@ private[orc] object OrcFilters extends Logging {
expression match {
case And(left, right) =>
- val tryLeft = buildSearchArgument(left, newBuilder)
- val tryRight = buildSearchArgument(right, newBuilder)
-
- val conjunction = for {
- _ <- tryLeft
- _ <- tryRight
+ // At here, it is not safe to just convert one side if we do not understand the
+ // other side. Here is an example used to explain the reason.
+ // Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to
+ // convert b in ('1'). If we only convert a = 2, we will end up with a filter
+ // NOT(a = 2), which will generate wrong results.
+ // Pushing one side of AND down is only safe to do at the top level.
+ // You can see ParquetRelation's initializeLocalJobFunc method as an example.
+ for {
+ _ <- buildSearchArgument(left, newBuilder)
+ _ <- buildSearchArgument(right, newBuilder)
lhs <- buildSearchArgument(left, builder.startAnd())
rhs <- buildSearchArgument(right, lhs)
} yield rhs.end()
- // For filter `left AND right`, we can still push down `left` even if `right` is not
- // convertible, and vice versa.
- conjunction
- .orElse(tryLeft.flatMap(_ => buildSearchArgument(left, builder)))
- .orElse(tryRight.flatMap(_ => buildSearchArgument(right, builder)))
-
case Or(left, right) =>
for {
_ <- buildSearchArgument(left, newBuilder)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
index 92043d66c9..e8a61123d1 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.orc
import org.apache.hadoop.fs.Path
import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.sql.{Row, SQLConf}
import org.apache.spark.sql.sources.HadoopFsRelationTest
import org.apache.spark.sql.types._
@@ -60,4 +61,23 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest {
"dataSchema" -> dataSchemaWithPartition.json)).format(dataSourceName).load())
}
}
+
+ test("SPARK-12218: 'Not' is included in ORC filter pushdown") {
+ import testImplicits._
+
+ withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") {
+ withTempPath { dir =>
+ val path = s"${dir.getCanonicalPath}/table1"
+ (1 to 5).map(i => (i, (i % 2).toString)).toDF("a", "b").write.orc(path)
+
+ checkAnswer(
+ sqlContext.read.orc(path).where("not (a = 2) or not(b in ('1'))"),
+ (1 to 5).map(i => Row(i, (i % 2).toString)))
+
+ checkAnswer(
+ sqlContext.read.orc(path).where("not (a = 2 and b in ('1'))"),
+ (1 to 5).map(i => Row(i, (i % 2).toString)))
+ }
+ }
+ }
}