aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2016-07-20 16:49:46 -0700
committerYin Huai <yhuai@databricks.com>2016-07-20 16:49:46 -0700
commite651900bd562cc29a3eb13e92a5147979e347f61 (patch)
treec2bf0c7ddcc0d11b5310046c798e0d72baf49966 /sql
parente3cd5b3050711af69fc1dfc518b11bf1a86b6a4c (diff)
downloadspark-e651900bd562cc29a3eb13e92a5147979e347f61.tar.gz
spark-e651900bd562cc29a3eb13e92a5147979e347f61.tar.bz2
spark-e651900bd562cc29a3eb13e92a5147979e347f61.zip
[SPARK-16344][SQL] Decoding Parquet array of struct with a single field named "element"
## What changes were proposed in this pull request? Due to backward-compatibility reasons, the following Parquet schema is ambiguous: ``` optional group f (LIST) { repeated group list { optional group element { optional int32 element; } } } ``` According to the parquet-format spec, when interpreted as a standard 3-level layout, this type is equivalent to the following SQL type: ``` ARRAY<STRUCT<element: INT>> ``` However, when interpreted as a legacy 2-level layout, it's equivalent to ``` ARRAY<STRUCT<element: STRUCT<element: INT>>> ``` Historically, to disambiguate these cases, we employed two methods: - `ParquetSchemaConverter.isElementType()` Used to disambiguate the above cases while converting Parquet types to Spark types. - `ParquetRowConverter.isElementType()` Used to disambiguate the above cases while instantiating row converters that convert Parquet records to Spark rows. Unfortunately, these two methods make different decision about the above problematic Parquet type, and caused SPARK-16344. `ParquetRowConverter.isElementType()` is necessary for Spark 1.4 and earlier versions because Parquet requested schemata are directly converted from Spark schemata in these versions. The converted Parquet schemata may be incompatible with actual schemata of the underlying physical files when the files are written by a system/library that uses a schema conversion scheme that is different from Spark when writing Parquet LIST and MAP fields. In Spark 1.5, Parquet requested schemata are always properly tailored from schemata of physical files to be read. Thus `ParquetRowConverter.isElementType()` is no longer necessary. This PR replaces this method with a simply yet accurate scheme: whenever an ambiguous Parquet type is hit, convert the type in question back to a Spark type using `ParquetSchemaConverter` and check whether it matches the corresponding Spark type. ## How was this patch tested? New test cases added in `ParquetHiveCompatibilitySuite` and `ParquetQuerySuite`. Author: Cheng Lian <lian@databricks.com> Closes #14014 from liancheng/spark-16344-for-master-and-2.0.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala79
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala16
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala8
6 files changed, 73 insertions, 47 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
index 46d786de57..0bee874016 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
@@ -96,7 +96,8 @@ private[parquet] class ParquetReadSupport extends ReadSupport[InternalRow] with
new ParquetRecordMaterializer(
parquetRequestedSchema,
- ParquetReadSupport.expandUDT(catalystRequestedSchema))
+ ParquetReadSupport.expandUDT(catalystRequestedSchema),
+ new ParquetSchemaConverter(conf))
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
index 0818d802b0..d12e780528 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
@@ -28,12 +28,14 @@ import org.apache.spark.sql.types.StructType
*
* @param parquetSchema Parquet schema of the records to be read
* @param catalystSchema Catalyst schema of the rows to be constructed
+ * @param schemaConverter A Parquet-Catalyst schema converter that helps initializing row converters
*/
private[parquet] class ParquetRecordMaterializer(
- parquetSchema: MessageType, catalystSchema: StructType)
+ parquetSchema: MessageType, catalystSchema: StructType, schemaConverter: ParquetSchemaConverter)
extends RecordMaterializer[InternalRow] {
- private val rootConverter = new ParquetRowConverter(parquetSchema, catalystSchema, NoopUpdater)
+ private val rootConverter =
+ new ParquetRowConverter(schemaConverter, parquetSchema, catalystSchema, NoopUpdater)
override def getCurrentRecord: InternalRow = rootConverter.currentRecord
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
index 9dad59647e..9ffc2b5dd8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
@@ -25,7 +25,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.parquet.column.Dictionary
import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter}
-import org.apache.parquet.schema.{GroupType, MessageType, PrimitiveType, Type}
+import org.apache.parquet.schema.{GroupType, MessageType, Type}
import org.apache.parquet.schema.OriginalType.{INT_32, LIST, UTF8}
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.{BINARY, DOUBLE, FIXED_LEN_BYTE_ARRAY, INT32, INT64}
@@ -113,12 +113,14 @@ private[parquet] class ParquetPrimitiveConverter(val updater: ParentContainerUpd
* When used as a root converter, [[NoopUpdater]] should be used since root converters don't have
* any "parent" container.
*
+ * @param schemaConverter A utility converter used to convert Parquet types to Catalyst types.
* @param parquetType Parquet schema of Parquet records
* @param catalystType Spark SQL schema that corresponds to the Parquet record type. User-defined
* types should have been expanded.
* @param updater An updater which propagates converted field values to the parent container
*/
private[parquet] class ParquetRowConverter(
+ schemaConverter: ParquetSchemaConverter,
parquetType: GroupType,
catalystType: StructType,
updater: ParentContainerUpdater)
@@ -292,9 +294,10 @@ private[parquet] class ParquetRowConverter(
new ParquetMapConverter(parquetType.asGroupType(), t, updater)
case t: StructType =>
- new ParquetRowConverter(parquetType.asGroupType(), t, new ParentContainerUpdater {
- override def set(value: Any): Unit = updater.set(value.asInstanceOf[InternalRow].copy())
- })
+ new ParquetRowConverter(
+ schemaConverter, parquetType.asGroupType(), t, new ParentContainerUpdater {
+ override def set(value: Any): Unit = updater.set(value.asInstanceOf[InternalRow].copy())
+ })
case t =>
throw new RuntimeException(
@@ -442,13 +445,46 @@ private[parquet] class ParquetRowConverter(
private val elementConverter: Converter = {
val repeatedType = parquetSchema.getType(0)
val elementType = catalystSchema.elementType
- val parentName = parquetSchema.getName
- if (isElementType(repeatedType, elementType, parentName)) {
+ // At this stage, we're not sure whether the repeated field maps to the element type or is
+ // just the syntactic repeated group of the 3-level standard LIST layout. Take the following
+ // Parquet LIST-annotated group type as an example:
+ //
+ // optional group f (LIST) {
+ // repeated group list {
+ // optional group element {
+ // optional int32 element;
+ // }
+ // }
+ // }
+ //
+ // This type is ambiguous:
+ //
+ // 1. When interpreted as a standard 3-level layout, the `list` field is just the syntactic
+ // group, and the entire type should be translated to:
+ //
+ // ARRAY<STRUCT<element: INT>>
+ //
+ // 2. On the other hand, when interpreted as a non-standard 2-level layout, the `list` field
+ // represents the element type, and the entire type should be translated to:
+ //
+ // ARRAY<STRUCT<element: STRUCT<element: INT>>>
+ //
+ // Here we try to convert field `list` into a Catalyst type to see whether the converted type
+ // matches the Catalyst array element type. If it doesn't match, then it's case 1; otherwise,
+ // it's case 2.
+ val guessedElementType = schemaConverter.convertField(repeatedType)
+
+ if (DataType.equalsIgnoreCompatibleNullability(guessedElementType, elementType)) {
+ // If the repeated field corresponds to the element type, creates a new converter using the
+ // type of the repeated field.
newConverter(repeatedType, elementType, new ParentContainerUpdater {
override def set(value: Any): Unit = currentArray += value
})
} else {
+ // If the repeated field corresponds to the syntactic group in the standard 3-level Parquet
+ // LIST layout, creates a new converter using the only child field of the repeated field.
+ assert(!repeatedType.isPrimitive && repeatedType.asGroupType().getFieldCount == 1)
new ElementConverter(repeatedType.asGroupType().getType(0), elementType)
}
}
@@ -462,37 +498,6 @@ private[parquet] class ParquetRowConverter(
// in row cells.
override def start(): Unit = currentArray = ArrayBuffer.empty[Any]
- // scalastyle:off
- /**
- * Returns whether the given type is the element type of a list or is a syntactic group with
- * one field that is the element type. This is determined by checking whether the type can be
- * a syntactic group and by checking whether a potential syntactic group matches the expected
- * schema.
- * {{{
- * <list-repetition> group <name> (LIST) {
- * repeated group list { <-- repeatedType points here
- * <element-repetition> <element-type> element;
- * }
- * }
- * }}}
- * In short, here we handle Parquet list backwards-compatibility rules on the read path. This
- * method is based on `AvroIndexedRecordConverter.isElementType`.
- *
- * @see https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules
- */
- // scalastyle:on
- private def isElementType(
- parquetRepeatedType: Type, catalystElementType: DataType, parentName: String): Boolean = {
- (parquetRepeatedType, catalystElementType) match {
- case (t: PrimitiveType, _) => true
- case (t: GroupType, _) if t.getFieldCount > 1 => true
- case (t: GroupType, _) if t.getFieldCount == 1 && t.getName == "array" => true
- case (t: GroupType, _) if t.getFieldCount == 1 && t.getName == parentName + "_tuple" => true
- case (t: GroupType, StructType(Array(f))) if f.name == t.getFieldName(0) => true
- case _ => false
- }
- }
-
/** Array element converter */
private final class ElementConverter(parquetType: Type, catalystType: DataType)
extends GroupConverter {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
index bcf535d455..c81a65f497 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
@@ -260,7 +260,7 @@ private[parquet] class ParquetSchemaConverter(
{
// For legacy 2-level list types with primitive element type, e.g.:
//
- // // List<Integer> (nullable list, non-null elements)
+ // // ARRAY<INT> (nullable list, non-null elements)
// optional group my_list (LIST) {
// repeated int32 element;
// }
@@ -270,7 +270,7 @@ private[parquet] class ParquetSchemaConverter(
// For legacy 2-level list types whose element type is a group type with 2 or more fields,
// e.g.:
//
- // // List<Tuple<String, Integer>> (nullable list, non-null elements)
+ // // ARRAY<STRUCT<str: STRING, num: INT>> (nullable list, non-null elements)
// optional group my_list (LIST) {
// repeated group element {
// required binary str (UTF8);
@@ -282,7 +282,7 @@ private[parquet] class ParquetSchemaConverter(
} || {
// For legacy 2-level list types generated by parquet-avro (Parquet version < 1.6.0), e.g.:
//
- // // List<OneTuple<String>> (nullable list, non-null elements)
+ // // ARRAY<STRUCT<str: STRING>> (nullable list, non-null elements)
// optional group my_list (LIST) {
// repeated group array {
// required binary str (UTF8);
@@ -293,7 +293,7 @@ private[parquet] class ParquetSchemaConverter(
} || {
// For Parquet data generated by parquet-thrift, e.g.:
//
- // // List<OneTuple<String>> (nullable list, non-null elements)
+ // // ARRAY<STRUCT<str: STRING>> (nullable list, non-null elements)
// optional group my_list (LIST) {
// repeated group my_list_tuple {
// required binary str (UTF8);
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index 7c394e0b0c..02b94452a1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.SpecificMutableRow
import org.apache.spark.sql.execution.BatchedDataSourceScanExec
-import org.apache.spark.sql.execution.datasources.parquet.TestingUDT.{NestedStruct, NestedStructUDT}
+import org.apache.spark.sql.execution.datasources.parquet.TestingUDT.{NestedStruct, NestedStructUDT, SingleElement}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
@@ -668,9 +668,23 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
}
}
}
+
+ test("SPARK-16344: array of struct with a single field named 'element'") {
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+ Seq(Tuple1(Array(SingleElement(42)))).toDF("f").write.parquet(path)
+
+ checkAnswer(
+ sqlContext.read.parquet(path),
+ Row(Array(Row(42)))
+ )
+ }
+ }
}
object TestingUDT {
+ case class SingleElement(element: Long)
+
@SQLUserDefinedType(udt = classOf[NestedStructUDT])
case class NestedStruct(a: Integer, b: Long, c: Double)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
index ac89bbbf8e..2b576469e9 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
@@ -19,8 +19,6 @@ package org.apache.spark.sql.hive
import java.sql.Timestamp
-import org.apache.hadoop.hive.conf.HiveConf
-
import org.apache.spark.sql.Row
import org.apache.spark.sql.execution.datasources.parquet.ParquetCompatibilityTest
import org.apache.spark.sql.hive.test.TestHiveSingleton
@@ -137,4 +135,10 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi
Row(Row(1, Seq("foo", "bar", null))),
"STRUCT<f0: INT, f1: ARRAY<STRING>>")
}
+
+ test("SPARK-16344: array of struct with a single field named 'array_element'") {
+ testParquetHiveCompatibility(
+ Row(Seq(Row(1))),
+ "ARRAY<STRUCT<array_element: INT>>")
+ }
}