aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2014-11-18 17:40:24 -0800
committerMichael Armbrust <michael@databricks.com>2014-11-18 17:40:24 -0800
commitf9739b9c886b1c207753ebf7067c09a60eff1695 (patch)
treebc3bb00fcabfb26d863e86b42b3e0c7a0af733d8
parentae9b1f69061401cf47d5a2e3dec79b18a7ef6bad (diff)
downloadspark-f9739b9c886b1c207753ebf7067c09a60eff1695.tar.gz
spark-f9739b9c886b1c207753ebf7067c09a60eff1695.tar.bz2
spark-f9739b9c886b1c207753ebf7067c09a60eff1695.zip
[SPARK-4468][SQL] Backports #3334 to branch-1.1
<!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/3338) <!-- Reviewable:end --> Author: Cheng Lian <lian@databricks.com> Closes #3338 from liancheng/spark-3334-for-1.1 and squashes the following commits: bd17512 [Cheng Lian] Backports #3334 to branch-1.1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala13
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala107
2 files changed, 75 insertions, 45 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
index 7c83f1cad7..0365c34c80 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
@@ -213,22 +213,27 @@ private[sql] object ParquetFilters {
Some(createEqualityFilter(right.name, left, p))
case p @ EqualTo(left: NamedExpression, right: Literal) if !left.nullable =>
Some(createEqualityFilter(left.name, right, p))
+
case p @ LessThan(left: Literal, right: NamedExpression) if !right.nullable =>
- Some(createLessThanFilter(right.name, left, p))
+ Some(createGreaterThanFilter(right.name, left, p))
case p @ LessThan(left: NamedExpression, right: Literal) if !left.nullable =>
Some(createLessThanFilter(left.name, right, p))
+
case p @ LessThanOrEqual(left: Literal, right: NamedExpression) if !right.nullable =>
- Some(createLessThanOrEqualFilter(right.name, left, p))
+ Some(createGreaterThanOrEqualFilter(right.name, left, p))
case p @ LessThanOrEqual(left: NamedExpression, right: Literal) if !left.nullable =>
Some(createLessThanOrEqualFilter(left.name, right, p))
+
case p @ GreaterThan(left: Literal, right: NamedExpression) if !right.nullable =>
- Some(createGreaterThanFilter(right.name, left, p))
+ Some(createLessThanFilter(right.name, left, p))
case p @ GreaterThan(left: NamedExpression, right: Literal) if !left.nullable =>
Some(createGreaterThanFilter(left.name, right, p))
+
case p @ GreaterThanOrEqual(left: Literal, right: NamedExpression) if !right.nullable =>
- Some(createGreaterThanOrEqualFilter(right.name, left, p))
+ Some(createLessThanOrEqualFilter(right.name, left, p))
case p @ GreaterThanOrEqual(left: NamedExpression, right: Literal) if !left.nullable =>
Some(createGreaterThanOrEqualFilter(left.name, right, p))
+
case _ => None
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index c6b790a4b6..10df1fac21 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -17,20 +17,19 @@
package org.apache.spark.sql.parquet
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.mapreduce.Job
import org.scalatest.{BeforeAndAfterAll, FunSuiteLike}
-
import parquet.hadoop.ParquetFileWriter
import parquet.hadoop.util.ContextUtil
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.mapreduce.Job
import org.apache.spark.SparkContext
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.{SqlLexical, SqlParser}
import org.apache.spark.sql.catalyst.analysis.{Star, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.types.{BooleanType, IntegerType}
import org.apache.spark.sql.catalyst.util.getTempFilePath
+import org.apache.spark.sql.catalyst.{SqlLexical, SqlParser}
import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.test.TestSQLContext._
import org.apache.spark.util.Utils
@@ -453,43 +452,46 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
}
test("create RecordFilter for simple predicates") {
- val attribute1 = new AttributeReference("first", IntegerType, false)()
- val predicate1 = new EqualTo(attribute1, new Literal(1, IntegerType))
- val filter1 = ParquetFilters.createFilter(predicate1)
- assert(filter1.isDefined)
- assert(filter1.get.predicate == predicate1, "predicates do not match")
- assert(filter1.get.isInstanceOf[ComparisonFilter])
- val cmpFilter1 = filter1.get.asInstanceOf[ComparisonFilter]
- assert(cmpFilter1.columnName == "first", "column name incorrect")
-
- val predicate2 = new LessThan(attribute1, new Literal(4, IntegerType))
- val filter2 = ParquetFilters.createFilter(predicate2)
- assert(filter2.isDefined)
- assert(filter2.get.predicate == predicate2, "predicates do not match")
- assert(filter2.get.isInstanceOf[ComparisonFilter])
- val cmpFilter2 = filter2.get.asInstanceOf[ComparisonFilter]
- assert(cmpFilter2.columnName == "first", "column name incorrect")
-
- val predicate3 = new And(predicate1, predicate2)
- val filter3 = ParquetFilters.createFilter(predicate3)
- assert(filter3.isDefined)
- assert(filter3.get.predicate == predicate3, "predicates do not match")
- assert(filter3.get.isInstanceOf[AndFilter])
-
- val predicate4 = new Or(predicate1, predicate2)
- val filter4 = ParquetFilters.createFilter(predicate4)
- assert(filter4.isDefined)
- assert(filter4.get.predicate == predicate4, "predicates do not match")
- assert(filter4.get.isInstanceOf[OrFilter])
-
- val attribute2 = new AttributeReference("second", IntegerType, false)()
- val predicate5 = new GreaterThan(attribute1, attribute2)
- val badfilter = ParquetFilters.createFilter(predicate5)
- assert(badfilter.isDefined === false)
-
- val predicate6 = And(GreaterThan(attribute1, attribute2), GreaterThan(attribute1, attribute2))
- val badfilter2 = ParquetFilters.createFilter(predicate6)
- assert(badfilter2.isDefined === false)
+ def checkFilter(predicate: Predicate): Option[CatalystFilter] = {
+ ParquetFilters.createFilter(predicate).map { f =>
+ assertResult(predicate)(f.predicate)
+ f
+ }.orElse {
+ fail(s"filter $predicate not pushed down")
+ }
+ }
+
+ def checkComparisonFilter(predicate: Predicate, columnName: String): Unit = {
+ assertResult(columnName, "column name incorrect") {
+ checkFilter(predicate).map(_.asInstanceOf[ComparisonFilter].columnName).get
+ }
+ }
+
+ def checkInvalidFilter(predicate: Predicate): Unit = {
+ assert(ParquetFilters.createFilter(predicate).isEmpty)
+ }
+
+ val a = 'a.int.notNull
+ val b = 'b.int.notNull
+
+ checkComparisonFilter(a === 1, "a")
+ checkComparisonFilter(Literal(1) === a, "a")
+
+ checkComparisonFilter(a < 4, "a")
+ checkComparisonFilter(a > 4, "a")
+ checkComparisonFilter(a <= 4, "a")
+ checkComparisonFilter(a >= 4, "a")
+
+ checkComparisonFilter(Literal(4) > a, "a")
+ checkComparisonFilter(Literal(4) < a, "a")
+ checkComparisonFilter(Literal(4) >= a, "a")
+ checkComparisonFilter(Literal(4) <= a, "a")
+
+ checkFilter(a === 1 && a < 4)
+ checkFilter(a === 1 || a < 4)
+
+ checkInvalidFilter(a > b)
+ checkInvalidFilter((a > b) && (a > b))
}
test("test filter by predicate pushdown") {
@@ -516,6 +518,29 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
assert(result2(49)(1) === 199)
}
}
+ for(myval <- Seq("myint", "mylong", "mydouble", "myfloat")) {
+ val query1 = sql(s"SELECT * FROM testfiltersource WHERE 150 > $myval AND 100 <= $myval")
+ assert(
+ query1.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan],
+ "Top operator should be ParquetTableScan after pushdown")
+ val result1 = query1.collect()
+ assert(result1.size === 50)
+ assert(result1(0)(1) === 100)
+ assert(result1(49)(1) === 149)
+ val query2 = sql(s"SELECT * FROM testfiltersource WHERE 150 < $myval AND 200 >= $myval")
+ assert(
+ query2.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan],
+ "Top operator should be ParquetTableScan after pushdown")
+ val result2 = query2.collect()
+ assert(result2.size === 50)
+ if (myval == "myint" || myval == "mylong") {
+ assert(result2(0)(1) === 151)
+ assert(result2(49)(1) === 200)
+ } else {
+ assert(result2(0)(1) === 150)
+ assert(result2(49)(1) === 199)
+ }
+ }
for(myval <- Seq("myint", "mylong")) {
val query3 = sql(s"SELECT * FROM testfiltersource WHERE $myval > 190 OR $myval < 10")
assert(