aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2015-12-23 00:39:49 +0800
committerCheng Lian <lian@databricks.com>2015-12-23 00:39:49 +0800
commit364d244a50aab9169ec1abe7e327004e681f8a71 (patch)
treeca667334a7a9aa9f33aa98357a4043501ffa7b07
parent42bfde29836529251a4337ea8cfc539c9c8b04b8 (diff)
downloadspark-364d244a50aab9169ec1abe7e327004e681f8a71.tar.gz
spark-364d244a50aab9169ec1abe7e327004e681f8a71.tar.bz2
spark-364d244a50aab9169ec1abe7e327004e681f8a71.zip
[SPARK-11677][SQL][FOLLOW-UP] Add tests for checking the ORC filter creation against pushed down filters.
https://issues.apache.org/jira/browse/SPARK-11677 Although it checks correctly the filters by the number of results if ORC filter-push-down is enabled, the filters themselves are not being tested. So, this PR includes the test similarly with `ParquetFilterSuite`. Since the results are checked by `OrcQuerySuite`, this `OrcFilterSuite` only checks if the appropriate filters are created. One thing different with `ParquetFilterSuite` here is, it does not check the results because that is checked in `OrcQuerySuite`. Author: hyukjinkwon <gurwls223@gmail.com> Closes #10341 from HyukjinKwon/SPARK-11677-followup.
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala236
1 files changed, 236 insertions, 0 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala
new file mode 100644
index 0000000000..7b61b635bd
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.orc
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.hive.ql.io.sarg.{SearchArgument, PredicateLeaf}
+
+import org.apache.spark.sql.{Column, DataFrame, QueryTest}
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, LogicalRelation}
+
+/**
+ * A test suite that tests ORC filter API based filter pushdown optimization.
+ */
+class OrcFilterSuite extends QueryTest with OrcTest {
+ private def checkFilterPredicate(
+ df: DataFrame,
+ predicate: Predicate,
+ checker: (SearchArgument) => Unit): Unit = {
+ val output = predicate.collect { case a: Attribute => a }.distinct
+ val query = df
+ .select(output.map(e => Column(e)): _*)
+ .where(Column(predicate))
+
+ var maybeRelation: Option[OrcRelation] = None
+ val maybeAnalyzedPredicate = query.queryExecution.optimizedPlan.collect {
+ case PhysicalOperation(_, filters, LogicalRelation(orcRelation: OrcRelation, _)) =>
+ maybeRelation = Some(orcRelation)
+ filters
+ }.flatten.reduceLeftOption(_ && _)
+ assert(maybeAnalyzedPredicate.isDefined, "No filter is analyzed from the given query")
+
+ val (_, selectedFilters) =
+ DataSourceStrategy.selectFilters(maybeRelation.get, maybeAnalyzedPredicate.toSeq)
+ assert(selectedFilters.nonEmpty, "No filter is pushed down")
+
+ val maybeFilter = OrcFilters.createFilter(selectedFilters.toArray)
+ assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $selectedFilters")
+ checker(maybeFilter.get)
+ }
+
+ private def checkFilterPredicate
+ (predicate: Predicate, filterOperator: PredicateLeaf.Operator)
+ (implicit df: DataFrame): Unit = {
+ def checkComparisonOperator(filter: SearchArgument) = {
+ val operator = filter.getLeaves.asScala.head.getOperator
+ assert(operator === filterOperator)
+ }
+ checkFilterPredicate(df, predicate, checkComparisonOperator)
+ }
+
+ private def checkFilterPredicate
+ (predicate: Predicate, stringExpr: String)
+ (implicit df: DataFrame): Unit = {
+ def checkLogicalOperator(filter: SearchArgument) = {
+ assert(filter.toString == stringExpr)
+ }
+ checkFilterPredicate(df, predicate, checkLogicalOperator)
+ }
+
+ test("filter pushdown - boolean") {
+ withOrcDataFrame((true :: false :: Nil).map(b => Tuple1.apply(Option(b)))) { implicit df =>
+ checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL)
+ }
+ }
+
+ test("filter pushdown - integer") {
+ withOrcDataFrame((1 to 4).map(i => Tuple1(Option(i)))) { implicit df =>
+ checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL)
+
+ checkFilterPredicate('_1 === 1, PredicateLeaf.Operator.EQUALS)
+ checkFilterPredicate('_1 <=> 1, PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+
+ checkFilterPredicate('_1 < 2, PredicateLeaf.Operator.LESS_THAN)
+ checkFilterPredicate('_1 > 3, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate('_1 <= 1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate('_1 >= 4, PredicateLeaf.Operator.LESS_THAN)
+
+ checkFilterPredicate(Literal(1) === '_1, PredicateLeaf.Operator.EQUALS)
+ checkFilterPredicate(Literal(1) <=> '_1, PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+ checkFilterPredicate(Literal(2) > '_1, PredicateLeaf.Operator.LESS_THAN)
+ checkFilterPredicate(Literal(3) < '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(Literal(1) >= '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(Literal(4) <= '_1, PredicateLeaf.Operator.LESS_THAN)
+ }
+ }
+
+ test("filter pushdown - long") {
+ withOrcDataFrame((1 to 4).map(i => Tuple1(Option(i.toLong)))) { implicit df =>
+ checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL)
+
+ checkFilterPredicate('_1 === 1, PredicateLeaf.Operator.EQUALS)
+ checkFilterPredicate('_1 <=> 1, PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+
+ checkFilterPredicate('_1 < 2, PredicateLeaf.Operator.LESS_THAN)
+ checkFilterPredicate('_1 > 3, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate('_1 <= 1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate('_1 >= 4, PredicateLeaf.Operator.LESS_THAN)
+
+ checkFilterPredicate(Literal(1) === '_1, PredicateLeaf.Operator.EQUALS)
+ checkFilterPredicate(Literal(1) <=> '_1, PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+ checkFilterPredicate(Literal(2) > '_1, PredicateLeaf.Operator.LESS_THAN)
+ checkFilterPredicate(Literal(3) < '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(Literal(1) >= '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(Literal(4) <= '_1, PredicateLeaf.Operator.LESS_THAN)
+ }
+ }
+
+ test("filter pushdown - float") {
+ withOrcDataFrame((1 to 4).map(i => Tuple1(Option(i.toFloat)))) { implicit df =>
+ checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL)
+
+ checkFilterPredicate('_1 === 1, PredicateLeaf.Operator.EQUALS)
+ checkFilterPredicate('_1 <=> 1, PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+
+ checkFilterPredicate('_1 < 2, PredicateLeaf.Operator.LESS_THAN)
+ checkFilterPredicate('_1 > 3, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate('_1 <= 1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate('_1 >= 4, PredicateLeaf.Operator.LESS_THAN)
+
+ checkFilterPredicate(Literal(1) === '_1, PredicateLeaf.Operator.EQUALS)
+ checkFilterPredicate(Literal(1) <=> '_1, PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+ checkFilterPredicate(Literal(2) > '_1, PredicateLeaf.Operator.LESS_THAN)
+ checkFilterPredicate(Literal(3) < '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(Literal(1) >= '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(Literal(4) <= '_1, PredicateLeaf.Operator.LESS_THAN)
+ }
+ }
+
+ test("filter pushdown - double") {
+ withOrcDataFrame((1 to 4).map(i => Tuple1(Option(i.toDouble)))) { implicit df =>
+ checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL)
+
+ checkFilterPredicate('_1 === 1, PredicateLeaf.Operator.EQUALS)
+ checkFilterPredicate('_1 <=> 1, PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+
+ checkFilterPredicate('_1 < 2, PredicateLeaf.Operator.LESS_THAN)
+ checkFilterPredicate('_1 > 3, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate('_1 <= 1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate('_1 >= 4, PredicateLeaf.Operator.LESS_THAN)
+
+ checkFilterPredicate(Literal(1) === '_1, PredicateLeaf.Operator.EQUALS)
+ checkFilterPredicate(Literal(1) <=> '_1, PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+ checkFilterPredicate(Literal(2) > '_1, PredicateLeaf.Operator.LESS_THAN)
+ checkFilterPredicate(Literal(3) < '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(Literal(1) >= '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(Literal(4) <= '_1, PredicateLeaf.Operator.LESS_THAN)
+ }
+ }
+
+ test("filter pushdown - string") {
+ withOrcDataFrame((1 to 4).map(i => Tuple1(i.toString))) { implicit df =>
+ checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL)
+
+ checkFilterPredicate('_1 === "1", PredicateLeaf.Operator.EQUALS)
+ checkFilterPredicate('_1 <=> "1", PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+
+ checkFilterPredicate('_1 < "2", PredicateLeaf.Operator.LESS_THAN)
+ checkFilterPredicate('_1 > "3", PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate('_1 <= "1", PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate('_1 >= "4", PredicateLeaf.Operator.LESS_THAN)
+
+ checkFilterPredicate(Literal("1") === '_1, PredicateLeaf.Operator.EQUALS)
+ checkFilterPredicate(Literal("1") <=> '_1, PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+ checkFilterPredicate(Literal("2") > '_1, PredicateLeaf.Operator.LESS_THAN)
+ checkFilterPredicate(Literal("3") < '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(Literal("1") >= '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(Literal("4") <= '_1, PredicateLeaf.Operator.LESS_THAN)
+ }
+ }
+
+ test("filter pushdown - binary") {
+ implicit class IntToBinary(int: Int) {
+ def b: Array[Byte] = int.toString.getBytes("UTF-8")
+ }
+
+ withOrcDataFrame((1 to 4).map(i => Tuple1(i.b))) { implicit df =>
+ checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL)
+ }
+ }
+
+ test("filter pushdown - combinations with logical operators") {
+ withOrcDataFrame((1 to 4).map(i => Tuple1(Option(i)))) { implicit df =>
+ // Because `ExpressionTree` is not accessible at Hive 1.2.x, this should be checked
+ // in string form in order to check filter creation including logical operators
+ // such as `and`, `or` or `not`. So, this function uses `SearchArgument.toString()`
+ // to produce string expression and then compare it to given string expression below.
+ // This might have to be changed after Hive version is upgraded.
+ checkFilterPredicate(
+ '_1.isNotNull,
+ """leaf-0 = (IS_NULL _1)
+ |expr = (not leaf-0)""".stripMargin.trim
+ )
+ checkFilterPredicate(
+ '_1 !== 1,
+ """leaf-0 = (EQUALS _1 1)
+ |expr = (not leaf-0)""".stripMargin.trim
+ )
+ checkFilterPredicate(
+ !('_1 < 4),
+ """leaf-0 = (LESS_THAN _1 4)
+ |expr = (not leaf-0)""".stripMargin.trim
+ )
+ checkFilterPredicate(
+ '_1 < 2 || '_1 > 3,
+ """leaf-0 = (LESS_THAN _1 2)
+ |leaf-1 = (LESS_THAN_EQUALS _1 3)
+ |expr = (or leaf-0 (not leaf-1))""".stripMargin.trim
+ )
+ checkFilterPredicate(
+ '_1 < 2 && '_1 > 3,
+ """leaf-0 = (LESS_THAN _1 2)
+ |leaf-1 = (LESS_THAN_EQUALS _1 3)
+ |expr = (and leaf-0 (not leaf-1))""".stripMargin.trim
+ )
+ }
+ }
+}