aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-12-28 08:48:44 -0800
committerYin Huai <yhuai@databricks.com>2015-12-28 08:48:44 -0800
commit8e23d8db7f28a97e2f4394cdf9d4c4260abbd750 (patch)
treec34d6ddda07daa81ab2e37a59333ff92cf39a75f
parent8d4940092141c0909b673607f393cdd53f093ed6 (diff)
downloadspark-8e23d8db7f28a97e2f4394cdf9d4c4260abbd750.tar.gz
spark-8e23d8db7f28a97e2f4394cdf9d4c4260abbd750.tar.bz2
spark-8e23d8db7f28a97e2f4394cdf9d4c4260abbd750.zip
[SPARK-12218] Fixes ORC conjunction predicate push down
This PR is a follow-up of PR #10362. Two major changes: 1. The fix introduced in #10362 is OK for Parquet, but may disable ORC PPD in many cases PR #10362 stops converting an `AND` predicate if any branch is inconvertible. On the other hand, `OrcFilters` combines all filters into a single big conjunction first and then tries to convert it into ORC `SearchArgument`. This means, if any filter is inconvertible, no filters can be pushed down. This PR fixes this issue by finding out all convertible filters first before doing the actual conversion. The reason behind the current implementation is mostly due to the limitation of ORC `SearchArgument` builder, which is documented in this PR in detail. 1. Copied the `AND` predicate fix for ORC from #10362 to avoid merge conflict. Same as #10362, this PR targets master (2.0.0-SNAPSHOT), branch-1.6, and branch-1.5. Author: Cheng Lian <lian@databricks.com> Closes #10377 from liancheng/spark-12218.fix-orc-conjunction-ppd.
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala42
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala68
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala32
3 files changed, 112 insertions, 30 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 9197b8b563..a0abfe458d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -17,15 +17,17 @@
package org.apache.spark.sql.execution.datasources.parquet
-import org.apache.parquet.filter2.predicate.Operators._
+import org.apache.parquet.filter2.predicate.FilterApi._
+import org.apache.parquet.filter2.predicate.Operators.{Column => _, _}
import org.apache.parquet.filter2.predicate.{FilterPredicate, Operators}
-import org.apache.spark.sql.{Column, DataFrame, QueryTest, Row, SQLConf}
+import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, LogicalRelation}
import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types._
/**
* A test suite that tests Parquet filter2 API based filter pushdown optimization.
@@ -382,6 +384,42 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
}
}
+ test("SPARK-12218 Converting conjunctions into Parquet filter predicates") {
+ val schema = StructType(Seq(
+ StructField("a", IntegerType, nullable = false),
+ StructField("b", StringType, nullable = true),
+ StructField("c", DoubleType, nullable = true)
+ ))
+
+ assertResult(Some(and(
+ lt(intColumn("a"), 10: Integer),
+ gt(doubleColumn("c"), 1.5: java.lang.Double)))
+ ) {
+ ParquetFilters.createFilter(
+ schema,
+ sources.And(
+ sources.LessThan("a", 10),
+ sources.GreaterThan("c", 1.5D)))
+ }
+
+ assertResult(None) {
+ ParquetFilters.createFilter(
+ schema,
+ sources.And(
+ sources.LessThan("a", 10),
+ sources.StringContains("b", "prefix")))
+ }
+
+ assertResult(None) {
+ ParquetFilters.createFilter(
+ schema,
+ sources.Not(
+ sources.And(
+ sources.GreaterThan("a", 1),
+ sources.StringContains("b", "prefix"))))
+ }
+ }
+
test("SPARK-11164: test the parquet filter in") {
import testImplicits._
withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala
index ebfb1759b8..165210f9ff 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala
@@ -26,15 +26,47 @@ import org.apache.spark.Logging
import org.apache.spark.sql.sources._
/**
- * It may be optimized by push down partial filters. But we are conservative here.
- * Because if some filters fail to be parsed, the tree may be corrupted,
- * and cannot be used anymore.
+ * Helper object for building ORC `SearchArgument`s, which are used for ORC predicate push-down.
+ *
+ * Due to limitation of ORC `SearchArgument` builder, we had to end up with a pretty weird double-
+ * checking pattern when converting `And`/`Or`/`Not` filters.
+ *
+ * An ORC `SearchArgument` must be built in one pass using a single builder. For example, you can't
+ * build `a = 1` and `b = 2` first, and then combine them into `a = 1 AND b = 2`. This is quite
+ * different from the cases in Spark SQL or Parquet, where complex filters can be easily built using
+ * existing simpler ones.
+ *
+ * The annoying part is that, `SearchArgument` builder methods like `startAnd()`, `startOr()`, and
+ * `startNot()` mutate internal state of the builder instance. This forces us to translate all
+ * convertible filters with a single builder instance. However, before actually converting a filter,
+ * we've no idea whether it can be recognized by ORC or not. Thus, when an inconvertible filter is
+ * found, we may already end up with a builder whose internal state is inconsistent.
+ *
+ * For example, to convert an `And` filter with builder `b`, we call `b.startAnd()` first, and then
+ * try to convert its children. Say we convert `left` child successfully, but find that `right`
+ * child is inconvertible. Alas, `b.startAnd()` call can't be rolled back, and `b` is inconsistent
+ * now.
+ *
+ * The workaround employed here is that, for `And`/`Or`/`Not`, we first try to convert their
+ * children with brand new builders, and only do the actual conversion with the right builder
+ * instance when the children are proven to be convertible.
+ *
+ * P.S.: Hive seems to use `SearchArgument` together with `ExprNodeGenericFuncDesc` only. Usage of
+ * builder methods mentioned above can only be found in test code, where all tested filters are
+ * known to be convertible.
*/
private[orc] object OrcFilters extends Logging {
def createFilter(filters: Array[Filter]): Option[SearchArgument] = {
+ // First, tries to convert each filter individually to see whether it's convertible, and then
+ // collect all convertible ones to build the final `SearchArgument`.
+ val convertibleFilters = for {
+ filter <- filters
+ _ <- buildSearchArgument(filter, SearchArgumentFactory.newBuilder())
+ } yield filter
+
for {
- // Combines all filters with `And`s to produce a single conjunction predicate
- conjunction <- filters.reduceOption(And)
+ // Combines all convertible filters using `And` to produce a single conjunction
+ conjunction <- convertibleFilters.reduceOption(And)
// Then tries to build a single ORC `SearchArgument` for the conjunction predicate
builder <- buildSearchArgument(conjunction, SearchArgumentFactory.newBuilder())
} yield builder.build()
@@ -50,28 +82,6 @@ private[orc] object OrcFilters extends Logging {
case _ => false
}
- // lian: I probably missed something here, and had to end up with a pretty weird double-checking
- // pattern when converting `And`/`Or`/`Not` filters.
- //
- // The annoying part is that, `SearchArgument` builder methods like `startAnd()` `startOr()`,
- // and `startNot()` mutate internal state of the builder instance. This forces us to translate
- // all convertible filters with a single builder instance. However, before actually converting a
- // filter, we've no idea whether it can be recognized by ORC or not. Thus, when an inconvertible
- // filter is found, we may already end up with a builder whose internal state is inconsistent.
- //
- // For example, to convert an `And` filter with builder `b`, we call `b.startAnd()` first, and
- // then try to convert its children. Say we convert `left` child successfully, but find that
- // `right` child is inconvertible. Alas, `b.startAnd()` call can't be rolled back, and `b` is
- // inconsistent now.
- //
- // The workaround employed here is that, for `And`/`Or`/`Not`, we first try to convert their
- // children with brand new builders, and only do the actual conversion with the right builder
- // instance when the children are proven to be convertible.
- //
- // P.S.: Hive seems to use `SearchArgument` together with `ExprNodeGenericFuncDesc` only.
- // Usage of builder methods mentioned above can only be found in test code, where all tested
- // filters are known to be convertible.
-
expression match {
case And(left, right) =>
// At here, it is not safe to just convert one side if we do not understand the
@@ -102,6 +112,10 @@ private[orc] object OrcFilters extends Logging {
negate <- buildSearchArgument(child, builder.startNot())
} yield negate.end()
+ // NOTE: For all case branches dealing with leaf predicates below, the additional `startAnd()`
+ // call is mandatory. ORC `SearchArgument` builder requires that all leaf predicates must be
+ // wrapped by a "parent" predicate (`And`, `Or`, or `Not`).
+
case EqualTo(attribute, value) if isSearchableLiteral(value) =>
Some(builder.startAnd().equals(attribute, value).end())
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
index 7a34cf731b..47e73b4006 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
@@ -21,8 +21,9 @@ import java.io.File
import org.scalatest.BeforeAndAfterAll
-import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.{QueryTest, Row}
case class OrcData(intField: Int, stringField: String)
@@ -174,4 +175,33 @@ class OrcSourceSuite extends OrcSuite {
|)
""".stripMargin)
}
+
+ test("SPARK-12218 Converting conjunctions into ORC SearchArguments") {
+ // The `LessThan` should be converted while the `StringContains` shouldn't
+ assertResult(
+ """leaf-0 = (LESS_THAN a 10)
+ |expr = leaf-0
+ """.stripMargin.trim
+ ) {
+ OrcFilters.createFilter(Array(
+ LessThan("a", 10),
+ StringContains("b", "prefix")
+ )).get.toString
+ }
+
+ // The `LessThan` should be converted while the whole inner `And` shouldn't
+ assertResult(
+ """leaf-0 = (LESS_THAN a 10)
+ |expr = leaf-0
+ """.stripMargin.trim
+ ) {
+ OrcFilters.createFilter(Array(
+ LessThan("a", 10),
+ Not(And(
+ GreaterThan("a", 1),
+ StringContains("b", "prefix")
+ ))
+ )).get.toString
+ }
+ }
}