diff options
author | Cheng Lian <lian@databricks.com> | 2015-08-16 10:17:58 -0700 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2015-08-16 10:17:58 -0700 |
commit | ae2370e72f93db8a28b262e8252c55fe1fc9873c (patch) | |
tree | b3bf8b6699430bfd4f0b2ecef0103d40bf1d3f76 /sql | |
parent | cf016075a006034c24c5b758edb279f3e151d25d (diff) | |
download | spark-ae2370e72f93db8a28b262e8252c55fe1fc9873c.tar.gz spark-ae2370e72f93db8a28b262e8252c55fe1fc9873c.tar.bz2 spark-ae2370e72f93db8a28b262e8252c55fe1fc9873c.zip |
[SPARK-10005] [SQL] Fixes schema merging for nested structs
In case of schema merging, we only handled first level fields when converting Parquet groups to `InternalRow`s. Nested struct fields are not properly handled.
For example, the schema of a Parquet file to be read can be:
```
message individual {
required group f1 {
optional binary f11 (utf8);
}
}
```
while the global schema is:
```
message global {
required group f1 {
optional binary f11 (utf8);
optional int32 f12;
}
}
```
This PR fixes this issue by padding missing fields when creating actual converters.
Author: Cheng Lian <lian@databricks.com>
Closes #8228 from liancheng/spark-10005/nested-schema-merging.
Diffstat (limited to 'sql')
4 files changed, 112 insertions, 22 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala index 4049795ed3..a4679bb2f6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.types.StructType private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with Logging { + // Called after `init()` when initializing Parquet record reader. override def prepareForRead( conf: Configuration, keyValueMetaData: JMap[String, String], @@ -51,19 +52,30 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with // available if the target file is written by Spark SQL. .orElse(metadata.get(CatalystReadSupport.SPARK_METADATA_KEY)) }.map(StructType.fromString).getOrElse { - logDebug("Catalyst schema not available, falling back to Parquet schema") + logInfo("Catalyst schema not available, falling back to Parquet schema") toCatalyst.convert(parquetRequestedSchema) } - logDebug(s"Catalyst schema used to read Parquet files: $catalystRequestedSchema") + logInfo { + s"""Going to read the following fields from the Parquet file: + | + |Parquet form: + |$parquetRequestedSchema + | + |Catalyst form: + |$catalystRequestedSchema + """.stripMargin + } + new CatalystRecordMaterializer(parquetRequestedSchema, catalystRequestedSchema) } + // Called before `prepareForRead()` when initializing Parquet record reader. override def init(context: InitContext): ReadContext = { val conf = context.getConfiguration // If the target file was written by Spark SQL, we should be able to find a serialized Catalyst - // schema of this file from its the metadata. + // schema of this file from its metadata. val maybeRowSchema = Option(conf.get(RowWriteSupport.SPARK_ROW_SCHEMA)) // Optional schema of requested columns, in the form of a string serialized from a Catalyst @@ -141,7 +153,6 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with maybeRequestedSchema.map(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA -> _) ++ maybeRowSchema.map(RowWriteSupport.SPARK_ROW_SCHEMA -> _) - logInfo(s"Going to read Parquet file with these requested columns: $parquetRequestedSchema") new ReadContext(parquetRequestedSchema, metadata) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala index ab5a6ddd41..18c5b50020 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala @@ -25,9 +25,10 @@ import scala.collection.mutable.ArrayBuffer import org.apache.parquet.column.Dictionary import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter} -import org.apache.parquet.schema.OriginalType.LIST +import org.apache.parquet.schema.OriginalType.{LIST, INT_32, UTF8} +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE import org.apache.parquet.schema.Type.Repetition -import org.apache.parquet.schema.{GroupType, PrimitiveType, Type} +import org.apache.parquet.schema.{GroupType, MessageType, PrimitiveType, Type} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -88,12 +89,54 @@ private[parquet] class CatalystPrimitiveConverter(val updater: ParentContainerUp } /** - * A [[CatalystRowConverter]] is used to convert Parquet "structs" into Spark SQL [[InternalRow]]s. - * Since any Parquet record is also a struct, this converter can also be used as root converter. + * A [[CatalystRowConverter]] is used to convert Parquet records into Catalyst [[InternalRow]]s. + * Since Catalyst `StructType` is also a Parquet record, this converter can be used as root + * converter. Take the following Parquet type as an example: + * {{{ + * message root { + * required int32 f1; + * optional group f2 { + * required double f21; + * optional binary f22 (utf8); + * } + * } + * }}} + * 5 converters will be created: + * + * - a root [[CatalystRowConverter]] for [[MessageType]] `root`, which contains: + * - a [[CatalystPrimitiveConverter]] for required [[INT_32]] field `f1`, and + * - a nested [[CatalystRowConverter]] for optional [[GroupType]] `f2`, which contains: + * - a [[CatalystPrimitiveConverter]] for required [[DOUBLE]] field `f21`, and + * - a [[CatalystStringConverter]] for optional [[UTF8]] string field `f22` * * When used as a root converter, [[NoopUpdater]] should be used since root converters don't have * any "parent" container. * + * @note Constructor argument [[parquetType]] refers to requested fields of the actual schema of the + * Parquet file being read, while constructor argument [[catalystType]] refers to requested + * fields of the global schema. The key difference is that, in case of schema merging, + * [[parquetType]] can be a subset of [[catalystType]]. For example, it's possible to have + * the following [[catalystType]]: + * {{{ + * new StructType() + * .add("f1", IntegerType, nullable = false) + * .add("f2", StringType, nullable = true) + * .add("f3", new StructType() + * .add("f31", DoubleType, nullable = false) + * .add("f32", IntegerType, nullable = true) + * .add("f33", StringType, nullable = true), nullable = false) + * }}} + * and the following [[parquetType]] (`f2` and `f32` are missing): + * {{{ + * message root { + * required int32 f1; + * required group f3 { + * required double f31; + * optional binary f33 (utf8); + * } + * } + * }}} + * * @param parquetType Parquet schema of Parquet records * @param catalystType Spark SQL schema that corresponds to the Parquet record type * @param updater An updater which propagates converted field values to the parent container @@ -126,7 +169,24 @@ private[parquet] class CatalystRowConverter( // Converters for each field. private val fieldConverters: Array[Converter with HasParentContainerUpdater] = { - parquetType.getFields.zip(catalystType).zipWithIndex.map { + // In case of schema merging, `parquetType` can be a subset of `catalystType`. We need to pad + // those missing fields and create converters for them, although values of these fields are + // always null. + val paddedParquetFields = { + val parquetFields = parquetType.getFields + val parquetFieldNames = parquetFields.map(_.getName).toSet + val missingFields = catalystType.filterNot(f => parquetFieldNames.contains(f.name)) + + // We don't need to worry about feature flag arguments like `assumeBinaryIsString` when + // creating the schema converter here, since values of missing fields are always null. + val toParquet = new CatalystSchemaConverter() + + (parquetFields ++ missingFields.map(toParquet.convertField)).sortBy { f => + catalystType.indexWhere(_.name == f.getName) + } + } + + paddedParquetFields.zip(catalystType).zipWithIndex.map { case ((parquetFieldType, catalystField), ordinal) => // Converted field value should be set to the `ordinal`-th cell of `currentRow` newConverter(parquetFieldType, catalystField.dataType, new RowUpdater(currentRow, ordinal)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala index 275646e818..535f0684e9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala @@ -72,18 +72,9 @@ private[parquet] class CatalystSchemaConverter( followParquetFormatSpec = conf.followParquetFormatSpec) def this(conf: Configuration) = this( - assumeBinaryIsString = - conf.getBoolean( - SQLConf.PARQUET_BINARY_AS_STRING.key, - SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get), - assumeInt96IsTimestamp = - conf.getBoolean( - SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, - SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get), - followParquetFormatSpec = - conf.getBoolean( - SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.key, - SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.defaultValue.get)) + assumeBinaryIsString = conf.get(SQLConf.PARQUET_BINARY_AS_STRING.key).toBoolean, + assumeInt96IsTimestamp = conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean, + followParquetFormatSpec = conf.get(SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.key).toBoolean) /** * Converts Parquet [[MessageType]] `parquetSchema` to a Spark SQL [[StructType]]. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index e2f2a8c744..b7b70c2bbb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -21,7 +21,7 @@ import java.io.File import org.apache.hadoop.fs.Path -import org.apache.spark.sql.{QueryTest, Row, SQLConf} +import org.apache.spark.sql._ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -201,4 +201,32 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext assert(Decimal("67123.45") === Decimal(decimal)) } } + + test("SPARK-10005 Schema merging for nested struct") { + val sqlContext = _sqlContext + import sqlContext.implicits._ + + withTempPath { dir => + val path = dir.getCanonicalPath + + def append(df: DataFrame): Unit = { + df.write.mode(SaveMode.Append).parquet(path) + } + + // Note that both the following two DataFrames contain a single struct column with multiple + // nested fields. + append((1 to 2).map(i => Tuple1((i, i))).toDF()) + append((1 to 2).map(i => Tuple1((i, i, i))).toDF()) + + withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING.key -> "true") { + checkAnswer( + sqlContext.read.option("mergeSchema", "true").parquet(path), + Seq( + Row(Row(1, 1, null)), + Row(Row(2, 2, null)), + Row(Row(1, 1, 1)), + Row(Row(2, 2, 2)))) + } + } + } } |