aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala42
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala68
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala32
3 files changed, 112 insertions, 30 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 9197b8b563..a0abfe458d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -17,15 +17,17 @@
package org.apache.spark.sql.execution.datasources.parquet
-import org.apache.parquet.filter2.predicate.Operators._
+import org.apache.parquet.filter2.predicate.FilterApi._
+import org.apache.parquet.filter2.predicate.Operators.{Column => _, _}
import org.apache.parquet.filter2.predicate.{FilterPredicate, Operators}
-import org.apache.spark.sql.{Column, DataFrame, QueryTest, Row, SQLConf}
+import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, LogicalRelation}
import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types._
/**
* A test suite that tests Parquet filter2 API based filter pushdown optimization.
@@ -382,6 +384,42 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
}
}
+ test("SPARK-12218 Converting conjunctions into Parquet filter predicates") {
+ val schema = StructType(Seq(
+ StructField("a", IntegerType, nullable = false),
+ StructField("b", StringType, nullable = true),
+ StructField("c", DoubleType, nullable = true)
+ ))
+
+ assertResult(Some(and(
+ lt(intColumn("a"), 10: Integer),
+ gt(doubleColumn("c"), 1.5: java.lang.Double)))
+ ) {
+ ParquetFilters.createFilter(
+ schema,
+ sources.And(
+ sources.LessThan("a", 10),
+ sources.GreaterThan("c", 1.5D)))
+ }
+
+ assertResult(None) {
+ ParquetFilters.createFilter(
+ schema,
+ sources.And(
+ sources.LessThan("a", 10),
+ sources.StringContains("b", "prefix")))
+ }
+
+ assertResult(None) {
+ ParquetFilters.createFilter(
+ schema,
+ sources.Not(
+ sources.And(
+ sources.GreaterThan("a", 1),
+ sources.StringContains("b", "prefix"))))
+ }
+ }
+
test("SPARK-11164: test the parquet filter in") {
import testImplicits._
withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala
index ebfb1759b8..165210f9ff 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala
@@ -26,15 +26,47 @@ import org.apache.spark.Logging
import org.apache.spark.sql.sources._
/**
- * It may be optimized by push down partial filters. But we are conservative here.
- * Because if some filters fail to be parsed, the tree may be corrupted,
- * and cannot be used anymore.
+ * Helper object for building ORC `SearchArgument`s, which are used for ORC predicate push-down.
+ *
+ * Due to limitation of ORC `SearchArgument` builder, we had to end up with a pretty weird double-
+ * checking pattern when converting `And`/`Or`/`Not` filters.
+ *
+ * An ORC `SearchArgument` must be built in one pass using a single builder. For example, you can't
+ * build `a = 1` and `b = 2` first, and then combine them into `a = 1 AND b = 2`. This is quite
+ * different from the cases in Spark SQL or Parquet, where complex filters can be easily built using
+ * existing simpler ones.
+ *
+ * The annoying part is that, `SearchArgument` builder methods like `startAnd()`, `startOr()`, and
+ * `startNot()` mutate internal state of the builder instance. This forces us to translate all
+ * convertible filters with a single builder instance. However, before actually converting a filter,
+ * we've no idea whether it can be recognized by ORC or not. Thus, when an inconvertible filter is
+ * found, we may already end up with a builder whose internal state is inconsistent.
+ *
+ * For example, to convert an `And` filter with builder `b`, we call `b.startAnd()` first, and then
+ * try to convert its children. Say we convert `left` child successfully, but find that `right`
+ * child is inconvertible. Alas, `b.startAnd()` call can't be rolled back, and `b` is inconsistent
+ * now.
+ *
+ * The workaround employed here is that, for `And`/`Or`/`Not`, we first try to convert their
+ * children with brand new builders, and only do the actual conversion with the right builder
+ * instance when the children are proven to be convertible.
+ *
+ * P.S.: Hive seems to use `SearchArgument` together with `ExprNodeGenericFuncDesc` only. Usage of
+ * builder methods mentioned above can only be found in test code, where all tested filters are
+ * known to be convertible.
*/
private[orc] object OrcFilters extends Logging {
def createFilter(filters: Array[Filter]): Option[SearchArgument] = {
+ // First, tries to convert each filter individually to see whether it's convertible, and then
+ // collect all convertible ones to build the final `SearchArgument`.
+ val convertibleFilters = for {
+ filter <- filters
+ _ <- buildSearchArgument(filter, SearchArgumentFactory.newBuilder())
+ } yield filter
+
for {
- // Combines all filters with `And`s to produce a single conjunction predicate
- conjunction <- filters.reduceOption(And)
+ // Combines all convertible filters using `And` to produce a single conjunction
+ conjunction <- convertibleFilters.reduceOption(And)
// Then tries to build a single ORC `SearchArgument` for the conjunction predicate
builder <- buildSearchArgument(conjunction, SearchArgumentFactory.newBuilder())
} yield builder.build()
@@ -50,28 +82,6 @@ private[orc] object OrcFilters extends Logging {
case _ => false
}
- // lian: I probably missed something here, and had to end up with a pretty weird double-checking
- // pattern when converting `And`/`Or`/`Not` filters.
- //
- // The annoying part is that, `SearchArgument` builder methods like `startAnd()` `startOr()`,
- // and `startNot()` mutate internal state of the builder instance. This forces us to translate
- // all convertible filters with a single builder instance. However, before actually converting a
- // filter, we've no idea whether it can be recognized by ORC or not. Thus, when an inconvertible
- // filter is found, we may already end up with a builder whose internal state is inconsistent.
- //
- // For example, to convert an `And` filter with builder `b`, we call `b.startAnd()` first, and
- // then try to convert its children. Say we convert `left` child successfully, but find that
- // `right` child is inconvertible. Alas, `b.startAnd()` call can't be rolled back, and `b` is
- // inconsistent now.
- //
- // The workaround employed here is that, for `And`/`Or`/`Not`, we first try to convert their
- // children with brand new builders, and only do the actual conversion with the right builder
- // instance when the children are proven to be convertible.
- //
- // P.S.: Hive seems to use `SearchArgument` together with `ExprNodeGenericFuncDesc` only.
- // Usage of builder methods mentioned above can only be found in test code, where all tested
- // filters are known to be convertible.
-
expression match {
case And(left, right) =>
// At here, it is not safe to just convert one side if we do not understand the
@@ -102,6 +112,10 @@ private[orc] object OrcFilters extends Logging {
negate <- buildSearchArgument(child, builder.startNot())
} yield negate.end()
+ // NOTE: For all case branches dealing with leaf predicates below, the additional `startAnd()`
+ // call is mandatory. ORC `SearchArgument` builder requires that all leaf predicates must be
+ // wrapped by a "parent" predicate (`And`, `Or`, or `Not`).
+
case EqualTo(attribute, value) if isSearchableLiteral(value) =>
Some(builder.startAnd().equals(attribute, value).end())
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
index 7a34cf731b..47e73b4006 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
@@ -21,8 +21,9 @@ import java.io.File
import org.scalatest.BeforeAndAfterAll
-import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.{QueryTest, Row}
case class OrcData(intField: Int, stringField: String)
@@ -174,4 +175,33 @@ class OrcSourceSuite extends OrcSuite {
|)
""".stripMargin)
}
+
+ test("SPARK-12218 Converting conjunctions into ORC SearchArguments") {
+ // The `LessThan` should be converted while the `StringContains` shouldn't
+ assertResult(
+ """leaf-0 = (LESS_THAN a 10)
+ |expr = leaf-0
+ """.stripMargin.trim
+ ) {
+ OrcFilters.createFilter(Array(
+ LessThan("a", 10),
+ StringContains("b", "prefix")
+ )).get.toString
+ }
+
+ // The `LessThan` should be converted while the whole inner `And` shouldn't
+ assertResult(
+ """leaf-0 = (LESS_THAN a 10)
+ |expr = leaf-0
+ """.stripMargin.trim
+ ) {
+ OrcFilters.createFilter(Array(
+ LessThan("a", 10),
+ Not(And(
+ GreaterThan("a", 1),
+ StringContains("b", "prefix")
+ ))
+ )).get.toString
+ }
+ }
}