aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala11
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala61
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala14
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala289
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala2
6 files changed, 231 insertions, 148 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index b9fb90d964..e7bbc7d5db 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -290,9 +290,9 @@ private[spark] object SQLConf {
defaultValue = Some(true),
doc = "Enables Parquet filter push-down optimization when set to true.")
- val PARQUET_FOLLOW_PARQUET_FORMAT_SPEC = booleanConf(
- key = "spark.sql.parquet.followParquetFormatSpec",
- defaultValue = Some(false),
+ val PARQUET_WRITE_LEGACY_FORMAT = booleanConf(
+ key = "spark.sql.parquet.writeLegacyFormat",
+ defaultValue = Some(true),
doc = "Whether to follow Parquet's format specification when converting Parquet schema to " +
"Spark SQL schema and vice versa.",
isPublic = false)
@@ -304,8 +304,7 @@ private[spark] object SQLConf {
"subclass of org.apache.hadoop.mapreduce.OutputCommitter. Typically, it's also a subclass " +
"of org.apache.parquet.hadoop.ParquetOutputCommitter. NOTE: 1. Instead of SQLConf, this " +
"option must be set in Hadoop Configuration. 2. This option overrides " +
- "\"spark.sql.sources.outputCommitterClass\"."
- )
+ "\"spark.sql.sources.outputCommitterClass\".")
val ORC_FILTER_PUSHDOWN_ENABLED = booleanConf("spark.sql.orc.filterPushdown",
defaultValue = Some(false),
@@ -491,7 +490,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
private[spark] def isParquetINT96AsTimestamp: Boolean = getConf(PARQUET_INT96_AS_TIMESTAMP)
- private[spark] def followParquetFormatSpec: Boolean = getConf(PARQUET_FOLLOW_PARQUET_FORMAT_SPEC)
+ private[spark] def writeLegacyParquetFormat: Boolean = getConf(PARQUET_WRITE_LEGACY_FORMAT)
private[spark] def inMemoryPartitionPruning: Boolean = getConf(IN_MEMORY_PARTITION_PRUNING)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
index 9502b835a5..5325698034 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
@@ -263,7 +263,7 @@ private[parquet] object CatalystReadSupport {
private def clipParquetGroupFields(
parquetRecord: GroupType, structType: StructType): Seq[Type] = {
val parquetFieldMap = parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap
- val toParquet = new CatalystSchemaConverter(followParquetFormatSpec = true)
+ val toParquet = new CatalystSchemaConverter(writeLegacyParquetFormat = false)
structType.map { f =>
parquetFieldMap
.get(f.name)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
index 97ffeb08aa..6904fc736c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
@@ -41,34 +41,31 @@ import org.apache.spark.sql.{AnalysisException, SQLConf}
* @constructor
* @param assumeBinaryIsString Whether unannotated BINARY fields should be assumed to be Spark SQL
* [[StringType]] fields when converting Parquet a [[MessageType]] to Spark SQL
- * [[StructType]].
+ * [[StructType]]. This argument only affects Parquet read path.
* @param assumeInt96IsTimestamp Whether unannotated INT96 fields should be assumed to be Spark SQL
* [[TimestampType]] fields when converting Parquet a [[MessageType]] to Spark SQL
* [[StructType]]. Note that Spark SQL [[TimestampType]] is similar to Hive timestamp, which
* has optional nanosecond precision, but different from `TIME_MILLS` and `TIMESTAMP_MILLIS`
- * described in Parquet format spec.
- * @param followParquetFormatSpec Whether to generate standard DECIMAL, LIST, and MAP structure when
- * converting Spark SQL [[StructType]] to Parquet [[MessageType]]. For Spark 1.4.x and
- * prior versions, Spark SQL only supports decimals with a max precision of 18 digits, and
- * uses non-standard LIST and MAP structure. Note that the current Parquet format spec is
- * backwards-compatible with these settings. If this argument is set to `false`, we fallback
- * to old style non-standard behaviors.
+ * described in Parquet format spec. This argument only affects Parquet read path.
+ * @param writeLegacyParquetFormat Whether to use legacy Parquet format compatible with Spark 1.4
+ * and prior versions when converting a Catalyst [[StructType]] to a Parquet [[MessageType]].
+ * When set to false, use standard format defined in parquet-format spec. This argument only
+ * affects Parquet write path.
*/
private[parquet] class CatalystSchemaConverter(
assumeBinaryIsString: Boolean = SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get,
assumeInt96IsTimestamp: Boolean = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get,
- followParquetFormatSpec: Boolean = SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.defaultValue.get
-) {
+ writeLegacyParquetFormat: Boolean = SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get) {
def this(conf: SQLConf) = this(
assumeBinaryIsString = conf.isParquetBinaryAsString,
assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp,
- followParquetFormatSpec = conf.followParquetFormatSpec)
+ writeLegacyParquetFormat = conf.writeLegacyParquetFormat)
def this(conf: Configuration) = this(
assumeBinaryIsString = conf.get(SQLConf.PARQUET_BINARY_AS_STRING.key).toBoolean,
assumeInt96IsTimestamp = conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean,
- followParquetFormatSpec = conf.get(SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.key).toBoolean)
+ writeLegacyParquetFormat = conf.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key).toBoolean)
/**
* Converts Parquet [[MessageType]] `parquetSchema` to a Spark SQL [[StructType]].
@@ -371,15 +368,15 @@ private[parquet] class CatalystSchemaConverter(
case BinaryType =>
Types.primitive(BINARY, repetition).named(field.name)
- // =====================================
- // Decimals (for Spark version <= 1.4.x)
- // =====================================
+ // ======================
+ // Decimals (legacy mode)
+ // ======================
// Spark 1.4.x and prior versions only support decimals with a maximum precision of 18 and
// always store decimals in fixed-length byte arrays. To keep compatibility with these older
// versions, here we convert decimals with all precisions to `FIXED_LEN_BYTE_ARRAY` annotated
// by `DECIMAL`.
- case DecimalType.Fixed(precision, scale) if !followParquetFormatSpec =>
+ case DecimalType.Fixed(precision, scale) if writeLegacyParquetFormat =>
Types
.primitive(FIXED_LEN_BYTE_ARRAY, repetition)
.as(DECIMAL)
@@ -388,13 +385,13 @@ private[parquet] class CatalystSchemaConverter(
.length(CatalystSchemaConverter.minBytesForPrecision(precision))
.named(field.name)
- // =====================================
- // Decimals (follow Parquet format spec)
- // =====================================
+ // ========================
+ // Decimals (standard mode)
+ // ========================
// Uses INT32 for 1 <= precision <= 9
case DecimalType.Fixed(precision, scale)
- if precision <= MAX_PRECISION_FOR_INT32 && followParquetFormatSpec =>
+ if precision <= MAX_PRECISION_FOR_INT32 && !writeLegacyParquetFormat =>
Types
.primitive(INT32, repetition)
.as(DECIMAL)
@@ -404,7 +401,7 @@ private[parquet] class CatalystSchemaConverter(
// Uses INT64 for 1 <= precision <= 18
case DecimalType.Fixed(precision, scale)
- if precision <= MAX_PRECISION_FOR_INT64 && followParquetFormatSpec =>
+ if precision <= MAX_PRECISION_FOR_INT64 && !writeLegacyParquetFormat =>
Types
.primitive(INT64, repetition)
.as(DECIMAL)
@@ -413,7 +410,7 @@ private[parquet] class CatalystSchemaConverter(
.named(field.name)
// Uses FIXED_LEN_BYTE_ARRAY for all other precisions
- case DecimalType.Fixed(precision, scale) if followParquetFormatSpec =>
+ case DecimalType.Fixed(precision, scale) if !writeLegacyParquetFormat =>
Types
.primitive(FIXED_LEN_BYTE_ARRAY, repetition)
.as(DECIMAL)
@@ -422,15 +419,15 @@ private[parquet] class CatalystSchemaConverter(
.length(CatalystSchemaConverter.minBytesForPrecision(precision))
.named(field.name)
- // ===================================================
- // ArrayType and MapType (for Spark versions <= 1.4.x)
- // ===================================================
+ // ===================================
+ // ArrayType and MapType (legacy mode)
+ // ===================================
// Spark 1.4.x and prior versions convert `ArrayType` with nullable elements into a 3-level
// `LIST` structure. This behavior is somewhat a hybrid of parquet-hive and parquet-avro
// (1.6.0rc3): the 3-level structure is similar to parquet-hive while the 3rd level element
// field name "array" is borrowed from parquet-avro.
- case ArrayType(elementType, nullable @ true) if !followParquetFormatSpec =>
+ case ArrayType(elementType, nullable @ true) if writeLegacyParquetFormat =>
// <list-repetition> group <name> (LIST) {
// optional group bag {
// repeated <element-type> array;
@@ -448,7 +445,7 @@ private[parquet] class CatalystSchemaConverter(
// Spark 1.4.x and prior versions convert ArrayType with non-nullable elements into a 2-level
// LIST structure. This behavior mimics parquet-avro (1.6.0rc3). Note that this case is
// covered by the backwards-compatibility rules implemented in `isElementType()`.
- case ArrayType(elementType, nullable @ false) if !followParquetFormatSpec =>
+ case ArrayType(elementType, nullable @ false) if writeLegacyParquetFormat =>
// <list-repetition> group <name> (LIST) {
// repeated <element-type> element;
// }
@@ -460,7 +457,7 @@ private[parquet] class CatalystSchemaConverter(
// Spark 1.4.x and prior versions convert MapType into a 3-level group annotated by
// MAP_KEY_VALUE. This is covered by `convertGroupField(field: GroupType): DataType`.
- case MapType(keyType, valueType, valueContainsNull) if !followParquetFormatSpec =>
+ case MapType(keyType, valueType, valueContainsNull) if writeLegacyParquetFormat =>
// <map-repetition> group <name> (MAP) {
// repeated group map (MAP_KEY_VALUE) {
// required <key-type> key;
@@ -473,11 +470,11 @@ private[parquet] class CatalystSchemaConverter(
convertField(StructField("key", keyType, nullable = false)),
convertField(StructField("value", valueType, valueContainsNull)))
- // ==================================================
- // ArrayType and MapType (follow Parquet format spec)
- // ==================================================
+ // =====================================
+ // ArrayType and MapType (standard mode)
+ // =====================================
- case ArrayType(elementType, containsNull) if followParquetFormatSpec =>
+ case ArrayType(elementType, containsNull) if !writeLegacyParquetFormat =>
// <list-repetition> group <name> (LIST) {
// repeated group list {
// <element-repetition> <element-type> element;
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index 953fcab126..8a9c0e733a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -287,7 +287,7 @@ private[sql] class ParquetRelation(
val parquetFilterPushDown = sqlContext.conf.parquetFilterPushDown
val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString
val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp
- val followParquetFormatSpec = sqlContext.conf.followParquetFormatSpec
+ val writeLegacyParquetFormat = sqlContext.conf.writeLegacyParquetFormat
// Parquet row group size. We will use this value as the value for
// mapreduce.input.fileinputformat.split.minsize and mapred.min.split.size if the value
@@ -305,7 +305,7 @@ private[sql] class ParquetRelation(
parquetFilterPushDown,
assumeBinaryIsString,
assumeInt96IsTimestamp,
- followParquetFormatSpec) _
+ writeLegacyParquetFormat) _
// Create the function to set input paths at the driver side.
val setInputPaths =
@@ -531,7 +531,7 @@ private[sql] object ParquetRelation extends Logging {
parquetFilterPushDown: Boolean,
assumeBinaryIsString: Boolean,
assumeInt96IsTimestamp: Boolean,
- followParquetFormatSpec: Boolean)(job: Job): Unit = {
+ writeLegacyParquetFormat: Boolean)(job: Job): Unit = {
val conf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName)
@@ -561,7 +561,7 @@ private[sql] object ParquetRelation extends Logging {
// Sets flags for Parquet schema conversion
conf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING.key, assumeBinaryIsString)
conf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, assumeInt96IsTimestamp)
- conf.setBoolean(SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.key, followParquetFormatSpec)
+ conf.setBoolean(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key, writeLegacyParquetFormat)
overrideMinSplitSize(parquetBlockSize, conf)
}
@@ -586,7 +586,7 @@ private[sql] object ParquetRelation extends Logging {
val converter = new CatalystSchemaConverter(
sqlContext.conf.isParquetBinaryAsString,
sqlContext.conf.isParquetBinaryAsString,
- sqlContext.conf.followParquetFormatSpec)
+ sqlContext.conf.writeLegacyParquetFormat)
converter.convert(schema)
}
@@ -720,7 +720,7 @@ private[sql] object ParquetRelation extends Logging {
filesToTouch: Seq[FileStatus], sqlContext: SQLContext): Option[StructType] = {
val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString
val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp
- val followParquetFormatSpec = sqlContext.conf.followParquetFormatSpec
+ val writeLegacyParquetFormat = sqlContext.conf.writeLegacyParquetFormat
val serializedConf = new SerializableConfiguration(sqlContext.sparkContext.hadoopConfiguration)
// !! HACK ALERT !!
@@ -760,7 +760,7 @@ private[sql] object ParquetRelation extends Logging {
new CatalystSchemaConverter(
assumeBinaryIsString = assumeBinaryIsString,
assumeInt96IsTimestamp = assumeInt96IsTimestamp,
- followParquetFormatSpec = followParquetFormatSpec)
+ writeLegacyParquetFormat = writeLegacyParquetFormat)
footers.map { footer =>
ParquetRelation.readSchemaFromFooter(footer, converter)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index 5a8f772c32..f17fb36f25 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -22,7 +22,6 @@ import scala.reflect.runtime.universe.TypeTag
import org.apache.parquet.schema.MessageTypeParser
-import org.apache.spark.sql.SQLConf
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
@@ -35,32 +34,29 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSQLContext {
protected def testSchemaInference[T <: Product: ClassTag: TypeTag](
testName: String,
messageType: String,
- binaryAsString: Boolean = true,
- int96AsTimestamp: Boolean = true,
- followParquetFormatSpec: Boolean = false,
- isThriftDerived: Boolean = false): Unit = {
+ binaryAsString: Boolean,
+ int96AsTimestamp: Boolean,
+ writeLegacyParquetFormat: Boolean): Unit = {
testSchema(
testName,
StructType.fromAttributes(ScalaReflection.attributesFor[T]),
messageType,
binaryAsString,
int96AsTimestamp,
- followParquetFormatSpec,
- isThriftDerived)
+ writeLegacyParquetFormat)
}
protected def testParquetToCatalyst(
testName: String,
sqlSchema: StructType,
parquetSchema: String,
- binaryAsString: Boolean = true,
- int96AsTimestamp: Boolean = true,
- followParquetFormatSpec: Boolean = false,
- isThriftDerived: Boolean = false): Unit = {
+ binaryAsString: Boolean,
+ int96AsTimestamp: Boolean,
+ writeLegacyParquetFormat: Boolean): Unit = {
val converter = new CatalystSchemaConverter(
assumeBinaryIsString = binaryAsString,
assumeInt96IsTimestamp = int96AsTimestamp,
- followParquetFormatSpec = followParquetFormatSpec)
+ writeLegacyParquetFormat = writeLegacyParquetFormat)
test(s"sql <= parquet: $testName") {
val actual = converter.convert(MessageTypeParser.parseMessageType(parquetSchema))
@@ -78,14 +74,13 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSQLContext {
testName: String,
sqlSchema: StructType,
parquetSchema: String,
- binaryAsString: Boolean = true,
- int96AsTimestamp: Boolean = true,
- followParquetFormatSpec: Boolean = false,
- isThriftDerived: Boolean = false): Unit = {
+ binaryAsString: Boolean,
+ int96AsTimestamp: Boolean,
+ writeLegacyParquetFormat: Boolean): Unit = {
val converter = new CatalystSchemaConverter(
assumeBinaryIsString = binaryAsString,
assumeInt96IsTimestamp = int96AsTimestamp,
- followParquetFormatSpec = followParquetFormatSpec)
+ writeLegacyParquetFormat = writeLegacyParquetFormat)
test(s"sql => parquet: $testName") {
val actual = converter.convert(sqlSchema)
@@ -99,10 +94,9 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSQLContext {
testName: String,
sqlSchema: StructType,
parquetSchema: String,
- binaryAsString: Boolean = true,
- int96AsTimestamp: Boolean = true,
- followParquetFormatSpec: Boolean = false,
- isThriftDerived: Boolean = false): Unit = {
+ binaryAsString: Boolean,
+ int96AsTimestamp: Boolean,
+ writeLegacyParquetFormat: Boolean): Unit = {
testCatalystToParquet(
testName,
@@ -110,8 +104,7 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSQLContext {
parquetSchema,
binaryAsString,
int96AsTimestamp,
- followParquetFormatSpec,
- isThriftDerived)
+ writeLegacyParquetFormat)
testParquetToCatalyst(
testName,
@@ -119,8 +112,7 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSQLContext {
parquetSchema,
binaryAsString,
int96AsTimestamp,
- followParquetFormatSpec,
- isThriftDerived)
+ writeLegacyParquetFormat)
}
}
@@ -137,7 +129,9 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest {
| optional binary _6;
|}
""".stripMargin,
- binaryAsString = false)
+ binaryAsString = false,
+ int96AsTimestamp = true,
+ writeLegacyParquetFormat = true)
testSchemaInference[(Byte, Short, Int, Long, java.sql.Date)](
"logical integral types",
@@ -149,7 +143,10 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest {
| required int64 _4 (INT_64);
| optional int32 _5 (DATE);
|}
- """.stripMargin)
+ """.stripMargin,
+ binaryAsString = true,
+ int96AsTimestamp = true,
+ writeLegacyParquetFormat = true)
testSchemaInference[Tuple1[String]](
"string",
@@ -158,7 +155,9 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest {
| optional binary _1 (UTF8);
|}
""".stripMargin,
- binaryAsString = true)
+ binaryAsString = true,
+ int96AsTimestamp = true,
+ writeLegacyParquetFormat = true)
testSchemaInference[Tuple1[String]](
"binary enum as string",
@@ -166,7 +165,10 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest {
|message root {
| optional binary _1 (ENUM);
|}
- """.stripMargin)
+ """.stripMargin,
+ binaryAsString = true,
+ int96AsTimestamp = true,
+ writeLegacyParquetFormat = true)
testSchemaInference[Tuple1[Seq[Int]]](
"non-nullable array - non-standard",
@@ -176,7 +178,10 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest {
| repeated int32 array;
| }
|}
- """.stripMargin)
+ """.stripMargin,
+ binaryAsString = true,
+ int96AsTimestamp = true,
+ writeLegacyParquetFormat = true)
testSchemaInference[Tuple1[Seq[Int]]](
"non-nullable array - standard",
@@ -189,7 +194,9 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest {
| }
|}
""".stripMargin,
- followParquetFormatSpec = true)
+ binaryAsString = true,
+ int96AsTimestamp = true,
+ writeLegacyParquetFormat = false)
testSchemaInference[Tuple1[Seq[Integer]]](
"nullable array - non-standard",
@@ -201,7 +208,10 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest {
| }
| }
|}
- """.stripMargin)
+ """.stripMargin,
+ binaryAsString = true,
+ int96AsTimestamp = true,
+ writeLegacyParquetFormat = true)
testSchemaInference[Tuple1[Seq[Integer]]](
"nullable array - standard",
@@ -214,7 +224,9 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest {
| }
|}
""".stripMargin,
- followParquetFormatSpec = true)
+ binaryAsString = true,
+ int96AsTimestamp = true,
+ writeLegacyParquetFormat = false)
testSchemaInference[Tuple1[Map[Int, String]]](
"map - standard",
@@ -228,7 +240,9 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest {
| }
|}
""".stripMargin,
- followParquetFormatSpec = true)
+ binaryAsString = true,
+ int96AsTimestamp = true,
+ writeLegacyParquetFormat = false)
testSchemaInference[Tuple1[Map[Int, String]]](
"map - non-standard",
@@ -241,7 +255,10 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest {
| }
| }
|}
- """.stripMargin)
+ """.stripMargin,
+ binaryAsString = true,
+ int96AsTimestamp = true,
+ writeLegacyParquetFormat = true)
testSchemaInference[Tuple1[Pair[Int, String]]](
"struct",
@@ -253,7 +270,9 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest {
| }
|}
""".stripMargin,
- followParquetFormatSpec = true)
+ binaryAsString = true,
+ int96AsTimestamp = true,
+ writeLegacyParquetFormat = false)
testSchemaInference[Tuple1[Map[Int, (String, Seq[(Int, Double)])]]](
"deeply nested type - non-standard",
@@ -276,7 +295,10 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest {
| }
| }
|}
- """.stripMargin)
+ """.stripMargin,
+ binaryAsString = true,
+ int96AsTimestamp = true,
+ writeLegacyParquetFormat = true)
testSchemaInference[Tuple1[Map[Int, (String, Seq[(Int, Double)])]]](
"deeply nested type - standard",
@@ -300,7 +322,9 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest {
| }
|}
""".stripMargin,
- followParquetFormatSpec = true)
+ binaryAsString = true,
+ int96AsTimestamp = true,
+ writeLegacyParquetFormat = false)
testSchemaInference[(Option[Int], Map[Int, Option[Double]])](
"optional types",
@@ -315,36 +339,9 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest {
| }
|}
""".stripMargin,
- followParquetFormatSpec = true)
-
- // Parquet files generated by parquet-thrift are already handled by the schema converter, but
- // let's leave this test here until both read path and write path are all updated.
- ignore("thrift generated parquet schema") {
- // Test for SPARK-4520 -- ensure that thrift generated parquet schema is generated
- // as expected from attributes
- testSchemaInference[(
- Array[Byte], Array[Byte], Array[Byte], Seq[Int], Map[Array[Byte], Seq[Int]])](
- "thrift generated parquet schema",
- """
- |message root {
- | optional binary _1 (UTF8);
- | optional binary _2 (UTF8);
- | optional binary _3 (UTF8);
- | optional group _4 (LIST) {
- | repeated int32 _4_tuple;
- | }
- | optional group _5 (MAP) {
- | repeated group map (MAP_KEY_VALUE) {
- | required binary key (UTF8);
- | optional group value (LIST) {
- | repeated int32 value_tuple;
- | }
- | }
- | }
- |}
- """.stripMargin,
- isThriftDerived = true)
- }
+ binaryAsString = true,
+ int96AsTimestamp = true,
+ writeLegacyParquetFormat = false)
}
class ParquetSchemaSuite extends ParquetSchemaTest {
@@ -470,7 +467,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
| }
| }
|}
- """.stripMargin)
+ """.stripMargin,
+ binaryAsString = true,
+ int96AsTimestamp = true,
+ writeLegacyParquetFormat = true)
testParquetToCatalyst(
"Backwards-compatibility: LIST with nullable element type - 2",
@@ -486,7 +486,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
| }
| }
|}
- """.stripMargin)
+ """.stripMargin,
+ binaryAsString = true,
+ int96AsTimestamp = true,
+ writeLegacyParquetFormat = true)
testParquetToCatalyst(
"Backwards-compatibility: LIST with non-nullable element type - 1 - standard",
@@ -499,7 +502,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
| }
| }
|}
- """.stripMargin)
+ """.stripMargin,
+ binaryAsString = true,
+ int96AsTimestamp = true,
+ writeLegacyParquetFormat = true)
testParquetToCatalyst(
"Backwards-compatibility: LIST with non-nullable element type - 2",
@@ -512,7 +518,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
| }
| }
|}
- """.stripMargin)
+ """.stripMargin,
+ binaryAsString = true,
+ int96AsTimestamp = true,
+ writeLegacyParquetFormat = true)
testParquetToCatalyst(
"Backwards-compatibility: LIST with non-nullable element type - 3",
@@ -523,7 +532,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
| repeated int32 element;
| }
|}
- """.stripMargin)
+ """.stripMargin,
+ binaryAsString = true,
+ int96AsTimestamp = true,
+ writeLegacyParquetFormat = true)
testParquetToCatalyst(
"Backwards-compatibility: LIST with non-nullable element type - 4",
@@ -544,7 +556,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
| }
| }
|}
- """.stripMargin)
+ """.stripMargin,
+ binaryAsString = true,
+ int96AsTimestamp = true,
+ writeLegacyParquetFormat = true)
testParquetToCatalyst(
"Backwards-compatibility: LIST with non-nullable element type - 5 - parquet-avro style",
@@ -563,7 +578,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
| }
| }
|}
- """.stripMargin)
+ """.stripMargin,
+ binaryAsString = true,
+ int96AsTimestamp = true,
+ writeLegacyParquetFormat = true)
testParquetToCatalyst(
"Backwards-compatibility: LIST with non-nullable element type - 6 - parquet-thrift style",
@@ -582,7 +600,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
| }
| }
|}
- """.stripMargin)
+ """.stripMargin,
+ binaryAsString = true,
+ int96AsTimestamp = true,
+ writeLegacyParquetFormat = true)
testParquetToCatalyst(
"Backwards-compatibility: LIST with non-nullable element type 7 - " +
@@ -592,7 +613,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
"""message root {
| repeated int32 f1;
|}
- """.stripMargin)
+ """.stripMargin,
+ binaryAsString = true,
+ int96AsTimestamp = true,
+ writeLegacyParquetFormat = true)
testParquetToCatalyst(
"Backwards-compatibility: LIST with non-nullable element type 8 - " +
@@ -612,7 +636,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
| required int32 c2;
| }
|}
- """.stripMargin)
+ """.stripMargin,
+ binaryAsString = true,
+ int96AsTimestamp = true,
+ writeLegacyParquetFormat = true)
// =======================================================
// Tests for converting Catalyst ArrayType to Parquet LIST
@@ -633,7 +660,9 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
| }
|}
""".stripMargin,
- followParquetFormatSpec = true)
+ binaryAsString = true,
+ int96AsTimestamp = true,
+ writeLegacyParquetFormat = false)
testCatalystToParquet(
"Backwards-compatibility: LIST with nullable element type - 2 - prior to 1.4.x",
@@ -649,7 +678,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
| }
| }
|}
- """.stripMargin)
+ """.stripMargin,
+ binaryAsString = true,
+ int96AsTimestamp = true,
+ writeLegacyParquetFormat = true)
testCatalystToParquet(
"Backwards-compatibility: LIST with non-nullable element type - 1 - standard",
@@ -666,7 +698,9 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
| }
|}
""".stripMargin,
- followParquetFormatSpec = true)
+ binaryAsString = true,
+ int96AsTimestamp = true,
+ writeLegacyParquetFormat = false)
testCatalystToParquet(
"Backwards-compatibility: LIST with non-nullable element type - 2 - prior to 1.4.x",
@@ -680,7 +714,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
| repeated int32 array;
| }
|}
- """.stripMargin)
+ """.stripMargin,
+ binaryAsString = true,
+ int96AsTimestamp = true,
+ writeLegacyParquetFormat = true)
// ====================================================
// Tests for converting Parquet Map to Catalyst MapType
@@ -701,7 +738,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
| }
| }
|}
- """.stripMargin)
+ """.stripMargin,
+ binaryAsString = true,
+ int96AsTimestamp = true,
+ writeLegacyParquetFormat = true)
testParquetToCatalyst(
"Backwards-compatibility: MAP with non-nullable value type - 2",
@@ -718,7 +758,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
| }
| }
|}
- """.stripMargin)
+ """.stripMargin,
+ binaryAsString = true,
+ int96AsTimestamp = true,
+ writeLegacyParquetFormat = true)
testParquetToCatalyst(
"Backwards-compatibility: MAP with non-nullable value type - 3 - prior to 1.4.x",
@@ -735,7 +778,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
| }
| }
|}
- """.stripMargin)
+ """.stripMargin,
+ binaryAsString = true,
+ int96AsTimestamp = true,
+ writeLegacyParquetFormat = true)
testParquetToCatalyst(
"Backwards-compatibility: MAP with nullable value type - 1 - standard",
@@ -752,7 +798,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
| }
| }
|}
- """.stripMargin)
+ """.stripMargin,
+ binaryAsString = true,
+ int96AsTimestamp = true,
+ writeLegacyParquetFormat = true)
testParquetToCatalyst(
"Backwards-compatibility: MAP with nullable value type - 2",
@@ -769,7 +818,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
| }
| }
|}
- """.stripMargin)
+ """.stripMargin,
+ binaryAsString = true,
+ int96AsTimestamp = true,
+ writeLegacyParquetFormat = true)
testParquetToCatalyst(
"Backwards-compatibility: MAP with nullable value type - 3 - parquet-avro style",
@@ -786,7 +838,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
| }
| }
|}
- """.stripMargin)
+ """.stripMargin,
+ binaryAsString = true,
+ int96AsTimestamp = true,
+ writeLegacyParquetFormat = true)
// ====================================================
// Tests for converting Catalyst MapType to Parquet Map
@@ -808,7 +863,9 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
| }
|}
""".stripMargin,
- followParquetFormatSpec = true)
+ binaryAsString = true,
+ int96AsTimestamp = true,
+ writeLegacyParquetFormat = false)
testCatalystToParquet(
"Backwards-compatibility: MAP with non-nullable value type - 2 - prior to 1.4.x",
@@ -825,7 +882,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
| }
| }
|}
- """.stripMargin)
+ """.stripMargin,
+ binaryAsString = true,
+ int96AsTimestamp = true,
+ writeLegacyParquetFormat = true)
testCatalystToParquet(
"Backwards-compatibility: MAP with nullable value type - 1 - standard",
@@ -843,7 +903,9 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
| }
|}
""".stripMargin,
- followParquetFormatSpec = true)
+ binaryAsString = true,
+ int96AsTimestamp = true,
+ writeLegacyParquetFormat = false)
testCatalystToParquet(
"Backwards-compatibility: MAP with nullable value type - 3 - prior to 1.4.x",
@@ -860,7 +922,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
| }
| }
|}
- """.stripMargin)
+ """.stripMargin,
+ binaryAsString = true,
+ int96AsTimestamp = true,
+ writeLegacyParquetFormat = true)
// =================================
// Tests for conversion for decimals
@@ -873,7 +938,9 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
| optional int32 f1 (DECIMAL(1, 0));
|}
""".stripMargin,
- followParquetFormatSpec = true)
+ binaryAsString = true,
+ int96AsTimestamp = true,
+ writeLegacyParquetFormat = false)
testSchema(
"DECIMAL(8, 3) - standard",
@@ -882,7 +949,9 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
| optional int32 f1 (DECIMAL(8, 3));
|}
""".stripMargin,
- followParquetFormatSpec = true)
+ binaryAsString = true,
+ int96AsTimestamp = true,
+ writeLegacyParquetFormat = false)
testSchema(
"DECIMAL(9, 3) - standard",
@@ -891,7 +960,9 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
| optional int32 f1 (DECIMAL(9, 3));
|}
""".stripMargin,
- followParquetFormatSpec = true)
+ binaryAsString = true,
+ int96AsTimestamp = true,
+ writeLegacyParquetFormat = false)
testSchema(
"DECIMAL(18, 3) - standard",
@@ -900,7 +971,9 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
| optional int64 f1 (DECIMAL(18, 3));
|}
""".stripMargin,
- followParquetFormatSpec = true)
+ binaryAsString = true,
+ int96AsTimestamp = true,
+ writeLegacyParquetFormat = false)
testSchema(
"DECIMAL(19, 3) - standard",
@@ -909,7 +982,9 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
| optional fixed_len_byte_array(9) f1 (DECIMAL(19, 3));
|}
""".stripMargin,
- followParquetFormatSpec = true)
+ binaryAsString = true,
+ int96AsTimestamp = true,
+ writeLegacyParquetFormat = false)
testSchema(
"DECIMAL(1, 0) - prior to 1.4.x",
@@ -917,7 +992,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
"""message root {
| optional fixed_len_byte_array(1) f1 (DECIMAL(1, 0));
|}
- """.stripMargin)
+ """.stripMargin,
+ binaryAsString = true,
+ int96AsTimestamp = true,
+ writeLegacyParquetFormat = true)
testSchema(
"DECIMAL(8, 3) - prior to 1.4.x",
@@ -925,7 +1003,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
"""message root {
| optional fixed_len_byte_array(4) f1 (DECIMAL(8, 3));
|}
- """.stripMargin)
+ """.stripMargin,
+ binaryAsString = true,
+ int96AsTimestamp = true,
+ writeLegacyParquetFormat = true)
testSchema(
"DECIMAL(9, 3) - prior to 1.4.x",
@@ -933,7 +1014,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
"""message root {
| optional fixed_len_byte_array(5) f1 (DECIMAL(9, 3));
|}
- """.stripMargin)
+ """.stripMargin,
+ binaryAsString = true,
+ int96AsTimestamp = true,
+ writeLegacyParquetFormat = true)
testSchema(
"DECIMAL(18, 3) - prior to 1.4.x",
@@ -941,7 +1025,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
"""message root {
| optional fixed_len_byte_array(8) f1 (DECIMAL(18, 3));
|}
- """.stripMargin)
+ """.stripMargin,
+ binaryAsString = true,
+ int96AsTimestamp = true,
+ writeLegacyParquetFormat = true)
private def testSchemaClipping(
testName: String,
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 7d8104f935..ff2d0a35e3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -620,7 +620,7 @@ class ParquetSourceSuite extends ParquetPartitioningTest {
val conf = Seq(
HiveContext.CONVERT_METASTORE_PARQUET.key -> "false",
SQLConf.PARQUET_BINARY_AS_STRING.key -> "true",
- SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.key -> "true")
+ SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> "false")
withSQLConf(conf: _*) {
sql(