aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala235
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala51
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala14
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala1
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala90
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala77
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala310
7 files changed, 653 insertions, 125 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
index 0a6bb44445..dc4ff06df6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
@@ -19,17 +19,18 @@ package org.apache.spark.sql.execution.datasources.parquet
import java.util.{Map => JMap}
-import scala.collection.JavaConverters._
+import scala.collection.JavaConverters.{collectionAsScalaIterableConverter, mapAsJavaMapConverter, mapAsScalaMapConverter}
import org.apache.hadoop.conf.Configuration
import org.apache.parquet.hadoop.api.ReadSupport.ReadContext
import org.apache.parquet.hadoop.api.{InitContext, ReadSupport}
import org.apache.parquet.io.api.RecordMaterializer
-import org.apache.parquet.schema.MessageType
+import org.apache.parquet.schema.Type.Repetition
+import org.apache.parquet.schema._
import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types._
private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with Logging {
// Called after `init()` when initializing Parquet record reader.
@@ -81,70 +82,10 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with
// `StructType` containing all requested columns.
val maybeRequestedSchema = Option(conf.get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA))
- // Below we construct a Parquet schema containing all requested columns. This schema tells
- // Parquet which columns to read.
- //
- // If `maybeRequestedSchema` is defined, we assemble an equivalent Parquet schema. Otherwise,
- // we have to fallback to the full file schema which contains all columns in the file.
- // Obviously this may waste IO bandwidth since it may read more columns than requested.
- //
- // Two things to note:
- //
- // 1. It's possible that some requested columns don't exist in the target Parquet file. For
- // example, in the case of schema merging, the globally merged schema may contain extra
- // columns gathered from other Parquet files. These columns will be simply filled with nulls
- // when actually reading the target Parquet file.
- //
- // 2. When `maybeRequestedSchema` is available, we can't simply convert the Catalyst schema to
- // Parquet schema using `CatalystSchemaConverter`, because the mapping is not unique due to
- // non-standard behaviors of some Parquet libraries/tools. For example, a Parquet file
- // containing a single integer array field `f1` may have the following legacy 2-level
- // structure:
- //
- // message root {
- // optional group f1 (LIST) {
- // required INT32 element;
- // }
- // }
- //
- // while `CatalystSchemaConverter` may generate a standard 3-level structure:
- //
- // message root {
- // optional group f1 (LIST) {
- // repeated group list {
- // required INT32 element;
- // }
- // }
- // }
- //
- // Apparently, we can't use the 2nd schema to read the target Parquet file as they have
- // different physical structures.
val parquetRequestedSchema =
maybeRequestedSchema.fold(context.getFileSchema) { schemaString =>
- val toParquet = new CatalystSchemaConverter(conf)
- val fileSchema = context.getFileSchema.asGroupType()
- val fileFieldNames = fileSchema.getFields.asScala.map(_.getName).toSet
-
- StructType
- // Deserializes the Catalyst schema of requested columns
- .fromString(schemaString)
- .map { field =>
- if (fileFieldNames.contains(field.name)) {
- // If the field exists in the target Parquet file, extracts the field type from the
- // full file schema and makes a single-field Parquet schema
- new MessageType("root", fileSchema.getType(field.name))
- } else {
- // Otherwise, just resorts to `CatalystSchemaConverter`
- toParquet.convert(StructType(Array(field)))
- }
- }
- // Merges all single-field Parquet schemas to form a complete schema for all requested
- // columns. Note that it's possible that no columns are requested at all (e.g., count
- // some partition column of a partitioned Parquet table). That's why `fold` is used here
- // and always fallback to an empty Parquet schema.
- .fold(new MessageType("root")) {
- _ union _
- }
+ val catalystRequestedSchema = StructType.fromString(schemaString)
+ CatalystReadSupport.clipParquetSchema(context.getFileSchema, catalystRequestedSchema)
}
val metadata =
@@ -160,4 +101,168 @@ private[parquet] object CatalystReadSupport {
val SPARK_ROW_REQUESTED_SCHEMA = "org.apache.spark.sql.parquet.row.requested_schema"
val SPARK_METADATA_KEY = "org.apache.spark.sql.parquet.row.metadata"
+
+ /**
+ * Tailors `parquetSchema` according to `catalystSchema` by removing column paths don't exist
+ * in `catalystSchema`, and adding those only exist in `catalystSchema`.
+ */
+ def clipParquetSchema(parquetSchema: MessageType, catalystSchema: StructType): MessageType = {
+ val clippedParquetFields = clipParquetGroupFields(parquetSchema.asGroupType(), catalystSchema)
+ Types.buildMessage().addFields(clippedParquetFields: _*).named("root")
+ }
+
+ private def clipParquetType(parquetType: Type, catalystType: DataType): Type = {
+ catalystType match {
+ case t: ArrayType if !isPrimitiveCatalystType(t.elementType) =>
+ // Only clips array types with nested type as element type.
+ clipParquetListType(parquetType.asGroupType(), t.elementType)
+
+ case t: MapType if !isPrimitiveCatalystType(t.valueType) =>
+ // Only clips map types with nested type as value type.
+ clipParquetMapType(parquetType.asGroupType(), t.keyType, t.valueType)
+
+ case t: StructType =>
+ clipParquetGroup(parquetType.asGroupType(), t)
+
+ case _ =>
+ parquetType
+ }
+ }
+
+ /**
+ * Whether a Catalyst [[DataType]] is primitive. Primitive [[DataType]] is not equivalent to
+ * [[AtomicType]]. For example, [[CalendarIntervalType]] is primitive, but it's not an
+ * [[AtomicType]].
+ */
+ private def isPrimitiveCatalystType(dataType: DataType): Boolean = {
+ dataType match {
+ case _: ArrayType | _: MapType | _: StructType => false
+ case _ => true
+ }
+ }
+
+ /**
+ * Clips a Parquet [[GroupType]] which corresponds to a Catalyst [[ArrayType]]. The element type
+ * of the [[ArrayType]] should also be a nested type, namely an [[ArrayType]], a [[MapType]], or a
+ * [[StructType]].
+ */
+ private def clipParquetListType(parquetList: GroupType, elementType: DataType): Type = {
+ // Precondition of this method, should only be called for lists with nested element types.
+ assert(!isPrimitiveCatalystType(elementType))
+
+ // Unannotated repeated group should be interpreted as required list of required element, so
+ // list element type is just the group itself. Clip it.
+ if (parquetList.getOriginalType == null && parquetList.isRepetition(Repetition.REPEATED)) {
+ clipParquetType(parquetList, elementType)
+ } else {
+ assert(
+ parquetList.getOriginalType == OriginalType.LIST,
+ "Invalid Parquet schema. " +
+ "Original type of annotated Parquet lists must be LIST: " +
+ parquetList.toString)
+
+ assert(
+ parquetList.getFieldCount == 1 && parquetList.getType(0).isRepetition(Repetition.REPEATED),
+ "Invalid Parquet schema. " +
+ "LIST-annotated group should only have exactly one repeated field: " +
+ parquetList)
+
+ // Precondition of this method, should only be called for lists with nested element types.
+ assert(!parquetList.getType(0).isPrimitive)
+
+ val repeatedGroup = parquetList.getType(0).asGroupType()
+
+ // If the repeated field is a group with multiple fields, or the repeated field is a group
+ // with one field and is named either "array" or uses the LIST-annotated group's name with
+ // "_tuple" appended then the repeated type is the element type and elements are required.
+ // Build a new LIST-annotated group with clipped `repeatedGroup` as element type and the
+ // only field.
+ if (
+ repeatedGroup.getFieldCount > 1 ||
+ repeatedGroup.getName == "array" ||
+ repeatedGroup.getName == parquetList.getName + "_tuple"
+ ) {
+ Types
+ .buildGroup(parquetList.getRepetition)
+ .as(OriginalType.LIST)
+ .addField(clipParquetType(repeatedGroup, elementType))
+ .named(parquetList.getName)
+ } else {
+ // Otherwise, the repeated field's type is the element type with the repeated field's
+ // repetition.
+ Types
+ .buildGroup(parquetList.getRepetition)
+ .as(OriginalType.LIST)
+ .addField(
+ Types
+ .repeatedGroup()
+ .addField(clipParquetType(repeatedGroup.getType(0), elementType))
+ .named(repeatedGroup.getName))
+ .named(parquetList.getName)
+ }
+ }
+ }
+
+ /**
+ * Clips a Parquet [[GroupType]] which corresponds to a Catalyst [[MapType]]. The value type
+ * of the [[MapType]] should also be a nested type, namely an [[ArrayType]], a [[MapType]], or a
+ * [[StructType]]. Note that key type of any [[MapType]] is always a primitive type.
+ */
+ private def clipParquetMapType(
+ parquetMap: GroupType, keyType: DataType, valueType: DataType): GroupType = {
+ // Precondition of this method, should only be called for maps with nested value types.
+ assert(!isPrimitiveCatalystType(valueType))
+
+ val repeatedGroup = parquetMap.getType(0).asGroupType()
+ val parquetKeyType = repeatedGroup.getType(0)
+ val parquetValueType = repeatedGroup.getType(1)
+
+ val clippedRepeatedGroup =
+ Types
+ .repeatedGroup()
+ .as(repeatedGroup.getOriginalType)
+ .addField(parquetKeyType)
+ .addField(clipParquetType(parquetValueType, valueType))
+ .named(repeatedGroup.getName)
+
+ Types
+ .buildGroup(parquetMap.getRepetition)
+ .as(parquetMap.getOriginalType)
+ .addField(clippedRepeatedGroup)
+ .named(parquetMap.getName)
+ }
+
+ /**
+ * Clips a Parquet [[GroupType]] which corresponds to a Catalyst [[StructType]].
+ *
+ * @return A clipped [[GroupType]], which has at least one field.
+ * @note Parquet doesn't allow creating empty [[GroupType]] instances except for empty
+ * [[MessageType]]. Because it's legal to construct an empty requested schema for column
+ * pruning.
+ */
+ private def clipParquetGroup(parquetRecord: GroupType, structType: StructType): GroupType = {
+ val clippedParquetFields = clipParquetGroupFields(parquetRecord, structType)
+ Types
+ .buildGroup(parquetRecord.getRepetition)
+ .as(parquetRecord.getOriginalType)
+ .addFields(clippedParquetFields: _*)
+ .named(parquetRecord.getName)
+ }
+
+ /**
+ * Clips a Parquet [[GroupType]] which corresponds to a Catalyst [[StructType]].
+ *
+ * @return A list of clipped [[GroupType]] fields, which can be empty.
+ */
+ private def clipParquetGroupFields(
+ parquetRecord: GroupType, structType: StructType): Seq[Type] = {
+ val parquetFieldMap = parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap
+ val toParquet = new CatalystSchemaConverter(followParquetFormatSpec = true)
+ structType.map { f =>
+ parquetFieldMap
+ .get(f.name)
+ .map(clipParquetType(_, f.dataType))
+ .getOrElse(toParquet.convertField(f))
+ }
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
index fe13dfbbed..f17e794b76 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
@@ -113,31 +113,6 @@ private[parquet] class CatalystPrimitiveConverter(val updater: ParentContainerUp
* When used as a root converter, [[NoopUpdater]] should be used since root converters don't have
* any "parent" container.
*
- * @note Constructor argument [[parquetType]] refers to requested fields of the actual schema of the
- * Parquet file being read, while constructor argument [[catalystType]] refers to requested
- * fields of the global schema. The key difference is that, in case of schema merging,
- * [[parquetType]] can be a subset of [[catalystType]]. For example, it's possible to have
- * the following [[catalystType]]:
- * {{{
- * new StructType()
- * .add("f1", IntegerType, nullable = false)
- * .add("f2", StringType, nullable = true)
- * .add("f3", new StructType()
- * .add("f31", DoubleType, nullable = false)
- * .add("f32", IntegerType, nullable = true)
- * .add("f33", StringType, nullable = true), nullable = false)
- * }}}
- * and the following [[parquetType]] (`f2` and `f32` are missing):
- * {{{
- * message root {
- * required int32 f1;
- * required group f3 {
- * required double f31;
- * optional binary f33 (utf8);
- * }
- * }
- * }}}
- *
* @param parquetType Parquet schema of Parquet records
* @param catalystType Spark SQL schema that corresponds to the Parquet record type
* @param updater An updater which propagates converted field values to the parent container
@@ -179,31 +154,7 @@ private[parquet] class CatalystRowConverter(
// Converters for each field.
private val fieldConverters: Array[Converter with HasParentContainerUpdater] = {
- // In case of schema merging, `parquetType` can be a subset of `catalystType`. We need to pad
- // those missing fields and create converters for them, although values of these fields are
- // always null.
- val paddedParquetFields = {
- val parquetFields = parquetType.getFields.asScala
- val parquetFieldNames = parquetFields.map(_.getName).toSet
- val missingFields = catalystType.filterNot(f => parquetFieldNames.contains(f.name))
-
- // We don't need to worry about feature flag arguments like `assumeBinaryIsString` when
- // creating the schema converter here, since values of missing fields are always null.
- val toParquet = new CatalystSchemaConverter()
-
- (parquetFields ++ missingFields.map(toParquet.convertField)).sortBy { f =>
- catalystType.indexWhere(_.name == f.getName)
- }
- }
-
- if (paddedParquetFields.length != catalystType.length) {
- throw new UnsupportedOperationException(
- "A Parquet file's schema has different number of fields with the table schema. " +
- "Please enable schema merging by setting \"mergeSchema\" to true when load " +
- "a Parquet dataset or set spark.sql.parquet.mergeSchema to true in SQLConf.")
- }
-
- paddedParquetFields.zip(catalystType).zipWithIndex.map {
+ parquetType.getFields.asScala.zip(catalystType).zipWithIndex.map {
case ((parquetFieldType, catalystField), ordinal) =>
// Converted field value should be set to the `ordinal`-th cell of `currentRow`
newConverter(parquetFieldType, catalystField.dataType, new RowUpdater(currentRow, ordinal))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
index be6c0545f5..a21ab1dbb2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
@@ -55,16 +55,10 @@ import org.apache.spark.sql.{AnalysisException, SQLConf}
* to old style non-standard behaviors.
*/
private[parquet] class CatalystSchemaConverter(
- private val assumeBinaryIsString: Boolean,
- private val assumeInt96IsTimestamp: Boolean,
- private val followParquetFormatSpec: Boolean) {
-
- // Only used when constructing converter for converting Spark SQL schema to Parquet schema, in
- // which case `assumeInt96IsTimestamp` and `assumeBinaryIsString` are irrelevant.
- def this() = this(
- assumeBinaryIsString = SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get,
- assumeInt96IsTimestamp = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get,
- followParquetFormatSpec = SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.defaultValue.get)
+ assumeBinaryIsString: Boolean = SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get,
+ assumeInt96IsTimestamp: Boolean = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get,
+ followParquetFormatSpec: Boolean = SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.defaultValue.get
+) {
def this(conf: SQLConf) = this(
assumeBinaryIsString = conf.isParquetBinaryAsString,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala
index bd7cf8c10a..36b929ee1f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.execution.datasources.parquet
+import java.io.File
import java.nio.ByteBuffer
import java.util.{List => JList, Map => JMap}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala
new file mode 100644
index 0000000000..83b65fb419
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.io.File
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.SharedSQLContext
+
+class ParquetInteroperabilitySuite extends ParquetCompatibilityTest with SharedSQLContext {
+ test("parquet files with different physical schemas but share the same logical schema") {
+ import ParquetCompatibilityTest._
+
+ // This test case writes two Parquet files, both representing the following Catalyst schema
+ //
+ // StructType(
+ // StructField(
+ // "f",
+ // ArrayType(IntegerType, containsNull = false),
+ // nullable = false))
+ //
+ // The first Parquet file comes with parquet-avro style 2-level LIST-annotated group, while the
+ // other one comes with parquet-protobuf style 1-level unannotated primitive field.
+ withTempDir { dir =>
+ val avroStylePath = new File(dir, "avro-style").getCanonicalPath
+ val protobufStylePath = new File(dir, "protobuf-style").getCanonicalPath
+
+ val avroStyleSchema =
+ """message avro_style {
+ | required group f (LIST) {
+ | repeated int32 array;
+ | }
+ |}
+ """.stripMargin
+
+ writeDirect(avroStylePath, avroStyleSchema, { rc =>
+ rc.message {
+ rc.field("f", 0) {
+ rc.group {
+ rc.field("array", 0) {
+ rc.addInteger(0)
+ rc.addInteger(1)
+ }
+ }
+ }
+ }
+ })
+
+ logParquetSchema(avroStylePath)
+
+ val protobufStyleSchema =
+ """message protobuf_style {
+ | repeated int32 f;
+ |}
+ """.stripMargin
+
+ writeDirect(protobufStylePath, protobufStyleSchema, { rc =>
+ rc.message {
+ rc.field("f", 0) {
+ rc.addInteger(2)
+ rc.addInteger(3)
+ }
+ }
+ })
+
+ logParquetSchema(protobufStylePath)
+
+ checkAnswer(
+ sqlContext.read.parquet(dir.getCanonicalPath),
+ Seq(
+ Row(Seq(0, 1)),
+ Row(Seq(2, 3))))
+ }
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index b7b70c2bbb..a379523d67 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -229,4 +229,81 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
}
}
}
+
+ test("SPARK-10301 Clipping nested structs in requested schema") {
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+ val df = sqlContext
+ .range(1)
+ .selectExpr("NAMED_STRUCT('a', id, 'b', id) AS s")
+ .coalesce(1)
+
+ df.write.mode("append").parquet(path)
+
+ val userDefinedSchema = new StructType()
+ .add("s", new StructType().add("a", LongType, nullable = true), nullable = true)
+
+ checkAnswer(
+ sqlContext.read.schema(userDefinedSchema).parquet(path),
+ Row(Row(0)))
+ }
+
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+
+ val df1 = sqlContext
+ .range(1)
+ .selectExpr("NAMED_STRUCT('a', id, 'b', id) AS s")
+ .coalesce(1)
+
+ val df2 = sqlContext
+ .range(1, 2)
+ .selectExpr("NAMED_STRUCT('b', id, 'c', id) AS s")
+ .coalesce(1)
+
+ df1.write.parquet(path)
+ df2.write.mode(SaveMode.Append).parquet(path)
+
+ val userDefinedSchema = new StructType()
+ .add("s",
+ new StructType()
+ .add("a", LongType, nullable = true)
+ .add("c", LongType, nullable = true),
+ nullable = true)
+
+ checkAnswer(
+ sqlContext.read.schema(userDefinedSchema).parquet(path),
+ Seq(
+ Row(Row(0, null)),
+ Row(Row(null, 1))))
+ }
+
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+
+ val df = sqlContext
+ .range(1)
+ .selectExpr("NAMED_STRUCT('a', ARRAY(NAMED_STRUCT('b', id, 'c', id))) AS s")
+ .coalesce(1)
+
+ df.write.parquet(path)
+
+ val userDefinedSchema = new StructType()
+ .add("s",
+ new StructType()
+ .add(
+ "a",
+ ArrayType(
+ new StructType()
+ .add("b", LongType, nullable = true)
+ .add("d", StringType, nullable = true),
+ containsNull = true),
+ nullable = true),
+ nullable = true)
+
+ checkAnswer(
+ sqlContext.read.schema(userDefinedSchema).parquet(path),
+ Row(Row(Seq(Row(0, null)))))
+ }
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index 9dcbc1a047..28c59a4abd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -22,6 +22,7 @@ import scala.reflect.runtime.universe.TypeTag
import org.apache.parquet.schema.MessageTypeParser
+import org.apache.spark.sql.SQLConf
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
@@ -941,4 +942,313 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
| optional fixed_len_byte_array(8) f1 (DECIMAL(18, 3));
|}
""".stripMargin)
+
+ private def testSchemaClipping(
+ testName: String,
+ parquetSchema: String,
+ catalystSchema: StructType,
+ expectedSchema: String): Unit = {
+ test(s"Clipping - $testName") {
+ val expected = MessageTypeParser.parseMessageType(expectedSchema)
+ val actual = CatalystReadSupport.clipParquetSchema(
+ MessageTypeParser.parseMessageType(parquetSchema), catalystSchema)
+
+ try {
+ expected.checkContains(actual)
+ actual.checkContains(expected)
+ } catch { case cause: Throwable =>
+ fail(
+ s"""Expected clipped schema:
+ |$expected
+ |Actual clipped schema:
+ |$actual
+ """.stripMargin,
+ cause)
+ }
+ }
+ }
+
+ testSchemaClipping(
+ "simple nested struct",
+
+ parquetSchema =
+ """message root {
+ | required group f0 {
+ | optional int32 f00;
+ | optional int32 f01;
+ | }
+ |}
+ """.stripMargin,
+
+ catalystSchema = {
+ val f0Type = new StructType().add("f00", IntegerType, nullable = true)
+ new StructType()
+ .add("f0", f0Type, nullable = false)
+ .add("f1", IntegerType, nullable = true)
+ },
+
+ expectedSchema =
+ """message root {
+ | required group f0 {
+ | optional int32 f00;
+ | }
+ | optional int32 f1;
+ |}
+ """.stripMargin)
+
+ testSchemaClipping(
+ "parquet-protobuf style array",
+
+ parquetSchema =
+ """message root {
+ | required group f0 {
+ | repeated binary f00 (UTF8);
+ | repeated group f01 {
+ | optional int32 f010;
+ | optional double f011;
+ | }
+ | }
+ |}
+ """.stripMargin,
+
+ catalystSchema = {
+ val f11Type = new StructType().add("f011", DoubleType, nullable = true)
+ val f01Type = ArrayType(StringType, containsNull = false)
+ val f0Type = new StructType()
+ .add("f00", f01Type, nullable = false)
+ .add("f01", f11Type, nullable = false)
+ val f1Type = ArrayType(IntegerType, containsNull = true)
+ new StructType()
+ .add("f0", f0Type, nullable = false)
+ .add("f1", f1Type, nullable = true)
+ },
+
+ expectedSchema =
+ """message root {
+ | required group f0 {
+ | repeated binary f00 (UTF8);
+ | repeated group f01 {
+ | optional double f011;
+ | }
+ | }
+ |
+ | optional group f1 (LIST) {
+ | repeated group list {
+ | optional int32 element;
+ | }
+ | }
+ |}
+ """.stripMargin)
+
+ testSchemaClipping(
+ "parquet-thrift style array",
+
+ parquetSchema =
+ """message root {
+ | required group f0 {
+ | optional group f00 {
+ | repeated binary f00_tuple (UTF8);
+ | }
+ |
+ | optional group f01 (LIST) {
+ | repeated group f01_tuple {
+ | optional int32 f010;
+ | optional double f011;
+ | }
+ | }
+ | }
+ |}
+ """.stripMargin,
+
+ catalystSchema = {
+ val f11ElementType = new StructType()
+ .add("f011", DoubleType, nullable = true)
+ .add("f012", LongType, nullable = true)
+
+ val f0Type = new StructType()
+ .add("f00", ArrayType(StringType, containsNull = false), nullable = false)
+ .add("f01", ArrayType(f11ElementType, containsNull = false), nullable = false)
+
+ new StructType().add("f0", f0Type, nullable = false)
+ },
+
+ expectedSchema =
+ """message root {
+ | required group f0 {
+ | optional group f00 {
+ | repeated binary f00_tuple (UTF8);
+ | }
+ |
+ | optional group f01 (LIST) {
+ | repeated group f01_tuple {
+ | optional double f011;
+ | optional int64 f012;
+ | }
+ | }
+ | }
+ |}
+ """.stripMargin)
+
+ testSchemaClipping(
+ "parquet-avro style array",
+
+ parquetSchema =
+ """message root {
+ | required group f0 {
+ | optional group f00 {
+ | repeated binary array (UTF8);
+ | }
+ |
+ | optional group f01 (LIST) {
+ | repeated group array {
+ | optional int32 f010;
+ | optional double f011;
+ | }
+ | }
+ | }
+ |}
+ """.stripMargin,
+
+ catalystSchema = {
+ val f11ElementType = new StructType()
+ .add("f011", DoubleType, nullable = true)
+ .add("f012", LongType, nullable = true)
+
+ val f0Type = new StructType()
+ .add("f00", ArrayType(StringType, containsNull = false), nullable = false)
+ .add("f01", ArrayType(f11ElementType, containsNull = false), nullable = false)
+
+ new StructType().add("f0", f0Type, nullable = false)
+ },
+
+ expectedSchema =
+ """message root {
+ | required group f0 {
+ | optional group f00 {
+ | repeated binary array (UTF8);
+ | }
+ |
+ | optional group f01 (LIST) {
+ | repeated group array {
+ | optional double f011;
+ | optional int64 f012;
+ | }
+ | }
+ | }
+ |}
+ """.stripMargin)
+
+ testSchemaClipping(
+ "parquet-hive style array",
+
+ parquetSchema =
+ """message root {
+ | optional group f0 {
+ | optional group f00 (LIST) {
+ | repeated group bag {
+ | optional binary array_element;
+ | }
+ | }
+ |
+ | optional group f01 (LIST) {
+ | repeated group bag {
+ | optional group array_element {
+ | optional int32 f010;
+ | optional double f011;
+ | }
+ | }
+ | }
+ | }
+ |}
+ """.stripMargin,
+
+ catalystSchema = {
+ val f01ElementType = new StructType()
+ .add("f011", DoubleType, nullable = true)
+ .add("f012", LongType, nullable = true)
+
+ val f0Type = new StructType()
+ .add("f00", ArrayType(StringType, containsNull = true), nullable = true)
+ .add("f01", ArrayType(f01ElementType, containsNull = true), nullable = true)
+
+ new StructType().add("f0", f0Type, nullable = true)
+ },
+
+ expectedSchema =
+ """message root {
+ | optional group f0 {
+ | optional group f00 (LIST) {
+ | repeated group bag {
+ | optional binary array_element;
+ | }
+ | }
+ |
+ | optional group f01 (LIST) {
+ | repeated group bag {
+ | optional group array_element {
+ | optional double f011;
+ | optional int64 f012;
+ | }
+ | }
+ | }
+ | }
+ |}
+ """.stripMargin)
+
+ testSchemaClipping(
+ "2-level list of required struct",
+
+ parquetSchema =
+ s"""message root {
+ | required group f0 {
+ | required group f00 (LIST) {
+ | repeated group element {
+ | required int32 f000;
+ | optional int64 f001;
+ | }
+ | }
+ | }
+ |}
+ """.stripMargin,
+
+ catalystSchema = {
+ val f00ElementType =
+ new StructType()
+ .add("f001", LongType, nullable = true)
+ .add("f002", DoubleType, nullable = false)
+
+ val f00Type = ArrayType(f00ElementType, containsNull = false)
+ val f0Type = new StructType().add("f00", f00Type, nullable = false)
+
+ new StructType().add("f0", f0Type, nullable = false)
+ },
+
+ expectedSchema =
+ s"""message root {
+ | required group f0 {
+ | required group f00 (LIST) {
+ | repeated group element {
+ | optional int64 f001;
+ | required double f002;
+ | }
+ | }
+ | }
+ |}
+ """.stripMargin)
+
+ testSchemaClipping(
+ "empty requested schema",
+
+ parquetSchema =
+ """message root {
+ | required group f0 {
+ | required int32 f00;
+ | required int64 f01;
+ | }
+ |}
+ """.stripMargin,
+
+ catalystSchema = new StructType(),
+
+ expectedSchema = "message root {}")
}