From 9fde1ff5fc114b5edb755ed40944607419b62184 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Wed, 13 Aug 2014 17:40:59 -0700 Subject: [SPARK-2935][SQL]Fix parquet predicate push down bug Author: Michael Armbrust Closes #1863 from marmbrus/parquetPredicates and squashes the following commits: 10ad202 [Michael Armbrust] left <=> right f249158 [Michael Armbrust] quiet parquet tests. 802da5b [Michael Armbrust] Add test case. eab2eda [Michael Armbrust] Fix parquet predicate push down bug --- .../src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala | 5 +++-- sql/core/src/test/resources/log4j.properties | 3 +++ .../test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala | 5 ++++- 3 files changed, 10 insertions(+), 3 deletions(-) (limited to 'sql') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala index cc575bedd8..2298a9b933 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala @@ -201,8 +201,9 @@ object ParquetFilters { (leftFilter, rightFilter) match { case (None, Some(filter)) => Some(filter) case (Some(filter), None) => Some(filter) - case (_, _) => - Some(new AndFilter(leftFilter.get, rightFilter.get)) + case (Some(leftF), Some(rightF)) => + Some(new AndFilter(leftF, rightF)) + case _ => None } } case p @ EqualTo(left: Literal, right: NamedExpression) if !right.nullable => diff --git a/sql/core/src/test/resources/log4j.properties b/sql/core/src/test/resources/log4j.properties index dffd15a618..c7e0ff1cf6 100644 --- a/sql/core/src/test/resources/log4j.properties +++ b/sql/core/src/test/resources/log4j.properties @@ -36,6 +36,9 @@ log4j.appender.FA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %p %c{1}: %m%n log4j.appender.FA.Threshold = INFO # Some packages are noisy for no good reason. +log4j.additivity.parquet.hadoop.ParquetRecordReader=false +log4j.logger.parquet.hadoop.ParquetRecordReader=OFF + log4j.additivity.org.apache.hadoop.hive.serde2.lazy.LazyStruct=false log4j.logger.org.apache.hadoop.hive.serde2.lazy.LazyStruct=OFF diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala index 9933575038..502f6702e3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala @@ -381,11 +381,14 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA val predicate5 = new GreaterThan(attribute1, attribute2) val badfilter = ParquetFilters.createFilter(predicate5) assert(badfilter.isDefined === false) + + val predicate6 = And(GreaterThan(attribute1, attribute2), GreaterThan(attribute1, attribute2)) + val badfilter2 = ParquetFilters.createFilter(predicate6) + assert(badfilter2.isDefined === false) } test("test filter by predicate pushdown") { for(myval <- Seq("myint", "mylong", "mydouble", "myfloat")) { - println(s"testing field $myval") val query1 = sql(s"SELECT * FROM testfiltersource WHERE $myval < 150 AND $myval >= 100") assert( query1.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan], -- cgit v1.2.3