aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java1
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java27
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala14
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala25
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala15
-rw-r--r--sql/core/src/test/resources/test-data/timemillis-in-i64.parquetbin0 -> 517 bytes
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala16
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala73
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala33
10 files changed, 193 insertions, 20 deletions
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
index bf87174835..eb97118872 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
@@ -197,6 +197,7 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
config.set("spark.sql.parquet.binaryAsString", "false");
config.set("spark.sql.parquet.int96AsTimestamp", "false");
config.set("spark.sql.parquet.writeLegacyFormat", "false");
+ config.set("spark.sql.parquet.int64AsTimestampMillis", "false");
this.file = new Path(path);
long length = this.file.getFileSystem(config).getFileStatus(this.file).getLen();
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
index cb51cb499e..9d641b5287 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
@@ -28,6 +28,7 @@ import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.PrimitiveType;
+import org.apache.spark.sql.catalyst.util.DateTimeUtils;
import org.apache.spark.sql.execution.vectorized.ColumnVector;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.DecimalType;
@@ -155,9 +156,13 @@ public class VectorizedColumnReader {
// Read and decode dictionary ids.
defColumn.readIntegers(
num, dictionaryIds, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
+
+ // Timestamp values encoded as INT64 can't be lazily decoded as we need to post process
+ // the values to add microseconds precision.
if (column.hasDictionary() || (rowId == 0 &&
(descriptor.getType() == PrimitiveType.PrimitiveTypeName.INT32 ||
- descriptor.getType() == PrimitiveType.PrimitiveTypeName.INT64 ||
+ (descriptor.getType() == PrimitiveType.PrimitiveTypeName.INT64 &&
+ column.dataType() != DataTypes.TimestampType) ||
descriptor.getType() == PrimitiveType.PrimitiveTypeName.FLOAT ||
descriptor.getType() == PrimitiveType.PrimitiveTypeName.DOUBLE ||
descriptor.getType() == PrimitiveType.PrimitiveTypeName.BINARY))) {
@@ -250,7 +255,15 @@ public class VectorizedColumnReader {
column.putLong(i, dictionary.decodeToLong(dictionaryIds.getDictId(i)));
}
}
- } else {
+ } else if (column.dataType() == DataTypes.TimestampType) {
+ for (int i = rowId; i < rowId + num; ++i) {
+ if (!column.isNullAt(i)) {
+ column.putLong(i,
+ DateTimeUtils.fromMillis(dictionary.decodeToLong(dictionaryIds.getDictId(i))));
+ }
+ }
+ }
+ else {
throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
}
break;
@@ -362,7 +375,15 @@ public class VectorizedColumnReader {
if (column.dataType() == DataTypes.LongType ||
DecimalType.is64BitDecimalType(column.dataType())) {
defColumn.readLongs(
- num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
+ num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
+ } else if (column.dataType() == DataTypes.TimestampType) {
+ for (int i = 0; i < num; i++) {
+ if (defColumn.readInteger() == maxDefLevel) {
+ column.putLong(rowId + i, DateTimeUtils.fromMillis(dataColumn.readLong()));
+ } else {
+ column.putNull(rowId + i);
+ }
+ }
} else {
throw new UnsupportedOperationException("Unsupported conversion to: " + column.dataType());
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 062aa5c8ea..2f3a2c62b9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -125,6 +125,10 @@ class ParquetFileFormat
SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key,
sparkSession.sessionState.conf.writeLegacyParquetFormat.toString)
+ conf.set(
+ SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.key,
+ sparkSession.sessionState.conf.isParquetINT64AsTimestampMillis.toString)
+
// Sets compression scheme
conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodecClassName)
@@ -300,6 +304,9 @@ class ParquetFileFormat
hadoopConf.setBoolean(
SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
+ hadoopConf.setBoolean(
+ SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.key,
+ sparkSession.sessionState.conf.isParquetINT64AsTimestampMillis)
// Try to push down filters when filter push-down is enabled.
val pushed =
@@ -410,7 +417,8 @@ object ParquetFileFormat extends Logging {
val converter = new ParquetSchemaConverter(
sparkSession.sessionState.conf.isParquetBinaryAsString,
sparkSession.sessionState.conf.isParquetBinaryAsString,
- sparkSession.sessionState.conf.writeLegacyParquetFormat)
+ sparkSession.sessionState.conf.writeLegacyParquetFormat,
+ sparkSession.sessionState.conf.isParquetINT64AsTimestampMillis)
converter.convert(schema)
}
@@ -510,6 +518,7 @@ object ParquetFileFormat extends Logging {
sparkSession: SparkSession): Option[StructType] = {
val assumeBinaryIsString = sparkSession.sessionState.conf.isParquetBinaryAsString
val assumeInt96IsTimestamp = sparkSession.sessionState.conf.isParquetINT96AsTimestamp
+ val writeTimestampInMillis = sparkSession.sessionState.conf.isParquetINT64AsTimestampMillis
val writeLegacyParquetFormat = sparkSession.sessionState.conf.writeLegacyParquetFormat
val serializedConf = new SerializableConfiguration(sparkSession.sessionState.newHadoopConf())
@@ -554,7 +563,8 @@ object ParquetFileFormat extends Logging {
new ParquetSchemaConverter(
assumeBinaryIsString = assumeBinaryIsString,
assumeInt96IsTimestamp = assumeInt96IsTimestamp,
- writeLegacyParquetFormat = writeLegacyParquetFormat)
+ writeLegacyParquetFormat = writeLegacyParquetFormat,
+ writeTimestampInMillis = writeTimestampInMillis)
if (footers.isEmpty) {
Iterator.empty
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
index 33dcf2f3fd..32e6c60cd9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
@@ -25,7 +25,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.parquet.column.Dictionary
import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter}
-import org.apache.parquet.schema.{GroupType, MessageType, Type}
+import org.apache.parquet.schema.{GroupType, MessageType, OriginalType, Type}
import org.apache.parquet.schema.OriginalType.{INT_32, LIST, UTF8}
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.{BINARY, DOUBLE, FIXED_LEN_BYTE_ARRAY, INT32, INT64}
@@ -252,6 +252,13 @@ private[parquet] class ParquetRowConverter(
case StringType =>
new ParquetStringConverter(updater)
+ case TimestampType if parquetType.getOriginalType == OriginalType.TIMESTAMP_MILLIS =>
+ new ParquetPrimitiveConverter(updater) {
+ override def addLong(value: Long): Unit = {
+ updater.setLong(DateTimeUtils.fromMillis(value))
+ }
+ }
+
case TimestampType =>
// TODO Implements `TIMESTAMP_MICROS` once parquet-mr has that.
new ParquetPrimitiveConverter(updater) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
index 66d4027edf..0b805e4362 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
@@ -51,22 +51,29 @@ import org.apache.spark.sql.types._
* and prior versions when converting a Catalyst [[StructType]] to a Parquet [[MessageType]].
* When set to false, use standard format defined in parquet-format spec. This argument only
* affects Parquet write path.
+ * @param writeTimestampInMillis Whether to write timestamp values as INT64 annotated by logical
+ * type TIMESTAMP_MILLIS.
+ *
*/
private[parquet] class ParquetSchemaConverter(
assumeBinaryIsString: Boolean = SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get,
assumeInt96IsTimestamp: Boolean = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get,
- writeLegacyParquetFormat: Boolean = SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get) {
+ writeLegacyParquetFormat: Boolean = SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get,
+ writeTimestampInMillis: Boolean = SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.defaultValue.get) {
def this(conf: SQLConf) = this(
assumeBinaryIsString = conf.isParquetBinaryAsString,
assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp,
- writeLegacyParquetFormat = conf.writeLegacyParquetFormat)
+ writeLegacyParquetFormat = conf.writeLegacyParquetFormat,
+ writeTimestampInMillis = conf.isParquetINT64AsTimestampMillis)
def this(conf: Configuration) = this(
assumeBinaryIsString = conf.get(SQLConf.PARQUET_BINARY_AS_STRING.key).toBoolean,
assumeInt96IsTimestamp = conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean,
writeLegacyParquetFormat = conf.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key,
- SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get.toString).toBoolean)
+ SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get.toString).toBoolean,
+ writeTimestampInMillis = conf.get(SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.key).toBoolean)
+
/**
* Converts Parquet [[MessageType]] `parquetSchema` to a Spark SQL [[StructType]].
@@ -158,7 +165,7 @@ private[parquet] class ParquetSchemaConverter(
case INT_64 | null => LongType
case DECIMAL => makeDecimalType(Decimal.MAX_LONG_DIGITS)
case UINT_64 => typeNotSupported()
- case TIMESTAMP_MILLIS => typeNotImplemented()
+ case TIMESTAMP_MILLIS => TimestampType
case _ => illegalType()
}
@@ -370,10 +377,16 @@ private[parquet] class ParquetSchemaConverter(
// we may resort to microsecond precision in the future.
//
// For Parquet, we plan to write all `TimestampType` value as `TIMESTAMP_MICROS`, but it's
- // currently not implemented yet because parquet-mr 1.7.0 (the version we're currently using)
- // hasn't implemented `TIMESTAMP_MICROS` yet.
+ // currently not implemented yet because parquet-mr 1.8.1 (the version we're currently using)
+ // hasn't implemented `TIMESTAMP_MICROS` yet, however it supports TIMESTAMP_MILLIS. We will
+ // encode timestamp values as TIMESTAMP_MILLIS annotating INT64 if
+ // 'spark.sql.parquet.int64AsTimestampMillis' is set.
//
// TODO Converts `TIMESTAMP_MICROS` once parquet-mr implements that.
+
+ case TimestampType if writeTimestampInMillis =>
+ Types.primitive(INT64, repetition).as(TIMESTAMP_MILLIS).named(field.name)
+
case TimestampType =>
Types.primitive(INT96, repetition).named(field.name)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
index a31d2b9c37..38b0e33937 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
@@ -66,6 +66,9 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit
// Whether to write data in legacy Parquet format compatible with Spark 1.4 and prior versions
private var writeLegacyParquetFormat: Boolean = _
+ // Whether to write timestamp value with milliseconds precision.
+ private var writeTimestampInMillis: Boolean = _
+
// Reusable byte array used to write timestamps as Parquet INT96 values
private val timestampBuffer = new Array[Byte](12)
@@ -80,6 +83,13 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit
assert(configuration.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key) != null)
configuration.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key).toBoolean
}
+
+ this.writeTimestampInMillis = {
+ assert(configuration.get(SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.key) != null)
+ configuration.get(SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.key).toBoolean
+ }
+
+
this.rootFieldWriters = schema.map(_.dataType).map(makeWriter)
val messageType = new ParquetSchemaConverter(configuration).convert(schema)
@@ -153,6 +163,11 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit
recordConsumer.addBinary(
Binary.fromReusedByteArray(row.getUTF8String(ordinal).getBytes))
+ case TimestampType if writeTimestampInMillis =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ val millis = DateTimeUtils.toMillis(row.getLong(ordinal))
+ recordConsumer.addLong(millis)
+
case TimestampType =>
(row: SpecializedGetters, ordinal: Int) => {
// TODO Writes `TimestampType` values as `TIMESTAMP_MICROS` once parquet-mr implements it
diff --git a/sql/core/src/test/resources/test-data/timemillis-in-i64.parquet b/sql/core/src/test/resources/test-data/timemillis-in-i64.parquet
new file mode 100644
index 0000000000..d3c39e2c26
--- /dev/null
+++ b/sql/core/src/test/resources/test-data/timemillis-in-i64.parquet
Binary files differ
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index dbdcd230a4..57a0af1dda 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -107,11 +107,13 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
| required binary g(ENUM);
| required binary h(DECIMAL(32,0));
| required fixed_len_byte_array(32) i(DECIMAL(32,0));
+ | required int64 j(TIMESTAMP_MILLIS);
|}
""".stripMargin)
val expectedSparkTypes = Seq(ByteType, ShortType, DateType, DecimalType(1, 0),
- DecimalType(10, 0), StringType, StringType, DecimalType(32, 0), DecimalType(32, 0))
+ DecimalType(10, 0), StringType, StringType, DecimalType(32, 0), DecimalType(32, 0),
+ TimestampType)
withTempPath { location =>
val path = new Path(location.getCanonicalPath)
@@ -607,6 +609,18 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
}
}
+ test("read dictionary and plain encoded timestamp_millis written as INT64") {
+ ("true" :: "false" :: Nil).foreach { vectorized =>
+ withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) {
+ checkAnswer(
+ // timestamp column in this file is encoded using combination of plain
+ // and dictionary encodings.
+ readResourceParquetFile("test-data/timemillis-in-i64.parquet"),
+ (1 to 3).map(i => Row(new java.sql.Timestamp(10))))
+ }
+ }
+ }
+
test("SPARK-12589 copy() on rows returned from reader works for strings") {
withTempPath { dir =>
val data = (1, "abc") ::(2, "helloabcde") :: Nil
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index 200e356c72..c36609586c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.datasources.parquet
import java.io.File
+import java.sql.Timestamp
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.parquet.hadoop.ParquetOutputFormat
@@ -162,6 +163,78 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
}
}
+ test("SPARK-10634 timestamp written and read as INT64 - TIMESTAMP_MILLIS") {
+ val data = (1 to 10).map(i => Row(i, new java.sql.Timestamp(i)))
+ val schema = StructType(List(StructField("d", IntegerType, false),
+ StructField("time", TimestampType, false)).toArray)
+ withSQLConf(SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.key -> "true") {
+ withTempPath { file =>
+ val df = spark.createDataFrame(sparkContext.parallelize(data), schema)
+ df.write.parquet(file.getCanonicalPath)
+ ("true" :: "false" :: Nil).foreach { vectorized =>
+ withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) {
+ val df2 = spark.read.parquet(file.getCanonicalPath)
+ checkAnswer(df2, df.collect().toSeq)
+ }
+ }
+ }
+ }
+ }
+
+ test("SPARK-10634 timestamp written and read as INT64 - truncation") {
+ withTable("ts") {
+ sql("create table ts (c1 int, c2 timestamp) using parquet")
+ sql("insert into ts values (1, '2016-01-01 10:11:12.123456')")
+ sql("insert into ts values (2, null)")
+ sql("insert into ts values (3, '1965-01-01 10:11:12.123456')")
+ checkAnswer(
+ sql("select * from ts"),
+ Seq(
+ Row(1, Timestamp.valueOf("2016-01-01 10:11:12.123456")),
+ Row(2, null),
+ Row(3, Timestamp.valueOf("1965-01-01 10:11:12.123456"))))
+ }
+
+ // The microsecond portion is truncated when written as TIMESTAMP_MILLIS.
+ withTable("ts") {
+ withSQLConf(SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.key -> "true") {
+ sql("create table ts (c1 int, c2 timestamp) using parquet")
+ sql("insert into ts values (1, '2016-01-01 10:11:12.123456')")
+ sql("insert into ts values (2, null)")
+ sql("insert into ts values (3, '1965-01-01 10:11:12.125456')")
+ sql("insert into ts values (4, '1965-01-01 10:11:12.125')")
+ sql("insert into ts values (5, '1965-01-01 10:11:12.1')")
+ sql("insert into ts values (6, '1965-01-01 10:11:12.123456789')")
+ sql("insert into ts values (7, '0001-01-01 00:00:00.000000')")
+ checkAnswer(
+ sql("select * from ts"),
+ Seq(
+ Row(1, Timestamp.valueOf("2016-01-01 10:11:12.123")),
+ Row(2, null),
+ Row(3, Timestamp.valueOf("1965-01-01 10:11:12.125")),
+ Row(4, Timestamp.valueOf("1965-01-01 10:11:12.125")),
+ Row(5, Timestamp.valueOf("1965-01-01 10:11:12.1")),
+ Row(6, Timestamp.valueOf("1965-01-01 10:11:12.123")),
+ Row(7, Timestamp.valueOf("0001-01-01 00:00:00.000"))))
+
+ // Read timestamps that were encoded as TIMESTAMP_MILLIS annotated as INT64
+ // with PARQUET_INT64_AS_TIMESTAMP_MILLIS set to false.
+ withSQLConf(SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.key -> "false") {
+ checkAnswer(
+ sql("select * from ts"),
+ Seq(
+ Row(1, Timestamp.valueOf("2016-01-01 10:11:12.123")),
+ Row(2, null),
+ Row(3, Timestamp.valueOf("1965-01-01 10:11:12.125")),
+ Row(4, Timestamp.valueOf("1965-01-01 10:11:12.125")),
+ Row(5, Timestamp.valueOf("1965-01-01 10:11:12.1")),
+ Row(6, Timestamp.valueOf("1965-01-01 10:11:12.123")),
+ Row(7, Timestamp.valueOf("0001-01-01 00:00:00.000"))))
+ }
+ }
+ }
+ }
+
test("Enabling/disabling merging partfiles when merging parquet schema") {
def testSchemaMerging(expectedColumnNumber: Int): Unit = {
withTempDir { dir =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index 6aa940afbb..ce992674d7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -53,11 +53,13 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSQLContext {
parquetSchema: String,
binaryAsString: Boolean,
int96AsTimestamp: Boolean,
- writeLegacyParquetFormat: Boolean): Unit = {
+ writeLegacyParquetFormat: Boolean,
+ int64AsTimestampMillis: Boolean = false): Unit = {
val converter = new ParquetSchemaConverter(
assumeBinaryIsString = binaryAsString,
assumeInt96IsTimestamp = int96AsTimestamp,
- writeLegacyParquetFormat = writeLegacyParquetFormat)
+ writeLegacyParquetFormat = writeLegacyParquetFormat,
+ writeTimestampInMillis = int64AsTimestampMillis)
test(s"sql <= parquet: $testName") {
val actual = converter.convert(MessageTypeParser.parseMessageType(parquetSchema))
@@ -77,11 +79,13 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSQLContext {
parquetSchema: String,
binaryAsString: Boolean,
int96AsTimestamp: Boolean,
- writeLegacyParquetFormat: Boolean): Unit = {
+ writeLegacyParquetFormat: Boolean,
+ int64AsTimestampMillis: Boolean = false): Unit = {
val converter = new ParquetSchemaConverter(
assumeBinaryIsString = binaryAsString,
assumeInt96IsTimestamp = int96AsTimestamp,
- writeLegacyParquetFormat = writeLegacyParquetFormat)
+ writeLegacyParquetFormat = writeLegacyParquetFormat,
+ writeTimestampInMillis = int64AsTimestampMillis)
test(s"sql => parquet: $testName") {
val actual = converter.convert(sqlSchema)
@@ -97,7 +101,8 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSQLContext {
parquetSchema: String,
binaryAsString: Boolean,
int96AsTimestamp: Boolean,
- writeLegacyParquetFormat: Boolean): Unit = {
+ writeLegacyParquetFormat: Boolean,
+ int64AsTimestampMillis: Boolean = false): Unit = {
testCatalystToParquet(
testName,
@@ -105,7 +110,8 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSQLContext {
parquetSchema,
binaryAsString,
int96AsTimestamp,
- writeLegacyParquetFormat)
+ writeLegacyParquetFormat,
+ int64AsTimestampMillis)
testParquetToCatalyst(
testName,
@@ -113,7 +119,8 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSQLContext {
parquetSchema,
binaryAsString,
int96AsTimestamp,
- writeLegacyParquetFormat)
+ writeLegacyParquetFormat,
+ int64AsTimestampMillis)
}
}
@@ -965,6 +972,18 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
int96AsTimestamp = true,
writeLegacyParquetFormat = true)
+ testSchema(
+ "Timestamp written and read as INT64 with TIMESTAMP_MILLIS",
+ StructType(Seq(StructField("f1", TimestampType))),
+ """message root {
+ | optional INT64 f1 (TIMESTAMP_MILLIS);
+ |}
+ """.stripMargin,
+ binaryAsString = true,
+ int96AsTimestamp = false,
+ writeLegacyParquetFormat = true,
+ int64AsTimestampMillis = true)
+
private def testSchemaClipping(
testName: String,
parquetSchema: String,