aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2016-07-06 12:42:16 -0700
committerReynold Xin <rxin@databricks.com>2016-07-06 12:42:16 -0700
commit4f8ceed59367319300e4bfa5b957c387be81ffa3 (patch)
tree8f16f31a9d6366e4d714d830b253908218bec21b /sql
parent480357cc6d71c682fe703611c71c1e6a36e6ce9a (diff)
downloadspark-4f8ceed59367319300e4bfa5b957c387be81ffa3.tar.gz
spark-4f8ceed59367319300e4bfa5b957c387be81ffa3.tar.bz2
spark-4f8ceed59367319300e4bfa5b957c387be81ffa3.zip
[SPARK-16371][SQL] Do not push down filters incorrectly when inner name and outer name are the same in Parquet
## What changes were proposed in this pull request? Currently, if there is a schema as below: ``` root |-- _1: struct (nullable = true) | |-- _1: integer (nullable = true) ``` and if we execute the codes below: ```scala df.filter("_1 IS NOT NULL").count() ``` This pushes down a filter although this filter is being applied to `StructType`.(If my understanding is correct, Spark does not pushes down filters for those). The reason is, `ParquetFilters.getFieldMap` produces results below: ``` (_1,StructType(StructField(_1,IntegerType,true))) (_1,IntegerType) ``` and then it becomes a `Map` ``` (_1,IntegerType) ``` Now, because of ` ....lift(dataTypeOf(name)).map(_(name, value))`, this pushes down filters for `_1` which Parquet thinks is `IntegerType`. However, it is actually `StructType`. So, Parquet filter2 produces incorrect results, for example, the codes below: ``` df.filter("_1 IS NOT NULL").count() ``` produces always 0. This PR prevents this by not finding nested fields. ## How was this patch tested? Unit test in `ParquetFilterSuite`. Author: hyukjinkwon <gurwls223@gmail.com> Closes #14067 from HyukjinKwon/SPARK-16371.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala5
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala14
3 files changed, 19 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 98336203b0..76d7f5cbc3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -438,7 +438,7 @@ private[sql] class ParquetOutputWriterFactory(
ParquetOutputFormat.setWriteSupportClass(job, classOf[ParquetWriteSupport])
// We want to clear this temporary metadata from saving into Parquet file.
- // This metadata is only useful for detecting optional columns when pushdowning filters.
+ // This metadata is only useful for detecting optional columns when pushing down filters.
val dataSchemaToWrite = StructType.removeMetadata(
StructType.metadataKeyForOptionalField,
dataSchema).asInstanceOf[StructType]
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 7213a38b08..e0a113a1b3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -185,10 +185,13 @@ private[sql] object ParquetFilters {
*/
private def getFieldMap(dataType: DataType): Array[(String, DataType)] = dataType match {
case StructType(fields) =>
+ // Here we don't flatten the fields in the nested schema but just look up through
+ // root fields. Currently, accessing to nested fields does not push down filters
+ // and it does not support to create filters for them.
fields.filter { f =>
!f.metadata.contains(StructType.metadataKeyForOptionalField) ||
!f.metadata.getBoolean(StructType.metadataKeyForOptionalField)
- }.map(f => f.name -> f.dataType) ++ fields.flatMap { f => getFieldMap(f.dataType) }
+ }.map(f => f.name -> f.dataType)
case _ => Array.empty[(String, DataType)]
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 2a5666e70f..18a3128010 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -543,4 +543,18 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
}
}
}
+
+ test("Do not push down filters incorrectly when inner name and outer name are the same") {
+ withParquetDataFrame((1 to 4).map(i => Tuple1(Tuple1(i)))) { implicit df =>
+ // Here the schema becomes as below:
+ //
+ // root
+ // |-- _1: struct (nullable = true)
+ // | |-- _1: integer (nullable = true)
+ //
+ // The inner column name, `_1` and outer column name `_1` are the same.
+ // Obviously this should not push down filters because the outer column is struct.
+ assert(df.filter("_1 IS NOT NULL").count() === 4)
+ }
+ }
}