diff options
author | Sameer Agarwal <sameer@databricks.com> | 2016-03-23 12:13:32 -0700 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2016-03-23 12:13:32 -0700 |
commit | 0a64294fcb4b64bfe095c63c3a494e0f40e22743 (patch) | |
tree | 7ea8206553f976dae3154d79a98677b56d442cf9 /sql/hive | |
parent | 02d9c352c72a16725322678ef174c5c6e9f2c617 (diff) | |
download | spark-0a64294fcb4b64bfe095c63c3a494e0f40e22743.tar.gz spark-0a64294fcb4b64bfe095c63c3a494e0f40e22743.tar.bz2 spark-0a64294fcb4b64bfe095c63c3a494e0f40e22743.zip |
[SPARK-14015][SQL] Support TimestampType in vectorized parquet reader
## What changes were proposed in this pull request?
This PR adds support for TimestampType in the vectorized parquet reader
## How was this patch tested?
1. `VectorizedColumnReader` initially had a gating condition on `primitiveType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT96)` that made us fall back on parquet-mr for handling timestamps. This condition is now removed.
2. The `ParquetHadoopFsRelationSuite` (that tests for all supported hive types -- including `TimestampType`) fails when the gating condition is removed (https://github.com/apache/spark/pull/11808) and should now pass with this change. Similarly, the `ParquetHiveCompatibilitySuite.SPARK-10177 timestamp` test that fails when the gating condition is removed, should now pass as well.
3. Added tests in `HadoopFsRelationTest` that test both the dictionary encoded and non-encoded versions across all supported datatypes.
Author: Sameer Agarwal <sameer@databricks.com>
Closes #11882 from sameeragarwal/timestamp-parquet.
Diffstat (limited to 'sql/hive')
-rw-r--r-- | sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala | 82 |
1 files changed, 47 insertions, 35 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala index 7e5506ee4a..e842caf5be 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala @@ -116,44 +116,56 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes new MyDenseVectorUDT() ).filter(supportsDataType) - for (dataType <- supportedDataTypes) { - test(s"test all data types - $dataType") { - withTempPath { file => - val path = file.getCanonicalPath - - val dataGenerator = RandomDataGenerator.forType( - dataType = dataType, - nullable = true, - new Random(System.nanoTime()) - ).getOrElse { - fail(s"Failed to create data generator for schema $dataType") + try { + for (dataType <- supportedDataTypes) { + for (parquetDictionaryEncodingEnabled <- Seq(true, false)) { + test(s"test all data types - $dataType with parquet.enable.dictionary = " + + s"$parquetDictionaryEncodingEnabled") { + + hadoopConfiguration.setBoolean("parquet.enable.dictionary", + parquetDictionaryEncodingEnabled) + + withTempPath { file => + val path = file.getCanonicalPath + + val dataGenerator = RandomDataGenerator.forType( + dataType = dataType, + nullable = true, + new Random(System.nanoTime()) + ).getOrElse { + fail(s"Failed to create data generator for schema $dataType") + } + + // Create a DF for the schema with random data. The index field is used to sort the + // DataFrame. This is a workaround for SPARK-10591. + val schema = new StructType() + .add("index", IntegerType, nullable = false) + .add("col", dataType, nullable = true) + val rdd = + sqlContext.sparkContext.parallelize((1 to 10).map(i => Row(i, dataGenerator()))) + val df = sqlContext.createDataFrame(rdd, schema).orderBy("index").coalesce(1) + + df.write + .mode("overwrite") + .format(dataSourceName) + .option("dataSchema", df.schema.json) + .save(path) + + val loadedDF = sqlContext + .read + .format(dataSourceName) + .option("dataSchema", df.schema.json) + .schema(df.schema) + .load(path) + .orderBy("index") + + checkAnswer(loadedDF, df) + } } - - // Create a DF for the schema with random data. The index field is used to sort the - // DataFrame. This is a workaround for SPARK-10591. - val schema = new StructType() - .add("index", IntegerType, nullable = false) - .add("col", dataType, nullable = true) - val rdd = sqlContext.sparkContext.parallelize((1 to 10).map(i => Row(i, dataGenerator()))) - val df = sqlContext.createDataFrame(rdd, schema).orderBy("index").coalesce(1) - - df.write - .mode("overwrite") - .format(dataSourceName) - .option("dataSchema", df.schema.json) - .save(path) - - val loadedDF = sqlContext - .read - .format(dataSourceName) - .option("dataSchema", df.schema.json) - .schema(df.schema) - .load(path) - .orderBy("index") - - checkAnswer(loadedDF, df) } } + } finally { + hadoopConfiguration.unset("parquet.enable.dictionary") } test("save()/load() - non-partitioned table - Overwrite") { |