aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2015-11-12 12:29:50 -0800
committerMichael Armbrust <michael@databricks.com>2015-11-12 12:29:50 -0800
commitf5a9526fec284cccd0755d190c91e8d9999f7877 (patch)
tree9adcf3815527e22010be4082fb28d35c2cc873f2
parent4fe99c72c60646b1372bb2c089c6fc7c4fa11644 (diff)
downloadspark-f5a9526fec284cccd0755d190c91e8d9999f7877.tar.gz
spark-f5a9526fec284cccd0755d190c91e8d9999f7877.tar.bz2
spark-f5a9526fec284cccd0755d190c91e8d9999f7877.zip
[SPARK-10113][SQL] Explicit error message for unsigned Parquet logical types
Parquet supports some unsigned datatypes. However, Since Spark does not support unsigned datatypes, it needs to emit an exception with a clear message rather then with the one saying illegal datatype. Author: hyukjinkwon <gurwls223@gmail.com> Closes #9646 from HyukjinKwon/SPARK-10113.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala7
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala24
2 files changed, 31 insertions, 0 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
index 7f3394c20e..f28a18e275 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
@@ -108,6 +108,9 @@ private[parquet] class CatalystSchemaConverter(
def typeString =
if (originalType == null) s"$typeName" else s"$typeName ($originalType)"
+ def typeNotSupported() =
+ throw new AnalysisException(s"Parquet type not supported: $typeString")
+
def typeNotImplemented() =
throw new AnalysisException(s"Parquet type not yet supported: $typeString")
@@ -142,6 +145,9 @@ private[parquet] class CatalystSchemaConverter(
case INT_32 | null => IntegerType
case DATE => DateType
case DECIMAL => makeDecimalType(MAX_PRECISION_FOR_INT32)
+ case UINT_8 => typeNotSupported()
+ case UINT_16 => typeNotSupported()
+ case UINT_32 => typeNotSupported()
case TIME_MILLIS => typeNotImplemented()
case _ => illegalType()
}
@@ -150,6 +156,7 @@ private[parquet] class CatalystSchemaConverter(
originalType match {
case INT_64 | null => LongType
case DECIMAL => makeDecimalType(MAX_PRECISION_FOR_INT64)
+ case UINT_64 => typeNotSupported()
case TIMESTAMP_MILLIS => typeNotImplemented()
case _ => illegalType()
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 7274479989..82a42d68fe 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -206,6 +206,30 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
}
}
+ test("SPARK-10113 Support for unsigned Parquet logical types") {
+ val parquetSchema = MessageTypeParser.parseMessageType(
+ """message root {
+ | required int32 c(UINT_32);
+ |}
+ """.stripMargin)
+
+ withTempPath { location =>
+ val extraMetadata = Map.empty[String, String].asJava
+ val fileMetadata = new FileMetaData(parquetSchema, extraMetadata, "Spark")
+ val path = new Path(location.getCanonicalPath)
+ val footer = List(
+ new Footer(path, new ParquetMetadata(fileMetadata, Collections.emptyList()))
+ ).asJava
+
+ ParquetFileWriter.writeMetadataFile(sparkContext.hadoopConfiguration, path, footer)
+
+ val errorMessage = intercept[Throwable] {
+ sqlContext.read.parquet(path.toString).printSchema()
+ }.toString
+ assert(errorMessage.contains("Parquet type not supported"))
+ }
+ }
+
test("compression codec") {
def compressionCodecFor(path: String, codecName: String): String = {
val codecs = for {