aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-08-25 14:58:42 +0800
committerCheng Lian <lian@databricks.com>2015-08-25 14:58:42 +0800
commitbf03fe68d62f33dda70dff45c3bda1f57b032dfc (patch)
treee249b4797046b74baa0fa72faf7b1c841e223bbc /sql
parentdf7041d02d3fd44b08a859f5d77bf6fb726895f0 (diff)
downloadspark-bf03fe68d62f33dda70dff45c3bda1f57b032dfc.tar.gz
spark-bf03fe68d62f33dda70dff45c3bda1f57b032dfc.tar.bz2
spark-bf03fe68d62f33dda70dff45c3bda1f57b032dfc.zip
[SPARK-10136] [SQL] A more robust fix for SPARK-10136
PR #8341 is a valid fix for SPARK-10136, but it didn't catch the real root cause. The real problem can be rather tricky to explain, and requires audiences to be pretty familiar with parquet-format spec, especially details of `LIST` backwards-compatibility rules. Let me have a try to give an explanation here. The structure of the problematic Parquet schema generated by parquet-avro is something like this: ``` message m { <repetition> group f (LIST) { // Level 1 repeated group array (LIST) { // Level 2 repeated <primitive-type> array; // Level 3 } } } ``` (The schema generated by parquet-thrift is structurally similar, just replace the `array` at level 2 with `f_tuple`, and the other one at level 3 with `f_tuple_tuple`.) This structure consists of two nested legacy 2-level `LIST`-like structures: 1. The repeated group type at level 2 is the element type of the outer array defined at level 1 This group should map to an `CatalystArrayConverter.ElementConverter` when building converters. 2. The repeated primitive type at level 3 is the element type of the inner array defined at level 2 This group should also map to an `CatalystArrayConverter.ElementConverter`. The root cause of SPARK-10136 is that, the group at level 2 isn't properly recognized as the element type of level 1. Thus, according to parquet-format spec, the repeated primitive at level 3 is left as a so called "unannotated repeated primitive type", and is recognized as a required list of required primitive type, thus a `RepeatedPrimitiveConverter` instead of a `CatalystArrayConverter.ElementConverter` is created for it. According to parquet-format spec, unannotated repeated type shouldn't appear in a `LIST`- or `MAP`-annotated group. PR #8341 fixed this issue by allowing such unannotated repeated type appear in `LIST`-annotated groups, which is a non-standard, hacky, but valid fix. (I didn't realize this when authoring #8341 though.) As for the reason why level 2 isn't recognized as a list element type, it's because of the following `LIST` backwards-compatibility rule defined in the parquet-format spec: > If the repeated field is a group with one field and is named either `array` or uses the `LIST`-annotated group's name with `_tuple` appended then the repeated type is the element type and elements are required. (The `array` part is for parquet-avro compatibility, while the `_tuple` part is for parquet-thrift.) This rule is implemented in [`CatalystSchemaConverter.isElementType`] [1], but neglected in [`CatalystRowConverter.isElementType`] [2]. This PR delivers a more robust fix by adding this rule in the latter method. Note that parquet-avro 1.7.0 also suffers from this issue. Details can be found at [PARQUET-364] [3]. [1]: https://github.com/apache/spark/blob/85f9a61357994da5023b08b0a8a2eb09388ce7f8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala#L259-L305 [2]: https://github.com/apache/spark/blob/85f9a61357994da5023b08b0a8a2eb09388ce7f8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala#L456-L463 [3]: https://issues.apache.org/jira/browse/PARQUET-364 Author: Cheng Lian <lian@databricks.com> Closes #8361 from liancheng/spark-10136/proper-version.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala18
1 files changed, 8 insertions, 10 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
index d2c2db5176..cbf0704c4a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
@@ -415,8 +415,9 @@ private[parquet] class CatalystRowConverter(
private val elementConverter: Converter = {
val repeatedType = parquetSchema.getType(0)
val elementType = catalystSchema.elementType
+ val parentName = parquetSchema.getName
- if (isElementType(repeatedType, elementType)) {
+ if (isElementType(repeatedType, elementType, parentName)) {
newConverter(repeatedType, elementType, new ParentContainerUpdater {
override def set(value: Any): Unit = currentArray += value
})
@@ -453,10 +454,13 @@ private[parquet] class CatalystRowConverter(
* @see https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules
*/
// scalastyle:on
- private def isElementType(parquetRepeatedType: Type, catalystElementType: DataType): Boolean = {
+ private def isElementType(
+ parquetRepeatedType: Type, catalystElementType: DataType, parentName: String): Boolean = {
(parquetRepeatedType, catalystElementType) match {
case (t: PrimitiveType, _) => true
case (t: GroupType, _) if t.getFieldCount > 1 => true
+ case (t: GroupType, _) if t.getFieldCount == 1 && t.getName == "array" => true
+ case (t: GroupType, _) if t.getFieldCount == 1 && t.getName == parentName + "_tuple" => true
case (t: GroupType, StructType(Array(f))) if f.name == t.getFieldName(0) => true
case _ => false
}
@@ -474,15 +478,9 @@ private[parquet] class CatalystRowConverter(
override def getConverter(fieldIndex: Int): Converter = converter
- override def end(): Unit = {
- converter.updater.end()
- currentArray += currentElement
- }
+ override def end(): Unit = currentArray += currentElement
- override def start(): Unit = {
- converter.updater.start()
- currentElement = null
- }
+ override def start(): Unit = currentElement = null
}
}