diff options
3 files changed, 19 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 98336203b0..76d7f5cbc3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -438,7 +438,7 @@ private[sql] class ParquetOutputWriterFactory( ParquetOutputFormat.setWriteSupportClass(job, classOf[ParquetWriteSupport]) // We want to clear this temporary metadata from saving into Parquet file. - // This metadata is only useful for detecting optional columns when pushdowning filters. + // This metadata is only useful for detecting optional columns when pushing down filters. val dataSchemaToWrite = StructType.removeMetadata( StructType.metadataKeyForOptionalField, dataSchema).asInstanceOf[StructType] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 7213a38b08..e0a113a1b3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -185,10 +185,13 @@ private[sql] object ParquetFilters { */ private def getFieldMap(dataType: DataType): Array[(String, DataType)] = dataType match { case StructType(fields) => + // Here we don't flatten the fields in the nested schema but just look up through + // root fields. Currently, accessing to nested fields does not push down filters + // and it does not support to create filters for them. fields.filter { f => !f.metadata.contains(StructType.metadataKeyForOptionalField) || !f.metadata.getBoolean(StructType.metadataKeyForOptionalField) - }.map(f => f.name -> f.dataType) ++ fields.flatMap { f => getFieldMap(f.dataType) } + }.map(f => f.name -> f.dataType) case _ => Array.empty[(String, DataType)] } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 2a5666e70f..18a3128010 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -543,4 +543,18 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex } } } + + test("Do not push down filters incorrectly when inner name and outer name are the same") { + withParquetDataFrame((1 to 4).map(i => Tuple1(Tuple1(i)))) { implicit df => + // Here the schema becomes as below: + // + // root + // |-- _1: struct (nullable = true) + // | |-- _1: integer (nullable = true) + // + // The inner column name, `_1` and outer column name `_1` are the same. + // Obviously this should not push down filters because the outer column is struct. + assert(df.filter("_1 IS NOT NULL").count() === 4) + } + } } |