aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala34
2 files changed, 35 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala
index 483363d2c1..6862dea5e6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala
@@ -429,7 +429,7 @@ private[parquet] object CatalystWriteSupport {
def setSchema(schema: StructType, configuration: Configuration): Unit = {
schema.map(_.name).foreach(CatalystSchemaConverter.checkFieldName)
configuration.set(SPARK_ROW_SCHEMA, schema.json)
- configuration.set(
+ configuration.setIfUnset(
ParquetOutputFormat.WRITER_VERSION,
ParquetProperties.WriterVersion.PARQUET_1_0.toString)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 78df363ade..2aa5dca847 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources.parquet
import java.util.Collections
+import org.apache.parquet.column.{Encoding, ParquetProperties}
+
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag
@@ -534,6 +536,38 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
}
}
+ test("SPARK-11044 Parquet writer version fixed as version1 ") {
+ // For dictionary encoding, Parquet changes the encoding types according to its writer
+ // version. So, this test checks one of the encoding types in order to ensure that
+ // the file is written with writer version2.
+ withTempPath { dir =>
+ val clonedConf = new Configuration(hadoopConfiguration)
+ try {
+ // Write a Parquet file with writer version2.
+ hadoopConfiguration.set(ParquetOutputFormat.WRITER_VERSION,
+ ParquetProperties.WriterVersion.PARQUET_2_0.toString)
+
+ // By default, dictionary encoding is enabled from Parquet 1.2.0 but
+ // it is enabled just in case.
+ hadoopConfiguration.setBoolean(ParquetOutputFormat.ENABLE_DICTIONARY, true)
+ val path = s"${dir.getCanonicalPath}/part-r-0.parquet"
+ sqlContext.range(1 << 16).selectExpr("(id % 4) AS i")
+ .coalesce(1).write.mode("overwrite").parquet(path)
+
+ val blockMetadata = readFooter(new Path(path), hadoopConfiguration).getBlocks.asScala.head
+ val columnChunkMetadata = blockMetadata.getColumns.asScala.head
+
+ // If the file is written with version2, this should include
+ // Encoding.RLE_DICTIONARY type. For version1, it is Encoding.PLAIN_DICTIONARY
+ assert(columnChunkMetadata.getEncodings.contains(Encoding.RLE_DICTIONARY))
+ } finally {
+ // Manually clear the hadoop configuration for other tests.
+ hadoopConfiguration.clear()
+ clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue))
+ }
+ }
+ }
+
test("read dictionary encoded decimals written as INT32") {
checkAnswer(
// Decimal column in this file is encoded using plain dictionary