aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--project/MimaExcludes.scala7
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala14
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystSchemaConverter.scala565
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala374
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala736
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala2
9 files changed, 1297 insertions, 422 deletions
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index f678c69a6d..6f86a505b3 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -69,7 +69,12 @@ object MimaExcludes {
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.parquet.CatalystTimestampConverter"),
ProblemFilters.exclude[MissingClassProblem](
- "org.apache.spark.sql.parquet.CatalystTimestampConverter$")
+ "org.apache.spark.sql.parquet.CatalystTimestampConverter$"),
+ // SPARK-6777 Implements backwards compatibility rules in CatalystSchemaConverter
+ ProblemFilters.exclude[MissingClassProblem](
+ "org.apache.spark.sql.parquet.ParquetTypeInfo"),
+ ProblemFilters.exclude[MissingClassProblem](
+ "org.apache.spark.sql.parquet.ParquetTypeInfo$")
)
case v if v.startsWith("1.4") =>
Seq(
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala
index 407dc27326..18cdfa7238 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala
@@ -20,13 +20,18 @@ package org.apache.spark.sql.types
import scala.reflect.runtime.universe.typeTag
import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.ScalaReflectionLock
import org.apache.spark.sql.catalyst.expressions.Expression
/** Precision parameters for a Decimal */
-case class PrecisionInfo(precision: Int, scale: Int)
-
+case class PrecisionInfo(precision: Int, scale: Int) {
+ if (scale > precision) {
+ throw new AnalysisException(
+ s"Decimal scale ($scale) cannot be greater than precision ($precision).")
+ }
+}
/**
* :: DeveloperApi ::
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 265352647f..9a10a23937 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -264,6 +264,14 @@ private[spark] object SQLConf {
defaultValue = Some(true),
doc = "<TODO>")
+ val PARQUET_FOLLOW_PARQUET_FORMAT_SPEC = booleanConf(
+ key = "spark.sql.parquet.followParquetFormatSpec",
+ defaultValue = Some(false),
+ doc = "Wether to stick to Parquet format specification when converting Parquet schema to " +
+ "Spark SQL schema and vice versa. Sticks to the specification if set to true; falls back " +
+ "to compatible mode if set to false.",
+ isPublic = false)
+
val PARQUET_OUTPUT_COMMITTER_CLASS = stringConf(
key = "spark.sql.parquet.output.committer.class",
defaultValue = Some(classOf[ParquetOutputCommitter].getName),
@@ -499,6 +507,12 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
private[spark] def isParquetINT96AsTimestamp: Boolean = getConf(PARQUET_INT96_AS_TIMESTAMP)
/**
+ * When set to true, sticks to Parquet format spec when converting Parquet schema to Spark SQL
+ * schema and vice versa. Otherwise, falls back to compatible mode.
+ */
+ private[spark] def followParquetFormatSpec: Boolean = getConf(PARQUET_FOLLOW_PARQUET_FORMAT_SPEC)
+
+ /**
* When set to true, partition pruning for in-memory columnar tables is enabled.
*/
private[spark] def inMemoryPartitionPruning: Boolean = getConf(IN_MEMORY_PARTITION_PRUNING)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystSchemaConverter.scala
new file mode 100644
index 0000000000..4fd3e93b70
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystSchemaConverter.scala
@@ -0,0 +1,565 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.parquet
+
+import scala.collection.JavaConversions._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.schema.OriginalType._
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
+import org.apache.parquet.schema.Type.Repetition._
+import org.apache.parquet.schema._
+
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.{AnalysisException, SQLConf}
+
+/**
+ * This converter class is used to convert Parquet [[MessageType]] to Spark SQL [[StructType]] and
+ * vice versa.
+ *
+ * Parquet format backwards-compatibility rules are respected when converting Parquet
+ * [[MessageType]] schemas.
+ *
+ * @see https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
+ *
+ * @constructor
+ * @param assumeBinaryIsString Whether unannotated BINARY fields should be assumed to be Spark SQL
+ * [[StringType]] fields when converting Parquet a [[MessageType]] to Spark SQL
+ * [[StructType]].
+ * @param assumeInt96IsTimestamp Whether unannotated INT96 fields should be assumed to be Spark SQL
+ * [[TimestampType]] fields when converting Parquet a [[MessageType]] to Spark SQL
+ * [[StructType]]. Note that Spark SQL [[TimestampType]] is similar to Hive timestamp, which
+ * has optional nanosecond precision, but different from `TIME_MILLS` and `TIMESTAMP_MILLIS`
+ * described in Parquet format spec.
+ * @param followParquetFormatSpec Whether to generate standard DECIMAL, LIST, and MAP structure when
+ * converting Spark SQL [[StructType]] to Parquet [[MessageType]]. For Spark 1.4.x and
+ * prior versions, Spark SQL only supports decimals with a max precision of 18 digits, and
+ * uses non-standard LIST and MAP structure. Note that the current Parquet format spec is
+ * backwards-compatible with these settings. If this argument is set to `false`, we fallback
+ * to old style non-standard behaviors.
+ */
+private[parquet] class CatalystSchemaConverter(
+ private val assumeBinaryIsString: Boolean,
+ private val assumeInt96IsTimestamp: Boolean,
+ private val followParquetFormatSpec: Boolean) {
+
+ // Only used when constructing converter for converting Spark SQL schema to Parquet schema, in
+ // which case `assumeInt96IsTimestamp` and `assumeBinaryIsString` are irrelevant.
+ def this() = this(
+ assumeBinaryIsString = SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get,
+ assumeInt96IsTimestamp = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get,
+ followParquetFormatSpec = SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.defaultValue.get)
+
+ def this(conf: SQLConf) = this(
+ assumeBinaryIsString = conf.isParquetBinaryAsString,
+ assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp,
+ followParquetFormatSpec = conf.followParquetFormatSpec)
+
+ def this(conf: Configuration) = this(
+ assumeBinaryIsString =
+ conf.getBoolean(
+ SQLConf.PARQUET_BINARY_AS_STRING.key,
+ SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get),
+ assumeInt96IsTimestamp =
+ conf.getBoolean(
+ SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
+ SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get),
+ followParquetFormatSpec =
+ conf.getBoolean(
+ SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.key,
+ SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.defaultValue.get))
+
+ /**
+ * Converts Parquet [[MessageType]] `parquetSchema` to a Spark SQL [[StructType]].
+ */
+ def convert(parquetSchema: MessageType): StructType = convert(parquetSchema.asGroupType())
+
+ private def convert(parquetSchema: GroupType): StructType = {
+ val fields = parquetSchema.getFields.map { field =>
+ field.getRepetition match {
+ case OPTIONAL =>
+ StructField(field.getName, convertField(field), nullable = true)
+
+ case REQUIRED =>
+ StructField(field.getName, convertField(field), nullable = false)
+
+ case REPEATED =>
+ throw new AnalysisException(
+ s"REPEATED not supported outside LIST or MAP. Type: $field")
+ }
+ }
+
+ StructType(fields)
+ }
+
+ /**
+ * Converts a Parquet [[Type]] to a Spark SQL [[DataType]].
+ */
+ def convertField(parquetType: Type): DataType = parquetType match {
+ case t: PrimitiveType => convertPrimitiveField(t)
+ case t: GroupType => convertGroupField(t.asGroupType())
+ }
+
+ private def convertPrimitiveField(field: PrimitiveType): DataType = {
+ val typeName = field.getPrimitiveTypeName
+ val originalType = field.getOriginalType
+
+ def typeString =
+ if (originalType == null) s"$typeName" else s"$typeName ($originalType)"
+
+ def typeNotImplemented() =
+ throw new AnalysisException(s"Parquet type not yet supported: $typeString")
+
+ def illegalType() =
+ throw new AnalysisException(s"Illegal Parquet type: $typeString")
+
+ // When maxPrecision = -1, we skip precision range check, and always respect the precision
+ // specified in field.getDecimalMetadata. This is useful when interpreting decimal types stored
+ // as binaries with variable lengths.
+ def makeDecimalType(maxPrecision: Int = -1): DecimalType = {
+ val precision = field.getDecimalMetadata.getPrecision
+ val scale = field.getDecimalMetadata.getScale
+
+ CatalystSchemaConverter.analysisRequire(
+ maxPrecision == -1 || 1 <= precision && precision <= maxPrecision,
+ s"Invalid decimal precision: $typeName cannot store $precision digits (max $maxPrecision)")
+
+ DecimalType(precision, scale)
+ }
+
+ field.getPrimitiveTypeName match {
+ case BOOLEAN => BooleanType
+
+ case FLOAT => FloatType
+
+ case DOUBLE => DoubleType
+
+ case INT32 =>
+ field.getOriginalType match {
+ case INT_8 => ByteType
+ case INT_16 => ShortType
+ case INT_32 | null => IntegerType
+ case DATE => DateType
+ case DECIMAL => makeDecimalType(maxPrecisionForBytes(4))
+ case TIME_MILLIS => typeNotImplemented()
+ case _ => illegalType()
+ }
+
+ case INT64 =>
+ field.getOriginalType match {
+ case INT_64 | null => LongType
+ case DECIMAL => makeDecimalType(maxPrecisionForBytes(8))
+ case TIMESTAMP_MILLIS => typeNotImplemented()
+ case _ => illegalType()
+ }
+
+ case INT96 =>
+ CatalystSchemaConverter.analysisRequire(
+ assumeInt96IsTimestamp,
+ "INT96 is not supported unless it's interpreted as timestamp. " +
+ s"Please try to set ${SQLConf.PARQUET_INT96_AS_TIMESTAMP.key} to true.")
+ TimestampType
+
+ case BINARY =>
+ field.getOriginalType match {
+ case UTF8 => StringType
+ case null if assumeBinaryIsString => StringType
+ case null => BinaryType
+ case DECIMAL => makeDecimalType()
+ case _ => illegalType()
+ }
+
+ case FIXED_LEN_BYTE_ARRAY =>
+ field.getOriginalType match {
+ case DECIMAL => makeDecimalType(maxPrecisionForBytes(field.getTypeLength))
+ case INTERVAL => typeNotImplemented()
+ case _ => illegalType()
+ }
+
+ case _ => illegalType()
+ }
+ }
+
+ private def convertGroupField(field: GroupType): DataType = {
+ Option(field.getOriginalType).fold(convert(field): DataType) {
+ // A Parquet list is represented as a 3-level structure:
+ //
+ // <list-repetition> group <name> (LIST) {
+ // repeated group list {
+ // <element-repetition> <element-type> element;
+ // }
+ // }
+ //
+ // However, according to the most recent Parquet format spec (not released yet up until
+ // writing), some 2-level structures are also recognized for backwards-compatibility. Thus,
+ // we need to check whether the 2nd level or the 3rd level refers to list element type.
+ //
+ // See: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
+ case LIST =>
+ CatalystSchemaConverter.analysisRequire(
+ field.getFieldCount == 1, s"Invalid list type $field")
+
+ val repeatedType = field.getType(0)
+ CatalystSchemaConverter.analysisRequire(
+ repeatedType.isRepetition(REPEATED), s"Invalid list type $field")
+
+ if (isElementType(repeatedType, field.getName)) {
+ ArrayType(convertField(repeatedType), containsNull = false)
+ } else {
+ val elementType = repeatedType.asGroupType().getType(0)
+ val optional = elementType.isRepetition(OPTIONAL)
+ ArrayType(convertField(elementType), containsNull = optional)
+ }
+
+ // scalastyle:off
+ // `MAP_KEY_VALUE` is for backwards-compatibility
+ // See: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules-1
+ // scalastyle:on
+ case MAP | MAP_KEY_VALUE =>
+ CatalystSchemaConverter.analysisRequire(
+ field.getFieldCount == 1 && !field.getType(0).isPrimitive,
+ s"Invalid map type: $field")
+
+ val keyValueType = field.getType(0).asGroupType()
+ CatalystSchemaConverter.analysisRequire(
+ keyValueType.isRepetition(REPEATED) && keyValueType.getFieldCount == 2,
+ s"Invalid map type: $field")
+
+ val keyType = keyValueType.getType(0)
+ CatalystSchemaConverter.analysisRequire(
+ keyType.isPrimitive,
+ s"Map key type is expected to be a primitive type, but found: $keyType")
+
+ val valueType = keyValueType.getType(1)
+ val valueOptional = valueType.isRepetition(OPTIONAL)
+ MapType(
+ convertField(keyType),
+ convertField(valueType),
+ valueContainsNull = valueOptional)
+
+ case _ =>
+ throw new AnalysisException(s"Unrecognized Parquet type: $field")
+ }
+ }
+
+ // scalastyle:off
+ // Here we implement Parquet LIST backwards-compatibility rules.
+ // See: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules
+ // scalastyle:on
+ private def isElementType(repeatedType: Type, parentName: String) = {
+ {
+ // For legacy 2-level list types with primitive element type, e.g.:
+ //
+ // // List<Integer> (nullable list, non-null elements)
+ // optional group my_list (LIST) {
+ // repeated int32 element;
+ // }
+ //
+ repeatedType.isPrimitive
+ } || {
+ // For legacy 2-level list types whose element type is a group type with 2 or more fields,
+ // e.g.:
+ //
+ // // List<Tuple<String, Integer>> (nullable list, non-null elements)
+ // optional group my_list (LIST) {
+ // repeated group element {
+ // required binary str (UTF8);
+ // required int32 num;
+ // };
+ // }
+ //
+ repeatedType.asGroupType().getFieldCount > 1
+ } || {
+ // For legacy 2-level list types generated by parquet-avro (Parquet version < 1.6.0), e.g.:
+ //
+ // // List<OneTuple<String>> (nullable list, non-null elements)
+ // optional group my_list (LIST) {
+ // repeated group array {
+ // required binary str (UTF8);
+ // };
+ // }
+ //
+ repeatedType.getName == "array"
+ } || {
+ // For Parquet data generated by parquet-thrift, e.g.:
+ //
+ // // List<OneTuple<String>> (nullable list, non-null elements)
+ // optional group my_list (LIST) {
+ // repeated group my_list_tuple {
+ // required binary str (UTF8);
+ // };
+ // }
+ //
+ repeatedType.getName == s"${parentName}_tuple"
+ }
+ }
+
+ /**
+ * Converts a Spark SQL [[StructType]] to a Parquet [[MessageType]].
+ */
+ def convert(catalystSchema: StructType): MessageType = {
+ Types.buildMessage().addFields(catalystSchema.map(convertField): _*).named("root")
+ }
+
+ /**
+ * Converts a Spark SQL [[StructField]] to a Parquet [[Type]].
+ */
+ def convertField(field: StructField): Type = {
+ convertField(field, if (field.nullable) OPTIONAL else REQUIRED)
+ }
+
+ private def convertField(field: StructField, repetition: Type.Repetition): Type = {
+ CatalystSchemaConverter.checkFieldName(field.name)
+
+ field.dataType match {
+ // ===================
+ // Simple atomic types
+ // ===================
+
+ case BooleanType =>
+ Types.primitive(BOOLEAN, repetition).named(field.name)
+
+ case ByteType =>
+ Types.primitive(INT32, repetition).as(INT_8).named(field.name)
+
+ case ShortType =>
+ Types.primitive(INT32, repetition).as(INT_16).named(field.name)
+
+ case IntegerType =>
+ Types.primitive(INT32, repetition).named(field.name)
+
+ case LongType =>
+ Types.primitive(INT64, repetition).named(field.name)
+
+ case FloatType =>
+ Types.primitive(FLOAT, repetition).named(field.name)
+
+ case DoubleType =>
+ Types.primitive(DOUBLE, repetition).named(field.name)
+
+ case StringType =>
+ Types.primitive(BINARY, repetition).as(UTF8).named(field.name)
+
+ case DateType =>
+ Types.primitive(INT32, repetition).as(DATE).named(field.name)
+
+ // NOTE: !! This timestamp type is not specified in Parquet format spec !!
+ // However, Impala and older versions of Spark SQL use INT96 to store timestamps with
+ // nanosecond precision (not TIME_MILLIS or TIMESTAMP_MILLIS described in the spec).
+ case TimestampType =>
+ Types.primitive(INT96, repetition).named(field.name)
+
+ case BinaryType =>
+ Types.primitive(BINARY, repetition).named(field.name)
+
+ // =====================================
+ // Decimals (for Spark version <= 1.4.x)
+ // =====================================
+
+ // Spark 1.4.x and prior versions only support decimals with a maximum precision of 18 and
+ // always store decimals in fixed-length byte arrays.
+ case DecimalType.Fixed(precision, scale)
+ if precision <= maxPrecisionForBytes(8) && !followParquetFormatSpec =>
+ Types
+ .primitive(FIXED_LEN_BYTE_ARRAY, repetition)
+ .as(DECIMAL)
+ .precision(precision)
+ .scale(scale)
+ .length(minBytesForPrecision(precision))
+ .named(field.name)
+
+ case dec @ DecimalType() if !followParquetFormatSpec =>
+ throw new AnalysisException(
+ s"Data type $dec is not supported. " +
+ s"When ${SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.key} is set to false," +
+ "decimal precision and scale must be specified, " +
+ "and precision must be less than or equal to 18.")
+
+ // =====================================
+ // Decimals (follow Parquet format spec)
+ // =====================================
+
+ // Uses INT32 for 1 <= precision <= 9
+ case DecimalType.Fixed(precision, scale)
+ if precision <= maxPrecisionForBytes(4) && followParquetFormatSpec =>
+ Types
+ .primitive(INT32, repetition)
+ .as(DECIMAL)
+ .precision(precision)
+ .scale(scale)
+ .named(field.name)
+
+ // Uses INT64 for 1 <= precision <= 18
+ case DecimalType.Fixed(precision, scale)
+ if precision <= maxPrecisionForBytes(8) && followParquetFormatSpec =>
+ Types
+ .primitive(INT64, repetition)
+ .as(DECIMAL)
+ .precision(precision)
+ .scale(scale)
+ .named(field.name)
+
+ // Uses FIXED_LEN_BYTE_ARRAY for all other precisions
+ case DecimalType.Fixed(precision, scale) if followParquetFormatSpec =>
+ Types
+ .primitive(FIXED_LEN_BYTE_ARRAY, repetition)
+ .as(DECIMAL)
+ .precision(precision)
+ .scale(scale)
+ .length(minBytesForPrecision(precision))
+ .named(field.name)
+
+ case dec @ DecimalType.Unlimited if followParquetFormatSpec =>
+ throw new AnalysisException(
+ s"Data type $dec is not supported. Decimal precision and scale must be specified.")
+
+ // ===================================================
+ // ArrayType and MapType (for Spark versions <= 1.4.x)
+ // ===================================================
+
+ // Spark 1.4.x and prior versions convert ArrayType with nullable elements into a 3-level
+ // LIST structure. This behavior mimics parquet-hive (1.6.0rc3). Note that this case is
+ // covered by the backwards-compatibility rules implemented in `isElementType()`.
+ case ArrayType(elementType, nullable @ true) if !followParquetFormatSpec =>
+ // <list-repetition> group <name> (LIST) {
+ // optional group bag {
+ // repeated <element-type> element;
+ // }
+ // }
+ ConversionPatterns.listType(
+ repetition,
+ field.name,
+ Types
+ .buildGroup(REPEATED)
+ .addField(convertField(StructField("element", elementType, nullable)))
+ .named(CatalystConverter.ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME))
+
+ // Spark 1.4.x and prior versions convert ArrayType with non-nullable elements into a 2-level
+ // LIST structure. This behavior mimics parquet-avro (1.6.0rc3). Note that this case is
+ // covered by the backwards-compatibility rules implemented in `isElementType()`.
+ case ArrayType(elementType, nullable @ false) if !followParquetFormatSpec =>
+ // <list-repetition> group <name> (LIST) {
+ // repeated <element-type> element;
+ // }
+ ConversionPatterns.listType(
+ repetition,
+ field.name,
+ convertField(StructField("element", elementType, nullable), REPEATED))
+
+ // Spark 1.4.x and prior versions convert MapType into a 3-level group annotated by
+ // MAP_KEY_VALUE. This is covered by `convertGroupField(field: GroupType): DataType`.
+ case MapType(keyType, valueType, valueContainsNull) if !followParquetFormatSpec =>
+ // <map-repetition> group <name> (MAP) {
+ // repeated group map (MAP_KEY_VALUE) {
+ // required <key-type> key;
+ // <value-repetition> <value-type> value;
+ // }
+ // }
+ ConversionPatterns.mapType(
+ repetition,
+ field.name,
+ convertField(StructField("key", keyType, nullable = false)),
+ convertField(StructField("value", valueType, valueContainsNull)))
+
+ // ==================================================
+ // ArrayType and MapType (follow Parquet format spec)
+ // ==================================================
+
+ case ArrayType(elementType, containsNull) if followParquetFormatSpec =>
+ // <list-repetition> group <name> (LIST) {
+ // repeated group list {
+ // <element-repetition> <element-type> element;
+ // }
+ // }
+ Types
+ .buildGroup(repetition).as(LIST)
+ .addField(
+ Types.repeatedGroup()
+ .addField(convertField(StructField("element", elementType, containsNull)))
+ .named("list"))
+ .named(field.name)
+
+ case MapType(keyType, valueType, valueContainsNull) =>
+ // <map-repetition> group <name> (MAP) {
+ // repeated group key_value {
+ // required <key-type> key;
+ // <value-repetition> <value-type> value;
+ // }
+ // }
+ Types
+ .buildGroup(repetition).as(MAP)
+ .addField(
+ Types
+ .repeatedGroup()
+ .addField(convertField(StructField("key", keyType, nullable = false)))
+ .addField(convertField(StructField("value", valueType, valueContainsNull)))
+ .named("key_value"))
+ .named(field.name)
+
+ // ===========
+ // Other types
+ // ===========
+
+ case StructType(fields) =>
+ fields.foldLeft(Types.buildGroup(repetition)) { (builder, field) =>
+ builder.addField(convertField(field))
+ }.named(field.name)
+
+ case udt: UserDefinedType[_] =>
+ convertField(field.copy(dataType = udt.sqlType))
+
+ case _ =>
+ throw new AnalysisException(s"Unsupported data type $field.dataType")
+ }
+ }
+
+ // Max precision of a decimal value stored in `numBytes` bytes
+ private def maxPrecisionForBytes(numBytes: Int): Int = {
+ Math.round( // convert double to long
+ Math.floor(Math.log10( // number of base-10 digits
+ Math.pow(2, 8 * numBytes - 1) - 1))) // max value stored in numBytes
+ .asInstanceOf[Int]
+ }
+
+ // Min byte counts needed to store decimals with various precisions
+ private val minBytesForPrecision: Array[Int] = Array.tabulate(38) { precision =>
+ var numBytes = 1
+ while (math.pow(2.0, 8 * numBytes - 1) < math.pow(10.0, precision)) {
+ numBytes += 1
+ }
+ numBytes
+ }
+}
+
+
+private[parquet] object CatalystSchemaConverter {
+ def checkFieldName(name: String): Unit = {
+ // ,;{}()\n\t= and space are special characters in Parquet schema
+ analysisRequire(
+ !name.matches(".*[ ,;{}()\n\t=].*"),
+ s"""Attribute name "$name" contains invalid character(s) among " ,;{}()\\n\\t=".
+ |Please use alias to rename it.
+ """.stripMargin.split("\n").mkString(" "))
+ }
+
+ def analysisRequire(f: => Boolean, message: String): Unit = {
+ if (!f) {
+ throw new AnalysisException(message)
+ }
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
index e65fa0030e..0d96a1e807 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
@@ -86,8 +86,7 @@ private[parquet] class RowReadSupport extends ReadSupport[InternalRow] with Logg
// TODO: Why it can be null?
if (schema == null) {
log.debug("falling back to Parquet read schema")
- schema = ParquetTypesConverter.convertToAttributes(
- parquetSchema, false, true)
+ schema = ParquetTypesConverter.convertToAttributes(parquetSchema, false, true)
}
log.debug(s"list of attributes that will be read: $schema")
new RowRecordMaterializer(parquetSchema, schema)
@@ -105,8 +104,7 @@ private[parquet] class RowReadSupport extends ReadSupport[InternalRow] with Logg
// If the parquet file is thrift derived, there is a good chance that
// it will have the thrift class in metadata.
val isThriftDerived = keyValueMetaData.keySet().contains("thrift.class")
- parquetSchema = ParquetTypesConverter
- .convertFromAttributes(requestedAttributes, isThriftDerived)
+ parquetSchema = ParquetTypesConverter.convertFromAttributes(requestedAttributes)
metadata.put(
RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
ParquetTypesConverter.convertToString(requestedAttributes))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
index ba2a35b74e..4d5199a140 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
@@ -29,214 +29,19 @@ import org.apache.parquet.format.converter.ParquetMetadataConverter
import org.apache.parquet.hadoop.metadata.{FileMetaData, ParquetMetadata}
import org.apache.parquet.hadoop.util.ContextUtil
import org.apache.parquet.hadoop.{Footer, ParquetFileReader, ParquetFileWriter}
-import org.apache.parquet.schema.PrimitiveType.{PrimitiveTypeName => ParquetPrimitiveTypeName}
-import org.apache.parquet.schema.Type.Repetition
-import org.apache.parquet.schema.{ConversionPatterns, DecimalMetadata, GroupType => ParquetGroupType, MessageType, OriginalType => ParquetOriginalType, PrimitiveType => ParquetPrimitiveType, Type => ParquetType, Types => ParquetTypes}
+import org.apache.parquet.schema.MessageType
import org.apache.spark.Logging
-import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
+import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.types._
-/** A class representing Parquet info fields we care about, for passing back to Parquet */
-private[parquet] case class ParquetTypeInfo(
- primitiveType: ParquetPrimitiveTypeName,
- originalType: Option[ParquetOriginalType] = None,
- decimalMetadata: Option[DecimalMetadata] = None,
- length: Option[Int] = None)
-
private[parquet] object ParquetTypesConverter extends Logging {
def isPrimitiveType(ctype: DataType): Boolean = ctype match {
case _: NumericType | BooleanType | StringType | BinaryType => true
case _: DataType => false
}
- def toPrimitiveDataType(
- parquetType: ParquetPrimitiveType,
- binaryAsString: Boolean,
- int96AsTimestamp: Boolean): DataType = {
- val originalType = parquetType.getOriginalType
- val decimalInfo = parquetType.getDecimalMetadata
- parquetType.getPrimitiveTypeName match {
- case ParquetPrimitiveTypeName.BINARY
- if (originalType == ParquetOriginalType.UTF8 || binaryAsString) => StringType
- case ParquetPrimitiveTypeName.BINARY => BinaryType
- case ParquetPrimitiveTypeName.BOOLEAN => BooleanType
- case ParquetPrimitiveTypeName.DOUBLE => DoubleType
- case ParquetPrimitiveTypeName.FLOAT => FloatType
- case ParquetPrimitiveTypeName.INT32
- if originalType == ParquetOriginalType.DATE => DateType
- case ParquetPrimitiveTypeName.INT32 => IntegerType
- case ParquetPrimitiveTypeName.INT64 => LongType
- case ParquetPrimitiveTypeName.INT96 if int96AsTimestamp => TimestampType
- case ParquetPrimitiveTypeName.INT96 =>
- // TODO: add BigInteger type? TODO(andre) use DecimalType instead????
- throw new AnalysisException("Potential loss of precision: cannot convert INT96")
- case ParquetPrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
- if (originalType == ParquetOriginalType.DECIMAL && decimalInfo.getPrecision <= 18) =>
- // TODO: for now, our reader only supports decimals that fit in a Long
- DecimalType(decimalInfo.getPrecision, decimalInfo.getScale)
- case _ => throw new AnalysisException(s"Unsupported parquet datatype $parquetType")
- }
- }
-
- /**
- * Converts a given Parquet `Type` into the corresponding
- * [[org.apache.spark.sql.types.DataType]].
- *
- * We apply the following conversion rules:
- * <ul>
- * <li> Primitive types are converter to the corresponding primitive type.</li>
- * <li> Group types that have a single field that is itself a group, which has repetition
- * level `REPEATED`, are treated as follows:<ul>
- * <li> If the nested group has name `values`, the surrounding group is converted
- * into an [[ArrayType]] with the corresponding field type (primitive or
- * complex) as element type.</li>
- * <li> If the nested group has name `map` and two fields (named `key` and `value`),
- * the surrounding group is converted into a [[MapType]]
- * with the corresponding key and value (value possibly complex) types.
- * Note that we currently assume map values are not nullable.</li>
- * <li> Other group types are converted into a [[StructType]] with the corresponding
- * field types.</li></ul></li>
- * </ul>
- * Note that fields are determined to be `nullable` if and only if their Parquet repetition
- * level is not `REQUIRED`.
- *
- * @param parquetType The type to convert.
- * @return The corresponding Catalyst type.
- */
- def toDataType(parquetType: ParquetType,
- isBinaryAsString: Boolean,
- isInt96AsTimestamp: Boolean): DataType = {
- def correspondsToMap(groupType: ParquetGroupType): Boolean = {
- if (groupType.getFieldCount != 1 || groupType.getFields.apply(0).isPrimitive) {
- false
- } else {
- // This mostly follows the convention in ``parquet.schema.ConversionPatterns``
- val keyValueGroup = groupType.getFields.apply(0).asGroupType()
- keyValueGroup.getRepetition == Repetition.REPEATED &&
- keyValueGroup.getName == CatalystConverter.MAP_SCHEMA_NAME &&
- keyValueGroup.getFieldCount == 2 &&
- keyValueGroup.getFields.apply(0).getName == CatalystConverter.MAP_KEY_SCHEMA_NAME &&
- keyValueGroup.getFields.apply(1).getName == CatalystConverter.MAP_VALUE_SCHEMA_NAME
- }
- }
-
- def correspondsToArray(groupType: ParquetGroupType): Boolean = {
- groupType.getFieldCount == 1 &&
- groupType.getFieldName(0) == CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME &&
- groupType.getFields.apply(0).getRepetition == Repetition.REPEATED
- }
-
- if (parquetType.isPrimitive) {
- toPrimitiveDataType(parquetType.asPrimitiveType, isBinaryAsString, isInt96AsTimestamp)
- } else {
- val groupType = parquetType.asGroupType()
- parquetType.getOriginalType match {
- // if the schema was constructed programmatically there may be hints how to convert
- // it inside the metadata via the OriginalType field
- case ParquetOriginalType.LIST => { // TODO: check enums!
- assert(groupType.getFieldCount == 1)
- val field = groupType.getFields.apply(0)
- if (field.getName == CatalystConverter.ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME) {
- val bag = field.asGroupType()
- assert(bag.getFieldCount == 1)
- ArrayType(
- toDataType(bag.getFields.apply(0), isBinaryAsString, isInt96AsTimestamp),
- containsNull = true)
- } else {
- ArrayType(
- toDataType(field, isBinaryAsString, isInt96AsTimestamp), containsNull = false)
- }
- }
- case ParquetOriginalType.MAP => {
- assert(
- !groupType.getFields.apply(0).isPrimitive,
- "Parquet Map type malformatted: expected nested group for map!")
- val keyValueGroup = groupType.getFields.apply(0).asGroupType()
- assert(
- keyValueGroup.getFieldCount == 2,
- "Parquet Map type malformatted: nested group should have 2 (key, value) fields!")
- assert(keyValueGroup.getFields.apply(0).getRepetition == Repetition.REQUIRED)
-
- val keyType =
- toDataType(keyValueGroup.getFields.apply(0), isBinaryAsString, isInt96AsTimestamp)
- val valueType =
- toDataType(keyValueGroup.getFields.apply(1), isBinaryAsString, isInt96AsTimestamp)
- MapType(keyType, valueType,
- keyValueGroup.getFields.apply(1).getRepetition != Repetition.REQUIRED)
- }
- case _ => {
- // Note: the order of these checks is important!
- if (correspondsToMap(groupType)) { // MapType
- val keyValueGroup = groupType.getFields.apply(0).asGroupType()
- assert(keyValueGroup.getFields.apply(0).getRepetition == Repetition.REQUIRED)
-
- val keyType =
- toDataType(keyValueGroup.getFields.apply(0), isBinaryAsString, isInt96AsTimestamp)
- val valueType =
- toDataType(keyValueGroup.getFields.apply(1), isBinaryAsString, isInt96AsTimestamp)
- MapType(keyType, valueType,
- keyValueGroup.getFields.apply(1).getRepetition != Repetition.REQUIRED)
- } else if (correspondsToArray(groupType)) { // ArrayType
- val field = groupType.getFields.apply(0)
- if (field.getName == CatalystConverter.ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME) {
- val bag = field.asGroupType()
- assert(bag.getFieldCount == 1)
- ArrayType(
- toDataType(bag.getFields.apply(0), isBinaryAsString, isInt96AsTimestamp),
- containsNull = true)
- } else {
- ArrayType(
- toDataType(field, isBinaryAsString, isInt96AsTimestamp), containsNull = false)
- }
- } else { // everything else: StructType
- val fields = groupType
- .getFields
- .map(ptype => new StructField(
- ptype.getName,
- toDataType(ptype, isBinaryAsString, isInt96AsTimestamp),
- ptype.getRepetition != Repetition.REQUIRED))
- StructType(fields)
- }
- }
- }
- }
- }
-
- /**
- * For a given Catalyst [[org.apache.spark.sql.types.DataType]] return
- * the name of the corresponding Parquet primitive type or None if the given type
- * is not primitive.
- *
- * @param ctype The type to convert
- * @return The name of the corresponding Parquet type properties
- */
- def fromPrimitiveDataType(ctype: DataType): Option[ParquetTypeInfo] = ctype match {
- case StringType => Some(ParquetTypeInfo(
- ParquetPrimitiveTypeName.BINARY, Some(ParquetOriginalType.UTF8)))
- case BinaryType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.BINARY))
- case BooleanType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.BOOLEAN))
- case DoubleType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.DOUBLE))
- case FloatType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.FLOAT))
- case IntegerType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.INT32))
- // There is no type for Byte or Short so we promote them to INT32.
- case ShortType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.INT32))
- case ByteType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.INT32))
- case DateType => Some(ParquetTypeInfo(
- ParquetPrimitiveTypeName.INT32, Some(ParquetOriginalType.DATE)))
- case LongType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.INT64))
- case TimestampType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.INT96))
- case DecimalType.Fixed(precision, scale) if precision <= 18 =>
- // TODO: for now, our writer only supports decimals that fit in a Long
- Some(ParquetTypeInfo(ParquetPrimitiveTypeName.FIXED_LEN_BYTE_ARRAY,
- Some(ParquetOriginalType.DECIMAL),
- Some(new DecimalMetadata(precision, scale)),
- Some(BYTES_FOR_PRECISION(precision))))
- case _ => None
- }
-
/**
* Compute the FIXED_LEN_BYTE_ARRAY length needed to represent a given DECIMAL precision.
*/
@@ -248,177 +53,29 @@ private[parquet] object ParquetTypesConverter extends Logging {
length
}
- /**
- * Converts a given Catalyst [[org.apache.spark.sql.types.DataType]] into
- * the corresponding Parquet `Type`.
- *
- * The conversion follows the rules below:
- * <ul>
- * <li> Primitive types are converted into Parquet's primitive types.</li>
- * <li> [[org.apache.spark.sql.types.StructType]]s are converted
- * into Parquet's `GroupType` with the corresponding field types.</li>
- * <li> [[org.apache.spark.sql.types.ArrayType]]s are converted
- * into a 2-level nested group, where the outer group has the inner
- * group as sole field. The inner group has name `values` and
- * repetition level `REPEATED` and has the element type of
- * the array as schema. We use Parquet's `ConversionPatterns` for this
- * purpose.</li>
- * <li> [[org.apache.spark.sql.types.MapType]]s are converted
- * into a nested (2-level) Parquet `GroupType` with two fields: a key
- * type and a value type. The nested group has repetition level
- * `REPEATED` and name `map`. We use Parquet's `ConversionPatterns`
- * for this purpose</li>
- * </ul>
- * Parquet's repetition level is generally set according to the following rule:
- * <ul>
- * <li> If the call to `fromDataType` is recursive inside an enclosing `ArrayType` or
- * `MapType`, then the repetition level is set to `REPEATED`.</li>
- * <li> Otherwise, if the attribute whose type is converted is `nullable`, the Parquet
- * type gets repetition level `OPTIONAL` and otherwise `REQUIRED`.</li>
- * </ul>
- *
- *@param ctype The type to convert
- * @param name The name of the [[org.apache.spark.sql.catalyst.expressions.Attribute]]
- * whose type is converted
- * @param nullable When true indicates that the attribute is nullable
- * @param inArray When true indicates that this is a nested attribute inside an array.
- * @return The corresponding Parquet type.
- */
- def fromDataType(
- ctype: DataType,
- name: String,
- nullable: Boolean = true,
- inArray: Boolean = false,
- toThriftSchemaNames: Boolean = false): ParquetType = {
- val repetition =
- if (inArray) {
- Repetition.REPEATED
- } else {
- if (nullable) Repetition.OPTIONAL else Repetition.REQUIRED
- }
- val arraySchemaName = if (toThriftSchemaNames) {
- name + CatalystConverter.THRIFT_ARRAY_ELEMENTS_SCHEMA_NAME_SUFFIX
- } else {
- CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME
- }
- val typeInfo = fromPrimitiveDataType(ctype)
- typeInfo.map {
- case ParquetTypeInfo(primitiveType, originalType, decimalMetadata, length) =>
- val builder = ParquetTypes.primitive(primitiveType, repetition).as(originalType.orNull)
- for (len <- length) {
- builder.length(len)
- }
- for (metadata <- decimalMetadata) {
- builder.precision(metadata.getPrecision).scale(metadata.getScale)
- }
- builder.named(name)
- }.getOrElse {
- ctype match {
- case udt: UserDefinedType[_] => {
- fromDataType(udt.sqlType, name, nullable, inArray, toThriftSchemaNames)
- }
- case ArrayType(elementType, false) => {
- val parquetElementType = fromDataType(
- elementType,
- arraySchemaName,
- nullable = false,
- inArray = true,
- toThriftSchemaNames)
- ConversionPatterns.listType(repetition, name, parquetElementType)
- }
- case ArrayType(elementType, true) => {
- val parquetElementType = fromDataType(
- elementType,
- arraySchemaName,
- nullable = true,
- inArray = false,
- toThriftSchemaNames)
- ConversionPatterns.listType(
- repetition,
- name,
- new ParquetGroupType(
- Repetition.REPEATED,
- CatalystConverter.ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME,
- parquetElementType))
- }
- case StructType(structFields) => {
- val fields = structFields.map {
- field => fromDataType(field.dataType, field.name, field.nullable,
- inArray = false, toThriftSchemaNames)
- }
- new ParquetGroupType(repetition, name, fields.toSeq)
- }
- case MapType(keyType, valueType, valueContainsNull) => {
- val parquetKeyType =
- fromDataType(
- keyType,
- CatalystConverter.MAP_KEY_SCHEMA_NAME,
- nullable = false,
- inArray = false,
- toThriftSchemaNames)
- val parquetValueType =
- fromDataType(
- valueType,
- CatalystConverter.MAP_VALUE_SCHEMA_NAME,
- nullable = valueContainsNull,
- inArray = false,
- toThriftSchemaNames)
- ConversionPatterns.mapType(
- repetition,
- name,
- parquetKeyType,
- parquetValueType)
- }
- case _ => throw new AnalysisException(s"Unsupported datatype $ctype")
- }
- }
- }
-
- def convertToAttributes(parquetSchema: ParquetType,
- isBinaryAsString: Boolean,
- isInt96AsTimestamp: Boolean): Seq[Attribute] = {
- parquetSchema
- .asGroupType()
- .getFields
- .map(
- field =>
- new AttributeReference(
- field.getName,
- toDataType(field, isBinaryAsString, isInt96AsTimestamp),
- field.getRepetition != Repetition.REQUIRED)())
+ def convertToAttributes(
+ parquetSchema: MessageType,
+ isBinaryAsString: Boolean,
+ isInt96AsTimestamp: Boolean): Seq[Attribute] = {
+ val converter = new CatalystSchemaConverter(
+ isBinaryAsString, isInt96AsTimestamp, followParquetFormatSpec = false)
+ converter.convert(parquetSchema).toAttributes
}
- def convertFromAttributes(attributes: Seq[Attribute],
- toThriftSchemaNames: Boolean = false): MessageType = {
- checkSpecialCharacters(attributes)
- val fields = attributes.map(
- attribute =>
- fromDataType(attribute.dataType, attribute.name, attribute.nullable,
- toThriftSchemaNames = toThriftSchemaNames))
- new MessageType("root", fields)
+ def convertFromAttributes(attributes: Seq[Attribute]): MessageType = {
+ val converter = new CatalystSchemaConverter()
+ converter.convert(StructType.fromAttributes(attributes))
}
def convertFromString(string: String): Seq[Attribute] = {
Try(DataType.fromJson(string)).getOrElse(DataType.fromCaseClassString(string)) match {
case s: StructType => s.toAttributes
- case other => throw new AnalysisException(s"Can convert $string to row")
- }
- }
-
- private def checkSpecialCharacters(schema: Seq[Attribute]) = {
- // ,;{}()\n\t= and space character are special characters in Parquet schema
- schema.map(_.name).foreach { name =>
- if (name.matches(".*[ ,;{}()\n\t=].*")) {
- throw new AnalysisException(
- s"""Attribute name "$name" contains invalid character(s) among " ,;{}()\\n\\t=".
- |Please use alias to rename it.
- """.stripMargin.split("\n").mkString(" "))
- }
+ case other => sys.error(s"Can convert $string to row")
}
}
def convertToString(schema: Seq[Attribute]): String = {
- checkSpecialCharacters(schema)
+ schema.map(_.name).foreach(CatalystSchemaConverter.checkFieldName)
StructType.fromAttributes(schema).json
}
@@ -450,8 +107,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
ParquetTypesConverter.convertToString(attributes))
// TODO: add extra data, e.g., table name, date, etc.?
- val parquetSchema: MessageType =
- ParquetTypesConverter.convertFromAttributes(attributes)
+ val parquetSchema: MessageType = ParquetTypesConverter.convertFromAttributes(attributes)
val metaData: FileMetaData = new FileMetaData(
parquetSchema,
extraMetadata,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
index 47a7be1c6a..7b16eba00d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
@@ -99,7 +99,6 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
}
test("fixed-length decimals") {
-
def makeDecimalRDD(decimal: DecimalType): DataFrame =
sqlContext.sparkContext
.parallelize(0 to 1000)
@@ -158,6 +157,11 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
checkParquetFile(data)
}
+ test("array and double") {
+ val data = (1 to 4).map(i => (i.toDouble, Seq(i.toDouble, (i + 1).toDouble)))
+ checkParquetFile(data)
+ }
+
test("struct") {
val data = (1 to 4).map(i => Tuple1((i, s"val_$i")))
withParquetDataFrame(data) { df =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
index 171a656f0e..d0bfcde7e0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
@@ -24,26 +24,109 @@ import org.apache.parquet.schema.MessageTypeParser
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.ScalaReflection
+import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.types._
-class ParquetSchemaSuite extends SparkFunSuite with ParquetTest {
- lazy val sqlContext = org.apache.spark.sql.test.TestSQLContext
+abstract class ParquetSchemaTest extends SparkFunSuite with ParquetTest {
+ val sqlContext = TestSQLContext
/**
* Checks whether the reflected Parquet message type for product type `T` conforms `messageType`.
*/
- private def testSchema[T <: Product: ClassTag: TypeTag](
- testName: String, messageType: String, isThriftDerived: Boolean = false): Unit = {
- test(testName) {
- val actual = ParquetTypesConverter.convertFromAttributes(
- ScalaReflection.attributesFor[T], isThriftDerived)
- val expected = MessageTypeParser.parseMessageType(messageType)
+ protected def testSchemaInference[T <: Product: ClassTag: TypeTag](
+ testName: String,
+ messageType: String,
+ binaryAsString: Boolean = true,
+ int96AsTimestamp: Boolean = true,
+ followParquetFormatSpec: Boolean = false,
+ isThriftDerived: Boolean = false): Unit = {
+ testSchema(
+ testName,
+ StructType.fromAttributes(ScalaReflection.attributesFor[T]),
+ messageType,
+ binaryAsString,
+ int96AsTimestamp,
+ followParquetFormatSpec,
+ isThriftDerived)
+ }
+
+ protected def testParquetToCatalyst(
+ testName: String,
+ sqlSchema: StructType,
+ parquetSchema: String,
+ binaryAsString: Boolean = true,
+ int96AsTimestamp: Boolean = true,
+ followParquetFormatSpec: Boolean = false,
+ isThriftDerived: Boolean = false): Unit = {
+ val converter = new CatalystSchemaConverter(
+ assumeBinaryIsString = binaryAsString,
+ assumeInt96IsTimestamp = int96AsTimestamp,
+ followParquetFormatSpec = followParquetFormatSpec)
+
+ test(s"sql <= parquet: $testName") {
+ val actual = converter.convert(MessageTypeParser.parseMessageType(parquetSchema))
+ val expected = sqlSchema
+ assert(
+ actual === expected,
+ s"""Schema mismatch.
+ |Expected schema: ${expected.json}
+ |Actual schema: ${actual.json}
+ """.stripMargin)
+ }
+ }
+
+ protected def testCatalystToParquet(
+ testName: String,
+ sqlSchema: StructType,
+ parquetSchema: String,
+ binaryAsString: Boolean = true,
+ int96AsTimestamp: Boolean = true,
+ followParquetFormatSpec: Boolean = false,
+ isThriftDerived: Boolean = false): Unit = {
+ val converter = new CatalystSchemaConverter(
+ assumeBinaryIsString = binaryAsString,
+ assumeInt96IsTimestamp = int96AsTimestamp,
+ followParquetFormatSpec = followParquetFormatSpec)
+
+ test(s"sql => parquet: $testName") {
+ val actual = converter.convert(sqlSchema)
+ val expected = MessageTypeParser.parseMessageType(parquetSchema)
actual.checkContains(expected)
expected.checkContains(actual)
}
}
- testSchema[(Boolean, Int, Long, Float, Double, Array[Byte])](
+ protected def testSchema(
+ testName: String,
+ sqlSchema: StructType,
+ parquetSchema: String,
+ binaryAsString: Boolean = true,
+ int96AsTimestamp: Boolean = true,
+ followParquetFormatSpec: Boolean = false,
+ isThriftDerived: Boolean = false): Unit = {
+
+ testCatalystToParquet(
+ testName,
+ sqlSchema,
+ parquetSchema,
+ binaryAsString,
+ int96AsTimestamp,
+ followParquetFormatSpec,
+ isThriftDerived)
+
+ testParquetToCatalyst(
+ testName,
+ sqlSchema,
+ parquetSchema,
+ binaryAsString,
+ int96AsTimestamp,
+ followParquetFormatSpec,
+ isThriftDerived)
+ }
+}
+
+class ParquetSchemaInferenceSuite extends ParquetSchemaTest {
+ testSchemaInference[(Boolean, Int, Long, Float, Double, Array[Byte])](
"basic types",
"""
|message root {
@@ -54,9 +137,10 @@ class ParquetSchemaSuite extends SparkFunSuite with ParquetTest {
| required double _5;
| optional binary _6;
|}
- """.stripMargin)
+ """.stripMargin,
+ binaryAsString = false)
- testSchema[(Byte, Short, Int, Long, java.sql.Date)](
+ testSchemaInference[(Byte, Short, Int, Long, java.sql.Date)](
"logical integral types",
"""
|message root {
@@ -68,27 +152,79 @@ class ParquetSchemaSuite extends SparkFunSuite with ParquetTest {
|}
""".stripMargin)
- // Currently String is the only supported logical binary type.
- testSchema[Tuple1[String]](
- "binary logical types",
+ testSchemaInference[Tuple1[String]](
+ "string",
"""
|message root {
| optional binary _1 (UTF8);
|}
+ """.stripMargin,
+ binaryAsString = true)
+
+ testSchemaInference[Tuple1[Seq[Int]]](
+ "non-nullable array - non-standard",
+ """
+ |message root {
+ | optional group _1 (LIST) {
+ | repeated int32 element;
+ | }
+ |}
""".stripMargin)
- testSchema[Tuple1[Seq[Int]]](
- "array",
+ testSchemaInference[Tuple1[Seq[Int]]](
+ "non-nullable array - standard",
+ """
+ |message root {
+ | optional group _1 (LIST) {
+ | repeated group list {
+ | required int32 element;
+ | }
+ | }
+ |}
+ """.stripMargin,
+ followParquetFormatSpec = true)
+
+ testSchemaInference[Tuple1[Seq[Integer]]](
+ "nullable array - non-standard",
"""
|message root {
| optional group _1 (LIST) {
- | repeated int32 array;
+ | repeated group bag {
+ | optional int32 element;
+ | }
| }
|}
""".stripMargin)
- testSchema[Tuple1[Map[Int, String]]](
- "map",
+ testSchemaInference[Tuple1[Seq[Integer]]](
+ "nullable array - standard",
+ """
+ |message root {
+ | optional group _1 (LIST) {
+ | repeated group list {
+ | optional int32 element;
+ | }
+ | }
+ |}
+ """.stripMargin,
+ followParquetFormatSpec = true)
+
+ testSchemaInference[Tuple1[Map[Int, String]]](
+ "map - standard",
+ """
+ |message root {
+ | optional group _1 (MAP) {
+ | repeated group key_value {
+ | required int32 key;
+ | optional binary value (UTF8);
+ | }
+ | }
+ |}
+ """.stripMargin,
+ followParquetFormatSpec = true)
+
+ testSchemaInference[Tuple1[Map[Int, String]]](
+ "map - non-standard",
"""
|message root {
| optional group _1 (MAP) {
@@ -100,7 +236,7 @@ class ParquetSchemaSuite extends SparkFunSuite with ParquetTest {
|}
""".stripMargin)
- testSchema[Tuple1[Pair[Int, String]]](
+ testSchemaInference[Tuple1[Pair[Int, String]]](
"struct",
"""
|message root {
@@ -109,20 +245,21 @@ class ParquetSchemaSuite extends SparkFunSuite with ParquetTest {
| optional binary _2 (UTF8);
| }
|}
- """.stripMargin)
+ """.stripMargin,
+ followParquetFormatSpec = true)
- testSchema[Tuple1[Map[Int, (String, Seq[(Int, Double)])]]](
- "deeply nested type",
+ testSchemaInference[Tuple1[Map[Int, (String, Seq[(Int, Double)])]]](
+ "deeply nested type - non-standard",
"""
|message root {
- | optional group _1 (MAP) {
- | repeated group map (MAP_KEY_VALUE) {
+ | optional group _1 (MAP_KEY_VALUE) {
+ | repeated group map {
| required int32 key;
| optional group value {
| optional binary _1 (UTF8);
| optional group _2 (LIST) {
| repeated group bag {
- | optional group array {
+ | optional group element {
| required int32 _1;
| required double _2;
| }
@@ -134,43 +271,76 @@ class ParquetSchemaSuite extends SparkFunSuite with ParquetTest {
|}
""".stripMargin)
- testSchema[(Option[Int], Map[Int, Option[Double]])](
- "optional types",
+ testSchemaInference[Tuple1[Map[Int, (String, Seq[(Int, Double)])]]](
+ "deeply nested type - standard",
"""
|message root {
- | optional int32 _1;
- | optional group _2 (MAP) {
- | repeated group map (MAP_KEY_VALUE) {
+ | optional group _1 (MAP) {
+ | repeated group key_value {
| required int32 key;
- | optional double value;
+ | optional group value {
+ | optional binary _1 (UTF8);
+ | optional group _2 (LIST) {
+ | repeated group list {
+ | optional group element {
+ | required int32 _1;
+ | required double _2;
+ | }
+ | }
+ | }
+ | }
| }
| }
|}
- """.stripMargin)
+ """.stripMargin,
+ followParquetFormatSpec = true)
- // Test for SPARK-4520 -- ensure that thrift generated parquet schema is generated
- // as expected from attributes
- testSchema[(Array[Byte], Array[Byte], Array[Byte], Seq[Int], Map[Array[Byte], Seq[Int]])](
- "thrift generated parquet schema",
+ testSchemaInference[(Option[Int], Map[Int, Option[Double]])](
+ "optional types",
"""
|message root {
- | optional binary _1 (UTF8);
- | optional binary _2 (UTF8);
- | optional binary _3 (UTF8);
- | optional group _4 (LIST) {
- | repeated int32 _4_tuple;
- | }
- | optional group _5 (MAP) {
- | repeated group map (MAP_KEY_VALUE) {
- | required binary key (UTF8);
- | optional group value (LIST) {
- | repeated int32 value_tuple;
- | }
+ | optional int32 _1;
+ | optional group _2 (MAP) {
+ | repeated group key_value {
+ | required int32 key;
+ | optional double value;
| }
| }
|}
- """.stripMargin, isThriftDerived = true)
+ """.stripMargin,
+ followParquetFormatSpec = true)
+ // Parquet files generated by parquet-thrift are already handled by the schema converter, but
+ // let's leave this test here until both read path and write path are all updated.
+ ignore("thrift generated parquet schema") {
+ // Test for SPARK-4520 -- ensure that thrift generated parquet schema is generated
+ // as expected from attributes
+ testSchemaInference[(
+ Array[Byte], Array[Byte], Array[Byte], Seq[Int], Map[Array[Byte], Seq[Int]])](
+ "thrift generated parquet schema",
+ """
+ |message root {
+ | optional binary _1 (UTF8);
+ | optional binary _2 (UTF8);
+ | optional binary _3 (UTF8);
+ | optional group _4 (LIST) {
+ | repeated int32 _4_tuple;
+ | }
+ | optional group _5 (MAP) {
+ | repeated group map (MAP_KEY_VALUE) {
+ | required binary key (UTF8);
+ | optional group value (LIST) {
+ | repeated int32 value_tuple;
+ | }
+ | }
+ | }
+ |}
+ """.stripMargin,
+ isThriftDerived = true)
+ }
+}
+
+class ParquetSchemaSuite extends ParquetSchemaTest {
test("DataType string parser compatibility") {
// This is the generated string from previous versions of the Spark SQL, using the following:
// val schema = StructType(List(
@@ -180,10 +350,7 @@ class ParquetSchemaSuite extends SparkFunSuite with ParquetTest {
"StructType(List(StructField(c1,IntegerType,false), StructField(c2,BinaryType,true)))"
// scalastyle:off
- val jsonString =
- """
- |{"type":"struct","fields":[{"name":"c1","type":"integer","nullable":false,"metadata":{}},{"name":"c2","type":"binary","nullable":true,"metadata":{}}]}
- """.stripMargin
+ val jsonString = """{"type":"struct","fields":[{"name":"c1","type":"integer","nullable":false,"metadata":{}},{"name":"c2","type":"binary","nullable":true,"metadata":{}}]}"""
// scalastyle:on
val fromCaseClassString = ParquetTypesConverter.convertFromString(caseClassString)
@@ -277,4 +444,465 @@ class ParquetSchemaSuite extends SparkFunSuite with ParquetTest {
StructField("secondField", StringType, nullable = true))))
}.getMessage.contains("detected conflicting schemas"))
}
+
+ // =======================================================
+ // Tests for converting Parquet LIST to Catalyst ArrayType
+ // =======================================================
+
+ testParquetToCatalyst(
+ "Backwards-compatibility: LIST with nullable element type - 1 - standard",
+ StructType(Seq(
+ StructField(
+ "f1",
+ ArrayType(IntegerType, containsNull = true),
+ nullable = true))),
+ """message root {
+ | optional group f1 (LIST) {
+ | repeated group list {
+ | optional int32 element;
+ | }
+ | }
+ |}
+ """.stripMargin)
+
+ testParquetToCatalyst(
+ "Backwards-compatibility: LIST with nullable element type - 2",
+ StructType(Seq(
+ StructField(
+ "f1",
+ ArrayType(IntegerType, containsNull = true),
+ nullable = true))),
+ """message root {
+ | optional group f1 (LIST) {
+ | repeated group element {
+ | optional int32 num;
+ | }
+ | }
+ |}
+ """.stripMargin)
+
+ testParquetToCatalyst(
+ "Backwards-compatibility: LIST with non-nullable element type - 1 - standard",
+ StructType(Seq(
+ StructField("f1", ArrayType(IntegerType, containsNull = false), nullable = true))),
+ """message root {
+ | optional group f1 (LIST) {
+ | repeated group list {
+ | required int32 element;
+ | }
+ | }
+ |}
+ """.stripMargin)
+
+ testParquetToCatalyst(
+ "Backwards-compatibility: LIST with non-nullable element type - 2",
+ StructType(Seq(
+ StructField("f1", ArrayType(IntegerType, containsNull = false), nullable = true))),
+ """message root {
+ | optional group f1 (LIST) {
+ | repeated group element {
+ | required int32 num;
+ | }
+ | }
+ |}
+ """.stripMargin)
+
+ testParquetToCatalyst(
+ "Backwards-compatibility: LIST with non-nullable element type - 3",
+ StructType(Seq(
+ StructField("f1", ArrayType(IntegerType, containsNull = false), nullable = true))),
+ """message root {
+ | optional group f1 (LIST) {
+ | repeated int32 element;
+ | }
+ |}
+ """.stripMargin)
+
+ testParquetToCatalyst(
+ "Backwards-compatibility: LIST with non-nullable element type - 4",
+ StructType(Seq(
+ StructField(
+ "f1",
+ ArrayType(
+ StructType(Seq(
+ StructField("str", StringType, nullable = false),
+ StructField("num", IntegerType, nullable = false))),
+ containsNull = false),
+ nullable = true))),
+ """message root {
+ | optional group f1 (LIST) {
+ | repeated group element {
+ | required binary str (UTF8);
+ | required int32 num;
+ | }
+ | }
+ |}
+ """.stripMargin)
+
+ testParquetToCatalyst(
+ "Backwards-compatibility: LIST with non-nullable element type - 5 - parquet-avro style",
+ StructType(Seq(
+ StructField(
+ "f1",
+ ArrayType(
+ StructType(Seq(
+ StructField("str", StringType, nullable = false))),
+ containsNull = false),
+ nullable = true))),
+ """message root {
+ | optional group f1 (LIST) {
+ | repeated group array {
+ | required binary str (UTF8);
+ | }
+ | }
+ |}
+ """.stripMargin)
+
+ testParquetToCatalyst(
+ "Backwards-compatibility: LIST with non-nullable element type - 6 - parquet-thrift style",
+ StructType(Seq(
+ StructField(
+ "f1",
+ ArrayType(
+ StructType(Seq(
+ StructField("str", StringType, nullable = false))),
+ containsNull = false),
+ nullable = true))),
+ """message root {
+ | optional group f1 (LIST) {
+ | repeated group f1_tuple {
+ | required binary str (UTF8);
+ | }
+ | }
+ |}
+ """.stripMargin)
+
+ // =======================================================
+ // Tests for converting Catalyst ArrayType to Parquet LIST
+ // =======================================================
+
+ testCatalystToParquet(
+ "Backwards-compatibility: LIST with nullable element type - 1 - standard",
+ StructType(Seq(
+ StructField(
+ "f1",
+ ArrayType(IntegerType, containsNull = true),
+ nullable = true))),
+ """message root {
+ | optional group f1 (LIST) {
+ | repeated group list {
+ | optional int32 element;
+ | }
+ | }
+ |}
+ """.stripMargin,
+ followParquetFormatSpec = true)
+
+ testCatalystToParquet(
+ "Backwards-compatibility: LIST with nullable element type - 2 - prior to 1.4.x",
+ StructType(Seq(
+ StructField(
+ "f1",
+ ArrayType(IntegerType, containsNull = true),
+ nullable = true))),
+ """message root {
+ | optional group f1 (LIST) {
+ | repeated group bag {
+ | optional int32 element;
+ | }
+ | }
+ |}
+ """.stripMargin)
+
+ testCatalystToParquet(
+ "Backwards-compatibility: LIST with non-nullable element type - 1 - standard",
+ StructType(Seq(
+ StructField(
+ "f1",
+ ArrayType(IntegerType, containsNull = false),
+ nullable = true))),
+ """message root {
+ | optional group f1 (LIST) {
+ | repeated group list {
+ | required int32 element;
+ | }
+ | }
+ |}
+ """.stripMargin,
+ followParquetFormatSpec = true)
+
+ testCatalystToParquet(
+ "Backwards-compatibility: LIST with non-nullable element type - 2 - prior to 1.4.x",
+ StructType(Seq(
+ StructField(
+ "f1",
+ ArrayType(IntegerType, containsNull = false),
+ nullable = true))),
+ """message root {
+ | optional group f1 (LIST) {
+ | repeated int32 element;
+ | }
+ |}
+ """.stripMargin)
+
+ // ====================================================
+ // Tests for converting Parquet Map to Catalyst MapType
+ // ====================================================
+
+ testParquetToCatalyst(
+ "Backwards-compatibility: MAP with non-nullable value type - 1 - standard",
+ StructType(Seq(
+ StructField(
+ "f1",
+ MapType(IntegerType, StringType, valueContainsNull = false),
+ nullable = true))),
+ """message root {
+ | optional group f1 (MAP) {
+ | repeated group key_value {
+ | required int32 key;
+ | required binary value (UTF8);
+ | }
+ | }
+ |}
+ """.stripMargin)
+
+ testParquetToCatalyst(
+ "Backwards-compatibility: MAP with non-nullable value type - 2",
+ StructType(Seq(
+ StructField(
+ "f1",
+ MapType(IntegerType, StringType, valueContainsNull = false),
+ nullable = true))),
+ """message root {
+ | optional group f1 (MAP_KEY_VALUE) {
+ | repeated group map {
+ | required int32 num;
+ | required binary str (UTF8);
+ | }
+ | }
+ |}
+ """.stripMargin)
+
+ testParquetToCatalyst(
+ "Backwards-compatibility: MAP with non-nullable value type - 3 - prior to 1.4.x",
+ StructType(Seq(
+ StructField(
+ "f1",
+ MapType(IntegerType, StringType, valueContainsNull = false),
+ nullable = true))),
+ """message root {
+ | optional group f1 (MAP) {
+ | repeated group map (MAP_KEY_VALUE) {
+ | required int32 key;
+ | required binary value (UTF8);
+ | }
+ | }
+ |}
+ """.stripMargin)
+
+ testParquetToCatalyst(
+ "Backwards-compatibility: MAP with nullable value type - 1 - standard",
+ StructType(Seq(
+ StructField(
+ "f1",
+ MapType(IntegerType, StringType, valueContainsNull = true),
+ nullable = true))),
+ """message root {
+ | optional group f1 (MAP) {
+ | repeated group key_value {
+ | required int32 key;
+ | optional binary value (UTF8);
+ | }
+ | }
+ |}
+ """.stripMargin)
+
+ testParquetToCatalyst(
+ "Backwards-compatibility: MAP with nullable value type - 2",
+ StructType(Seq(
+ StructField(
+ "f1",
+ MapType(IntegerType, StringType, valueContainsNull = true),
+ nullable = true))),
+ """message root {
+ | optional group f1 (MAP_KEY_VALUE) {
+ | repeated group map {
+ | required int32 num;
+ | optional binary str (UTF8);
+ | }
+ | }
+ |}
+ """.stripMargin)
+
+ testParquetToCatalyst(
+ "Backwards-compatibility: MAP with nullable value type - 3 - parquet-avro style",
+ StructType(Seq(
+ StructField(
+ "f1",
+ MapType(IntegerType, StringType, valueContainsNull = true),
+ nullable = true))),
+ """message root {
+ | optional group f1 (MAP) {
+ | repeated group map (MAP_KEY_VALUE) {
+ | required int32 key;
+ | optional binary value (UTF8);
+ | }
+ | }
+ |}
+ """.stripMargin)
+
+ // ====================================================
+ // Tests for converting Catalyst MapType to Parquet Map
+ // ====================================================
+
+ testCatalystToParquet(
+ "Backwards-compatibility: MAP with non-nullable value type - 1 - standard",
+ StructType(Seq(
+ StructField(
+ "f1",
+ MapType(IntegerType, StringType, valueContainsNull = false),
+ nullable = true))),
+ """message root {
+ | optional group f1 (MAP) {
+ | repeated group key_value {
+ | required int32 key;
+ | required binary value (UTF8);
+ | }
+ | }
+ |}
+ """.stripMargin,
+ followParquetFormatSpec = true)
+
+ testCatalystToParquet(
+ "Backwards-compatibility: MAP with non-nullable value type - 2 - prior to 1.4.x",
+ StructType(Seq(
+ StructField(
+ "f1",
+ MapType(IntegerType, StringType, valueContainsNull = false),
+ nullable = true))),
+ """message root {
+ | optional group f1 (MAP) {
+ | repeated group map (MAP_KEY_VALUE) {
+ | required int32 key;
+ | required binary value (UTF8);
+ | }
+ | }
+ |}
+ """.stripMargin)
+
+ testCatalystToParquet(
+ "Backwards-compatibility: MAP with nullable value type - 1 - standard",
+ StructType(Seq(
+ StructField(
+ "f1",
+ MapType(IntegerType, StringType, valueContainsNull = true),
+ nullable = true))),
+ """message root {
+ | optional group f1 (MAP) {
+ | repeated group key_value {
+ | required int32 key;
+ | optional binary value (UTF8);
+ | }
+ | }
+ |}
+ """.stripMargin,
+ followParquetFormatSpec = true)
+
+ testCatalystToParquet(
+ "Backwards-compatibility: MAP with nullable value type - 3 - prior to 1.4.x",
+ StructType(Seq(
+ StructField(
+ "f1",
+ MapType(IntegerType, StringType, valueContainsNull = true),
+ nullable = true))),
+ """message root {
+ | optional group f1 (MAP) {
+ | repeated group map (MAP_KEY_VALUE) {
+ | required int32 key;
+ | optional binary value (UTF8);
+ | }
+ | }
+ |}
+ """.stripMargin)
+
+ // =================================
+ // Tests for conversion for decimals
+ // =================================
+
+ testSchema(
+ "DECIMAL(1, 0) - standard",
+ StructType(Seq(StructField("f1", DecimalType(1, 0)))),
+ """message root {
+ | optional int32 f1 (DECIMAL(1, 0));
+ |}
+ """.stripMargin,
+ followParquetFormatSpec = true)
+
+ testSchema(
+ "DECIMAL(8, 3) - standard",
+ StructType(Seq(StructField("f1", DecimalType(8, 3)))),
+ """message root {
+ | optional int32 f1 (DECIMAL(8, 3));
+ |}
+ """.stripMargin,
+ followParquetFormatSpec = true)
+
+ testSchema(
+ "DECIMAL(9, 3) - standard",
+ StructType(Seq(StructField("f1", DecimalType(9, 3)))),
+ """message root {
+ | optional int32 f1 (DECIMAL(9, 3));
+ |}
+ """.stripMargin,
+ followParquetFormatSpec = true)
+
+ testSchema(
+ "DECIMAL(18, 3) - standard",
+ StructType(Seq(StructField("f1", DecimalType(18, 3)))),
+ """message root {
+ | optional int64 f1 (DECIMAL(18, 3));
+ |}
+ """.stripMargin,
+ followParquetFormatSpec = true)
+
+ testSchema(
+ "DECIMAL(19, 3) - standard",
+ StructType(Seq(StructField("f1", DecimalType(19, 3)))),
+ """message root {
+ | optional fixed_len_byte_array(9) f1 (DECIMAL(19, 3));
+ |}
+ """.stripMargin,
+ followParquetFormatSpec = true)
+
+ testSchema(
+ "DECIMAL(1, 0) - prior to 1.4.x",
+ StructType(Seq(StructField("f1", DecimalType(1, 0)))),
+ """message root {
+ | optional fixed_len_byte_array(1) f1 (DECIMAL(1, 0));
+ |}
+ """.stripMargin)
+
+ testSchema(
+ "DECIMAL(8, 3) - prior to 1.4.x",
+ StructType(Seq(StructField("f1", DecimalType(8, 3)))),
+ """message root {
+ | optional fixed_len_byte_array(4) f1 (DECIMAL(8, 3));
+ |}
+ """.stripMargin)
+
+ testSchema(
+ "DECIMAL(9, 3) - prior to 1.4.x",
+ StructType(Seq(StructField("f1", DecimalType(9, 3)))),
+ """message root {
+ | optional fixed_len_byte_array(5) f1 (DECIMAL(9, 3));
+ |}
+ """.stripMargin)
+
+ testSchema(
+ "DECIMAL(18, 3) - prior to 1.4.x",
+ StructType(Seq(StructField("f1", DecimalType(18, 3)))),
+ """message root {
+ | optional fixed_len_byte_array(8) f1 (DECIMAL(18, 3));
+ |}
+ """.stripMargin)
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index a2e666586c..f0aad8dbbe 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -638,7 +638,7 @@ class SQLQuerySuite extends QueryTest {
test("SPARK-5203 union with different decimal precision") {
Seq.empty[(Decimal, Decimal)]
.toDF("d1", "d2")
- .select($"d1".cast(DecimalType(10, 15)).as("d"))
+ .select($"d1".cast(DecimalType(10, 5)).as("d"))
.registerTempTable("dn")
sql("select d from dn union all select d * 2 from dn")