aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-08-16 10:17:58 -0700
committerYin Huai <yhuai@databricks.com>2015-08-16 10:17:58 -0700
commitae2370e72f93db8a28b262e8252c55fe1fc9873c (patch)
treeb3bf8b6699430bfd4f0b2ecef0103d40bf1d3f76
parentcf016075a006034c24c5b758edb279f3e151d25d (diff)
downloadspark-ae2370e72f93db8a28b262e8252c55fe1fc9873c.tar.gz
spark-ae2370e72f93db8a28b262e8252c55fe1fc9873c.tar.bz2
spark-ae2370e72f93db8a28b262e8252c55fe1fc9873c.zip
[SPARK-10005] [SQL] Fixes schema merging for nested structs
In case of schema merging, we only handled first level fields when converting Parquet groups to `InternalRow`s. Nested struct fields are not properly handled. For example, the schema of a Parquet file to be read can be: ``` message individual { required group f1 { optional binary f11 (utf8); } } ``` while the global schema is: ``` message global { required group f1 { optional binary f11 (utf8); optional int32 f12; } } ``` This PR fixes this issue by padding missing fields when creating actual converters. Author: Cheng Lian <lian@databricks.com> Closes #8228 from liancheng/spark-10005/nested-schema-merging.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala19
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala70
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala15
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala30
4 files changed, 112 insertions, 22 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
index 4049795ed3..a4679bb2f6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.StructType
private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with Logging {
+ // Called after `init()` when initializing Parquet record reader.
override def prepareForRead(
conf: Configuration,
keyValueMetaData: JMap[String, String],
@@ -51,19 +52,30 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with
// available if the target file is written by Spark SQL.
.orElse(metadata.get(CatalystReadSupport.SPARK_METADATA_KEY))
}.map(StructType.fromString).getOrElse {
- logDebug("Catalyst schema not available, falling back to Parquet schema")
+ logInfo("Catalyst schema not available, falling back to Parquet schema")
toCatalyst.convert(parquetRequestedSchema)
}
- logDebug(s"Catalyst schema used to read Parquet files: $catalystRequestedSchema")
+ logInfo {
+ s"""Going to read the following fields from the Parquet file:
+ |
+ |Parquet form:
+ |$parquetRequestedSchema
+ |
+ |Catalyst form:
+ |$catalystRequestedSchema
+ """.stripMargin
+ }
+
new CatalystRecordMaterializer(parquetRequestedSchema, catalystRequestedSchema)
}
+ // Called before `prepareForRead()` when initializing Parquet record reader.
override def init(context: InitContext): ReadContext = {
val conf = context.getConfiguration
// If the target file was written by Spark SQL, we should be able to find a serialized Catalyst
- // schema of this file from its the metadata.
+ // schema of this file from its metadata.
val maybeRowSchema = Option(conf.get(RowWriteSupport.SPARK_ROW_SCHEMA))
// Optional schema of requested columns, in the form of a string serialized from a Catalyst
@@ -141,7 +153,6 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with
maybeRequestedSchema.map(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA -> _) ++
maybeRowSchema.map(RowWriteSupport.SPARK_ROW_SCHEMA -> _)
- logInfo(s"Going to read Parquet file with these requested columns: $parquetRequestedSchema")
new ReadContext(parquetRequestedSchema, metadata)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
index ab5a6ddd41..18c5b50020 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
@@ -25,9 +25,10 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.parquet.column.Dictionary
import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter}
-import org.apache.parquet.schema.OriginalType.LIST
+import org.apache.parquet.schema.OriginalType.{LIST, INT_32, UTF8}
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE
import org.apache.parquet.schema.Type.Repetition
-import org.apache.parquet.schema.{GroupType, PrimitiveType, Type}
+import org.apache.parquet.schema.{GroupType, MessageType, PrimitiveType, Type}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
@@ -88,12 +89,54 @@ private[parquet] class CatalystPrimitiveConverter(val updater: ParentContainerUp
}
/**
- * A [[CatalystRowConverter]] is used to convert Parquet "structs" into Spark SQL [[InternalRow]]s.
- * Since any Parquet record is also a struct, this converter can also be used as root converter.
+ * A [[CatalystRowConverter]] is used to convert Parquet records into Catalyst [[InternalRow]]s.
+ * Since Catalyst `StructType` is also a Parquet record, this converter can be used as root
+ * converter. Take the following Parquet type as an example:
+ * {{{
+ * message root {
+ * required int32 f1;
+ * optional group f2 {
+ * required double f21;
+ * optional binary f22 (utf8);
+ * }
+ * }
+ * }}}
+ * 5 converters will be created:
+ *
+ * - a root [[CatalystRowConverter]] for [[MessageType]] `root`, which contains:
+ * - a [[CatalystPrimitiveConverter]] for required [[INT_32]] field `f1`, and
+ * - a nested [[CatalystRowConverter]] for optional [[GroupType]] `f2`, which contains:
+ * - a [[CatalystPrimitiveConverter]] for required [[DOUBLE]] field `f21`, and
+ * - a [[CatalystStringConverter]] for optional [[UTF8]] string field `f22`
*
* When used as a root converter, [[NoopUpdater]] should be used since root converters don't have
* any "parent" container.
*
+ * @note Constructor argument [[parquetType]] refers to requested fields of the actual schema of the
+ * Parquet file being read, while constructor argument [[catalystType]] refers to requested
+ * fields of the global schema. The key difference is that, in case of schema merging,
+ * [[parquetType]] can be a subset of [[catalystType]]. For example, it's possible to have
+ * the following [[catalystType]]:
+ * {{{
+ * new StructType()
+ * .add("f1", IntegerType, nullable = false)
+ * .add("f2", StringType, nullable = true)
+ * .add("f3", new StructType()
+ * .add("f31", DoubleType, nullable = false)
+ * .add("f32", IntegerType, nullable = true)
+ * .add("f33", StringType, nullable = true), nullable = false)
+ * }}}
+ * and the following [[parquetType]] (`f2` and `f32` are missing):
+ * {{{
+ * message root {
+ * required int32 f1;
+ * required group f3 {
+ * required double f31;
+ * optional binary f33 (utf8);
+ * }
+ * }
+ * }}}
+ *
* @param parquetType Parquet schema of Parquet records
* @param catalystType Spark SQL schema that corresponds to the Parquet record type
* @param updater An updater which propagates converted field values to the parent container
@@ -126,7 +169,24 @@ private[parquet] class CatalystRowConverter(
// Converters for each field.
private val fieldConverters: Array[Converter with HasParentContainerUpdater] = {
- parquetType.getFields.zip(catalystType).zipWithIndex.map {
+ // In case of schema merging, `parquetType` can be a subset of `catalystType`. We need to pad
+ // those missing fields and create converters for them, although values of these fields are
+ // always null.
+ val paddedParquetFields = {
+ val parquetFields = parquetType.getFields
+ val parquetFieldNames = parquetFields.map(_.getName).toSet
+ val missingFields = catalystType.filterNot(f => parquetFieldNames.contains(f.name))
+
+ // We don't need to worry about feature flag arguments like `assumeBinaryIsString` when
+ // creating the schema converter here, since values of missing fields are always null.
+ val toParquet = new CatalystSchemaConverter()
+
+ (parquetFields ++ missingFields.map(toParquet.convertField)).sortBy { f =>
+ catalystType.indexWhere(_.name == f.getName)
+ }
+ }
+
+ paddedParquetFields.zip(catalystType).zipWithIndex.map {
case ((parquetFieldType, catalystField), ordinal) =>
// Converted field value should be set to the `ordinal`-th cell of `currentRow`
newConverter(parquetFieldType, catalystField.dataType, new RowUpdater(currentRow, ordinal))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
index 275646e818..535f0684e9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
@@ -72,18 +72,9 @@ private[parquet] class CatalystSchemaConverter(
followParquetFormatSpec = conf.followParquetFormatSpec)
def this(conf: Configuration) = this(
- assumeBinaryIsString =
- conf.getBoolean(
- SQLConf.PARQUET_BINARY_AS_STRING.key,
- SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get),
- assumeInt96IsTimestamp =
- conf.getBoolean(
- SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
- SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get),
- followParquetFormatSpec =
- conf.getBoolean(
- SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.key,
- SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.defaultValue.get))
+ assumeBinaryIsString = conf.get(SQLConf.PARQUET_BINARY_AS_STRING.key).toBoolean,
+ assumeInt96IsTimestamp = conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean,
+ followParquetFormatSpec = conf.get(SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.key).toBoolean)
/**
* Converts Parquet [[MessageType]] `parquetSchema` to a Spark SQL [[StructType]].
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index e2f2a8c744..b7b70c2bbb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -21,7 +21,7 @@ import java.io.File
import org.apache.hadoop.fs.Path
-import org.apache.spark.sql.{QueryTest, Row, SQLConf}
+import org.apache.spark.sql._
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -201,4 +201,32 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
assert(Decimal("67123.45") === Decimal(decimal))
}
}
+
+ test("SPARK-10005 Schema merging for nested struct") {
+ val sqlContext = _sqlContext
+ import sqlContext.implicits._
+
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+
+ def append(df: DataFrame): Unit = {
+ df.write.mode(SaveMode.Append).parquet(path)
+ }
+
+ // Note that both the following two DataFrames contain a single struct column with multiple
+ // nested fields.
+ append((1 to 2).map(i => Tuple1((i, i))).toDF())
+ append((1 to 2).map(i => Tuple1((i, i, i))).toDF())
+
+ withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING.key -> "true") {
+ checkAnswer(
+ sqlContext.read.option("mergeSchema", "true").parquet(path),
+ Seq(
+ Row(Row(1, 1, null)),
+ Row(Row(2, 2, null)),
+ Row(Row(1, 1, 1)),
+ Row(Row(2, 2, 2))))
+ }
+ }
+ }
}