From f5a9526fec284cccd0755d190c91e8d9999f7877 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 12 Nov 2015 12:29:50 -0800 Subject: [SPARK-10113][SQL] Explicit error message for unsigned Parquet logical types Parquet supports some unsigned datatypes. However, Since Spark does not support unsigned datatypes, it needs to emit an exception with a clear message rather then with the one saying illegal datatype. Author: hyukjinkwon Closes #9646 from HyukjinKwon/SPARK-10113. --- .../parquet/CatalystSchemaConverter.scala | 7 +++++++ .../datasources/parquet/ParquetIOSuite.scala | 24 ++++++++++++++++++++++ 2 files changed, 31 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala index 7f3394c20e..f28a18e275 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala @@ -108,6 +108,9 @@ private[parquet] class CatalystSchemaConverter( def typeString = if (originalType == null) s"$typeName" else s"$typeName ($originalType)" + def typeNotSupported() = + throw new AnalysisException(s"Parquet type not supported: $typeString") + def typeNotImplemented() = throw new AnalysisException(s"Parquet type not yet supported: $typeString") @@ -142,6 +145,9 @@ private[parquet] class CatalystSchemaConverter( case INT_32 | null => IntegerType case DATE => DateType case DECIMAL => makeDecimalType(MAX_PRECISION_FOR_INT32) + case UINT_8 => typeNotSupported() + case UINT_16 => typeNotSupported() + case UINT_32 => typeNotSupported() case TIME_MILLIS => typeNotImplemented() case _ => illegalType() } @@ -150,6 +156,7 @@ private[parquet] class CatalystSchemaConverter( originalType match { case INT_64 | null => LongType case DECIMAL => makeDecimalType(MAX_PRECISION_FOR_INT64) + case UINT_64 => typeNotSupported() case TIMESTAMP_MILLIS => typeNotImplemented() case _ => illegalType() } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 7274479989..82a42d68fe 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -206,6 +206,30 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { } } + test("SPARK-10113 Support for unsigned Parquet logical types") { + val parquetSchema = MessageTypeParser.parseMessageType( + """message root { + | required int32 c(UINT_32); + |} + """.stripMargin) + + withTempPath { location => + val extraMetadata = Map.empty[String, String].asJava + val fileMetadata = new FileMetaData(parquetSchema, extraMetadata, "Spark") + val path = new Path(location.getCanonicalPath) + val footer = List( + new Footer(path, new ParquetMetadata(fileMetadata, Collections.emptyList())) + ).asJava + + ParquetFileWriter.writeMetadataFile(sparkContext.hadoopConfiguration, path, footer) + + val errorMessage = intercept[Throwable] { + sqlContext.read.parquet(path.toString).printSchema() + }.toString + assert(errorMessage.contains("Parquet type not supported")) + } + } + test("compression codec") { def compressionCodecFor(path: String, codecName: String): String = { val codecs = for { -- cgit v1.2.3