aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2015-11-16 21:30:10 +0800
committerCheng Lian <lian@databricks.com>2015-11-16 21:30:10 +0800
commit7f8eb3bf6ed64eefc5472f5c5fb02e2db1e3f618 (patch)
tree013233de657ccef8d7af9b7db4825a87d2db5990 /sql
parent42de5253f327bd7ee258b0efb5024f3847fa3b51 (diff)
downloadspark-7f8eb3bf6ed64eefc5472f5c5fb02e2db1e3f618.tar.gz
spark-7f8eb3bf6ed64eefc5472f5c5fb02e2db1e3f618.tar.bz2
spark-7f8eb3bf6ed64eefc5472f5c5fb02e2db1e3f618.zip
[SPARK-11044][SQL] Parquet writer version fixed as version1
https://issues.apache.org/jira/browse/SPARK-11044 Spark writes a parquet file only with writer version1 ignoring the writer version given by user. So, in this PR, it keeps the writer version if given or sets version1 as default. Author: hyukjinkwon <gurwls223@gmail.com> Author: HyukjinKwon <gurwls223@gmail.com> Closes #9060 from HyukjinKwon/SPARK-11044.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala34
2 files changed, 35 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala
index 483363d2c1..6862dea5e6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala
@@ -429,7 +429,7 @@ private[parquet] object CatalystWriteSupport {
def setSchema(schema: StructType, configuration: Configuration): Unit = {
schema.map(_.name).foreach(CatalystSchemaConverter.checkFieldName)
configuration.set(SPARK_ROW_SCHEMA, schema.json)
- configuration.set(
+ configuration.setIfUnset(
ParquetOutputFormat.WRITER_VERSION,
ParquetProperties.WriterVersion.PARQUET_1_0.toString)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 78df363ade..2aa5dca847 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources.parquet
import java.util.Collections
+import org.apache.parquet.column.{Encoding, ParquetProperties}
+
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag
@@ -534,6 +536,38 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
}
}
+ test("SPARK-11044 Parquet writer version fixed as version1 ") {
+ // For dictionary encoding, Parquet changes the encoding types according to its writer
+ // version. So, this test checks one of the encoding types in order to ensure that
+ // the file is written with writer version2.
+ withTempPath { dir =>
+ val clonedConf = new Configuration(hadoopConfiguration)
+ try {
+ // Write a Parquet file with writer version2.
+ hadoopConfiguration.set(ParquetOutputFormat.WRITER_VERSION,
+ ParquetProperties.WriterVersion.PARQUET_2_0.toString)
+
+ // By default, dictionary encoding is enabled from Parquet 1.2.0 but
+ // it is enabled just in case.
+ hadoopConfiguration.setBoolean(ParquetOutputFormat.ENABLE_DICTIONARY, true)
+ val path = s"${dir.getCanonicalPath}/part-r-0.parquet"
+ sqlContext.range(1 << 16).selectExpr("(id % 4) AS i")
+ .coalesce(1).write.mode("overwrite").parquet(path)
+
+ val blockMetadata = readFooter(new Path(path), hadoopConfiguration).getBlocks.asScala.head
+ val columnChunkMetadata = blockMetadata.getColumns.asScala.head
+
+ // If the file is written with version2, this should include
+ // Encoding.RLE_DICTIONARY type. For version1, it is Encoding.PLAIN_DICTIONARY
+ assert(columnChunkMetadata.getEncodings.contains(Encoding.RLE_DICTIONARY))
+ } finally {
+ // Manually clear the hadoop configuration for other tests.
+ hadoopConfiguration.clear()
+ clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue))
+ }
+ }
+ }
+
test("read dictionary encoded decimals written as INT32") {
checkAnswer(
// Decimal column in this file is encoded using plain dictionary