diff options
Diffstat (limited to 'sql/core/src')
2 files changed, 57 insertions, 0 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala index e6ef634421..46d786de57 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala @@ -26,6 +26,8 @@ import org.apache.parquet.hadoop.api.{InitContext, ReadSupport} import org.apache.parquet.hadoop.api.ReadSupport.ReadContext import org.apache.parquet.io.api.RecordMaterializer import org.apache.parquet.schema._ +import org.apache.parquet.schema.OriginalType._ +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._ import org.apache.parquet.schema.Type.Repetition import org.apache.spark.internal.Logging @@ -120,6 +122,12 @@ private[parquet] object ParquetReadSupport { } private def clipParquetType(parquetType: Type, catalystType: DataType): Type = { + val primName = if (parquetType.isPrimitive()) { + parquetType.asPrimitiveType().getPrimitiveTypeName() + } else { + null + } + catalystType match { case t: ArrayType if !isPrimitiveCatalystType(t.elementType) => // Only clips array types with nested type as element type. @@ -134,6 +142,16 @@ private[parquet] object ParquetReadSupport { case t: StructType => clipParquetGroup(parquetType.asGroupType(), t) + case _: ByteType if primName == INT32 => + // SPARK-16632: Handle case where Hive stores bytes in a int32 field without specifying + // the original type. + Types.primitive(INT32, parquetType.getRepetition()).as(INT_8).named(parquetType.getName()) + + case _: ShortType if primName == INT32 => + // SPARK-16632: Handle case where Hive stores shorts in a int32 field without specifying + // the original type. + Types.primitive(INT32, parquetType.getRepetition()).as(INT_16).named(parquetType.getName()) + case _ => // UDTs and primitive types are not clipped. For UDTs, a clipped version might not be able // to be mapped to desired user-space types. So UDTs shouldn't participate schema merging. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala index 8a980a7eb5..31ebec096d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -1581,4 +1581,43 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | } |} """.stripMargin) + + testSchemaClipping( + "int32 parquet field with byte schema field", + + parquetSchema = + """message root { + | optional int32 value; + |} + """.stripMargin, + + catalystSchema = + new StructType() + .add("value", ByteType, nullable = true), + + expectedSchema = + """message root { + | optional int32 value (INT_8); + |} + """.stripMargin) + + testSchemaClipping( + "int32 parquet field with short schema field", + + parquetSchema = + """message root { + | optional int32 value; + |} + """.stripMargin, + + catalystSchema = + new StructType() + .add("value", ShortType, nullable = true), + + expectedSchema = + """message root { + | optional int32 value (INT_16); + |} + """.stripMargin) + } |