aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test/scala
diff options
context:
space:
mode:
authorLiang-Chi Hsieh <simonh@tw.ibm.com>2016-08-10 10:03:55 -0700
committerDavies Liu <davies.liu@gmail.com>2016-08-10 10:03:55 -0700
commit19af298bb6d264adcf02f6f84c8dc1542b408507 (patch)
treedf44c0693934540581543e080d31df0fdddb2b6f /sql/core/src/test/scala
parent11a6844bebbad1968bcdc295ab2de31c60dc0874 (diff)
downloadspark-19af298bb6d264adcf02f6f84c8dc1542b408507.tar.gz
spark-19af298bb6d264adcf02f6f84c8dc1542b408507.tar.bz2
spark-19af298bb6d264adcf02f6f84c8dc1542b408507.zip
[SPARK-15639] [SPARK-16321] [SQL] Push down filter at RowGroups level for parquet reader
## What changes were proposed in this pull request? The base class `SpecificParquetRecordReaderBase` used for vectorized parquet reader will try to get pushed-down filters from the given configuration. This pushed-down filters are used for RowGroups-level filtering. However, we don't set up the filters to push down into the configuration. In other words, the filters are not actually pushed down to do RowGroups-level filtering. This patch is to fix this and tries to set up the filters for pushing down to configuration for the reader. The benchmark that excludes the time of writing Parquet file: test("Benchmark for Parquet") { val N = 500 << 12 withParquetTable((0 until N).map(i => (101, i)), "t") { val benchmark = new Benchmark("Parquet reader", N) benchmark.addCase("reading Parquet file", 10) { iter => sql("SELECT _1 FROM t where t._1 < 100").collect() } benchmark.run() } } `withParquetTable` in default will run tests for vectorized reader non-vectorized readers. I only let it run vectorized reader. When we set the block size of parquet as 1024 to have multiple row groups. The benchmark is: Before this patch: The retrieved row groups: 8063 Java HotSpot(TM) 64-Bit Server VM 1.8.0_71-b15 on Linux 3.19.0-25-generic Intel(R) Core(TM) i7-5557U CPU 3.10GHz Parquet reader: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ reading Parquet file 825 / 1233 2.5 402.6 1.0X After this patch: The retrieved row groups: 0 Java HotSpot(TM) 64-Bit Server VM 1.8.0_71-b15 on Linux 3.19.0-25-generic Intel(R) Core(TM) i7-5557U CPU 3.10GHz Parquet reader: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ reading Parquet file 306 / 503 6.7 149.6 1.0X Next, I run the benchmark for non-pushdown case using the same benchmark code but with disabled pushdown configuration. This time the parquet block size is default value. Before this patch: Java HotSpot(TM) 64-Bit Server VM 1.8.0_71-b15 on Linux 3.19.0-25-generic Intel(R) Core(TM) i7-5557U CPU 3.10GHz Parquet reader: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ reading Parquet file 136 / 238 15.0 66.5 1.0X After this patch: Java HotSpot(TM) 64-Bit Server VM 1.8.0_71-b15 on Linux 3.19.0-25-generic Intel(R) Core(TM) i7-5557U CPU 3.10GHz Parquet reader: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ reading Parquet file 124 / 193 16.5 60.7 1.0X For non-pushdown case, from the results, I think this patch doesn't affect normal code path. I've manually output the `totalRowCount` in `SpecificParquetRecordReaderBase` to see if this patch actually filter the row-groups. When running the above benchmark: After this patch: `totalRowCount = 0` Before this patch: `totalRowCount = 1024000` ## How was this patch tested? Existing tests should be passed. Author: Liang-Chi Hsieh <simonh@tw.ibm.com> Closes #13701 from viirya/vectorized-reader-push-down-filter2.
Diffstat (limited to 'sql/core/src/test/scala')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala165
1 files changed, 98 insertions, 67 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index d846b27ffe..4246b54c21 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
+import org.apache.spark.util.{AccumulatorContext, LongAccumulator}
/**
* A test suite that tests Parquet filter2 API based filter pushdown optimization.
@@ -368,73 +369,75 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
test("SPARK-11103: Filter applied on merged Parquet schema with new column fails") {
import testImplicits._
-
- withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
- SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true") {
- withTempPath { dir =>
- val pathOne = s"${dir.getCanonicalPath}/table1"
- (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathOne)
- val pathTwo = s"${dir.getCanonicalPath}/table2"
- (1 to 3).map(i => (i, i.toString)).toDF("c", "b").write.parquet(pathTwo)
-
- // If the "c = 1" filter gets pushed down, this query will throw an exception which
- // Parquet emits. This is a Parquet issue (PARQUET-389).
- val df = spark.read.parquet(pathOne, pathTwo).filter("c = 1").selectExpr("c", "b", "a")
- checkAnswer(
- df,
- Row(1, "1", null))
-
- // The fields "a" and "c" only exist in one Parquet file.
- assert(df.schema("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
- assert(df.schema("c").metadata.getBoolean(StructType.metadataKeyForOptionalField))
-
- val pathThree = s"${dir.getCanonicalPath}/table3"
- df.write.parquet(pathThree)
-
- // We will remove the temporary metadata when writing Parquet file.
- val schema = spark.read.parquet(pathThree).schema
- assert(schema.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField)))
-
- val pathFour = s"${dir.getCanonicalPath}/table4"
- val dfStruct = sparkContext.parallelize(Seq((1, 1))).toDF("a", "b")
- dfStruct.select(struct("a").as("s")).write.parquet(pathFour)
-
- val pathFive = s"${dir.getCanonicalPath}/table5"
- val dfStruct2 = sparkContext.parallelize(Seq((1, 1))).toDF("c", "b")
- dfStruct2.select(struct("c").as("s")).write.parquet(pathFive)
-
- // If the "s.c = 1" filter gets pushed down, this query will throw an exception which
- // Parquet emits.
- val dfStruct3 = spark.read.parquet(pathFour, pathFive).filter("s.c = 1")
- .selectExpr("s")
- checkAnswer(dfStruct3, Row(Row(null, 1)))
-
- // The fields "s.a" and "s.c" only exist in one Parquet file.
- val field = dfStruct3.schema("s").dataType.asInstanceOf[StructType]
- assert(field("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
- assert(field("c").metadata.getBoolean(StructType.metadataKeyForOptionalField))
-
- val pathSix = s"${dir.getCanonicalPath}/table6"
- dfStruct3.write.parquet(pathSix)
-
- // We will remove the temporary metadata when writing Parquet file.
- val forPathSix = spark.read.parquet(pathSix).schema
- assert(forPathSix.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField)))
-
- // sanity test: make sure optional metadata field is not wrongly set.
- val pathSeven = s"${dir.getCanonicalPath}/table7"
- (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathSeven)
- val pathEight = s"${dir.getCanonicalPath}/table8"
- (4 to 6).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathEight)
-
- val df2 = spark.read.parquet(pathSeven, pathEight).filter("a = 1").selectExpr("a", "b")
- checkAnswer(
- df2,
- Row(1, "1"))
-
- // The fields "a" and "b" exist in both two Parquet files. No metadata is set.
- assert(!df2.schema("a").metadata.contains(StructType.metadataKeyForOptionalField))
- assert(!df2.schema("b").metadata.contains(StructType.metadataKeyForOptionalField))
+ Seq("true", "false").map { vectorized =>
+ withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
+ SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true",
+ SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) {
+ withTempPath { dir =>
+ val pathOne = s"${dir.getCanonicalPath}/table1"
+ (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathOne)
+ val pathTwo = s"${dir.getCanonicalPath}/table2"
+ (1 to 3).map(i => (i, i.toString)).toDF("c", "b").write.parquet(pathTwo)
+
+ // If the "c = 1" filter gets pushed down, this query will throw an exception which
+ // Parquet emits. This is a Parquet issue (PARQUET-389).
+ val df = spark.read.parquet(pathOne, pathTwo).filter("c = 1").selectExpr("c", "b", "a")
+ checkAnswer(
+ df,
+ Row(1, "1", null))
+
+ // The fields "a" and "c" only exist in one Parquet file.
+ assert(df.schema("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
+ assert(df.schema("c").metadata.getBoolean(StructType.metadataKeyForOptionalField))
+
+ val pathThree = s"${dir.getCanonicalPath}/table3"
+ df.write.parquet(pathThree)
+
+ // We will remove the temporary metadata when writing Parquet file.
+ val schema = spark.read.parquet(pathThree).schema
+ assert(schema.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField)))
+
+ val pathFour = s"${dir.getCanonicalPath}/table4"
+ val dfStruct = sparkContext.parallelize(Seq((1, 1))).toDF("a", "b")
+ dfStruct.select(struct("a").as("s")).write.parquet(pathFour)
+
+ val pathFive = s"${dir.getCanonicalPath}/table5"
+ val dfStruct2 = sparkContext.parallelize(Seq((1, 1))).toDF("c", "b")
+ dfStruct2.select(struct("c").as("s")).write.parquet(pathFive)
+
+ // If the "s.c = 1" filter gets pushed down, this query will throw an exception which
+ // Parquet emits.
+ val dfStruct3 = spark.read.parquet(pathFour, pathFive).filter("s.c = 1")
+ .selectExpr("s")
+ checkAnswer(dfStruct3, Row(Row(null, 1)))
+
+ // The fields "s.a" and "s.c" only exist in one Parquet file.
+ val field = dfStruct3.schema("s").dataType.asInstanceOf[StructType]
+ assert(field("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
+ assert(field("c").metadata.getBoolean(StructType.metadataKeyForOptionalField))
+
+ val pathSix = s"${dir.getCanonicalPath}/table6"
+ dfStruct3.write.parquet(pathSix)
+
+ // We will remove the temporary metadata when writing Parquet file.
+ val forPathSix = spark.read.parquet(pathSix).schema
+ assert(forPathSix.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField)))
+
+ // sanity test: make sure optional metadata field is not wrongly set.
+ val pathSeven = s"${dir.getCanonicalPath}/table7"
+ (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathSeven)
+ val pathEight = s"${dir.getCanonicalPath}/table8"
+ (4 to 6).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathEight)
+
+ val df2 = spark.read.parquet(pathSeven, pathEight).filter("a = 1").selectExpr("a", "b")
+ checkAnswer(
+ df2,
+ Row(1, "1"))
+
+ // The fields "a" and "b" exist in both two Parquet files. No metadata is set.
+ assert(!df2.schema("a").metadata.contains(StructType.metadataKeyForOptionalField))
+ assert(!df2.schema("b").metadata.contains(StructType.metadataKeyForOptionalField))
+ }
}
}
}
@@ -527,4 +530,32 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
assert(df.filter("_1 IS NOT NULL").count() === 4)
}
}
+
+ test("Fiters should be pushed down for vectorized Parquet reader at row group level") {
+ import testImplicits._
+
+ withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true",
+ SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
+ withTempPath { dir =>
+ val path = s"${dir.getCanonicalPath}/table"
+ (1 to 1024).map(i => (101, i)).toDF("a", "b").write.parquet(path)
+
+ Seq(("true", (x: Long) => x == 0), ("false", (x: Long) => x > 0)).map { case (push, func) =>
+ withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> push) {
+ val accu = new LongAccumulator
+ accu.register(sparkContext, Some("numRowGroups"))
+
+ val df = spark.read.parquet(path).filter("a < 100")
+ df.foreachPartition(_.foreach(v => accu.add(0)))
+ df.collect
+
+ val numRowGroups = AccumulatorContext.lookForAccumulatorByName("numRowGroups")
+ assert(numRowGroups.isDefined)
+ assert(func(numRowGroups.get.asInstanceOf[LongAccumulator].value))
+ AccumulatorContext.remove(accu.id)
+ }
+ }
+ }
+ }
+ }
}