aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala5
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala14
3 files changed, 19 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 98336203b0..76d7f5cbc3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -438,7 +438,7 @@ private[sql] class ParquetOutputWriterFactory(
ParquetOutputFormat.setWriteSupportClass(job, classOf[ParquetWriteSupport])
// We want to clear this temporary metadata from saving into Parquet file.
- // This metadata is only useful for detecting optional columns when pushdowning filters.
+ // This metadata is only useful for detecting optional columns when pushing down filters.
val dataSchemaToWrite = StructType.removeMetadata(
StructType.metadataKeyForOptionalField,
dataSchema).asInstanceOf[StructType]
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 7213a38b08..e0a113a1b3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -185,10 +185,13 @@ private[sql] object ParquetFilters {
*/
private def getFieldMap(dataType: DataType): Array[(String, DataType)] = dataType match {
case StructType(fields) =>
+ // Here we don't flatten the fields in the nested schema but just look up through
+ // root fields. Currently, accessing to nested fields does not push down filters
+ // and it does not support to create filters for them.
fields.filter { f =>
!f.metadata.contains(StructType.metadataKeyForOptionalField) ||
!f.metadata.getBoolean(StructType.metadataKeyForOptionalField)
- }.map(f => f.name -> f.dataType) ++ fields.flatMap { f => getFieldMap(f.dataType) }
+ }.map(f => f.name -> f.dataType)
case _ => Array.empty[(String, DataType)]
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 2a5666e70f..18a3128010 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -543,4 +543,18 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
}
}
}
+
+ test("Do not push down filters incorrectly when inner name and outer name are the same") {
+ withParquetDataFrame((1 to 4).map(i => Tuple1(Tuple1(i)))) { implicit df =>
+ // Here the schema becomes as below:
+ //
+ // root
+ // |-- _1: struct (nullable = true)
+ // | |-- _1: integer (nullable = true)
+ //
+ // The inner column name, `_1` and outer column name `_1` are the same.
+ // Obviously this should not push down filters because the outer column is struct.
+ assert(df.filter("_1 IS NOT NULL").count() === 4)
+ }
+ }
}