aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-09-10 11:01:08 -0700
committerDavies Liu <davies.liu@gmail.com>2015-09-10 11:01:08 -0700
commit49da38e5f728e05e8e929c4dcd37145ba060151d (patch)
tree8cf792c6808a79f1713dc5be9055ad1ae9468b3d /sql
parentf892d927d7246856dd3ea617b2942873359454bc (diff)
downloadspark-49da38e5f728e05e8e929c4dcd37145ba060151d.tar.gz
spark-49da38e5f728e05e8e929c4dcd37145ba060151d.tar.bz2
spark-49da38e5f728e05e8e929c4dcd37145ba060151d.zip
[SPARK-10301] [SPARK-10428] [SQL] Addresses comments of PR #8583 and #8509 for master
Author: Cheng Lian <lian@databricks.com> Closes #8670 from liancheng/spark-10301/address-pr-comments.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala20
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala10
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala291
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala246
4 files changed, 522 insertions, 45 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
index dc4ff06df6..5a8166fac5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
@@ -117,14 +117,18 @@ private[parquet] object CatalystReadSupport {
// Only clips array types with nested type as element type.
clipParquetListType(parquetType.asGroupType(), t.elementType)
- case t: MapType if !isPrimitiveCatalystType(t.valueType) =>
- // Only clips map types with nested type as value type.
+ case t: MapType
+ if !isPrimitiveCatalystType(t.keyType) ||
+ !isPrimitiveCatalystType(t.valueType) =>
+ // Only clips map types with nested key type or value type
clipParquetMapType(parquetType.asGroupType(), t.keyType, t.valueType)
case t: StructType =>
clipParquetGroup(parquetType.asGroupType(), t)
case _ =>
+ // UDTs and primitive types are not clipped. For UDTs, a clipped version might not be able
+ // to be mapped to desired user-space types. So UDTs shouldn't participate schema merging.
parquetType
}
}
@@ -204,14 +208,14 @@ private[parquet] object CatalystReadSupport {
}
/**
- * Clips a Parquet [[GroupType]] which corresponds to a Catalyst [[MapType]]. The value type
- * of the [[MapType]] should also be a nested type, namely an [[ArrayType]], a [[MapType]], or a
- * [[StructType]]. Note that key type of any [[MapType]] is always a primitive type.
+ * Clips a Parquet [[GroupType]] which corresponds to a Catalyst [[MapType]]. Either key type or
+ * value type of the [[MapType]] must be a nested type, namely an [[ArrayType]], a [[MapType]], or
+ * a [[StructType]].
*/
private def clipParquetMapType(
parquetMap: GroupType, keyType: DataType, valueType: DataType): GroupType = {
- // Precondition of this method, should only be called for maps with nested value types.
- assert(!isPrimitiveCatalystType(valueType))
+ // Precondition of this method, only handles maps with nested key types or value types.
+ assert(!isPrimitiveCatalystType(keyType) || !isPrimitiveCatalystType(valueType))
val repeatedGroup = parquetMap.getType(0).asGroupType()
val parquetKeyType = repeatedGroup.getType(0)
@@ -221,7 +225,7 @@ private[parquet] object CatalystReadSupport {
Types
.repeatedGroup()
.as(repeatedGroup.getOriginalType)
- .addField(parquetKeyType)
+ .addField(clipParquetType(parquetKeyType, keyType))
.addField(clipParquetType(parquetValueType, valueType))
.named(repeatedGroup.getName)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
index f17e794b76..2ff2fda361 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
@@ -123,6 +123,16 @@ private[parquet] class CatalystRowConverter(
updater: ParentContainerUpdater)
extends CatalystGroupConverter(updater) with Logging {
+ assert(
+ parquetType.getFieldCount == catalystType.length,
+ s"""Field counts of the Parquet schema and the Catalyst schema don't match:
+ |
+ |Parquet schema:
+ |$parquetType
+ |Catalyst schema:
+ |${catalystType.prettyJson}
+ """.stripMargin)
+
logDebug(
s"""Building row converter for the following schema:
|
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index 9edbb52268..1c1cfa34ad 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -22,6 +22,9 @@ import java.io.File
import org.apache.hadoop.fs.Path
import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.SpecificMutableRow
+import org.apache.spark.sql.execution.datasources.parquet.TestingUDT.{NestedStruct, NestedStructUDT}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -228,54 +231,168 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
}
}
- test("SPARK-10301 Clipping nested structs in requested schema") {
+ test("SPARK-10301 requested schema clipping - same schema") {
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+ val df = sqlContext.range(1).selectExpr("NAMED_STRUCT('a', id, 'b', id + 1) AS s").coalesce(1)
+ df.write.parquet(path)
+
+ val userDefinedSchema =
+ new StructType()
+ .add(
+ "s",
+ new StructType()
+ .add("a", LongType, nullable = true)
+ .add("b", LongType, nullable = true),
+ nullable = true)
+
+ checkAnswer(
+ sqlContext.read.schema(userDefinedSchema).parquet(path),
+ Row(Row(0L, 1L)))
+ }
+ }
+
+ // This test case is ignored because of parquet-mr bug PARQUET-370
+ ignore("SPARK-10301 requested schema clipping - schemas with disjoint sets of fields") {
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+ val df = sqlContext.range(1).selectExpr("NAMED_STRUCT('a', id, 'b', id + 1) AS s").coalesce(1)
+ df.write.parquet(path)
+
+ val userDefinedSchema =
+ new StructType()
+ .add(
+ "s",
+ new StructType()
+ .add("c", LongType, nullable = true)
+ .add("d", LongType, nullable = true),
+ nullable = true)
+
+ checkAnswer(
+ sqlContext.read.schema(userDefinedSchema).parquet(path),
+ Row(Row(null, null)))
+ }
+ }
+
+ test("SPARK-10301 requested schema clipping - requested schema contains physical schema") {
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+ val df = sqlContext.range(1).selectExpr("NAMED_STRUCT('a', id, 'b', id + 1) AS s").coalesce(1)
+ df.write.parquet(path)
+
+ val userDefinedSchema =
+ new StructType()
+ .add(
+ "s",
+ new StructType()
+ .add("a", LongType, nullable = true)
+ .add("b", LongType, nullable = true)
+ .add("c", LongType, nullable = true)
+ .add("d", LongType, nullable = true),
+ nullable = true)
+
+ checkAnswer(
+ sqlContext.read.schema(userDefinedSchema).parquet(path),
+ Row(Row(0L, 1L, null, null)))
+ }
+
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+ val df = sqlContext.range(1).selectExpr("NAMED_STRUCT('a', id, 'd', id + 3) AS s").coalesce(1)
+ df.write.parquet(path)
+
+ val userDefinedSchema =
+ new StructType()
+ .add(
+ "s",
+ new StructType()
+ .add("a", LongType, nullable = true)
+ .add("b", LongType, nullable = true)
+ .add("c", LongType, nullable = true)
+ .add("d", LongType, nullable = true),
+ nullable = true)
+
+ checkAnswer(
+ sqlContext.read.schema(userDefinedSchema).parquet(path),
+ Row(Row(0L, null, null, 3L)))
+ }
+ }
+
+ test("SPARK-10301 requested schema clipping - physical schema contains requested schema") {
withTempPath { dir =>
val path = dir.getCanonicalPath
val df = sqlContext
.range(1)
- .selectExpr("NAMED_STRUCT('a', id, 'b', id) AS s")
+ .selectExpr("NAMED_STRUCT('a', id, 'b', id + 1, 'c', id + 2, 'd', id + 3) AS s")
.coalesce(1)
- df.write.mode("append").parquet(path)
+ df.write.parquet(path)
- val userDefinedSchema = new StructType()
- .add("s", new StructType().add("a", LongType, nullable = true), nullable = true)
+ val userDefinedSchema =
+ new StructType()
+ .add(
+ "s",
+ new StructType()
+ .add("a", LongType, nullable = true)
+ .add("b", LongType, nullable = true),
+ nullable = true)
checkAnswer(
sqlContext.read.schema(userDefinedSchema).parquet(path),
- Row(Row(0)))
+ Row(Row(0L, 1L)))
}
withTempPath { dir =>
val path = dir.getCanonicalPath
-
- val df1 = sqlContext
+ val df = sqlContext
.range(1)
- .selectExpr("NAMED_STRUCT('a', id, 'b', id) AS s")
+ .selectExpr("NAMED_STRUCT('a', id, 'b', id + 1, 'c', id + 2, 'd', id + 3) AS s")
.coalesce(1)
- val df2 = sqlContext
- .range(1, 2)
- .selectExpr("NAMED_STRUCT('b', id, 'c', id) AS s")
+ df.write.parquet(path)
+
+ val userDefinedSchema =
+ new StructType()
+ .add(
+ "s",
+ new StructType()
+ .add("a", LongType, nullable = true)
+ .add("d", LongType, nullable = true),
+ nullable = true)
+
+ checkAnswer(
+ sqlContext.read.schema(userDefinedSchema).parquet(path),
+ Row(Row(0L, 3L)))
+ }
+ }
+
+ test("SPARK-10301 requested schema clipping - schemas overlap but don't contain each other") {
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+ val df = sqlContext
+ .range(1)
+ .selectExpr("NAMED_STRUCT('a', id, 'b', id + 1, 'c', id + 2) AS s")
.coalesce(1)
- df1.write.parquet(path)
- df2.write.mode(SaveMode.Append).parquet(path)
+ df.write.parquet(path)
- val userDefinedSchema = new StructType()
- .add("s",
- new StructType()
- .add("a", LongType, nullable = true)
- .add("c", LongType, nullable = true),
- nullable = true)
+ val userDefinedSchema =
+ new StructType()
+ .add(
+ "s",
+ new StructType()
+ .add("b", LongType, nullable = true)
+ .add("c", LongType, nullable = true)
+ .add("d", LongType, nullable = true),
+ nullable = true)
checkAnswer(
sqlContext.read.schema(userDefinedSchema).parquet(path),
- Seq(
- Row(Row(0, null)),
- Row(Row(null, 1))))
+ Row(Row(1L, 2L, null)))
}
+ }
+ test("SPARK-10301 requested schema clipping - deeply nested struct") {
withTempPath { dir =>
val path = dir.getCanonicalPath
@@ -304,4 +421,132 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
Row(Row(Seq(Row(0, null)))))
}
}
+
+ test("SPARK-10301 requested schema clipping - out of order") {
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+
+ val df1 = sqlContext
+ .range(1)
+ .selectExpr("NAMED_STRUCT('a', id, 'b', id + 1, 'c', id + 2) AS s")
+ .coalesce(1)
+
+ val df2 = sqlContext
+ .range(1, 2)
+ .selectExpr("NAMED_STRUCT('c', id + 2, 'b', id + 1, 'd', id + 3) AS s")
+ .coalesce(1)
+
+ df1.write.parquet(path)
+ df2.write.mode(SaveMode.Append).parquet(path)
+
+ val userDefinedSchema = new StructType()
+ .add("s",
+ new StructType()
+ .add("a", LongType, nullable = true)
+ .add("b", LongType, nullable = true)
+ .add("d", LongType, nullable = true),
+ nullable = true)
+
+ checkAnswer(
+ sqlContext.read.schema(userDefinedSchema).parquet(path),
+ Seq(
+ Row(Row(0, 1, null)),
+ Row(Row(null, 2, 4))))
+ }
+ }
+
+ test("SPARK-10301 requested schema clipping - schema merging") {
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+
+ val df1 = sqlContext
+ .range(1)
+ .selectExpr("NAMED_STRUCT('a', id, 'c', id + 2) AS s")
+ .coalesce(1)
+
+ val df2 = sqlContext
+ .range(1, 2)
+ .selectExpr("NAMED_STRUCT('a', id, 'b', id + 1, 'c', id + 2) AS s")
+ .coalesce(1)
+
+ df1.write.mode(SaveMode.Append).parquet(path)
+ df2.write.mode(SaveMode.Append).parquet(path)
+
+ checkAnswer(
+ sqlContext
+ .read
+ .option("mergeSchema", "true")
+ .parquet(path)
+ .selectExpr("s.a", "s.b", "s.c"),
+ Seq(
+ Row(0, null, 2),
+ Row(1, 2, 3)))
+ }
+ }
+
+ test("SPARK-10301 requested schema clipping - UDT") {
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+
+ val df = sqlContext
+ .range(1)
+ .selectExpr(
+ """NAMED_STRUCT(
+ | 'f0', CAST(id AS STRING),
+ | 'f1', NAMED_STRUCT(
+ | 'a', CAST(id + 1 AS INT),
+ | 'b', CAST(id + 2 AS LONG),
+ | 'c', CAST(id + 3.5 AS DOUBLE)
+ | )
+ |) AS s
+ """.stripMargin)
+ .coalesce(1)
+
+ df.write.mode(SaveMode.Append).parquet(path)
+
+ val userDefinedSchema =
+ new StructType()
+ .add(
+ "s",
+ new StructType()
+ .add("f1", new NestedStructUDT, nullable = true),
+ nullable = true)
+
+ checkAnswer(
+ sqlContext.read.schema(userDefinedSchema).parquet(path),
+ Row(Row(NestedStruct(1, 2L, 3.5D))))
+ }
+ }
+}
+
+object TestingUDT {
+ @SQLUserDefinedType(udt = classOf[NestedStructUDT])
+ case class NestedStruct(a: Integer, b: Long, c: Double)
+
+ class NestedStructUDT extends UserDefinedType[NestedStruct] {
+ override def sqlType: DataType =
+ new StructType()
+ .add("a", IntegerType, nullable = true)
+ .add("b", LongType, nullable = false)
+ .add("c", DoubleType, nullable = false)
+
+ override def serialize(obj: Any): Any = {
+ val row = new SpecificMutableRow(sqlType.asInstanceOf[StructType].map(_.dataType))
+ obj match {
+ case n: NestedStruct =>
+ row.setInt(0, n.a)
+ row.setLong(1, n.b)
+ row.setDouble(2, n.c)
+ }
+ }
+
+ override def userClass: Class[NestedStruct] = classOf[NestedStruct]
+
+ override def deserialize(datum: Any): NestedStruct = {
+ datum match {
+ case row: InternalRow =>
+ NestedStruct(row.getInt(0), row.getLong(1), row.getDouble(2))
+ }
+ }
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index 5331d7c035..5a8f772c32 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -1012,12 +1012,17 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
""".stripMargin,
catalystSchema = {
- val f11Type = new StructType().add("f011", DoubleType, nullable = true)
- val f01Type = ArrayType(StringType, containsNull = false)
+ val f00Type = ArrayType(StringType, containsNull = false)
+ val f01Type = ArrayType(
+ new StructType()
+ .add("f011", DoubleType, nullable = true),
+ containsNull = false)
+
val f0Type = new StructType()
- .add("f00", f01Type, nullable = false)
- .add("f01", f11Type, nullable = false)
+ .add("f00", f00Type, nullable = false)
+ .add("f01", f01Type, nullable = false)
val f1Type = ArrayType(IntegerType, containsNull = true)
+
new StructType()
.add("f0", f0Type, nullable = false)
.add("f1", f1Type, nullable = true)
@@ -1046,7 +1051,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
parquetSchema =
"""message root {
| required group f0 {
- | optional group f00 {
+ | optional group f00 (LIST) {
| repeated binary f00_tuple (UTF8);
| }
|
@@ -1061,13 +1066,13 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
""".stripMargin,
catalystSchema = {
- val f11ElementType = new StructType()
+ val f01ElementType = new StructType()
.add("f011", DoubleType, nullable = true)
.add("f012", LongType, nullable = true)
val f0Type = new StructType()
- .add("f00", ArrayType(StringType, containsNull = false), nullable = false)
- .add("f01", ArrayType(f11ElementType, containsNull = false), nullable = false)
+ .add("f00", ArrayType(StringType, containsNull = false), nullable = true)
+ .add("f01", ArrayType(f01ElementType, containsNull = false), nullable = true)
new StructType().add("f0", f0Type, nullable = false)
},
@@ -1075,7 +1080,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
expectedSchema =
"""message root {
| required group f0 {
- | optional group f00 {
+ | optional group f00 (LIST) {
| repeated binary f00_tuple (UTF8);
| }
|
@@ -1095,7 +1100,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
parquetSchema =
"""message root {
| required group f0 {
- | optional group f00 {
+ | optional group f00 (LIST) {
| repeated binary array (UTF8);
| }
|
@@ -1110,13 +1115,13 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
""".stripMargin,
catalystSchema = {
- val f11ElementType = new StructType()
+ val f01ElementType = new StructType()
.add("f011", DoubleType, nullable = true)
.add("f012", LongType, nullable = true)
val f0Type = new StructType()
- .add("f00", ArrayType(StringType, containsNull = false), nullable = false)
- .add("f01", ArrayType(f11ElementType, containsNull = false), nullable = false)
+ .add("f00", ArrayType(StringType, containsNull = false), nullable = true)
+ .add("f01", ArrayType(f01ElementType, containsNull = false), nullable = true)
new StructType().add("f0", f0Type, nullable = false)
},
@@ -1124,7 +1129,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
expectedSchema =
"""message root {
| required group f0 {
- | optional group f00 {
+ | optional group f00 (LIST) {
| repeated binary array (UTF8);
| }
|
@@ -1237,6 +1242,63 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
""".stripMargin)
testSchemaClipping(
+ "standard array",
+
+ parquetSchema =
+ """message root {
+ | required group f0 {
+ | optional group f00 (LIST) {
+ | repeated group list {
+ | required binary element (UTF8);
+ | }
+ | }
+ |
+ | optional group f01 (LIST) {
+ | repeated group list {
+ | required group element {
+ | optional int32 f010;
+ | optional double f011;
+ | }
+ | }
+ | }
+ | }
+ |}
+ """.stripMargin,
+
+ catalystSchema = {
+ val f01ElementType = new StructType()
+ .add("f011", DoubleType, nullable = true)
+ .add("f012", LongType, nullable = true)
+
+ val f0Type = new StructType()
+ .add("f00", ArrayType(StringType, containsNull = false), nullable = true)
+ .add("f01", ArrayType(f01ElementType, containsNull = false), nullable = true)
+
+ new StructType().add("f0", f0Type, nullable = false)
+ },
+
+ expectedSchema =
+ """message root {
+ | required group f0 {
+ | optional group f00 (LIST) {
+ | repeated group list {
+ | required binary element (UTF8);
+ | }
+ | }
+ |
+ | optional group f01 (LIST) {
+ | repeated group list {
+ | required group element {
+ | optional double f011;
+ | optional int64 f012;
+ | }
+ | }
+ | }
+ | }
+ |}
+ """.stripMargin)
+
+ testSchemaClipping(
"empty requested schema",
parquetSchema =
@@ -1251,4 +1313,160 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
catalystSchema = new StructType(),
expectedSchema = "message root {}")
+
+ testSchemaClipping(
+ "disjoint field sets",
+
+ parquetSchema =
+ """message root {
+ | required group f0 {
+ | required int32 f00;
+ | required int64 f01;
+ | }
+ |}
+ """.stripMargin,
+
+ catalystSchema =
+ new StructType()
+ .add(
+ "f0",
+ new StructType()
+ .add("f02", FloatType, nullable = true)
+ .add("f03", DoubleType, nullable = true),
+ nullable = true),
+
+ expectedSchema =
+ """message root {
+ | required group f0 {
+ | optional float f02;
+ | optional double f03;
+ | }
+ |}
+ """.stripMargin)
+
+ testSchemaClipping(
+ "parquet-avro style map",
+
+ parquetSchema =
+ """message root {
+ | required group f0 (MAP) {
+ | repeated group map (MAP_KEY_VALUE) {
+ | required int32 key;
+ | required group value {
+ | required int32 value_f0;
+ | required int64 value_f1;
+ | }
+ | }
+ | }
+ |}
+ """.stripMargin,
+
+ catalystSchema = {
+ val valueType =
+ new StructType()
+ .add("value_f1", LongType, nullable = false)
+ .add("value_f2", DoubleType, nullable = false)
+
+ val f0Type = MapType(IntegerType, valueType, valueContainsNull = false)
+
+ new StructType().add("f0", f0Type, nullable = false)
+ },
+
+ expectedSchema =
+ """message root {
+ | required group f0 (MAP) {
+ | repeated group map (MAP_KEY_VALUE) {
+ | required int32 key;
+ | required group value {
+ | required int64 value_f1;
+ | required double value_f2;
+ | }
+ | }
+ | }
+ |}
+ """.stripMargin)
+
+ testSchemaClipping(
+ "standard map",
+
+ parquetSchema =
+ """message root {
+ | required group f0 (MAP) {
+ | repeated group key_value {
+ | required int32 key;
+ | required group value {
+ | required int32 value_f0;
+ | required int64 value_f1;
+ | }
+ | }
+ | }
+ |}
+ """.stripMargin,
+
+ catalystSchema = {
+ val valueType =
+ new StructType()
+ .add("value_f1", LongType, nullable = false)
+ .add("value_f2", DoubleType, nullable = false)
+
+ val f0Type = MapType(IntegerType, valueType, valueContainsNull = false)
+
+ new StructType().add("f0", f0Type, nullable = false)
+ },
+
+ expectedSchema =
+ """message root {
+ | required group f0 (MAP) {
+ | repeated group key_value {
+ | required int32 key;
+ | required group value {
+ | required int64 value_f1;
+ | required double value_f2;
+ | }
+ | }
+ | }
+ |}
+ """.stripMargin)
+
+ testSchemaClipping(
+ "standard map with complex key",
+
+ parquetSchema =
+ """message root {
+ | required group f0 (MAP) {
+ | repeated group key_value {
+ | required group key {
+ | required int32 value_f0;
+ | required int64 value_f1;
+ | }
+ | required int32 value;
+ | }
+ | }
+ |}
+ """.stripMargin,
+
+ catalystSchema = {
+ val keyType =
+ new StructType()
+ .add("value_f1", LongType, nullable = false)
+ .add("value_f2", DoubleType, nullable = false)
+
+ val f0Type = MapType(keyType, IntegerType, valueContainsNull = false)
+
+ new StructType().add("f0", f0Type, nullable = false)
+ },
+
+ expectedSchema =
+ """message root {
+ | required group f0 (MAP) {
+ | repeated group key_value {
+ | required group key {
+ | required int64 value_f1;
+ | required double value_f2;
+ | }
+ | required int32 value;
+ | }
+ | }
+ |}
+ """.stripMargin)
}