aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala32
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala40
2 files changed, 66 insertions, 6 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 0d68810ec6..53f765ee26 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -765,12 +765,14 @@ private[sql] object ParquetRelation2 extends Logging {
|${parquetSchema.prettyJson}
""".stripMargin
- assert(metastoreSchema.size <= parquetSchema.size, schemaConflictMessage)
+ val mergedParquetSchema = mergeMissingNullableFields(metastoreSchema, parquetSchema)
+
+ assert(metastoreSchema.size <= mergedParquetSchema.size, schemaConflictMessage)
val ordinalMap = metastoreSchema.zipWithIndex.map {
case (field, index) => field.name.toLowerCase -> index
}.toMap
- val reorderedParquetSchema = parquetSchema.sortBy(f =>
+ val reorderedParquetSchema = mergedParquetSchema.sortBy(f =>
ordinalMap.getOrElse(f.name.toLowerCase, metastoreSchema.size + 1))
StructType(metastoreSchema.zip(reorderedParquetSchema).map {
@@ -782,6 +784,32 @@ private[sql] object ParquetRelation2 extends Logging {
})
}
+ /**
+ * Returns the original schema from the Parquet file with any missing nullable fields from the
+ * Hive Metastore schema merged in.
+ *
+ * When constructing a DataFrame from a collection of structured data, the resulting object has
+ * a schema corresponding to the union of the fields present in each element of the collection.
+ * Spark SQL simply assigns a null value to any field that isn't present for a particular row.
+ * In some cases, it is possible that a given table partition stored as a Parquet file doesn't
+ * contain a particular nullable field in its schema despite that field being present in the
+ * table schema obtained from the Hive Metastore. This method returns a schema representing the
+ * Parquet file schema along with any additional nullable fields from the Metastore schema
+ * merged in.
+ */
+ private[parquet] def mergeMissingNullableFields(
+ metastoreSchema: StructType,
+ parquetSchema: StructType): StructType = {
+ val fieldMap = metastoreSchema.map(f => f.name.toLowerCase -> f).toMap
+ val missingFields = metastoreSchema
+ .map(_.name.toLowerCase)
+ .diff(parquetSchema.map(_.name.toLowerCase))
+ .map(fieldMap(_))
+ .filter(_.nullable)
+ StructType(parquetSchema ++ missingFields)
+ }
+
+
// TODO Data source implementations shouldn't touch Catalyst types (`Literal`).
// However, we are already using Catalyst expressions for partition pruning and predicate
// push-down here...
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
index 8462f9bb2d..61f1cf347a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
@@ -226,22 +226,54 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
StructField("UPPERCase", IntegerType, nullable = true))))
}
- // Conflicting field count
+ // Metastore schema contains additional non-nullable fields.
assert(intercept[Throwable] {
ParquetRelation2.mergeMetastoreParquetSchema(
StructType(Seq(
StructField("uppercase", DoubleType, nullable = false),
- StructField("lowerCase", BinaryType))),
+ StructField("lowerCase", BinaryType, nullable = false))),
StructType(Seq(
StructField("UPPERCase", IntegerType, nullable = true))))
}.getMessage.contains("detected conflicting schemas"))
- // Conflicting field names
+ // Conflicting non-nullable field names
intercept[Throwable] {
ParquetRelation2.mergeMetastoreParquetSchema(
- StructType(Seq(StructField("lower", StringType))),
+ StructType(Seq(StructField("lower", StringType, nullable = false))),
StructType(Seq(StructField("lowerCase", BinaryType))))
}
}
+
+ test("merge missing nullable fields from Metastore schema") {
+ // Standard case: Metastore schema contains additional nullable fields not present
+ // in the Parquet file schema.
+ assertResult(
+ StructType(Seq(
+ StructField("firstField", StringType, nullable = true),
+ StructField("secondField", StringType, nullable = true),
+ StructField("thirdfield", StringType, nullable = true)))) {
+ ParquetRelation2.mergeMetastoreParquetSchema(
+ StructType(Seq(
+ StructField("firstfield", StringType, nullable = true),
+ StructField("secondfield", StringType, nullable = true),
+ StructField("thirdfield", StringType, nullable = true))),
+ StructType(Seq(
+ StructField("firstField", StringType, nullable = true),
+ StructField("secondField", StringType, nullable = true))))
+ }
+
+ // Merge should fail if the Metastore contains any additional fields that are not
+ // nullable.
+ assert(intercept[Throwable] {
+ ParquetRelation2.mergeMetastoreParquetSchema(
+ StructType(Seq(
+ StructField("firstfield", StringType, nullable = true),
+ StructField("secondfield", StringType, nullable = true),
+ StructField("thirdfield", StringType, nullable = false))),
+ StructType(Seq(
+ StructField("firstField", StringType, nullable = true),
+ StructField("secondField", StringType, nullable = true))))
+ }.getMessage.contains("detected conflicting schemas"))
+ }
}