aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test
diff options
context:
space:
mode:
authorDilip Biswal <dbiswal@us.ibm.com>2017-04-04 09:53:05 +0900
committerTakuya UESHIN <ueshin@databricks.com>2017-04-04 09:53:05 +0900
commit3bfb639cb7352aec572ef6686d3471bd78748ffa (patch)
tree9d2054aecd8b3356a4ec7a38543def25a3ee380e /sql/core/src/test
parente7877fd4728ed41e440d7c4d8b6b02bd0d9e873e (diff)
downloadspark-3bfb639cb7352aec572ef6686d3471bd78748ffa.tar.gz
spark-3bfb639cb7352aec572ef6686d3471bd78748ffa.tar.bz2
spark-3bfb639cb7352aec572ef6686d3471bd78748ffa.zip
[SPARK-10364][SQL] Support Parquet logical type TIMESTAMP_MILLIS
## What changes were proposed in this pull request? **Description** from JIRA The TimestampType in Spark SQL is of microsecond precision. Ideally, we should convert Spark SQL timestamp values into Parquet TIMESTAMP_MICROS. But unfortunately parquet-mr hasn't supported it yet. For the read path, we should be able to read TIMESTAMP_MILLIS Parquet values and pad a 0 microsecond part to read values. For the write path, currently we are writing timestamps as INT96, similar to Impala and Hive. One alternative is that, we can have a separate SQL option to let users be able to write Spark SQL timestamp values as TIMESTAMP_MILLIS. Of course, in this way the microsecond part will be truncated. ## How was this patch tested? Added new tests in ParquetQuerySuite and ParquetIOSuite Author: Dilip Biswal <dbiswal@us.ibm.com> Closes #15332 from dilipbiswal/parquet-time-millis.
Diffstat (limited to 'sql/core/src/test')
-rw-r--r--sql/core/src/test/resources/test-data/timemillis-in-i64.parquetbin0 -> 517 bytes
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala16
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala73
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala33
4 files changed, 114 insertions, 8 deletions
diff --git a/sql/core/src/test/resources/test-data/timemillis-in-i64.parquet b/sql/core/src/test/resources/test-data/timemillis-in-i64.parquet
new file mode 100644
index 0000000000..d3c39e2c26
--- /dev/null
+++ b/sql/core/src/test/resources/test-data/timemillis-in-i64.parquet
Binary files differ
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index dbdcd230a4..57a0af1dda 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -107,11 +107,13 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
| required binary g(ENUM);
| required binary h(DECIMAL(32,0));
| required fixed_len_byte_array(32) i(DECIMAL(32,0));
+ | required int64 j(TIMESTAMP_MILLIS);
|}
""".stripMargin)
val expectedSparkTypes = Seq(ByteType, ShortType, DateType, DecimalType(1, 0),
- DecimalType(10, 0), StringType, StringType, DecimalType(32, 0), DecimalType(32, 0))
+ DecimalType(10, 0), StringType, StringType, DecimalType(32, 0), DecimalType(32, 0),
+ TimestampType)
withTempPath { location =>
val path = new Path(location.getCanonicalPath)
@@ -607,6 +609,18 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
}
}
+ test("read dictionary and plain encoded timestamp_millis written as INT64") {
+ ("true" :: "false" :: Nil).foreach { vectorized =>
+ withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) {
+ checkAnswer(
+ // timestamp column in this file is encoded using combination of plain
+ // and dictionary encodings.
+ readResourceParquetFile("test-data/timemillis-in-i64.parquet"),
+ (1 to 3).map(i => Row(new java.sql.Timestamp(10))))
+ }
+ }
+ }
+
test("SPARK-12589 copy() on rows returned from reader works for strings") {
withTempPath { dir =>
val data = (1, "abc") ::(2, "helloabcde") :: Nil
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index 200e356c72..c36609586c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.datasources.parquet
import java.io.File
+import java.sql.Timestamp
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.parquet.hadoop.ParquetOutputFormat
@@ -162,6 +163,78 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
}
}
+ test("SPARK-10634 timestamp written and read as INT64 - TIMESTAMP_MILLIS") {
+ val data = (1 to 10).map(i => Row(i, new java.sql.Timestamp(i)))
+ val schema = StructType(List(StructField("d", IntegerType, false),
+ StructField("time", TimestampType, false)).toArray)
+ withSQLConf(SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.key -> "true") {
+ withTempPath { file =>
+ val df = spark.createDataFrame(sparkContext.parallelize(data), schema)
+ df.write.parquet(file.getCanonicalPath)
+ ("true" :: "false" :: Nil).foreach { vectorized =>
+ withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) {
+ val df2 = spark.read.parquet(file.getCanonicalPath)
+ checkAnswer(df2, df.collect().toSeq)
+ }
+ }
+ }
+ }
+ }
+
+ test("SPARK-10634 timestamp written and read as INT64 - truncation") {
+ withTable("ts") {
+ sql("create table ts (c1 int, c2 timestamp) using parquet")
+ sql("insert into ts values (1, '2016-01-01 10:11:12.123456')")
+ sql("insert into ts values (2, null)")
+ sql("insert into ts values (3, '1965-01-01 10:11:12.123456')")
+ checkAnswer(
+ sql("select * from ts"),
+ Seq(
+ Row(1, Timestamp.valueOf("2016-01-01 10:11:12.123456")),
+ Row(2, null),
+ Row(3, Timestamp.valueOf("1965-01-01 10:11:12.123456"))))
+ }
+
+ // The microsecond portion is truncated when written as TIMESTAMP_MILLIS.
+ withTable("ts") {
+ withSQLConf(SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.key -> "true") {
+ sql("create table ts (c1 int, c2 timestamp) using parquet")
+ sql("insert into ts values (1, '2016-01-01 10:11:12.123456')")
+ sql("insert into ts values (2, null)")
+ sql("insert into ts values (3, '1965-01-01 10:11:12.125456')")
+ sql("insert into ts values (4, '1965-01-01 10:11:12.125')")
+ sql("insert into ts values (5, '1965-01-01 10:11:12.1')")
+ sql("insert into ts values (6, '1965-01-01 10:11:12.123456789')")
+ sql("insert into ts values (7, '0001-01-01 00:00:00.000000')")
+ checkAnswer(
+ sql("select * from ts"),
+ Seq(
+ Row(1, Timestamp.valueOf("2016-01-01 10:11:12.123")),
+ Row(2, null),
+ Row(3, Timestamp.valueOf("1965-01-01 10:11:12.125")),
+ Row(4, Timestamp.valueOf("1965-01-01 10:11:12.125")),
+ Row(5, Timestamp.valueOf("1965-01-01 10:11:12.1")),
+ Row(6, Timestamp.valueOf("1965-01-01 10:11:12.123")),
+ Row(7, Timestamp.valueOf("0001-01-01 00:00:00.000"))))
+
+ // Read timestamps that were encoded as TIMESTAMP_MILLIS annotated as INT64
+ // with PARQUET_INT64_AS_TIMESTAMP_MILLIS set to false.
+ withSQLConf(SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.key -> "false") {
+ checkAnswer(
+ sql("select * from ts"),
+ Seq(
+ Row(1, Timestamp.valueOf("2016-01-01 10:11:12.123")),
+ Row(2, null),
+ Row(3, Timestamp.valueOf("1965-01-01 10:11:12.125")),
+ Row(4, Timestamp.valueOf("1965-01-01 10:11:12.125")),
+ Row(5, Timestamp.valueOf("1965-01-01 10:11:12.1")),
+ Row(6, Timestamp.valueOf("1965-01-01 10:11:12.123")),
+ Row(7, Timestamp.valueOf("0001-01-01 00:00:00.000"))))
+ }
+ }
+ }
+ }
+
test("Enabling/disabling merging partfiles when merging parquet schema") {
def testSchemaMerging(expectedColumnNumber: Int): Unit = {
withTempDir { dir =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index 6aa940afbb..ce992674d7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -53,11 +53,13 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSQLContext {
parquetSchema: String,
binaryAsString: Boolean,
int96AsTimestamp: Boolean,
- writeLegacyParquetFormat: Boolean): Unit = {
+ writeLegacyParquetFormat: Boolean,
+ int64AsTimestampMillis: Boolean = false): Unit = {
val converter = new ParquetSchemaConverter(
assumeBinaryIsString = binaryAsString,
assumeInt96IsTimestamp = int96AsTimestamp,
- writeLegacyParquetFormat = writeLegacyParquetFormat)
+ writeLegacyParquetFormat = writeLegacyParquetFormat,
+ writeTimestampInMillis = int64AsTimestampMillis)
test(s"sql <= parquet: $testName") {
val actual = converter.convert(MessageTypeParser.parseMessageType(parquetSchema))
@@ -77,11 +79,13 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSQLContext {
parquetSchema: String,
binaryAsString: Boolean,
int96AsTimestamp: Boolean,
- writeLegacyParquetFormat: Boolean): Unit = {
+ writeLegacyParquetFormat: Boolean,
+ int64AsTimestampMillis: Boolean = false): Unit = {
val converter = new ParquetSchemaConverter(
assumeBinaryIsString = binaryAsString,
assumeInt96IsTimestamp = int96AsTimestamp,
- writeLegacyParquetFormat = writeLegacyParquetFormat)
+ writeLegacyParquetFormat = writeLegacyParquetFormat,
+ writeTimestampInMillis = int64AsTimestampMillis)
test(s"sql => parquet: $testName") {
val actual = converter.convert(sqlSchema)
@@ -97,7 +101,8 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSQLContext {
parquetSchema: String,
binaryAsString: Boolean,
int96AsTimestamp: Boolean,
- writeLegacyParquetFormat: Boolean): Unit = {
+ writeLegacyParquetFormat: Boolean,
+ int64AsTimestampMillis: Boolean = false): Unit = {
testCatalystToParquet(
testName,
@@ -105,7 +110,8 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSQLContext {
parquetSchema,
binaryAsString,
int96AsTimestamp,
- writeLegacyParquetFormat)
+ writeLegacyParquetFormat,
+ int64AsTimestampMillis)
testParquetToCatalyst(
testName,
@@ -113,7 +119,8 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSQLContext {
parquetSchema,
binaryAsString,
int96AsTimestamp,
- writeLegacyParquetFormat)
+ writeLegacyParquetFormat,
+ int64AsTimestampMillis)
}
}
@@ -965,6 +972,18 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
int96AsTimestamp = true,
writeLegacyParquetFormat = true)
+ testSchema(
+ "Timestamp written and read as INT64 with TIMESTAMP_MILLIS",
+ StructType(Seq(StructField("f1", TimestampType))),
+ """message root {
+ | optional INT64 f1 (TIMESTAMP_MILLIS);
+ |}
+ """.stripMargin,
+ binaryAsString = true,
+ int96AsTimestamp = false,
+ writeLegacyParquetFormat = true,
+ int64AsTimestampMillis = true)
+
private def testSchemaClipping(
testName: String,
parquetSchema: String,