aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2016-07-20 13:00:22 +0800
committerCheng Lian <lian@databricks.com>2016-07-20 13:00:22 +0800
commit75146be6ba5e9f559f5f15430310bb476ee0812c (patch)
tree727adf7fb3d8edeaffb41839678a09771d36b399 /sql
parentfc23263623d5dcd1167fa93c094fe41ace77c326 (diff)
downloadspark-75146be6ba5e9f559f5f15430310bb476ee0812c.tar.gz
spark-75146be6ba5e9f559f5f15430310bb476ee0812c.tar.bz2
spark-75146be6ba5e9f559f5f15430310bb476ee0812c.zip
[SPARK-16632][SQL] Respect Hive schema when merging parquet schema.
When Hive (or at least certain versions of Hive) creates parquet files containing tinyint or smallint columns, it stores them as int32, but doesn't annotate the parquet field as containing the corresponding int8 / int16 data. When Spark reads those files using the vectorized reader, it follows the parquet schema for these fields, but when actually reading the data it tries to use the type fetched from the metastore, and then fails because data has been loaded into the wrong fields in OnHeapColumnVector. So instead of blindly trusting the parquet schema, check whether the Catalyst-provided schema disagrees with it, and adjust the types so that the necessary metadata is present when loading the data into the ColumnVector instance. Tested with unit tests and with tests that create byte / short columns in Hive and try to read them from Spark. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #14272 from vanzin/SPARK-16632.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala18
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala39
2 files changed, 57 insertions, 0 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
index e6ef634421..46d786de57 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
@@ -26,6 +26,8 @@ import org.apache.parquet.hadoop.api.{InitContext, ReadSupport}
import org.apache.parquet.hadoop.api.ReadSupport.ReadContext
import org.apache.parquet.io.api.RecordMaterializer
import org.apache.parquet.schema._
+import org.apache.parquet.schema.OriginalType._
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
import org.apache.parquet.schema.Type.Repetition
import org.apache.spark.internal.Logging
@@ -120,6 +122,12 @@ private[parquet] object ParquetReadSupport {
}
private def clipParquetType(parquetType: Type, catalystType: DataType): Type = {
+ val primName = if (parquetType.isPrimitive()) {
+ parquetType.asPrimitiveType().getPrimitiveTypeName()
+ } else {
+ null
+ }
+
catalystType match {
case t: ArrayType if !isPrimitiveCatalystType(t.elementType) =>
// Only clips array types with nested type as element type.
@@ -134,6 +142,16 @@ private[parquet] object ParquetReadSupport {
case t: StructType =>
clipParquetGroup(parquetType.asGroupType(), t)
+ case _: ByteType if primName == INT32 =>
+ // SPARK-16632: Handle case where Hive stores bytes in a int32 field without specifying
+ // the original type.
+ Types.primitive(INT32, parquetType.getRepetition()).as(INT_8).named(parquetType.getName())
+
+ case _: ShortType if primName == INT32 =>
+ // SPARK-16632: Handle case where Hive stores shorts in a int32 field without specifying
+ // the original type.
+ Types.primitive(INT32, parquetType.getRepetition()).as(INT_16).named(parquetType.getName())
+
case _ =>
// UDTs and primitive types are not clipped. For UDTs, a clipped version might not be able
// to be mapped to desired user-space types. So UDTs shouldn't participate schema merging.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index 8a980a7eb5..31ebec096d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -1581,4 +1581,43 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
| }
|}
""".stripMargin)
+
+ testSchemaClipping(
+ "int32 parquet field with byte schema field",
+
+ parquetSchema =
+ """message root {
+ | optional int32 value;
+ |}
+ """.stripMargin,
+
+ catalystSchema =
+ new StructType()
+ .add("value", ByteType, nullable = true),
+
+ expectedSchema =
+ """message root {
+ | optional int32 value (INT_8);
+ |}
+ """.stripMargin)
+
+ testSchemaClipping(
+ "int32 parquet field with short schema field",
+
+ parquetSchema =
+ """message root {
+ | optional int32 value;
+ |}
+ """.stripMargin,
+
+ catalystSchema =
+ new StructType()
+ .add("value", ShortType, nullable = true),
+
+ expectedSchema =
+ """message root {
+ | optional int32 value (INT_16);
+ |}
+ """.stripMargin)
+
}