aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2015-11-17 14:35:00 +0800
committerCheng Lian <lian@databricks.com>2015-11-17 14:35:00 +0800
commit75d202073143d5a7f943890d8682b5b0cf9e3092 (patch)
treef74f69d621d657e94cb20629e3ac21a2720203ac
parentfbad920dbfd6f389dea852cdc159cacb0f4f6997 (diff)
downloadspark-75d202073143d5a7f943890d8682b5b0cf9e3092.tar.gz
spark-75d202073143d5a7f943890d8682b5b0cf9e3092.tar.bz2
spark-75d202073143d5a7f943890d8682b5b0cf9e3092.zip
[SPARK-11694][FOLLOW-UP] Clean up imports, use a common function for metadata and add a test for FIXED_LEN_BYTE_ARRAY
As discussed https://github.com/apache/spark/pull/9660 https://github.com/apache/spark/pull/9060, I cleaned up unused imports, added a test for fixed-length byte array and used a common function for writing metadata for Parquet. For the test for fixed-length byte array, I have tested and checked the encoding types with [parquet-tools](https://github.com/Parquet/parquet-mr/tree/master/parquet-tools). Author: hyukjinkwon <gurwls223@gmail.com> Closes #9754 from HyukjinKwon/SPARK-11694-followup.
-rw-r--r--sql/core/src/test/resources/dec-in-fixed-len.parquetbin0 -> 460 bytes
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala42
2 files changed, 15 insertions, 27 deletions
diff --git a/sql/core/src/test/resources/dec-in-fixed-len.parquet b/sql/core/src/test/resources/dec-in-fixed-len.parquet
new file mode 100644
index 0000000000..6ad37d5639
--- /dev/null
+++ b/sql/core/src/test/resources/dec-in-fixed-len.parquet
Binary files differ
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index a148facd05..177ab42f77 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -17,8 +17,6 @@
package org.apache.spark.sql.execution.datasources.parquet
-import java.util.Collections
-
import org.apache.parquet.column.{Encoding, ParquetProperties}
import scala.collection.JavaConverters._
@@ -33,7 +31,7 @@ import org.apache.parquet.example.data.{Group, GroupWriter}
import org.apache.parquet.hadoop._
import org.apache.parquet.hadoop.api.WriteSupport
import org.apache.parquet.hadoop.api.WriteSupport.WriteContext
-import org.apache.parquet.hadoop.metadata.{CompressionCodecName, FileMetaData, ParquetMetadata}
+import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.parquet.io.api.RecordConsumer
import org.apache.parquet.schema.{MessageType, MessageTypeParser}
@@ -243,15 +241,9 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
""".stripMargin)
withTempPath { location =>
- val extraMetadata = Map.empty[String, String].asJava
- val fileMetadata = new FileMetaData(parquetSchema, extraMetadata, "Spark")
val path = new Path(location.getCanonicalPath)
- val footer = List(
- new Footer(path, new ParquetMetadata(fileMetadata, Collections.emptyList()))
- ).asJava
-
- ParquetFileWriter.writeMetadataFile(sparkContext.hadoopConfiguration, path, footer)
-
+ val conf = sparkContext.hadoopConfiguration
+ writeMetadata(parquetSchema, path, conf)
val errorMessage = intercept[Throwable] {
sqlContext.read.parquet(path.toString).printSchema()
}.toString
@@ -267,20 +259,14 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
|}
""".stripMargin)
+ val expectedSparkTypes = Seq(StringType, BinaryType)
+
withTempPath { location =>
- val extraMetadata = Map.empty[String, String].asJava
- val fileMetadata = new FileMetaData(parquetSchema, extraMetadata, "Spark")
val path = new Path(location.getCanonicalPath)
- val footer = List(
- new Footer(path, new ParquetMetadata(fileMetadata, Collections.emptyList()))
- ).asJava
-
- ParquetFileWriter.writeMetadataFile(sparkContext.hadoopConfiguration, path, footer)
-
- val jsonDataType = sqlContext.read.parquet(path.toString).schema(0).dataType
- assert(jsonDataType === StringType)
- val bsonDataType = sqlContext.read.parquet(path.toString).schema(1).dataType
- assert(bsonDataType === BinaryType)
+ val conf = sparkContext.hadoopConfiguration
+ writeMetadata(parquetSchema, path, conf)
+ val sparkTypes = sqlContext.read.parquet(path.toString).schema.map(_.dataType)
+ assert(sparkTypes === expectedSparkTypes)
}
}
@@ -607,10 +593,12 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
sqlContext.range(1 << 4).select('id % 10 cast DecimalType(10, 2) as 'i64_dec))
}
- // TODO Adds test case for reading dictionary encoded decimals written as `FIXED_LEN_BYTE_ARRAY`
- // The Parquet writer version Spark 1.6 and prior versions use is `PARQUET_1_0`, which doesn't
- // provide dictionary encoding support for `FIXED_LEN_BYTE_ARRAY`. Should add a test here once
- // we upgrade to `PARQUET_2_0`.
+ test("read dictionary encoded decimals written as FIXED_LEN_BYTE_ARRAY") {
+ checkAnswer(
+ // Decimal column in this file is encoded using plain dictionary
+ readResourceParquetFile("dec-in-fixed-len.parquet"),
+ sqlContext.range(1 << 4).select('id % 10 cast DecimalType(10, 2) as 'fixed_len_dec))
+ }
}
class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)