diff options
author | Dilip Biswal <dbiswal@us.ibm.com> | 2017-04-04 09:53:05 +0900 |
---|---|---|
committer | Takuya UESHIN <ueshin@databricks.com> | 2017-04-04 09:53:05 +0900 |
commit | 3bfb639cb7352aec572ef6686d3471bd78748ffa (patch) | |
tree | 9d2054aecd8b3356a4ec7a38543def25a3ee380e /sql/core/src/test | |
parent | e7877fd4728ed41e440d7c4d8b6b02bd0d9e873e (diff) | |
download | spark-3bfb639cb7352aec572ef6686d3471bd78748ffa.tar.gz spark-3bfb639cb7352aec572ef6686d3471bd78748ffa.tar.bz2 spark-3bfb639cb7352aec572ef6686d3471bd78748ffa.zip |
[SPARK-10364][SQL] Support Parquet logical type TIMESTAMP_MILLIS
## What changes were proposed in this pull request?
**Description** from JIRA
The TimestampType in Spark SQL is of microsecond precision. Ideally, we should convert Spark SQL timestamp values into Parquet TIMESTAMP_MICROS. But unfortunately parquet-mr hasn't supported it yet.
For the read path, we should be able to read TIMESTAMP_MILLIS Parquet values and pad a 0 microsecond part to read values.
For the write path, currently we are writing timestamps as INT96, similar to Impala and Hive. One alternative is that, we can have a separate SQL option to let users be able to write Spark SQL timestamp values as TIMESTAMP_MILLIS. Of course, in this way the microsecond part will be truncated.
## How was this patch tested?
Added new tests in ParquetQuerySuite and ParquetIOSuite
Author: Dilip Biswal <dbiswal@us.ibm.com>
Closes #15332 from dilipbiswal/parquet-time-millis.
Diffstat (limited to 'sql/core/src/test')
4 files changed, 114 insertions, 8 deletions
diff --git a/sql/core/src/test/resources/test-data/timemillis-in-i64.parquet b/sql/core/src/test/resources/test-data/timemillis-in-i64.parquet Binary files differnew file mode 100644 index 0000000000..d3c39e2c26 --- /dev/null +++ b/sql/core/src/test/resources/test-data/timemillis-in-i64.parquet diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index dbdcd230a4..57a0af1dda 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -107,11 +107,13 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { | required binary g(ENUM); | required binary h(DECIMAL(32,0)); | required fixed_len_byte_array(32) i(DECIMAL(32,0)); + | required int64 j(TIMESTAMP_MILLIS); |} """.stripMargin) val expectedSparkTypes = Seq(ByteType, ShortType, DateType, DecimalType(1, 0), - DecimalType(10, 0), StringType, StringType, DecimalType(32, 0), DecimalType(32, 0)) + DecimalType(10, 0), StringType, StringType, DecimalType(32, 0), DecimalType(32, 0), + TimestampType) withTempPath { location => val path = new Path(location.getCanonicalPath) @@ -607,6 +609,18 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { } } + test("read dictionary and plain encoded timestamp_millis written as INT64") { + ("true" :: "false" :: Nil).foreach { vectorized => + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) { + checkAnswer( + // timestamp column in this file is encoded using combination of plain + // and dictionary encodings. + readResourceParquetFile("test-data/timemillis-in-i64.parquet"), + (1 to 3).map(i => Row(new java.sql.Timestamp(10)))) + } + } + } + test("SPARK-12589 copy() on rows returned from reader works for strings") { withTempPath { dir => val data = (1, "abc") ::(2, "helloabcde") :: Nil diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 200e356c72..c36609586c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.datasources.parquet import java.io.File +import java.sql.Timestamp import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.parquet.hadoop.ParquetOutputFormat @@ -162,6 +163,78 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext } } + test("SPARK-10634 timestamp written and read as INT64 - TIMESTAMP_MILLIS") { + val data = (1 to 10).map(i => Row(i, new java.sql.Timestamp(i))) + val schema = StructType(List(StructField("d", IntegerType, false), + StructField("time", TimestampType, false)).toArray) + withSQLConf(SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.key -> "true") { + withTempPath { file => + val df = spark.createDataFrame(sparkContext.parallelize(data), schema) + df.write.parquet(file.getCanonicalPath) + ("true" :: "false" :: Nil).foreach { vectorized => + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) { + val df2 = spark.read.parquet(file.getCanonicalPath) + checkAnswer(df2, df.collect().toSeq) + } + } + } + } + } + + test("SPARK-10634 timestamp written and read as INT64 - truncation") { + withTable("ts") { + sql("create table ts (c1 int, c2 timestamp) using parquet") + sql("insert into ts values (1, '2016-01-01 10:11:12.123456')") + sql("insert into ts values (2, null)") + sql("insert into ts values (3, '1965-01-01 10:11:12.123456')") + checkAnswer( + sql("select * from ts"), + Seq( + Row(1, Timestamp.valueOf("2016-01-01 10:11:12.123456")), + Row(2, null), + Row(3, Timestamp.valueOf("1965-01-01 10:11:12.123456")))) + } + + // The microsecond portion is truncated when written as TIMESTAMP_MILLIS. + withTable("ts") { + withSQLConf(SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.key -> "true") { + sql("create table ts (c1 int, c2 timestamp) using parquet") + sql("insert into ts values (1, '2016-01-01 10:11:12.123456')") + sql("insert into ts values (2, null)") + sql("insert into ts values (3, '1965-01-01 10:11:12.125456')") + sql("insert into ts values (4, '1965-01-01 10:11:12.125')") + sql("insert into ts values (5, '1965-01-01 10:11:12.1')") + sql("insert into ts values (6, '1965-01-01 10:11:12.123456789')") + sql("insert into ts values (7, '0001-01-01 00:00:00.000000')") + checkAnswer( + sql("select * from ts"), + Seq( + Row(1, Timestamp.valueOf("2016-01-01 10:11:12.123")), + Row(2, null), + Row(3, Timestamp.valueOf("1965-01-01 10:11:12.125")), + Row(4, Timestamp.valueOf("1965-01-01 10:11:12.125")), + Row(5, Timestamp.valueOf("1965-01-01 10:11:12.1")), + Row(6, Timestamp.valueOf("1965-01-01 10:11:12.123")), + Row(7, Timestamp.valueOf("0001-01-01 00:00:00.000")))) + + // Read timestamps that were encoded as TIMESTAMP_MILLIS annotated as INT64 + // with PARQUET_INT64_AS_TIMESTAMP_MILLIS set to false. + withSQLConf(SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.key -> "false") { + checkAnswer( + sql("select * from ts"), + Seq( + Row(1, Timestamp.valueOf("2016-01-01 10:11:12.123")), + Row(2, null), + Row(3, Timestamp.valueOf("1965-01-01 10:11:12.125")), + Row(4, Timestamp.valueOf("1965-01-01 10:11:12.125")), + Row(5, Timestamp.valueOf("1965-01-01 10:11:12.1")), + Row(6, Timestamp.valueOf("1965-01-01 10:11:12.123")), + Row(7, Timestamp.valueOf("0001-01-01 00:00:00.000")))) + } + } + } + } + test("Enabling/disabling merging partfiles when merging parquet schema") { def testSchemaMerging(expectedColumnNumber: Int): Unit = { withTempDir { dir => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala index 6aa940afbb..ce992674d7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -53,11 +53,13 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSQLContext { parquetSchema: String, binaryAsString: Boolean, int96AsTimestamp: Boolean, - writeLegacyParquetFormat: Boolean): Unit = { + writeLegacyParquetFormat: Boolean, + int64AsTimestampMillis: Boolean = false): Unit = { val converter = new ParquetSchemaConverter( assumeBinaryIsString = binaryAsString, assumeInt96IsTimestamp = int96AsTimestamp, - writeLegacyParquetFormat = writeLegacyParquetFormat) + writeLegacyParquetFormat = writeLegacyParquetFormat, + writeTimestampInMillis = int64AsTimestampMillis) test(s"sql <= parquet: $testName") { val actual = converter.convert(MessageTypeParser.parseMessageType(parquetSchema)) @@ -77,11 +79,13 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSQLContext { parquetSchema: String, binaryAsString: Boolean, int96AsTimestamp: Boolean, - writeLegacyParquetFormat: Boolean): Unit = { + writeLegacyParquetFormat: Boolean, + int64AsTimestampMillis: Boolean = false): Unit = { val converter = new ParquetSchemaConverter( assumeBinaryIsString = binaryAsString, assumeInt96IsTimestamp = int96AsTimestamp, - writeLegacyParquetFormat = writeLegacyParquetFormat) + writeLegacyParquetFormat = writeLegacyParquetFormat, + writeTimestampInMillis = int64AsTimestampMillis) test(s"sql => parquet: $testName") { val actual = converter.convert(sqlSchema) @@ -97,7 +101,8 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSQLContext { parquetSchema: String, binaryAsString: Boolean, int96AsTimestamp: Boolean, - writeLegacyParquetFormat: Boolean): Unit = { + writeLegacyParquetFormat: Boolean, + int64AsTimestampMillis: Boolean = false): Unit = { testCatalystToParquet( testName, @@ -105,7 +110,8 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSQLContext { parquetSchema, binaryAsString, int96AsTimestamp, - writeLegacyParquetFormat) + writeLegacyParquetFormat, + int64AsTimestampMillis) testParquetToCatalyst( testName, @@ -113,7 +119,8 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSQLContext { parquetSchema, binaryAsString, int96AsTimestamp, - writeLegacyParquetFormat) + writeLegacyParquetFormat, + int64AsTimestampMillis) } } @@ -965,6 +972,18 @@ class ParquetSchemaSuite extends ParquetSchemaTest { int96AsTimestamp = true, writeLegacyParquetFormat = true) + testSchema( + "Timestamp written and read as INT64 with TIMESTAMP_MILLIS", + StructType(Seq(StructField("f1", TimestampType))), + """message root { + | optional INT64 f1 (TIMESTAMP_MILLIS); + |} + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = false, + writeLegacyParquetFormat = true, + int64AsTimestampMillis = true) + private def testSchemaClipping( testName: String, parquetSchema: String, |