aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala18
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala39
2 files changed, 57 insertions, 0 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
index e6ef634421..46d786de57 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
@@ -26,6 +26,8 @@ import org.apache.parquet.hadoop.api.{InitContext, ReadSupport}
import org.apache.parquet.hadoop.api.ReadSupport.ReadContext
import org.apache.parquet.io.api.RecordMaterializer
import org.apache.parquet.schema._
+import org.apache.parquet.schema.OriginalType._
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
import org.apache.parquet.schema.Type.Repetition
import org.apache.spark.internal.Logging
@@ -120,6 +122,12 @@ private[parquet] object ParquetReadSupport {
}
private def clipParquetType(parquetType: Type, catalystType: DataType): Type = {
+ val primName = if (parquetType.isPrimitive()) {
+ parquetType.asPrimitiveType().getPrimitiveTypeName()
+ } else {
+ null
+ }
+
catalystType match {
case t: ArrayType if !isPrimitiveCatalystType(t.elementType) =>
// Only clips array types with nested type as element type.
@@ -134,6 +142,16 @@ private[parquet] object ParquetReadSupport {
case t: StructType =>
clipParquetGroup(parquetType.asGroupType(), t)
+ case _: ByteType if primName == INT32 =>
+ // SPARK-16632: Handle case where Hive stores bytes in a int32 field without specifying
+ // the original type.
+ Types.primitive(INT32, parquetType.getRepetition()).as(INT_8).named(parquetType.getName())
+
+ case _: ShortType if primName == INT32 =>
+ // SPARK-16632: Handle case where Hive stores shorts in a int32 field without specifying
+ // the original type.
+ Types.primitive(INT32, parquetType.getRepetition()).as(INT_16).named(parquetType.getName())
+
case _ =>
// UDTs and primitive types are not clipped. For UDTs, a clipped version might not be able
// to be mapped to desired user-space types. So UDTs shouldn't participate schema merging.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index 8a980a7eb5..31ebec096d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -1581,4 +1581,43 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
| }
|}
""".stripMargin)
+
+ testSchemaClipping(
+ "int32 parquet field with byte schema field",
+
+ parquetSchema =
+ """message root {
+ | optional int32 value;
+ |}
+ """.stripMargin,
+
+ catalystSchema =
+ new StructType()
+ .add("value", ByteType, nullable = true),
+
+ expectedSchema =
+ """message root {
+ | optional int32 value (INT_8);
+ |}
+ """.stripMargin)
+
+ testSchemaClipping(
+ "int32 parquet field with short schema field",
+
+ parquetSchema =
+ """message root {
+ | optional int32 value;
+ |}
+ """.stripMargin,
+
+ catalystSchema =
+ new StructType()
+ .add("value", ShortType, nullable = true),
+
+ expectedSchema =
+ """message root {
+ | optional int32 value (INT_16);
+ |}
+ """.stripMargin)
+
}