diff options
author | hyukjinkwon <gurwls223@gmail.com> | 2015-08-20 08:13:25 +0800 |
---|---|---|
committer | Cheng Lian <lian@databricks.com> | 2015-08-20 08:13:25 +0800 |
commit | ba5f7e1842f2c5852b5309910c0d39926643da69 (patch) | |
tree | b88884aee0f09fe55a67508a60a875e8f62c9d3c | |
parent | 8e0a072f78b4902d5f7ccc6b15232ed202a117f9 (diff) | |
download | spark-ba5f7e1842f2c5852b5309910c0d39926643da69.tar.gz spark-ba5f7e1842f2c5852b5309910c0d39926643da69.tar.bz2 spark-ba5f7e1842f2c5852b5309910c0d39926643da69.zip |
[SPARK-10035] [SQL] Parquet filters does not process EqualNullSafe filter.
As I talked with Lian,
1. I added EquelNullSafe to ParquetFilters
- It uses the same equality comparison filter with EqualTo since the Parquet filter performs actually null-safe equality comparison.
2. Updated the test code (ParquetFilterSuite)
- Convert catalyst.Expression to sources.Filter
- Removed Cast since only Literal is picked up as a proper Filter in DataSourceStrategy
- Added EquelNullSafe comparison
3. Removed deprecated createFilter for catalyst.Expression
Author: hyukjinkwon <gurwls223@gmail.com>
Author: 권혁진 <gurwls223@gmail.com>
Closes #8275 from HyukjinKwon/master.
2 files changed, 37 insertions, 139 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 63915e0a28..c74c838863 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -22,8 +22,6 @@ import java.nio.ByteBuffer import com.google.common.io.BaseEncoding import org.apache.hadoop.conf.Configuration -import org.apache.parquet.filter2.compat.FilterCompat -import org.apache.parquet.filter2.compat.FilterCompat._ import org.apache.parquet.filter2.predicate.FilterApi._ import org.apache.parquet.filter2.predicate._ import org.apache.parquet.io.api.Binary @@ -39,12 +37,6 @@ import org.apache.spark.unsafe.types.UTF8String private[sql] object ParquetFilters { val PARQUET_FILTER_DATA = "org.apache.spark.sql.parquet.row.filter" - def createRecordFilter(filterExpressions: Seq[Expression]): Option[Filter] = { - filterExpressions.flatMap { filter => - createFilter(filter) - }.reduceOption(FilterApi.and).map(FilterCompat.get) - } - case class SetInFilter[T <: Comparable[T]]( valueSet: Set[T]) extends UserDefinedPredicate[T] with Serializable { @@ -205,6 +197,16 @@ private[sql] object ParquetFilters { // For any comparison operator `cmp`, both `a cmp NULL` and `NULL cmp a` evaluate to `NULL`, // which can be casted to `false` implicitly. Please refer to the `eval` method of these // operators and the `SimplifyFilters` rule for details. + + // Hyukjin: + // I added [[EqualNullSafe]] with [[org.apache.parquet.filter2.predicate.Operators.Eq]]. + // So, it performs equality comparison identically when given [[sources.Filter]] is [[EqualTo]]. + // The reason why I did this is, that the actual Parquet filter checks null-safe equality + // comparison. + // So I added this and maybe [[EqualTo]] should be changed. It still seems fine though, because + // physical planning does not set `NULL` to [[EqualTo]] but changes it to [[IsNull]] and etc. + // Probably I missed something and obviously this should be changed. + predicate match { case sources.IsNull(name) => makeEq.lift(dataTypeOf(name)).map(_(name, null)) @@ -216,6 +218,11 @@ private[sql] object ParquetFilters { case sources.Not(sources.EqualTo(name, value)) => makeNotEq.lift(dataTypeOf(name)).map(_(name, value)) + case sources.EqualNullSafe(name, value) => + makeEq.lift(dataTypeOf(name)).map(_(name, value)) + case sources.Not(sources.EqualNullSafe(name, value)) => + makeNotEq.lift(dataTypeOf(name)).map(_(name, value)) + case sources.LessThan(name, value) => makeLt.lift(dataTypeOf(name)).map(_(name, value)) case sources.LessThanOrEqual(name, value) => @@ -274,96 +281,6 @@ private[sql] object ParquetFilters { } /** - * Converts Catalyst predicate expressions to Parquet filter predicates. - * - * @todo This can be removed once we get rid of the old Parquet support. - */ - def createFilter(predicate: Expression): Option[FilterPredicate] = { - // NOTE: - // - // For any comparison operator `cmp`, both `a cmp NULL` and `NULL cmp a` evaluate to `NULL`, - // which can be casted to `false` implicitly. Please refer to the `eval` method of these - // operators and the `SimplifyFilters` rule for details. - predicate match { - case IsNull(NamedExpression(name, dataType)) => - makeEq.lift(dataType).map(_(name, null)) - case IsNotNull(NamedExpression(name, dataType)) => - makeNotEq.lift(dataType).map(_(name, null)) - - case EqualTo(NamedExpression(name, _), NonNullLiteral(value, dataType)) => - makeEq.lift(dataType).map(_(name, value)) - case EqualTo(Cast(NamedExpression(name, _), dataType), NonNullLiteral(value, _)) => - makeEq.lift(dataType).map(_(name, value)) - case EqualTo(NonNullLiteral(value, dataType), NamedExpression(name, _)) => - makeEq.lift(dataType).map(_(name, value)) - case EqualTo(NonNullLiteral(value, _), Cast(NamedExpression(name, _), dataType)) => - makeEq.lift(dataType).map(_(name, value)) - - case Not(EqualTo(NamedExpression(name, _), NonNullLiteral(value, dataType))) => - makeNotEq.lift(dataType).map(_(name, value)) - case Not(EqualTo(Cast(NamedExpression(name, _), dataType), NonNullLiteral(value, _))) => - makeNotEq.lift(dataType).map(_(name, value)) - case Not(EqualTo(NonNullLiteral(value, dataType), NamedExpression(name, _))) => - makeNotEq.lift(dataType).map(_(name, value)) - case Not(EqualTo(NonNullLiteral(value, _), Cast(NamedExpression(name, _), dataType))) => - makeNotEq.lift(dataType).map(_(name, value)) - - case LessThan(NamedExpression(name, _), NonNullLiteral(value, dataType)) => - makeLt.lift(dataType).map(_(name, value)) - case LessThan(Cast(NamedExpression(name, _), dataType), NonNullLiteral(value, _)) => - makeLt.lift(dataType).map(_(name, value)) - case LessThan(NonNullLiteral(value, dataType), NamedExpression(name, _)) => - makeGt.lift(dataType).map(_(name, value)) - case LessThan(NonNullLiteral(value, _), Cast(NamedExpression(name, _), dataType)) => - makeGt.lift(dataType).map(_(name, value)) - - case LessThanOrEqual(NamedExpression(name, _), NonNullLiteral(value, dataType)) => - makeLtEq.lift(dataType).map(_(name, value)) - case LessThanOrEqual(Cast(NamedExpression(name, _), dataType), NonNullLiteral(value, _)) => - makeLtEq.lift(dataType).map(_(name, value)) - case LessThanOrEqual(NonNullLiteral(value, dataType), NamedExpression(name, _)) => - makeGtEq.lift(dataType).map(_(name, value)) - case LessThanOrEqual(NonNullLiteral(value, _), Cast(NamedExpression(name, _), dataType)) => - makeGtEq.lift(dataType).map(_(name, value)) - - case GreaterThan(NamedExpression(name, _), NonNullLiteral(value, dataType)) => - makeGt.lift(dataType).map(_(name, value)) - case GreaterThan(Cast(NamedExpression(name, _), dataType), NonNullLiteral(value, _)) => - makeGt.lift(dataType).map(_(name, value)) - case GreaterThan(NonNullLiteral(value, dataType), NamedExpression(name, _)) => - makeLt.lift(dataType).map(_(name, value)) - case GreaterThan(NonNullLiteral(value, _), Cast(NamedExpression(name, _), dataType)) => - makeLt.lift(dataType).map(_(name, value)) - - case GreaterThanOrEqual(NamedExpression(name, _), NonNullLiteral(value, dataType)) => - makeGtEq.lift(dataType).map(_(name, value)) - case GreaterThanOrEqual(Cast(NamedExpression(name, _), dataType), NonNullLiteral(value, _)) => - makeGtEq.lift(dataType).map(_(name, value)) - case GreaterThanOrEqual(NonNullLiteral(value, dataType), NamedExpression(name, _)) => - makeLtEq.lift(dataType).map(_(name, value)) - case GreaterThanOrEqual(NonNullLiteral(value, _), Cast(NamedExpression(name, _), dataType)) => - makeLtEq.lift(dataType).map(_(name, value)) - - case And(lhs, rhs) => - (createFilter(lhs) ++ createFilter(rhs)).reduceOption(FilterApi.and) - - case Or(lhs, rhs) => - for { - lhsFilter <- createFilter(lhs) - rhsFilter <- createFilter(rhs) - } yield FilterApi.or(lhsFilter, rhsFilter) - - case Not(pred) => - createFilter(pred).map(FilterApi.not) - - case InSet(NamedExpression(name, dataType), valueSet) => - makeInSet.lift(dataType).map(_(name, valueSet)) - - case _ => None - } - } - - /** * Note: Inside the Hadoop API we only have access to `Configuration`, not to * [[org.apache.spark.SparkContext]], so we cannot use broadcasts to convey * the actual filter predicate. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 5b4e568bb9..f067112cfc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -24,9 +24,8 @@ import org.apache.spark.sql.{Column, DataFrame, QueryTest, Row, SQLConf} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation -import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, LogicalRelation} import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.sql.types._ /** * A test suite that tests Parquet filter2 API based filter pushdown optimization. @@ -55,20 +54,22 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex .select(output.map(e => Column(e)): _*) .where(Column(predicate)) - val maybeAnalyzedPredicate = query.queryExecution.optimizedPlan.collect { + val analyzedPredicate = query.queryExecution.optimizedPlan.collect { case PhysicalOperation(_, filters, LogicalRelation(_: ParquetRelation)) => filters - }.flatten.reduceOption(_ && _) + }.flatten + assert(analyzedPredicate.nonEmpty) - assert(maybeAnalyzedPredicate.isDefined) - maybeAnalyzedPredicate.foreach { pred => - val maybeFilter = ParquetFilters.createFilter(pred) + val selectedFilters = DataSourceStrategy.selectFilters(analyzedPredicate) + assert(selectedFilters.nonEmpty) + + selectedFilters.foreach { pred => + val maybeFilter = ParquetFilters.createFilter(df.schema, pred) assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $pred") maybeFilter.foreach { f => // Doesn't bother checking type parameters here (e.g. `Eq[Integer]`) assert(f.getClass === filterClass) } } - checker(query, expected) } } @@ -109,43 +110,18 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], Seq(Row(true), Row(false))) checkFilterPredicate('_1 === true, classOf[Eq[_]], true) + checkFilterPredicate('_1 <=> true, classOf[Eq[_]], true) checkFilterPredicate('_1 !== true, classOf[NotEq[_]], false) } } - test("filter pushdown - short") { - withParquetDataFrame((1 to 4).map(i => Tuple1(Option(i.toShort)))) { implicit df => - checkFilterPredicate(Cast('_1, IntegerType) === 1, classOf[Eq[_]], 1) - checkFilterPredicate( - Cast('_1, IntegerType) !== 1, classOf[NotEq[_]], (2 to 4).map(Row.apply(_))) - - checkFilterPredicate(Cast('_1, IntegerType) < 2, classOf[Lt[_]], 1) - checkFilterPredicate(Cast('_1, IntegerType) > 3, classOf[Gt[_]], 4) - checkFilterPredicate(Cast('_1, IntegerType) <= 1, classOf[LtEq[_]], 1) - checkFilterPredicate(Cast('_1, IntegerType) >= 4, classOf[GtEq[_]], 4) - - checkFilterPredicate(Literal(1) === Cast('_1, IntegerType), classOf[Eq[_]], 1) - checkFilterPredicate(Literal(2) > Cast('_1, IntegerType), classOf[Lt[_]], 1) - checkFilterPredicate(Literal(3) < Cast('_1, IntegerType), classOf[Gt[_]], 4) - checkFilterPredicate(Literal(1) >= Cast('_1, IntegerType), classOf[LtEq[_]], 1) - checkFilterPredicate(Literal(4) <= Cast('_1, IntegerType), classOf[GtEq[_]], 4) - - checkFilterPredicate(!(Cast('_1, IntegerType) < 4), classOf[GtEq[_]], 4) - checkFilterPredicate( - Cast('_1, IntegerType) > 2 && Cast('_1, IntegerType) < 4, classOf[Operators.And], 3) - checkFilterPredicate( - Cast('_1, IntegerType) < 2 || Cast('_1, IntegerType) > 3, - classOf[Operators.Or], - Seq(Row(1), Row(4))) - } - } - test("filter pushdown - integer") { withParquetDataFrame((1 to 4).map(i => Tuple1(Option(i)))) { implicit df => checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row]) checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(Row.apply(_))) checkFilterPredicate('_1 === 1, classOf[Eq[_]], 1) + checkFilterPredicate('_1 <=> 1, classOf[Eq[_]], 1) checkFilterPredicate('_1 !== 1, classOf[NotEq[_]], (2 to 4).map(Row.apply(_))) checkFilterPredicate('_1 < 2, classOf[Lt[_]], 1) @@ -154,13 +130,13 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex checkFilterPredicate('_1 >= 4, classOf[GtEq[_]], 4) checkFilterPredicate(Literal(1) === '_1, classOf[Eq[_]], 1) + checkFilterPredicate(Literal(1) <=> '_1, classOf[Eq[_]], 1) checkFilterPredicate(Literal(2) > '_1, classOf[Lt[_]], 1) checkFilterPredicate(Literal(3) < '_1, classOf[Gt[_]], 4) checkFilterPredicate(Literal(1) >= '_1, classOf[LtEq[_]], 1) checkFilterPredicate(Literal(4) <= '_1, classOf[GtEq[_]], 4) checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4) - checkFilterPredicate('_1 > 2 && '_1 < 4, classOf[Operators.And], 3) checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or], Seq(Row(1), Row(4))) } } @@ -171,6 +147,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(Row.apply(_))) checkFilterPredicate('_1 === 1, classOf[Eq[_]], 1) + checkFilterPredicate('_1 <=> 1, classOf[Eq[_]], 1) checkFilterPredicate('_1 !== 1, classOf[NotEq[_]], (2 to 4).map(Row.apply(_))) checkFilterPredicate('_1 < 2, classOf[Lt[_]], 1) @@ -179,13 +156,13 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex checkFilterPredicate('_1 >= 4, classOf[GtEq[_]], 4) checkFilterPredicate(Literal(1) === '_1, classOf[Eq[_]], 1) + checkFilterPredicate(Literal(1) <=> '_1, classOf[Eq[_]], 1) checkFilterPredicate(Literal(2) > '_1, classOf[Lt[_]], 1) checkFilterPredicate(Literal(3) < '_1, classOf[Gt[_]], 4) checkFilterPredicate(Literal(1) >= '_1, classOf[LtEq[_]], 1) checkFilterPredicate(Literal(4) <= '_1, classOf[GtEq[_]], 4) checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4) - checkFilterPredicate('_1 > 2 && '_1 < 4, classOf[Operators.And], 3) checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or], Seq(Row(1), Row(4))) } } @@ -196,6 +173,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(Row.apply(_))) checkFilterPredicate('_1 === 1, classOf[Eq[_]], 1) + checkFilterPredicate('_1 <=> 1, classOf[Eq[_]], 1) checkFilterPredicate('_1 !== 1, classOf[NotEq[_]], (2 to 4).map(Row.apply(_))) checkFilterPredicate('_1 < 2, classOf[Lt[_]], 1) @@ -204,13 +182,13 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex checkFilterPredicate('_1 >= 4, classOf[GtEq[_]], 4) checkFilterPredicate(Literal(1) === '_1, classOf[Eq[_]], 1) + checkFilterPredicate(Literal(1) <=> '_1, classOf[Eq[_]], 1) checkFilterPredicate(Literal(2) > '_1, classOf[Lt[_]], 1) checkFilterPredicate(Literal(3) < '_1, classOf[Gt[_]], 4) checkFilterPredicate(Literal(1) >= '_1, classOf[LtEq[_]], 1) checkFilterPredicate(Literal(4) <= '_1, classOf[GtEq[_]], 4) checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4) - checkFilterPredicate('_1 > 2 && '_1 < 4, classOf[Operators.And], 3) checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or], Seq(Row(1), Row(4))) } } @@ -221,6 +199,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(Row.apply(_))) checkFilterPredicate('_1 === 1, classOf[Eq[_]], 1) + checkFilterPredicate('_1 <=> 1, classOf[Eq[_]], 1) checkFilterPredicate('_1 !== 1, classOf[NotEq[_]], (2 to 4).map(Row.apply(_))) checkFilterPredicate('_1 < 2, classOf[Lt[_]], 1) @@ -229,13 +208,13 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex checkFilterPredicate('_1 >= 4, classOf[GtEq[_]], 4) checkFilterPredicate(Literal(1) === '_1, classOf[Eq[_]], 1) + checkFilterPredicate(Literal(1) <=> '_1, classOf[Eq[_]], 1) checkFilterPredicate(Literal(2) > '_1, classOf[Lt[_]], 1) checkFilterPredicate(Literal(3) < '_1, classOf[Gt[_]], 4) checkFilterPredicate(Literal(1) >= '_1, classOf[LtEq[_]], 1) checkFilterPredicate(Literal(4) <= '_1, classOf[GtEq[_]], 4) checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4) - checkFilterPredicate('_1 > 2 && '_1 < 4, classOf[Operators.And], 3) checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or], Seq(Row(1), Row(4))) } } @@ -247,6 +226,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex '_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(i => Row.apply(i.toString))) checkFilterPredicate('_1 === "1", classOf[Eq[_]], "1") + checkFilterPredicate('_1 <=> "1", classOf[Eq[_]], "1") checkFilterPredicate( '_1 !== "1", classOf[NotEq[_]], (2 to 4).map(i => Row.apply(i.toString))) @@ -256,13 +236,13 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex checkFilterPredicate('_1 >= "4", classOf[GtEq[_]], "4") checkFilterPredicate(Literal("1") === '_1, classOf[Eq[_]], "1") + checkFilterPredicate(Literal("1") <=> '_1, classOf[Eq[_]], "1") checkFilterPredicate(Literal("2") > '_1, classOf[Lt[_]], "1") checkFilterPredicate(Literal("3") < '_1, classOf[Gt[_]], "4") checkFilterPredicate(Literal("1") >= '_1, classOf[LtEq[_]], "1") checkFilterPredicate(Literal("4") <= '_1, classOf[GtEq[_]], "4") checkFilterPredicate(!('_1 < "4"), classOf[GtEq[_]], "4") - checkFilterPredicate('_1 > "2" && '_1 < "4", classOf[Operators.And], "3") checkFilterPredicate('_1 < "2" || '_1 > "3", classOf[Operators.Or], Seq(Row("1"), Row("4"))) } } @@ -274,6 +254,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex withParquetDataFrame((1 to 4).map(i => Tuple1(i.b))) { implicit df => checkBinaryFilterPredicate('_1 === 1.b, classOf[Eq[_]], 1.b) + checkBinaryFilterPredicate('_1 <=> 1.b, classOf[Eq[_]], 1.b) checkBinaryFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row]) checkBinaryFilterPredicate( @@ -288,13 +269,13 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex checkBinaryFilterPredicate('_1 >= 4.b, classOf[GtEq[_]], 4.b) checkBinaryFilterPredicate(Literal(1.b) === '_1, classOf[Eq[_]], 1.b) + checkBinaryFilterPredicate(Literal(1.b) <=> '_1, classOf[Eq[_]], 1.b) checkBinaryFilterPredicate(Literal(2.b) > '_1, classOf[Lt[_]], 1.b) checkBinaryFilterPredicate(Literal(3.b) < '_1, classOf[Gt[_]], 4.b) checkBinaryFilterPredicate(Literal(1.b) >= '_1, classOf[LtEq[_]], 1.b) checkBinaryFilterPredicate(Literal(4.b) <= '_1, classOf[GtEq[_]], 4.b) checkBinaryFilterPredicate(!('_1 < 4.b), classOf[GtEq[_]], 4.b) - checkBinaryFilterPredicate('_1 > 2.b && '_1 < 4.b, classOf[Operators.And], 3.b) checkBinaryFilterPredicate( '_1 < 2.b || '_1 > 3.b, classOf[Operators.Or], Seq(Row(1.b), Row(4.b))) } |