aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorSadhan Sood <sadhan@tellapart.com>2015-02-04 19:18:06 -0800
committerCheng Lian <lian@databricks.com>2015-02-04 19:18:06 -0800
commitdba98bf6987ec39380f1a5b0ca2772b694452231 (patch)
tree2dfe9c5ed122e7d09c26e144be4c8d1269ef3f7e /sql
parent1fbd124b1bd6159086d8e88b139ce0817af02322 (diff)
downloadspark-dba98bf6987ec39380f1a5b0ca2772b694452231.tar.gz
spark-dba98bf6987ec39380f1a5b0ca2772b694452231.tar.bz2
spark-dba98bf6987ec39380f1a5b0ca2772b694452231.zip
[SPARK-4520] [SQL] This pr fixes the ArrayIndexOutOfBoundsException as r...
...aised in SPARK-4520. The exception is thrown only for a thrift generated parquet file. The array element schema name is assumed as "array" as per ParquetAvro but for thrift generated parquet files, it is array_name + "_tuple". This leads to missing child of array group type and hence when the parquet rows are being materialized leads to the exception. Author: Sadhan Sood <sadhan@tellapart.com> Closes #4148 from sadhan/SPARK-4520 and squashes the following commits: c5ccde8 [Sadhan Sood] [SPARK-4520] [SQL] This pr fixes the ArrayIndexOutOfBoundsException as raised in SPARK-4520.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala35
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala28
4 files changed, 60 insertions, 14 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
index d87ddfeabd..7d62f3728f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
@@ -66,6 +66,11 @@ private[sql] object CatalystConverter {
// Using a different value will result in Parquet silently dropping columns.
val ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME = "bag"
val ARRAY_ELEMENTS_SCHEMA_NAME = "array"
+ // SPARK-4520: Thrift generated parquet files have different array element
+ // schema names than avro. Thrift parquet uses array_schema_name + "_tuple"
+ // as opposed to "array" used by default. For more information, check
+ // TestThriftSchemaConverter.java in parquet.thrift.
+ val THRIFT_ARRAY_ELEMENTS_SCHEMA_NAME_SUFFIX = "_tuple"
val MAP_KEY_SCHEMA_NAME = "key"
val MAP_VALUE_SCHEMA_NAME = "value"
val MAP_SCHEMA_NAME = "map"
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
index 3fb1cc4105..14c81ae4eb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
@@ -99,7 +99,11 @@ private[parquet] class RowReadSupport extends ReadSupport[Row] with Logging {
val requestedAttributes = RowReadSupport.getRequestedSchema(configuration)
if (requestedAttributes != null) {
- parquetSchema = ParquetTypesConverter.convertFromAttributes(requestedAttributes)
+ // If the parquet file is thrift derived, there is a good chance that
+ // it will have the thrift class in metadata.
+ val isThriftDerived = keyValueMetaData.keySet().contains("thrift.class")
+ parquetSchema = ParquetTypesConverter
+ .convertFromAttributes(requestedAttributes, isThriftDerived)
metadata.put(
RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
ParquetTypesConverter.convertToString(requestedAttributes))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
index f1d4ff2387..b646109b7c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
@@ -285,13 +285,19 @@ private[parquet] object ParquetTypesConverter extends Logging {
ctype: DataType,
name: String,
nullable: Boolean = true,
- inArray: Boolean = false): ParquetType = {
+ inArray: Boolean = false,
+ toThriftSchemaNames: Boolean = false): ParquetType = {
val repetition =
if (inArray) {
Repetition.REPEATED
} else {
if (nullable) Repetition.OPTIONAL else Repetition.REQUIRED
}
+ val arraySchemaName = if (toThriftSchemaNames) {
+ name + CatalystConverter.THRIFT_ARRAY_ELEMENTS_SCHEMA_NAME_SUFFIX
+ } else {
+ CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME
+ }
val typeInfo = fromPrimitiveDataType(ctype)
typeInfo.map {
case ParquetTypeInfo(primitiveType, originalType, decimalMetadata, length) =>
@@ -306,22 +312,24 @@ private[parquet] object ParquetTypesConverter extends Logging {
}.getOrElse {
ctype match {
case udt: UserDefinedType[_] => {
- fromDataType(udt.sqlType, name, nullable, inArray)
+ fromDataType(udt.sqlType, name, nullable, inArray, toThriftSchemaNames)
}
case ArrayType(elementType, false) => {
val parquetElementType = fromDataType(
elementType,
- CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME,
+ arraySchemaName,
nullable = false,
- inArray = true)
+ inArray = true,
+ toThriftSchemaNames)
ConversionPatterns.listType(repetition, name, parquetElementType)
}
case ArrayType(elementType, true) => {
val parquetElementType = fromDataType(
elementType,
- CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME,
+ arraySchemaName,
nullable = true,
- inArray = false)
+ inArray = false,
+ toThriftSchemaNames)
ConversionPatterns.listType(
repetition,
name,
@@ -332,7 +340,8 @@ private[parquet] object ParquetTypesConverter extends Logging {
}
case StructType(structFields) => {
val fields = structFields.map {
- field => fromDataType(field.dataType, field.name, field.nullable, inArray = false)
+ field => fromDataType(field.dataType, field.name, field.nullable,
+ inArray = false, toThriftSchemaNames)
}
new ParquetGroupType(repetition, name, fields.toSeq)
}
@@ -342,13 +351,15 @@ private[parquet] object ParquetTypesConverter extends Logging {
keyType,
CatalystConverter.MAP_KEY_SCHEMA_NAME,
nullable = false,
- inArray = false)
+ inArray = false,
+ toThriftSchemaNames)
val parquetValueType =
fromDataType(
valueType,
CatalystConverter.MAP_VALUE_SCHEMA_NAME,
nullable = valueContainsNull,
- inArray = false)
+ inArray = false,
+ toThriftSchemaNames)
ConversionPatterns.mapType(
repetition,
name,
@@ -374,10 +385,12 @@ private[parquet] object ParquetTypesConverter extends Logging {
field.getRepetition != Repetition.REQUIRED)())
}
- def convertFromAttributes(attributes: Seq[Attribute]): MessageType = {
+ def convertFromAttributes(attributes: Seq[Attribute],
+ toThriftSchemaNames: Boolean = false): MessageType = {
val fields = attributes.map(
attribute =>
- fromDataType(attribute.dataType, attribute.name, attribute.nullable))
+ fromDataType(attribute.dataType, attribute.name, attribute.nullable,
+ toThriftSchemaNames = toThriftSchemaNames))
new MessageType("root", fields)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
index 64274950b8..5f7f31d395 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
@@ -33,9 +33,10 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
* Checks whether the reflected Parquet message type for product type `T` conforms `messageType`.
*/
private def testSchema[T <: Product: ClassTag: TypeTag](
- testName: String, messageType: String): Unit = {
+ testName: String, messageType: String, isThriftDerived: Boolean = false): Unit = {
test(testName) {
- val actual = ParquetTypesConverter.convertFromAttributes(ScalaReflection.attributesFor[T])
+ val actual = ParquetTypesConverter.convertFromAttributes(ScalaReflection.attributesFor[T],
+ isThriftDerived)
val expected = MessageTypeParser.parseMessageType(messageType)
actual.checkContains(expected)
expected.checkContains(actual)
@@ -146,6 +147,29 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
|}
""".stripMargin)
+ // Test for SPARK-4520 -- ensure that thrift generated parquet schema is generated
+ // as expected from attributes
+ testSchema[(Array[Byte], Array[Byte], Array[Byte], Seq[Int], Map[Array[Byte], Seq[Int]])](
+ "thrift generated parquet schema",
+ """
+ |message root {
+ | optional binary _1 (UTF8);
+ | optional binary _2 (UTF8);
+ | optional binary _3 (UTF8);
+ | optional group _4 (LIST) {
+ | repeated int32 _4_tuple;
+ | }
+ | optional group _5 (MAP) {
+ | repeated group map (MAP_KEY_VALUE) {
+ | required binary key (UTF8);
+ | optional group value (LIST) {
+ | repeated int32 value_tuple;
+ | }
+ | }
+ | }
+ |}
+ """.stripMargin, isThriftDerived = true)
+
test("DataType string parser compatibility") {
// This is the generated string from previous versions of the Spark SQL, using the following:
// val schema = StructType(List(