From 7742d9f1584150befeb2f3d76cdbd4ea1f37c914 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 5 Jul 2016 13:59:13 +0800 Subject: [SPARK-15198][SQL] Support for pushing down filters for boolean types in ORC data source ## What changes were proposed in this pull request? It seems ORC supports all the types in ([`PredicateLeaf.Type`](https://github.com/apache/hive/blob/e085b7e9bd059d91aaf013df0db4d71dca90ec6f/storage-api/src/java/org/apache/hadoop/hive/ql/io/sarg/PredicateLeaf.java#L50-L56)) which includes boolean types. So, this was tested first. This PR adds the support for pushing filters down for `BooleanType` in ORC data source. This PR also removes `OrcTableScan` class and the companion object, which is not used anymore. ## How was this patch tested? Unittest in `OrcFilterSuite` and `OrcQuerySuite`. Author: hyukjinkwon Closes #12972 from HyukjinKwon/SPARK-15198. --- .../apache/spark/sql/hive/orc/OrcFileFormat.scala | 10 ++++----- .../org/apache/spark/sql/hive/orc/OrcFilters.scala | 2 +- .../apache/spark/sql/hive/orc/OrcFilterSuite.scala | 25 ++++++++++++++++++---- .../apache/spark/sql/hive/orc/OrcQuerySuite.scala | 13 +++++++++++ 4 files changed, 39 insertions(+), 11 deletions(-) (limited to 'sql') diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index 5de3507a67..1d3c4663c3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -111,7 +111,7 @@ private[sql] class OrcFileFormat if (sparkSession.sessionState.conf.orcFilterPushDown) { // Sets pushed predicates OrcFilters.createFilter(requiredSchema, filters.toArray).foreach { f => - hadoopConf.set(OrcTableScan.SARG_PUSHDOWN, f.toKryo) + hadoopConf.set(OrcRelation.SARG_PUSHDOWN, f.toKryo) hadoopConf.setBoolean(ConfVars.HIVEOPTINDEXFILTER.varname, true) } } @@ -258,15 +258,13 @@ private[orc] class OrcOutputWriter( } } -private[orc] object OrcTableScan { - // This constant duplicates `OrcInputFormat.SARG_PUSHDOWN`, which is unfortunately not public. - private[orc] val SARG_PUSHDOWN = "sarg.pushdown" -} - private[orc] object OrcRelation extends HiveInspectors { // The references of Hive's classes will be minimized. val ORC_COMPRESSION = "orc.compress" + // This constant duplicates `OrcInputFormat.SARG_PUSHDOWN`, which is unfortunately not public. + private[orc] val SARG_PUSHDOWN = "sarg.pushdown" + // The extensions for ORC compression codecs val extensionsForCompressionCodecNames = Map( "NONE" -> "", diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala index c463bc8394..6ab8244559 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala @@ -83,7 +83,7 @@ private[orc] object OrcFilters extends Logging { // Only the values in the Spark types below can be recognized by // the `SearchArgumentImpl.BuilderImpl.boxLiteral()` method. case ByteType | ShortType | FloatType | DoubleType => true - case IntegerType | LongType | StringType => true + case IntegerType | LongType | StringType | BooleanType => true case _ => false } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala index 8c027f9935..7a30e548cd 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala @@ -208,6 +208,27 @@ class OrcFilterSuite extends QueryTest with OrcTest { } } + test("filter pushdown - boolean") { + withOrcDataFrame((true :: false :: Nil).map(b => Tuple1.apply(Option(b)))) { implicit df => + checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL) + + checkFilterPredicate('_1 === true, PredicateLeaf.Operator.EQUALS) + checkFilterPredicate('_1 <=> true, PredicateLeaf.Operator.NULL_SAFE_EQUALS) + + checkFilterPredicate('_1 < true, PredicateLeaf.Operator.LESS_THAN) + checkFilterPredicate('_1 > false, PredicateLeaf.Operator.LESS_THAN_EQUALS) + checkFilterPredicate('_1 <= false, PredicateLeaf.Operator.LESS_THAN_EQUALS) + checkFilterPredicate('_1 >= false, PredicateLeaf.Operator.LESS_THAN) + + checkFilterPredicate(Literal(false) === '_1, PredicateLeaf.Operator.EQUALS) + checkFilterPredicate(Literal(false) <=> '_1, PredicateLeaf.Operator.NULL_SAFE_EQUALS) + checkFilterPredicate(Literal(false) > '_1, PredicateLeaf.Operator.LESS_THAN) + checkFilterPredicate(Literal(true) < '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS) + checkFilterPredicate(Literal(true) >= '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS) + checkFilterPredicate(Literal(true) <= '_1, PredicateLeaf.Operator.LESS_THAN) + } + } + test("filter pushdown - combinations with logical operators") { withOrcDataFrame((1 to 4).map(i => Tuple1(Option(i)))) { implicit df => // Because `ExpressionTree` is not accessible at Hive 1.2.x, this should be checked @@ -264,10 +285,6 @@ class OrcFilterSuite extends QueryTest with OrcTest { withOrcDataFrame((1 to 4).map(i => Tuple1(i.b))) { implicit df => checkNoFilterPredicate('_1 <=> 1.b) } - // BooleanType - withOrcDataFrame((true :: false :: Nil).map(b => Tuple1.apply(Option(b)))) { implicit df => - checkNoFilterPredicate('_1 === true) - } // TimestampType val stringTimestamp = "2015-08-20 15:57:00" withOrcDataFrame(Seq(Tuple1(Timestamp.valueOf(stringTimestamp)))) { implicit df => diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index 4a86987e29..af8115cf9d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -464,6 +464,19 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { } } + test("SPARK-15198 Support for pushing down filters for boolean types") { + withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") { + val data = (0 until 10).map(_ => (true, false)) + withOrcFile(data) { file => + val df = spark.read.orc(file).where("_2 == true") + val actual = stripSparkFilter(df).count() + + // ORC filter should be applied and the total count should be 0. + assert(actual === 0) + } + } + } + test("column nullability and comment - write and then read") { val schema = (new StructType) .add("cl1", IntegerType, nullable = false, comment = "test") -- cgit v1.2.3