aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorDamian Guy <damian.guy@gmail.com>2015-08-11 12:46:33 +0800
committerCheng Lian <lian@databricks.com>2015-08-11 12:46:33 +0800
commit071bbad5db1096a548c886762b611a8484a52753 (patch)
tree5ef7be83e9fa717f01a04d9ccfdb5dfb5d9938c1 /sql/core
parent3c9802d9400bea802984456683b2736a450ee17e (diff)
downloadspark-071bbad5db1096a548c886762b611a8484a52753.tar.gz
spark-071bbad5db1096a548c886762b611a8484a52753.tar.bz2
spark-071bbad5db1096a548c886762b611a8484a52753.zip
[SPARK-9340] [SQL] Fixes converting unannotated Parquet lists
This PR is inspired by #8063 authored by dguy. Especially, testing Parquet files added here are all taken from that PR. **Committer who merges this PR should attribute it to "Damian Guy <damian.guygmail.com>".** ---- SPARK-6776 and SPARK-6777 followed `parquet-avro` to implement backwards-compatibility rules defined in `parquet-format` spec. However, both Spark SQL and `parquet-avro` neglected the following statement in `parquet-format`: > This does not affect repeated fields that are not annotated: A repeated field that is neither contained by a `LIST`- or `MAP`-annotated group nor annotated by `LIST` or `MAP` should be interpreted as a required list of required elements where the element type is the type of the field. One of the consequences is that, Parquet files generated by `parquet-protobuf` containing unannotated repeated fields are not correctly converted to Catalyst arrays. This PR fixes this issue by 1. Handling unannotated repeated fields in `CatalystSchemaConverter`. 2. Converting this kind of special repeated fields to Catalyst arrays in `CatalystRowConverter`. Two special converters, `RepeatedPrimitiveConverter` and `RepeatedGroupConverter`, are added. They delegate actual conversion work to a child `elementConverter` and accumulates elements in an `ArrayBuffer`. Two extra methods, `start()` and `end()`, are added to `ParentContainerUpdater`. So that they can be used to initialize new `ArrayBuffer`s for unannotated repeated fields, and propagate converted array values to upstream. Author: Cheng Lian <lian@databricks.com> Closes #8070 from liancheng/spark-9340/unannotated-parquet-list and squashes the following commits: ace6df7 [Cheng Lian] Moves ParquetProtobufCompatibilitySuite f1c7bfd [Cheng Lian] Updates .rat-excludes 420ad2b [Cheng Lian] Fixes converting unannotated Parquet lists
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala151
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala7
-rw-r--r--sql/core/src/test/resources/nested-array-struct.parquetbin0 -> 775 bytes
-rw-r--r--sql/core/src/test/resources/old-repeated-int.parquetbin0 -> 389 bytes
-rw-r--r--sql/core/src/test/resources/old-repeated-message.parquetbin0 -> 600 bytes
-rw-r--r--sql/core/src/test/resources/old-repeated.parquetbin0 -> 432 bytes
-rw-r--r--[-rwxr-xr-x]sql/core/src/test/resources/parquet-thrift-compat.snappy.parquetbin10550 -> 10550 bytes
-rw-r--r--sql/core/src/test/resources/proto-repeated-string.parquetbin0 -> 411 bytes
-rw-r--r--sql/core/src/test/resources/proto-repeated-struct.parquetbin0 -> 608 bytes
-rw-r--r--sql/core/src/test/resources/proto-struct-with-array-many.parquetbin0 -> 802 bytes
-rw-r--r--sql/core/src/test/resources/proto-struct-with-array.parquetbin0 -> 1576 bytes
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetProtobufCompatibilitySuite.scala91
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala30
13 files changed, 246 insertions, 33 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
index 3542dfbae1..ab5a6ddd41 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
@@ -21,11 +21,11 @@ import java.math.{BigDecimal, BigInteger}
import java.nio.ByteOrder
import scala.collection.JavaConversions._
-import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import org.apache.parquet.column.Dictionary
import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter}
+import org.apache.parquet.schema.OriginalType.LIST
import org.apache.parquet.schema.Type.Repetition
import org.apache.parquet.schema.{GroupType, PrimitiveType, Type}
@@ -42,6 +42,12 @@ import org.apache.spark.unsafe.types.UTF8String
* values to an [[ArrayBuffer]].
*/
private[parquet] trait ParentContainerUpdater {
+ /** Called before a record field is being converted */
+ def start(): Unit = ()
+
+ /** Called after a record field is being converted */
+ def end(): Unit = ()
+
def set(value: Any): Unit = ()
def setBoolean(value: Boolean): Unit = set(value)
def setByte(value: Byte): Unit = set(value)
@@ -55,6 +61,32 @@ private[parquet] trait ParentContainerUpdater {
/** A no-op updater used for root converter (who doesn't have a parent). */
private[parquet] object NoopUpdater extends ParentContainerUpdater
+private[parquet] trait HasParentContainerUpdater {
+ def updater: ParentContainerUpdater
+}
+
+/**
+ * A convenient converter class for Parquet group types with an [[HasParentContainerUpdater]].
+ */
+private[parquet] abstract class CatalystGroupConverter(val updater: ParentContainerUpdater)
+ extends GroupConverter with HasParentContainerUpdater
+
+/**
+ * Parquet converter for Parquet primitive types. Note that not all Spark SQL atomic types
+ * are handled by this converter. Parquet primitive types are only a subset of those of Spark
+ * SQL. For example, BYTE, SHORT, and INT in Spark SQL are all covered by INT32 in Parquet.
+ */
+private[parquet] class CatalystPrimitiveConverter(val updater: ParentContainerUpdater)
+ extends PrimitiveConverter with HasParentContainerUpdater {
+
+ override def addBoolean(value: Boolean): Unit = updater.setBoolean(value)
+ override def addInt(value: Int): Unit = updater.setInt(value)
+ override def addLong(value: Long): Unit = updater.setLong(value)
+ override def addFloat(value: Float): Unit = updater.setFloat(value)
+ override def addDouble(value: Double): Unit = updater.setDouble(value)
+ override def addBinary(value: Binary): Unit = updater.set(value.getBytes)
+}
+
/**
* A [[CatalystRowConverter]] is used to convert Parquet "structs" into Spark SQL [[InternalRow]]s.
* Since any Parquet record is also a struct, this converter can also be used as root converter.
@@ -70,7 +102,7 @@ private[parquet] class CatalystRowConverter(
parquetType: GroupType,
catalystType: StructType,
updater: ParentContainerUpdater)
- extends GroupConverter {
+ extends CatalystGroupConverter(updater) {
/**
* Updater used together with field converters within a [[CatalystRowConverter]]. It propagates
@@ -89,13 +121,11 @@ private[parquet] class CatalystRowConverter(
/**
* Represents the converted row object once an entire Parquet record is converted.
- *
- * @todo Uses [[UnsafeRow]] for better performance.
*/
val currentRow = new SpecificMutableRow(catalystType.map(_.dataType))
// Converters for each field.
- private val fieldConverters: Array[Converter] = {
+ private val fieldConverters: Array[Converter with HasParentContainerUpdater] = {
parquetType.getFields.zip(catalystType).zipWithIndex.map {
case ((parquetFieldType, catalystField), ordinal) =>
// Converted field value should be set to the `ordinal`-th cell of `currentRow`
@@ -105,11 +135,19 @@ private[parquet] class CatalystRowConverter(
override def getConverter(fieldIndex: Int): Converter = fieldConverters(fieldIndex)
- override def end(): Unit = updater.set(currentRow)
+ override def end(): Unit = {
+ var i = 0
+ while (i < currentRow.numFields) {
+ fieldConverters(i).updater.end()
+ i += 1
+ }
+ updater.set(currentRow)
+ }
override def start(): Unit = {
var i = 0
while (i < currentRow.numFields) {
+ fieldConverters(i).updater.start()
currentRow.setNullAt(i)
i += 1
}
@@ -122,20 +160,20 @@ private[parquet] class CatalystRowConverter(
private def newConverter(
parquetType: Type,
catalystType: DataType,
- updater: ParentContainerUpdater): Converter = {
+ updater: ParentContainerUpdater): Converter with HasParentContainerUpdater = {
catalystType match {
case BooleanType | IntegerType | LongType | FloatType | DoubleType | BinaryType =>
new CatalystPrimitiveConverter(updater)
case ByteType =>
- new PrimitiveConverter {
+ new CatalystPrimitiveConverter(updater) {
override def addInt(value: Int): Unit =
updater.setByte(value.asInstanceOf[ByteType#InternalType])
}
case ShortType =>
- new PrimitiveConverter {
+ new CatalystPrimitiveConverter(updater) {
override def addInt(value: Int): Unit =
updater.setShort(value.asInstanceOf[ShortType#InternalType])
}
@@ -148,7 +186,7 @@ private[parquet] class CatalystRowConverter(
case TimestampType =>
// TODO Implements `TIMESTAMP_MICROS` once parquet-mr has that.
- new PrimitiveConverter {
+ new CatalystPrimitiveConverter(updater) {
// Converts nanosecond timestamps stored as INT96
override def addBinary(value: Binary): Unit = {
assert(
@@ -164,13 +202,23 @@ private[parquet] class CatalystRowConverter(
}
case DateType =>
- new PrimitiveConverter {
+ new CatalystPrimitiveConverter(updater) {
override def addInt(value: Int): Unit = {
// DateType is not specialized in `SpecificMutableRow`, have to box it here.
updater.set(value.asInstanceOf[DateType#InternalType])
}
}
+ // A repeated field that is neither contained by a `LIST`- or `MAP`-annotated group nor
+ // annotated by `LIST` or `MAP` should be interpreted as a required list of required
+ // elements where the element type is the type of the field.
+ case t: ArrayType if parquetType.getOriginalType != LIST =>
+ if (parquetType.isPrimitive) {
+ new RepeatedPrimitiveConverter(parquetType, t.elementType, updater)
+ } else {
+ new RepeatedGroupConverter(parquetType, t.elementType, updater)
+ }
+
case t: ArrayType =>
new CatalystArrayConverter(parquetType.asGroupType(), t, updater)
@@ -196,26 +244,10 @@ private[parquet] class CatalystRowConverter(
}
/**
- * Parquet converter for Parquet primitive types. Note that not all Spark SQL atomic types
- * are handled by this converter. Parquet primitive types are only a subset of those of Spark
- * SQL. For example, BYTE, SHORT, and INT in Spark SQL are all covered by INT32 in Parquet.
- */
- private final class CatalystPrimitiveConverter(updater: ParentContainerUpdater)
- extends PrimitiveConverter {
-
- override def addBoolean(value: Boolean): Unit = updater.setBoolean(value)
- override def addInt(value: Int): Unit = updater.setInt(value)
- override def addLong(value: Long): Unit = updater.setLong(value)
- override def addFloat(value: Float): Unit = updater.setFloat(value)
- override def addDouble(value: Double): Unit = updater.setDouble(value)
- override def addBinary(value: Binary): Unit = updater.set(value.getBytes)
- }
-
- /**
* Parquet converter for strings. A dictionary is used to minimize string decoding cost.
*/
private final class CatalystStringConverter(updater: ParentContainerUpdater)
- extends PrimitiveConverter {
+ extends CatalystPrimitiveConverter(updater) {
private var expandedDictionary: Array[UTF8String] = null
@@ -242,7 +274,7 @@ private[parquet] class CatalystRowConverter(
private final class CatalystDecimalConverter(
decimalType: DecimalType,
updater: ParentContainerUpdater)
- extends PrimitiveConverter {
+ extends CatalystPrimitiveConverter(updater) {
// Converts decimals stored as INT32
override def addInt(value: Int): Unit = {
@@ -306,7 +338,7 @@ private[parquet] class CatalystRowConverter(
parquetSchema: GroupType,
catalystSchema: ArrayType,
updater: ParentContainerUpdater)
- extends GroupConverter {
+ extends CatalystGroupConverter(updater) {
private var currentArray: ArrayBuffer[Any] = _
@@ -383,7 +415,7 @@ private[parquet] class CatalystRowConverter(
parquetType: GroupType,
catalystType: MapType,
updater: ParentContainerUpdater)
- extends GroupConverter {
+ extends CatalystGroupConverter(updater) {
private var currentKeys: ArrayBuffer[Any] = _
private var currentValues: ArrayBuffer[Any] = _
@@ -446,4 +478,61 @@ private[parquet] class CatalystRowConverter(
}
}
}
+
+ private trait RepeatedConverter {
+ private var currentArray: ArrayBuffer[Any] = _
+
+ protected def newArrayUpdater(updater: ParentContainerUpdater) = new ParentContainerUpdater {
+ override def start(): Unit = currentArray = ArrayBuffer.empty[Any]
+ override def end(): Unit = updater.set(new GenericArrayData(currentArray.toArray))
+ override def set(value: Any): Unit = currentArray += value
+ }
+ }
+
+ /**
+ * A primitive converter for converting unannotated repeated primitive values to required arrays
+ * of required primitives values.
+ */
+ private final class RepeatedPrimitiveConverter(
+ parquetType: Type,
+ catalystType: DataType,
+ parentUpdater: ParentContainerUpdater)
+ extends PrimitiveConverter with RepeatedConverter with HasParentContainerUpdater {
+
+ val updater: ParentContainerUpdater = newArrayUpdater(parentUpdater)
+
+ private val elementConverter: PrimitiveConverter =
+ newConverter(parquetType, catalystType, updater).asPrimitiveConverter()
+
+ override def addBoolean(value: Boolean): Unit = elementConverter.addBoolean(value)
+ override def addInt(value: Int): Unit = elementConverter.addInt(value)
+ override def addLong(value: Long): Unit = elementConverter.addLong(value)
+ override def addFloat(value: Float): Unit = elementConverter.addFloat(value)
+ override def addDouble(value: Double): Unit = elementConverter.addDouble(value)
+ override def addBinary(value: Binary): Unit = elementConverter.addBinary(value)
+
+ override def setDictionary(dict: Dictionary): Unit = elementConverter.setDictionary(dict)
+ override def hasDictionarySupport: Boolean = elementConverter.hasDictionarySupport
+ override def addValueFromDictionary(id: Int): Unit = elementConverter.addValueFromDictionary(id)
+ }
+
+ /**
+ * A group converter for converting unannotated repeated group values to required arrays of
+ * required struct values.
+ */
+ private final class RepeatedGroupConverter(
+ parquetType: Type,
+ catalystType: DataType,
+ parentUpdater: ParentContainerUpdater)
+ extends GroupConverter with HasParentContainerUpdater with RepeatedConverter {
+
+ val updater: ParentContainerUpdater = newArrayUpdater(parentUpdater)
+
+ private val elementConverter: GroupConverter =
+ newConverter(parquetType, catalystType, updater).asGroupConverter()
+
+ override def getConverter(field: Int): Converter = elementConverter.getConverter(field)
+ override def end(): Unit = elementConverter.end()
+ override def start(): Unit = elementConverter.start()
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
index a3fc74cf79..275646e818 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
@@ -100,8 +100,11 @@ private[parquet] class CatalystSchemaConverter(
StructField(field.getName, convertField(field), nullable = false)
case REPEATED =>
- throw new AnalysisException(
- s"REPEATED not supported outside LIST or MAP. Type: $field")
+ // A repeated field that is neither contained by a `LIST`- or `MAP`-annotated group nor
+ // annotated by `LIST` or `MAP` should be interpreted as a required list of required
+ // elements where the element type is the type of the field.
+ val arrayType = ArrayType(convertField(field), containsNull = false)
+ StructField(field.getName, arrayType, nullable = false)
}
}
diff --git a/sql/core/src/test/resources/nested-array-struct.parquet b/sql/core/src/test/resources/nested-array-struct.parquet
new file mode 100644
index 0000000000..41a43fa35d
--- /dev/null
+++ b/sql/core/src/test/resources/nested-array-struct.parquet
Binary files differ
diff --git a/sql/core/src/test/resources/old-repeated-int.parquet b/sql/core/src/test/resources/old-repeated-int.parquet
new file mode 100644
index 0000000000..520922f73e
--- /dev/null
+++ b/sql/core/src/test/resources/old-repeated-int.parquet
Binary files differ
diff --git a/sql/core/src/test/resources/old-repeated-message.parquet b/sql/core/src/test/resources/old-repeated-message.parquet
new file mode 100644
index 0000000000..548db99162
--- /dev/null
+++ b/sql/core/src/test/resources/old-repeated-message.parquet
Binary files differ
diff --git a/sql/core/src/test/resources/old-repeated.parquet b/sql/core/src/test/resources/old-repeated.parquet
new file mode 100644
index 0000000000..213f1a9029
--- /dev/null
+++ b/sql/core/src/test/resources/old-repeated.parquet
Binary files differ
diff --git a/sql/core/src/test/resources/parquet-thrift-compat.snappy.parquet b/sql/core/src/test/resources/parquet-thrift-compat.snappy.parquet
index 837e4876ee..837e4876ee 100755..100644
--- a/sql/core/src/test/resources/parquet-thrift-compat.snappy.parquet
+++ b/sql/core/src/test/resources/parquet-thrift-compat.snappy.parquet
Binary files differ
diff --git a/sql/core/src/test/resources/proto-repeated-string.parquet b/sql/core/src/test/resources/proto-repeated-string.parquet
new file mode 100644
index 0000000000..8a7eea601d
--- /dev/null
+++ b/sql/core/src/test/resources/proto-repeated-string.parquet
Binary files differ
diff --git a/sql/core/src/test/resources/proto-repeated-struct.parquet b/sql/core/src/test/resources/proto-repeated-struct.parquet
new file mode 100644
index 0000000000..c29eee35c3
--- /dev/null
+++ b/sql/core/src/test/resources/proto-repeated-struct.parquet
Binary files differ
diff --git a/sql/core/src/test/resources/proto-struct-with-array-many.parquet b/sql/core/src/test/resources/proto-struct-with-array-many.parquet
new file mode 100644
index 0000000000..ff9809675f
--- /dev/null
+++ b/sql/core/src/test/resources/proto-struct-with-array-many.parquet
Binary files differ
diff --git a/sql/core/src/test/resources/proto-struct-with-array.parquet b/sql/core/src/test/resources/proto-struct-with-array.parquet
new file mode 100644
index 0000000000..325a8370ad
--- /dev/null
+++ b/sql/core/src/test/resources/proto-struct-with-array.parquet
Binary files differ
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetProtobufCompatibilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetProtobufCompatibilitySuite.scala
new file mode 100644
index 0000000000..981334cf77
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetProtobufCompatibilitySuite.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.spark.sql.test.TestSQLContext
+import org.apache.spark.sql.{DataFrame, Row, SQLContext}
+
+class ParquetProtobufCompatibilitySuite extends ParquetCompatibilityTest {
+ override def sqlContext: SQLContext = TestSQLContext
+
+ private def readParquetProtobufFile(name: String): DataFrame = {
+ val url = Thread.currentThread().getContextClassLoader.getResource(name)
+ sqlContext.read.parquet(url.toString)
+ }
+
+ test("unannotated array of primitive type") {
+ checkAnswer(readParquetProtobufFile("old-repeated-int.parquet"), Row(Seq(1, 2, 3)))
+ }
+
+ test("unannotated array of struct") {
+ checkAnswer(
+ readParquetProtobufFile("old-repeated-message.parquet"),
+ Row(
+ Seq(
+ Row("First inner", null, null),
+ Row(null, "Second inner", null),
+ Row(null, null, "Third inner"))))
+
+ checkAnswer(
+ readParquetProtobufFile("proto-repeated-struct.parquet"),
+ Row(
+ Seq(
+ Row("0 - 1", "0 - 2", "0 - 3"),
+ Row("1 - 1", "1 - 2", "1 - 3"))))
+
+ checkAnswer(
+ readParquetProtobufFile("proto-struct-with-array-many.parquet"),
+ Seq(
+ Row(
+ Seq(
+ Row("0 - 0 - 1", "0 - 0 - 2", "0 - 0 - 3"),
+ Row("0 - 1 - 1", "0 - 1 - 2", "0 - 1 - 3"))),
+ Row(
+ Seq(
+ Row("1 - 0 - 1", "1 - 0 - 2", "1 - 0 - 3"),
+ Row("1 - 1 - 1", "1 - 1 - 2", "1 - 1 - 3"))),
+ Row(
+ Seq(
+ Row("2 - 0 - 1", "2 - 0 - 2", "2 - 0 - 3"),
+ Row("2 - 1 - 1", "2 - 1 - 2", "2 - 1 - 3")))))
+ }
+
+ test("struct with unannotated array") {
+ checkAnswer(
+ readParquetProtobufFile("proto-struct-with-array.parquet"),
+ Row(10, 9, Seq.empty, null, Row(9), Seq(Row(9), Row(10))))
+ }
+
+ test("unannotated array of struct with unannotated array") {
+ checkAnswer(
+ readParquetProtobufFile("nested-array-struct.parquet"),
+ Seq(
+ Row(2, Seq(Row(1, Seq(Row(3))))),
+ Row(5, Seq(Row(4, Seq(Row(6))))),
+ Row(8, Seq(Row(7, Seq(Row(9)))))))
+ }
+
+ test("unannotated array of string") {
+ checkAnswer(
+ readParquetProtobufFile("proto-repeated-string.parquet"),
+ Seq(
+ Row(Seq("hello", "world")),
+ Row(Seq("good", "bye")),
+ Row(Seq("one", "two", "three"))))
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index 8f06de7ce7..971f71e27b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -585,6 +585,36 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
|}
""".stripMargin)
+ testParquetToCatalyst(
+ "Backwards-compatibility: LIST with non-nullable element type 7 - " +
+ "parquet-protobuf primitive lists",
+ new StructType()
+ .add("f1", ArrayType(IntegerType, containsNull = false), nullable = false),
+ """message root {
+ | repeated int32 f1;
+ |}
+ """.stripMargin)
+
+ testParquetToCatalyst(
+ "Backwards-compatibility: LIST with non-nullable element type 8 - " +
+ "parquet-protobuf non-primitive lists",
+ {
+ val elementType =
+ new StructType()
+ .add("c1", StringType, nullable = true)
+ .add("c2", IntegerType, nullable = false)
+
+ new StructType()
+ .add("f1", ArrayType(elementType, containsNull = false), nullable = false)
+ },
+ """message root {
+ | repeated group f1 {
+ | optional binary c1 (UTF8);
+ | required int32 c2;
+ | }
+ |}
+ """.stripMargin)
+
// =======================================================
// Tests for converting Catalyst ArrayType to Parquet LIST
// =======================================================