aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2015-08-20 08:13:25 +0800
committerCheng Lian <lian@databricks.com>2015-08-28 16:18:02 +0800
commitf0c4470d43e833d2382a8b98bf4aa21ae9451d00 (patch)
treef2c9227872718b34036b9e6aed00777c7938c5d0
parent9b7f8f29373972f115a5d9068b6432b6757f8ac7 (diff)
downloadspark-f0c4470d43e833d2382a8b98bf4aa21ae9451d00.tar.gz
spark-f0c4470d43e833d2382a8b98bf4aa21ae9451d00.tar.bz2
spark-f0c4470d43e833d2382a8b98bf4aa21ae9451d00.zip
[SPARK-10035] [SQL] Parquet filters does not process EqualNullSafe filter.
As I talked with Lian, 1. I added EquelNullSafe to ParquetFilters - It uses the same equality comparison filter with EqualTo since the Parquet filter performs actually null-safe equality comparison. 2. Updated the test code (ParquetFilterSuite) - Convert catalyst.Expression to sources.Filter - Removed Cast since only Literal is picked up as a proper Filter in DataSourceStrategy - Added EquelNullSafe comparison 3. Removed deprecated createFilter for catalyst.Expression Author: hyukjinkwon <gurwls223@gmail.com> Author: 권혁진 <gurwls223@gmail.com> Closes #8275 from HyukjinKwon/master. (cherry picked from commit ba5f7e1842f2c5852b5309910c0d39926643da69) Signed-off-by: Cheng Lian <lian@databricks.com>
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala113
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala63
2 files changed, 37 insertions, 139 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 83eaf8e79f..c6b3fe7900 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -22,8 +22,6 @@ import java.nio.ByteBuffer
import com.google.common.io.BaseEncoding
import org.apache.hadoop.conf.Configuration
-import org.apache.parquet.filter2.compat.FilterCompat
-import org.apache.parquet.filter2.compat.FilterCompat._
import org.apache.parquet.filter2.predicate.FilterApi._
import org.apache.parquet.filter2.predicate._
import org.apache.parquet.io.api.Binary
@@ -38,12 +36,6 @@ import org.apache.spark.sql.types._
private[sql] object ParquetFilters {
val PARQUET_FILTER_DATA = "org.apache.spark.sql.parquet.row.filter"
- def createRecordFilter(filterExpressions: Seq[Expression]): Option[Filter] = {
- filterExpressions.flatMap { filter =>
- createFilter(filter)
- }.reduceOption(FilterApi.and).map(FilterCompat.get)
- }
-
case class SetInFilter[T <: Comparable[T]](
valueSet: Set[T]) extends UserDefinedPredicate[T] with Serializable {
@@ -208,6 +200,16 @@ private[sql] object ParquetFilters {
// For any comparison operator `cmp`, both `a cmp NULL` and `NULL cmp a` evaluate to `NULL`,
// which can be casted to `false` implicitly. Please refer to the `eval` method of these
// operators and the `SimplifyFilters` rule for details.
+
+ // Hyukjin:
+ // I added [[EqualNullSafe]] with [[org.apache.parquet.filter2.predicate.Operators.Eq]].
+ // So, it performs equality comparison identically when given [[sources.Filter]] is [[EqualTo]].
+ // The reason why I did this is, that the actual Parquet filter checks null-safe equality
+ // comparison.
+ // So I added this and maybe [[EqualTo]] should be changed. It still seems fine though, because
+ // physical planning does not set `NULL` to [[EqualTo]] but changes it to [[IsNull]] and etc.
+ // Probably I missed something and obviously this should be changed.
+
predicate match {
case sources.IsNull(name) =>
makeEq.lift(dataTypeOf(name)).map(_(name, null))
@@ -219,6 +221,11 @@ private[sql] object ParquetFilters {
case sources.Not(sources.EqualTo(name, value)) =>
makeNotEq.lift(dataTypeOf(name)).map(_(name, value))
+ case sources.EqualNullSafe(name, value) =>
+ makeEq.lift(dataTypeOf(name)).map(_(name, value))
+ case sources.Not(sources.EqualNullSafe(name, value)) =>
+ makeNotEq.lift(dataTypeOf(name)).map(_(name, value))
+
case sources.LessThan(name, value) =>
makeLt.lift(dataTypeOf(name)).map(_(name, value))
case sources.LessThanOrEqual(name, value) =>
@@ -277,96 +284,6 @@ private[sql] object ParquetFilters {
}
/**
- * Converts Catalyst predicate expressions to Parquet filter predicates.
- *
- * @todo This can be removed once we get rid of the old Parquet support.
- */
- def createFilter(predicate: Expression): Option[FilterPredicate] = {
- // NOTE:
- //
- // For any comparison operator `cmp`, both `a cmp NULL` and `NULL cmp a` evaluate to `NULL`,
- // which can be casted to `false` implicitly. Please refer to the `eval` method of these
- // operators and the `SimplifyFilters` rule for details.
- predicate match {
- case IsNull(NamedExpression(name, dataType)) =>
- makeEq.lift(dataType).map(_(name, null))
- case IsNotNull(NamedExpression(name, dataType)) =>
- makeNotEq.lift(dataType).map(_(name, null))
-
- case EqualTo(NamedExpression(name, _), NonNullLiteral(value, dataType)) =>
- makeEq.lift(dataType).map(_(name, value))
- case EqualTo(Cast(NamedExpression(name, _), dataType), NonNullLiteral(value, _)) =>
- makeEq.lift(dataType).map(_(name, value))
- case EqualTo(NonNullLiteral(value, dataType), NamedExpression(name, _)) =>
- makeEq.lift(dataType).map(_(name, value))
- case EqualTo(NonNullLiteral(value, _), Cast(NamedExpression(name, _), dataType)) =>
- makeEq.lift(dataType).map(_(name, value))
-
- case Not(EqualTo(NamedExpression(name, _), NonNullLiteral(value, dataType))) =>
- makeNotEq.lift(dataType).map(_(name, value))
- case Not(EqualTo(Cast(NamedExpression(name, _), dataType), NonNullLiteral(value, _))) =>
- makeNotEq.lift(dataType).map(_(name, value))
- case Not(EqualTo(NonNullLiteral(value, dataType), NamedExpression(name, _))) =>
- makeNotEq.lift(dataType).map(_(name, value))
- case Not(EqualTo(NonNullLiteral(value, _), Cast(NamedExpression(name, _), dataType))) =>
- makeNotEq.lift(dataType).map(_(name, value))
-
- case LessThan(NamedExpression(name, _), NonNullLiteral(value, dataType)) =>
- makeLt.lift(dataType).map(_(name, value))
- case LessThan(Cast(NamedExpression(name, _), dataType), NonNullLiteral(value, _)) =>
- makeLt.lift(dataType).map(_(name, value))
- case LessThan(NonNullLiteral(value, dataType), NamedExpression(name, _)) =>
- makeGt.lift(dataType).map(_(name, value))
- case LessThan(NonNullLiteral(value, _), Cast(NamedExpression(name, _), dataType)) =>
- makeGt.lift(dataType).map(_(name, value))
-
- case LessThanOrEqual(NamedExpression(name, _), NonNullLiteral(value, dataType)) =>
- makeLtEq.lift(dataType).map(_(name, value))
- case LessThanOrEqual(Cast(NamedExpression(name, _), dataType), NonNullLiteral(value, _)) =>
- makeLtEq.lift(dataType).map(_(name, value))
- case LessThanOrEqual(NonNullLiteral(value, dataType), NamedExpression(name, _)) =>
- makeGtEq.lift(dataType).map(_(name, value))
- case LessThanOrEqual(NonNullLiteral(value, _), Cast(NamedExpression(name, _), dataType)) =>
- makeGtEq.lift(dataType).map(_(name, value))
-
- case GreaterThan(NamedExpression(name, _), NonNullLiteral(value, dataType)) =>
- makeGt.lift(dataType).map(_(name, value))
- case GreaterThan(Cast(NamedExpression(name, _), dataType), NonNullLiteral(value, _)) =>
- makeGt.lift(dataType).map(_(name, value))
- case GreaterThan(NonNullLiteral(value, dataType), NamedExpression(name, _)) =>
- makeLt.lift(dataType).map(_(name, value))
- case GreaterThan(NonNullLiteral(value, _), Cast(NamedExpression(name, _), dataType)) =>
- makeLt.lift(dataType).map(_(name, value))
-
- case GreaterThanOrEqual(NamedExpression(name, _), NonNullLiteral(value, dataType)) =>
- makeGtEq.lift(dataType).map(_(name, value))
- case GreaterThanOrEqual(Cast(NamedExpression(name, _), dataType), NonNullLiteral(value, _)) =>
- makeGtEq.lift(dataType).map(_(name, value))
- case GreaterThanOrEqual(NonNullLiteral(value, dataType), NamedExpression(name, _)) =>
- makeLtEq.lift(dataType).map(_(name, value))
- case GreaterThanOrEqual(NonNullLiteral(value, _), Cast(NamedExpression(name, _), dataType)) =>
- makeLtEq.lift(dataType).map(_(name, value))
-
- case And(lhs, rhs) =>
- (createFilter(lhs) ++ createFilter(rhs)).reduceOption(FilterApi.and)
-
- case Or(lhs, rhs) =>
- for {
- lhsFilter <- createFilter(lhs)
- rhsFilter <- createFilter(rhs)
- } yield FilterApi.or(lhsFilter, rhsFilter)
-
- case Not(pred) =>
- createFilter(pred).map(FilterApi.not)
-
- case InSet(NamedExpression(name, dataType), valueSet) =>
- makeInSet.lift(dataType).map(_(name, valueSet))
-
- case _ => None
- }
- }
-
- /**
* Note: Inside the Hadoop API we only have access to `Configuration`, not to
* [[org.apache.spark.SparkContext]], so we cannot use broadcasts to convey
* the actual filter predicate.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 5b4e568bb9..f067112cfc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -24,9 +24,8 @@ import org.apache.spark.sql.{Column, DataFrame, QueryTest, Row, SQLConf}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
-import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, LogicalRelation}
import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.sql.types._
/**
* A test suite that tests Parquet filter2 API based filter pushdown optimization.
@@ -55,20 +54,22 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
.select(output.map(e => Column(e)): _*)
.where(Column(predicate))
- val maybeAnalyzedPredicate = query.queryExecution.optimizedPlan.collect {
+ val analyzedPredicate = query.queryExecution.optimizedPlan.collect {
case PhysicalOperation(_, filters, LogicalRelation(_: ParquetRelation)) => filters
- }.flatten.reduceOption(_ && _)
+ }.flatten
+ assert(analyzedPredicate.nonEmpty)
- assert(maybeAnalyzedPredicate.isDefined)
- maybeAnalyzedPredicate.foreach { pred =>
- val maybeFilter = ParquetFilters.createFilter(pred)
+ val selectedFilters = DataSourceStrategy.selectFilters(analyzedPredicate)
+ assert(selectedFilters.nonEmpty)
+
+ selectedFilters.foreach { pred =>
+ val maybeFilter = ParquetFilters.createFilter(df.schema, pred)
assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $pred")
maybeFilter.foreach { f =>
// Doesn't bother checking type parameters here (e.g. `Eq[Integer]`)
assert(f.getClass === filterClass)
}
}
-
checker(query, expected)
}
}
@@ -109,43 +110,18 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], Seq(Row(true), Row(false)))
checkFilterPredicate('_1 === true, classOf[Eq[_]], true)
+ checkFilterPredicate('_1 <=> true, classOf[Eq[_]], true)
checkFilterPredicate('_1 !== true, classOf[NotEq[_]], false)
}
}
- test("filter pushdown - short") {
- withParquetDataFrame((1 to 4).map(i => Tuple1(Option(i.toShort)))) { implicit df =>
- checkFilterPredicate(Cast('_1, IntegerType) === 1, classOf[Eq[_]], 1)
- checkFilterPredicate(
- Cast('_1, IntegerType) !== 1, classOf[NotEq[_]], (2 to 4).map(Row.apply(_)))
-
- checkFilterPredicate(Cast('_1, IntegerType) < 2, classOf[Lt[_]], 1)
- checkFilterPredicate(Cast('_1, IntegerType) > 3, classOf[Gt[_]], 4)
- checkFilterPredicate(Cast('_1, IntegerType) <= 1, classOf[LtEq[_]], 1)
- checkFilterPredicate(Cast('_1, IntegerType) >= 4, classOf[GtEq[_]], 4)
-
- checkFilterPredicate(Literal(1) === Cast('_1, IntegerType), classOf[Eq[_]], 1)
- checkFilterPredicate(Literal(2) > Cast('_1, IntegerType), classOf[Lt[_]], 1)
- checkFilterPredicate(Literal(3) < Cast('_1, IntegerType), classOf[Gt[_]], 4)
- checkFilterPredicate(Literal(1) >= Cast('_1, IntegerType), classOf[LtEq[_]], 1)
- checkFilterPredicate(Literal(4) <= Cast('_1, IntegerType), classOf[GtEq[_]], 4)
-
- checkFilterPredicate(!(Cast('_1, IntegerType) < 4), classOf[GtEq[_]], 4)
- checkFilterPredicate(
- Cast('_1, IntegerType) > 2 && Cast('_1, IntegerType) < 4, classOf[Operators.And], 3)
- checkFilterPredicate(
- Cast('_1, IntegerType) < 2 || Cast('_1, IntegerType) > 3,
- classOf[Operators.Or],
- Seq(Row(1), Row(4)))
- }
- }
-
test("filter pushdown - integer") {
withParquetDataFrame((1 to 4).map(i => Tuple1(Option(i)))) { implicit df =>
checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(Row.apply(_)))
checkFilterPredicate('_1 === 1, classOf[Eq[_]], 1)
+ checkFilterPredicate('_1 <=> 1, classOf[Eq[_]], 1)
checkFilterPredicate('_1 !== 1, classOf[NotEq[_]], (2 to 4).map(Row.apply(_)))
checkFilterPredicate('_1 < 2, classOf[Lt[_]], 1)
@@ -154,13 +130,13 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
checkFilterPredicate('_1 >= 4, classOf[GtEq[_]], 4)
checkFilterPredicate(Literal(1) === '_1, classOf[Eq[_]], 1)
+ checkFilterPredicate(Literal(1) <=> '_1, classOf[Eq[_]], 1)
checkFilterPredicate(Literal(2) > '_1, classOf[Lt[_]], 1)
checkFilterPredicate(Literal(3) < '_1, classOf[Gt[_]], 4)
checkFilterPredicate(Literal(1) >= '_1, classOf[LtEq[_]], 1)
checkFilterPredicate(Literal(4) <= '_1, classOf[GtEq[_]], 4)
checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4)
- checkFilterPredicate('_1 > 2 && '_1 < 4, classOf[Operators.And], 3)
checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or], Seq(Row(1), Row(4)))
}
}
@@ -171,6 +147,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(Row.apply(_)))
checkFilterPredicate('_1 === 1, classOf[Eq[_]], 1)
+ checkFilterPredicate('_1 <=> 1, classOf[Eq[_]], 1)
checkFilterPredicate('_1 !== 1, classOf[NotEq[_]], (2 to 4).map(Row.apply(_)))
checkFilterPredicate('_1 < 2, classOf[Lt[_]], 1)
@@ -179,13 +156,13 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
checkFilterPredicate('_1 >= 4, classOf[GtEq[_]], 4)
checkFilterPredicate(Literal(1) === '_1, classOf[Eq[_]], 1)
+ checkFilterPredicate(Literal(1) <=> '_1, classOf[Eq[_]], 1)
checkFilterPredicate(Literal(2) > '_1, classOf[Lt[_]], 1)
checkFilterPredicate(Literal(3) < '_1, classOf[Gt[_]], 4)
checkFilterPredicate(Literal(1) >= '_1, classOf[LtEq[_]], 1)
checkFilterPredicate(Literal(4) <= '_1, classOf[GtEq[_]], 4)
checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4)
- checkFilterPredicate('_1 > 2 && '_1 < 4, classOf[Operators.And], 3)
checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or], Seq(Row(1), Row(4)))
}
}
@@ -196,6 +173,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(Row.apply(_)))
checkFilterPredicate('_1 === 1, classOf[Eq[_]], 1)
+ checkFilterPredicate('_1 <=> 1, classOf[Eq[_]], 1)
checkFilterPredicate('_1 !== 1, classOf[NotEq[_]], (2 to 4).map(Row.apply(_)))
checkFilterPredicate('_1 < 2, classOf[Lt[_]], 1)
@@ -204,13 +182,13 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
checkFilterPredicate('_1 >= 4, classOf[GtEq[_]], 4)
checkFilterPredicate(Literal(1) === '_1, classOf[Eq[_]], 1)
+ checkFilterPredicate(Literal(1) <=> '_1, classOf[Eq[_]], 1)
checkFilterPredicate(Literal(2) > '_1, classOf[Lt[_]], 1)
checkFilterPredicate(Literal(3) < '_1, classOf[Gt[_]], 4)
checkFilterPredicate(Literal(1) >= '_1, classOf[LtEq[_]], 1)
checkFilterPredicate(Literal(4) <= '_1, classOf[GtEq[_]], 4)
checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4)
- checkFilterPredicate('_1 > 2 && '_1 < 4, classOf[Operators.And], 3)
checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or], Seq(Row(1), Row(4)))
}
}
@@ -221,6 +199,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(Row.apply(_)))
checkFilterPredicate('_1 === 1, classOf[Eq[_]], 1)
+ checkFilterPredicate('_1 <=> 1, classOf[Eq[_]], 1)
checkFilterPredicate('_1 !== 1, classOf[NotEq[_]], (2 to 4).map(Row.apply(_)))
checkFilterPredicate('_1 < 2, classOf[Lt[_]], 1)
@@ -229,13 +208,13 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
checkFilterPredicate('_1 >= 4, classOf[GtEq[_]], 4)
checkFilterPredicate(Literal(1) === '_1, classOf[Eq[_]], 1)
+ checkFilterPredicate(Literal(1) <=> '_1, classOf[Eq[_]], 1)
checkFilterPredicate(Literal(2) > '_1, classOf[Lt[_]], 1)
checkFilterPredicate(Literal(3) < '_1, classOf[Gt[_]], 4)
checkFilterPredicate(Literal(1) >= '_1, classOf[LtEq[_]], 1)
checkFilterPredicate(Literal(4) <= '_1, classOf[GtEq[_]], 4)
checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4)
- checkFilterPredicate('_1 > 2 && '_1 < 4, classOf[Operators.And], 3)
checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or], Seq(Row(1), Row(4)))
}
}
@@ -247,6 +226,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
'_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(i => Row.apply(i.toString)))
checkFilterPredicate('_1 === "1", classOf[Eq[_]], "1")
+ checkFilterPredicate('_1 <=> "1", classOf[Eq[_]], "1")
checkFilterPredicate(
'_1 !== "1", classOf[NotEq[_]], (2 to 4).map(i => Row.apply(i.toString)))
@@ -256,13 +236,13 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
checkFilterPredicate('_1 >= "4", classOf[GtEq[_]], "4")
checkFilterPredicate(Literal("1") === '_1, classOf[Eq[_]], "1")
+ checkFilterPredicate(Literal("1") <=> '_1, classOf[Eq[_]], "1")
checkFilterPredicate(Literal("2") > '_1, classOf[Lt[_]], "1")
checkFilterPredicate(Literal("3") < '_1, classOf[Gt[_]], "4")
checkFilterPredicate(Literal("1") >= '_1, classOf[LtEq[_]], "1")
checkFilterPredicate(Literal("4") <= '_1, classOf[GtEq[_]], "4")
checkFilterPredicate(!('_1 < "4"), classOf[GtEq[_]], "4")
- checkFilterPredicate('_1 > "2" && '_1 < "4", classOf[Operators.And], "3")
checkFilterPredicate('_1 < "2" || '_1 > "3", classOf[Operators.Or], Seq(Row("1"), Row("4")))
}
}
@@ -274,6 +254,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
withParquetDataFrame((1 to 4).map(i => Tuple1(i.b))) { implicit df =>
checkBinaryFilterPredicate('_1 === 1.b, classOf[Eq[_]], 1.b)
+ checkBinaryFilterPredicate('_1 <=> 1.b, classOf[Eq[_]], 1.b)
checkBinaryFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
checkBinaryFilterPredicate(
@@ -288,13 +269,13 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
checkBinaryFilterPredicate('_1 >= 4.b, classOf[GtEq[_]], 4.b)
checkBinaryFilterPredicate(Literal(1.b) === '_1, classOf[Eq[_]], 1.b)
+ checkBinaryFilterPredicate(Literal(1.b) <=> '_1, classOf[Eq[_]], 1.b)
checkBinaryFilterPredicate(Literal(2.b) > '_1, classOf[Lt[_]], 1.b)
checkBinaryFilterPredicate(Literal(3.b) < '_1, classOf[Gt[_]], 4.b)
checkBinaryFilterPredicate(Literal(1.b) >= '_1, classOf[LtEq[_]], 1.b)
checkBinaryFilterPredicate(Literal(4.b) <= '_1, classOf[GtEq[_]], 4.b)
checkBinaryFilterPredicate(!('_1 < 4.b), classOf[GtEq[_]], 4.b)
- checkBinaryFilterPredicate('_1 > 2.b && '_1 < 4.b, classOf[Operators.And], 3.b)
checkBinaryFilterPredicate(
'_1 < 2.b || '_1 > 3.b, classOf[Operators.Or], Seq(Row(1.b), Row(4.b)))
}