aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2016-09-28 00:50:12 +0800
committerCheng Lian <lian@databricks.com>2016-09-28 00:50:12 +0800
commit2cac3b2d4a4a4f3d0d45af4defc23bb0ba53484b (patch)
treec20781d0894ee036aaf25469e6c55bfd4b5cee07
parent5de1737b02710e36f6804d2ae243d1aeb30a0b32 (diff)
downloadspark-2cac3b2d4a4a4f3d0d45af4defc23bb0ba53484b.tar.gz
spark-2cac3b2d4a4a4f3d0d45af4defc23bb0ba53484b.tar.bz2
spark-2cac3b2d4a4a4f3d0d45af4defc23bb0ba53484b.zip
[SPARK-16516][SQL] Support for pushing down filters for decimal and timestamp types in ORC
## What changes were proposed in this pull request? It seems ORC supports all the types in ([`PredicateLeaf.Type`](https://github.com/apache/hive/blob/e085b7e9bd059d91aaf013df0db4d71dca90ec6f/storage-api/src/java/org/apache/hadoop/hive/ql/io/sarg/PredicateLeaf.java#L50-L56)) which includes timestamp type and decimal type. In more details, the types listed in [`SearchArgumentImpl.boxLiteral()`](https://github.com/apache/hive/blob/branch-1.2/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java#L1068-L1093) can be used as a filter value. FYI, inital `case` caluse for supported types was introduced in https://github.com/apache/spark/commit/65d71bd9fbfe6fe1b741c80fed72d6ae3d22b028 and this was not changed overtime. At that time, Hive version was, 0.13 which supports only some types for filter-push down (See [SearchArgumentImpl.java#L945-L965](https://github.com/apache/hive/blob/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java#L945-L965) at 0.13). However, the version was upgraded into 1.2.x and now it supports more types (See [SearchArgumentImpl.java#L1068-L1093](https://github.com/apache/hive/blob/branch-1.2/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java#L1068-L1093) at 1.2.0) ## How was this patch tested? Unit tests in `OrcFilterSuite` and `OrcQuerySuite` Author: hyukjinkwon <gurwls223@gmail.com> Closes #14172 from HyukjinKwon/SPARK-16516.
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala1
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala62
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala35
3 files changed, 89 insertions, 9 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala
index 6ab8244559..d9efd0cb45 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala
@@ -84,6 +84,7 @@ private[orc] object OrcFilters extends Logging {
// the `SearchArgumentImpl.BuilderImpl.boxLiteral()` method.
case ByteType | ShortType | FloatType | DoubleType => true
case IntegerType | LongType | StringType | BooleanType => true
+ case TimestampType | _: DecimalType => true
case _ => false
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala
index 471192a369..222c24927a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala
@@ -229,6 +229,59 @@ class OrcFilterSuite extends QueryTest with OrcTest {
}
}
+ test("filter pushdown - decimal") {
+ withOrcDataFrame((1 to 4).map(i => Tuple1.apply(BigDecimal.valueOf(i)))) { implicit df =>
+ checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL)
+
+ checkFilterPredicate('_1 === BigDecimal.valueOf(1), PredicateLeaf.Operator.EQUALS)
+ checkFilterPredicate('_1 <=> BigDecimal.valueOf(1), PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+
+ checkFilterPredicate('_1 < BigDecimal.valueOf(2), PredicateLeaf.Operator.LESS_THAN)
+ checkFilterPredicate('_1 > BigDecimal.valueOf(3), PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate('_1 <= BigDecimal.valueOf(1), PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate('_1 >= BigDecimal.valueOf(4), PredicateLeaf.Operator.LESS_THAN)
+
+ checkFilterPredicate(
+ Literal(BigDecimal.valueOf(1)) === '_1, PredicateLeaf.Operator.EQUALS)
+ checkFilterPredicate(
+ Literal(BigDecimal.valueOf(1)) <=> '_1, PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+ checkFilterPredicate(
+ Literal(BigDecimal.valueOf(2)) > '_1, PredicateLeaf.Operator.LESS_THAN)
+ checkFilterPredicate(
+ Literal(BigDecimal.valueOf(3)) < '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(
+ Literal(BigDecimal.valueOf(1)) >= '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(
+ Literal(BigDecimal.valueOf(4)) <= '_1, PredicateLeaf.Operator.LESS_THAN)
+ }
+ }
+
+ test("filter pushdown - timestamp") {
+ val timeString = "2015-08-20 14:57:00"
+ val timestamps = (1 to 4).map { i =>
+ val milliseconds = Timestamp.valueOf(timeString).getTime + i * 3600
+ new Timestamp(milliseconds)
+ }
+ withOrcDataFrame(timestamps.map(Tuple1(_))) { implicit df =>
+ checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL)
+
+ checkFilterPredicate('_1 === timestamps(0), PredicateLeaf.Operator.EQUALS)
+ checkFilterPredicate('_1 <=> timestamps(0), PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+
+ checkFilterPredicate('_1 < timestamps(1), PredicateLeaf.Operator.LESS_THAN)
+ checkFilterPredicate('_1 > timestamps(2), PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate('_1 <= timestamps(0), PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate('_1 >= timestamps(3), PredicateLeaf.Operator.LESS_THAN)
+
+ checkFilterPredicate(Literal(timestamps(0)) === '_1, PredicateLeaf.Operator.EQUALS)
+ checkFilterPredicate(Literal(timestamps(0)) <=> '_1, PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+ checkFilterPredicate(Literal(timestamps(1)) > '_1, PredicateLeaf.Operator.LESS_THAN)
+ checkFilterPredicate(Literal(timestamps(2)) < '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(Literal(timestamps(0)) >= '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(Literal(timestamps(3)) <= '_1, PredicateLeaf.Operator.LESS_THAN)
+ }
+ }
+
test("filter pushdown - combinations with logical operators") {
withOrcDataFrame((1 to 4).map(i => Tuple1(Option(i)))) { implicit df =>
// Because `ExpressionTree` is not accessible at Hive 1.2.x, this should be checked
@@ -277,19 +330,10 @@ class OrcFilterSuite extends QueryTest with OrcTest {
withOrcDataFrame((1 to 4).map(i => Tuple1(Array(i)))) { implicit df =>
checkNoFilterPredicate('_1.isNull)
}
- // DecimalType
- withOrcDataFrame((1 to 4).map(i => Tuple1(BigDecimal.valueOf(i)))) { implicit df =>
- checkNoFilterPredicate('_1 <= BigDecimal.valueOf(4))
- }
// BinaryType
withOrcDataFrame((1 to 4).map(i => Tuple1(i.b))) { implicit df =>
checkNoFilterPredicate('_1 <=> 1.b)
}
- // TimestampType
- val stringTimestamp = "2015-08-20 15:57:00"
- withOrcDataFrame(Seq(Tuple1(Timestamp.valueOf(stringTimestamp)))) { implicit df =>
- checkNoFilterPredicate('_1 <=> Timestamp.valueOf(stringTimestamp))
- }
// DateType
val stringDate = "2015-01-01"
withOrcDataFrame(Seq(Tuple1(Date.valueOf(stringDate)))) { implicit df =>
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
index b13878d578..b2ee49c441 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.hive.orc
import java.nio.charset.StandardCharsets
+import java.sql.Timestamp
import org.scalatest.BeforeAndAfterAll
@@ -500,6 +501,40 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
}
}
+ test("Support for pushing down filters for decimal types") {
+ withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") {
+ val data = (0 until 10).map(i => Tuple1(BigDecimal.valueOf(i)))
+ withTempPath { file =>
+ // It needs to repartition data so that we can have several ORC files
+ // in order to skip stripes in ORC.
+ createDataFrame(data).toDF("a").repartition(10).write.orc(file.getCanonicalPath)
+ val df = spark.read.orc(file.getCanonicalPath).where("a == 2")
+ val actual = stripSparkFilter(df).count()
+
+ assert(actual < 10)
+ }
+ }
+ }
+
+ test("Support for pushing down filters for timestamp types") {
+ withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") {
+ val timeString = "2015-08-20 14:57:00"
+ val data = (0 until 10).map { i =>
+ val milliseconds = Timestamp.valueOf(timeString).getTime + i * 3600
+ Tuple1(new Timestamp(milliseconds))
+ }
+ withTempPath { file =>
+ // It needs to repartition data so that we can have several ORC files
+ // in order to skip stripes in ORC.
+ createDataFrame(data).toDF("a").repartition(10).write.orc(file.getCanonicalPath)
+ val df = spark.read.orc(file.getCanonicalPath).where(s"a == '$timeString'")
+ val actual = stripSparkFilter(df).count()
+
+ assert(actual < 10)
+ }
+ }
+ }
+
test("column nullability and comment - write and then read") {
val schema = (new StructType)
.add("cl1", IntegerType, nullable = false, comment = "test")