aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorSameer Agarwal <sameer@databricks.com>2016-03-23 12:13:32 -0700
committerYin Huai <yhuai@databricks.com>2016-03-23 12:13:32 -0700
commit0a64294fcb4b64bfe095c63c3a494e0f40e22743 (patch)
tree7ea8206553f976dae3154d79a98677b56d442cf9 /sql/hive
parent02d9c352c72a16725322678ef174c5c6e9f2c617 (diff)
downloadspark-0a64294fcb4b64bfe095c63c3a494e0f40e22743.tar.gz
spark-0a64294fcb4b64bfe095c63c3a494e0f40e22743.tar.bz2
spark-0a64294fcb4b64bfe095c63c3a494e0f40e22743.zip
[SPARK-14015][SQL] Support TimestampType in vectorized parquet reader
## What changes were proposed in this pull request? This PR adds support for TimestampType in the vectorized parquet reader ## How was this patch tested? 1. `VectorizedColumnReader` initially had a gating condition on `primitiveType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT96)` that made us fall back on parquet-mr for handling timestamps. This condition is now removed. 2. The `ParquetHadoopFsRelationSuite` (that tests for all supported hive types -- including `TimestampType`) fails when the gating condition is removed (https://github.com/apache/spark/pull/11808) and should now pass with this change. Similarly, the `ParquetHiveCompatibilitySuite.SPARK-10177 timestamp` test that fails when the gating condition is removed, should now pass as well. 3. Added tests in `HadoopFsRelationTest` that test both the dictionary encoded and non-encoded versions across all supported datatypes. Author: Sameer Agarwal <sameer@databricks.com> Closes #11882 from sameeragarwal/timestamp-parquet.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala82
1 files changed, 47 insertions, 35 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index 7e5506ee4a..e842caf5be 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -116,44 +116,56 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
new MyDenseVectorUDT()
).filter(supportsDataType)
- for (dataType <- supportedDataTypes) {
- test(s"test all data types - $dataType") {
- withTempPath { file =>
- val path = file.getCanonicalPath
-
- val dataGenerator = RandomDataGenerator.forType(
- dataType = dataType,
- nullable = true,
- new Random(System.nanoTime())
- ).getOrElse {
- fail(s"Failed to create data generator for schema $dataType")
+ try {
+ for (dataType <- supportedDataTypes) {
+ for (parquetDictionaryEncodingEnabled <- Seq(true, false)) {
+ test(s"test all data types - $dataType with parquet.enable.dictionary = " +
+ s"$parquetDictionaryEncodingEnabled") {
+
+ hadoopConfiguration.setBoolean("parquet.enable.dictionary",
+ parquetDictionaryEncodingEnabled)
+
+ withTempPath { file =>
+ val path = file.getCanonicalPath
+
+ val dataGenerator = RandomDataGenerator.forType(
+ dataType = dataType,
+ nullable = true,
+ new Random(System.nanoTime())
+ ).getOrElse {
+ fail(s"Failed to create data generator for schema $dataType")
+ }
+
+ // Create a DF for the schema with random data. The index field is used to sort the
+ // DataFrame. This is a workaround for SPARK-10591.
+ val schema = new StructType()
+ .add("index", IntegerType, nullable = false)
+ .add("col", dataType, nullable = true)
+ val rdd =
+ sqlContext.sparkContext.parallelize((1 to 10).map(i => Row(i, dataGenerator())))
+ val df = sqlContext.createDataFrame(rdd, schema).orderBy("index").coalesce(1)
+
+ df.write
+ .mode("overwrite")
+ .format(dataSourceName)
+ .option("dataSchema", df.schema.json)
+ .save(path)
+
+ val loadedDF = sqlContext
+ .read
+ .format(dataSourceName)
+ .option("dataSchema", df.schema.json)
+ .schema(df.schema)
+ .load(path)
+ .orderBy("index")
+
+ checkAnswer(loadedDF, df)
+ }
}
-
- // Create a DF for the schema with random data. The index field is used to sort the
- // DataFrame. This is a workaround for SPARK-10591.
- val schema = new StructType()
- .add("index", IntegerType, nullable = false)
- .add("col", dataType, nullable = true)
- val rdd = sqlContext.sparkContext.parallelize((1 to 10).map(i => Row(i, dataGenerator())))
- val df = sqlContext.createDataFrame(rdd, schema).orderBy("index").coalesce(1)
-
- df.write
- .mode("overwrite")
- .format(dataSourceName)
- .option("dataSchema", df.schema.json)
- .save(path)
-
- val loadedDF = sqlContext
- .read
- .format(dataSourceName)
- .option("dataSchema", df.schema.json)
- .schema(df.schema)
- .load(path)
- .orderBy("index")
-
- checkAnswer(loadedDF, df)
}
}
+ } finally {
+ hadoopConfiguration.unset("parquet.enable.dictionary")
}
test("save()/load() - non-partitioned table - Overwrite") {