diff options
author | Marcelo Vanzin <vanzin@cloudera.com> | 2016-07-20 13:00:22 +0800 |
---|---|---|
committer | Cheng Lian <lian@databricks.com> | 2016-07-20 13:00:22 +0800 |
commit | 75146be6ba5e9f559f5f15430310bb476ee0812c (patch) | |
tree | 727adf7fb3d8edeaffb41839678a09771d36b399 /sql/core/src/main | |
parent | fc23263623d5dcd1167fa93c094fe41ace77c326 (diff) | |
download | spark-75146be6ba5e9f559f5f15430310bb476ee0812c.tar.gz spark-75146be6ba5e9f559f5f15430310bb476ee0812c.tar.bz2 spark-75146be6ba5e9f559f5f15430310bb476ee0812c.zip |
[SPARK-16632][SQL] Respect Hive schema when merging parquet schema.
When Hive (or at least certain versions of Hive) creates parquet files
containing tinyint or smallint columns, it stores them as int32, but
doesn't annotate the parquet field as containing the corresponding
int8 / int16 data. When Spark reads those files using the vectorized
reader, it follows the parquet schema for these fields, but when
actually reading the data it tries to use the type fetched from
the metastore, and then fails because data has been loaded into the
wrong fields in OnHeapColumnVector.
So instead of blindly trusting the parquet schema, check whether the
Catalyst-provided schema disagrees with it, and adjust the types so
that the necessary metadata is present when loading the data into
the ColumnVector instance.
Tested with unit tests and with tests that create byte / short columns
in Hive and try to read them from Spark.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes #14272 from vanzin/SPARK-16632.
Diffstat (limited to 'sql/core/src/main')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala | 18 |
1 files changed, 18 insertions, 0 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala index e6ef634421..46d786de57 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala @@ -26,6 +26,8 @@ import org.apache.parquet.hadoop.api.{InitContext, ReadSupport} import org.apache.parquet.hadoop.api.ReadSupport.ReadContext import org.apache.parquet.io.api.RecordMaterializer import org.apache.parquet.schema._ +import org.apache.parquet.schema.OriginalType._ +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._ import org.apache.parquet.schema.Type.Repetition import org.apache.spark.internal.Logging @@ -120,6 +122,12 @@ private[parquet] object ParquetReadSupport { } private def clipParquetType(parquetType: Type, catalystType: DataType): Type = { + val primName = if (parquetType.isPrimitive()) { + parquetType.asPrimitiveType().getPrimitiveTypeName() + } else { + null + } + catalystType match { case t: ArrayType if !isPrimitiveCatalystType(t.elementType) => // Only clips array types with nested type as element type. @@ -134,6 +142,16 @@ private[parquet] object ParquetReadSupport { case t: StructType => clipParquetGroup(parquetType.asGroupType(), t) + case _: ByteType if primName == INT32 => + // SPARK-16632: Handle case where Hive stores bytes in a int32 field without specifying + // the original type. + Types.primitive(INT32, parquetType.getRepetition()).as(INT_8).named(parquetType.getName()) + + case _: ShortType if primName == INT32 => + // SPARK-16632: Handle case where Hive stores shorts in a int32 field without specifying + // the original type. + Types.primitive(INT32, parquetType.getRepetition()).as(INT_16).named(parquetType.getName()) + case _ => // UDTs and primitive types are not clipped. For UDTs, a clipped version might not be able // to be mapped to desired user-space types. So UDTs shouldn't participate schema merging. |