aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main
diff options
context:
space:
mode:
authorDilip Biswal <dbiswal@us.ibm.com>2017-04-04 09:53:05 +0900
committerTakuya UESHIN <ueshin@databricks.com>2017-04-04 09:53:05 +0900
commit3bfb639cb7352aec572ef6686d3471bd78748ffa (patch)
tree9d2054aecd8b3356a4ec7a38543def25a3ee380e /sql/core/src/main
parente7877fd4728ed41e440d7c4d8b6b02bd0d9e873e (diff)
downloadspark-3bfb639cb7352aec572ef6686d3471bd78748ffa.tar.gz
spark-3bfb639cb7352aec572ef6686d3471bd78748ffa.tar.bz2
spark-3bfb639cb7352aec572ef6686d3471bd78748ffa.zip
[SPARK-10364][SQL] Support Parquet logical type TIMESTAMP_MILLIS
## What changes were proposed in this pull request? **Description** from JIRA The TimestampType in Spark SQL is of microsecond precision. Ideally, we should convert Spark SQL timestamp values into Parquet TIMESTAMP_MICROS. But unfortunately parquet-mr hasn't supported it yet. For the read path, we should be able to read TIMESTAMP_MILLIS Parquet values and pad a 0 microsecond part to read values. For the write path, currently we are writing timestamps as INT96, similar to Impala and Hive. One alternative is that, we can have a separate SQL option to let users be able to write Spark SQL timestamp values as TIMESTAMP_MILLIS. Of course, in this way the microsecond part will be truncated. ## How was this patch tested? Added new tests in ParquetQuerySuite and ParquetIOSuite Author: Dilip Biswal <dbiswal@us.ibm.com> Closes #15332 from dilipbiswal/parquet-time-millis.
Diffstat (limited to 'sql/core/src/main')
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java1
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java27
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala14
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala25
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala15
6 files changed, 79 insertions, 12 deletions
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
index bf87174835..eb97118872 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
@@ -197,6 +197,7 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
config.set("spark.sql.parquet.binaryAsString", "false");
config.set("spark.sql.parquet.int96AsTimestamp", "false");
config.set("spark.sql.parquet.writeLegacyFormat", "false");
+ config.set("spark.sql.parquet.int64AsTimestampMillis", "false");
this.file = new Path(path);
long length = this.file.getFileSystem(config).getFileStatus(this.file).getLen();
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
index cb51cb499e..9d641b5287 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
@@ -28,6 +28,7 @@ import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.PrimitiveType;
+import org.apache.spark.sql.catalyst.util.DateTimeUtils;
import org.apache.spark.sql.execution.vectorized.ColumnVector;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.DecimalType;
@@ -155,9 +156,13 @@ public class VectorizedColumnReader {
// Read and decode dictionary ids.
defColumn.readIntegers(
num, dictionaryIds, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
+
+ // Timestamp values encoded as INT64 can't be lazily decoded as we need to post process
+ // the values to add microseconds precision.
if (column.hasDictionary() || (rowId == 0 &&
(descriptor.getType() == PrimitiveType.PrimitiveTypeName.INT32 ||
- descriptor.getType() == PrimitiveType.PrimitiveTypeName.INT64 ||
+ (descriptor.getType() == PrimitiveType.PrimitiveTypeName.INT64 &&
+ column.dataType() != DataTypes.TimestampType) ||
descriptor.getType() == PrimitiveType.PrimitiveTypeName.FLOAT ||
descriptor.getType() == PrimitiveType.PrimitiveTypeName.DOUBLE ||
descriptor.getType() == PrimitiveType.PrimitiveTypeName.BINARY))) {
@@ -250,7 +255,15 @@ public class VectorizedColumnReader {
column.putLong(i, dictionary.decodeToLong(dictionaryIds.getDictId(i)));
}
}
- } else {
+ } else if (column.dataType() == DataTypes.TimestampType) {
+ for (int i = rowId; i < rowId + num; ++i) {
+ if (!column.isNullAt(i)) {
+ column.putLong(i,
+ DateTimeUtils.fromMillis(dictionary.decodeToLong(dictionaryIds.getDictId(i))));
+ }
+ }
+ }
+ else {
throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
}
break;
@@ -362,7 +375,15 @@ public class VectorizedColumnReader {
if (column.dataType() == DataTypes.LongType ||
DecimalType.is64BitDecimalType(column.dataType())) {
defColumn.readLongs(
- num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
+ num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
+ } else if (column.dataType() == DataTypes.TimestampType) {
+ for (int i = 0; i < num; i++) {
+ if (defColumn.readInteger() == maxDefLevel) {
+ column.putLong(rowId + i, DateTimeUtils.fromMillis(dataColumn.readLong()));
+ } else {
+ column.putNull(rowId + i);
+ }
+ }
} else {
throw new UnsupportedOperationException("Unsupported conversion to: " + column.dataType());
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 062aa5c8ea..2f3a2c62b9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -125,6 +125,10 @@ class ParquetFileFormat
SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key,
sparkSession.sessionState.conf.writeLegacyParquetFormat.toString)
+ conf.set(
+ SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.key,
+ sparkSession.sessionState.conf.isParquetINT64AsTimestampMillis.toString)
+
// Sets compression scheme
conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodecClassName)
@@ -300,6 +304,9 @@ class ParquetFileFormat
hadoopConf.setBoolean(
SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
+ hadoopConf.setBoolean(
+ SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.key,
+ sparkSession.sessionState.conf.isParquetINT64AsTimestampMillis)
// Try to push down filters when filter push-down is enabled.
val pushed =
@@ -410,7 +417,8 @@ object ParquetFileFormat extends Logging {
val converter = new ParquetSchemaConverter(
sparkSession.sessionState.conf.isParquetBinaryAsString,
sparkSession.sessionState.conf.isParquetBinaryAsString,
- sparkSession.sessionState.conf.writeLegacyParquetFormat)
+ sparkSession.sessionState.conf.writeLegacyParquetFormat,
+ sparkSession.sessionState.conf.isParquetINT64AsTimestampMillis)
converter.convert(schema)
}
@@ -510,6 +518,7 @@ object ParquetFileFormat extends Logging {
sparkSession: SparkSession): Option[StructType] = {
val assumeBinaryIsString = sparkSession.sessionState.conf.isParquetBinaryAsString
val assumeInt96IsTimestamp = sparkSession.sessionState.conf.isParquetINT96AsTimestamp
+ val writeTimestampInMillis = sparkSession.sessionState.conf.isParquetINT64AsTimestampMillis
val writeLegacyParquetFormat = sparkSession.sessionState.conf.writeLegacyParquetFormat
val serializedConf = new SerializableConfiguration(sparkSession.sessionState.newHadoopConf())
@@ -554,7 +563,8 @@ object ParquetFileFormat extends Logging {
new ParquetSchemaConverter(
assumeBinaryIsString = assumeBinaryIsString,
assumeInt96IsTimestamp = assumeInt96IsTimestamp,
- writeLegacyParquetFormat = writeLegacyParquetFormat)
+ writeLegacyParquetFormat = writeLegacyParquetFormat,
+ writeTimestampInMillis = writeTimestampInMillis)
if (footers.isEmpty) {
Iterator.empty
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
index 33dcf2f3fd..32e6c60cd9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
@@ -25,7 +25,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.parquet.column.Dictionary
import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter}
-import org.apache.parquet.schema.{GroupType, MessageType, Type}
+import org.apache.parquet.schema.{GroupType, MessageType, OriginalType, Type}
import org.apache.parquet.schema.OriginalType.{INT_32, LIST, UTF8}
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.{BINARY, DOUBLE, FIXED_LEN_BYTE_ARRAY, INT32, INT64}
@@ -252,6 +252,13 @@ private[parquet] class ParquetRowConverter(
case StringType =>
new ParquetStringConverter(updater)
+ case TimestampType if parquetType.getOriginalType == OriginalType.TIMESTAMP_MILLIS =>
+ new ParquetPrimitiveConverter(updater) {
+ override def addLong(value: Long): Unit = {
+ updater.setLong(DateTimeUtils.fromMillis(value))
+ }
+ }
+
case TimestampType =>
// TODO Implements `TIMESTAMP_MICROS` once parquet-mr has that.
new ParquetPrimitiveConverter(updater) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
index 66d4027edf..0b805e4362 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
@@ -51,22 +51,29 @@ import org.apache.spark.sql.types._
* and prior versions when converting a Catalyst [[StructType]] to a Parquet [[MessageType]].
* When set to false, use standard format defined in parquet-format spec. This argument only
* affects Parquet write path.
+ * @param writeTimestampInMillis Whether to write timestamp values as INT64 annotated by logical
+ * type TIMESTAMP_MILLIS.
+ *
*/
private[parquet] class ParquetSchemaConverter(
assumeBinaryIsString: Boolean = SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get,
assumeInt96IsTimestamp: Boolean = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get,
- writeLegacyParquetFormat: Boolean = SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get) {
+ writeLegacyParquetFormat: Boolean = SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get,
+ writeTimestampInMillis: Boolean = SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.defaultValue.get) {
def this(conf: SQLConf) = this(
assumeBinaryIsString = conf.isParquetBinaryAsString,
assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp,
- writeLegacyParquetFormat = conf.writeLegacyParquetFormat)
+ writeLegacyParquetFormat = conf.writeLegacyParquetFormat,
+ writeTimestampInMillis = conf.isParquetINT64AsTimestampMillis)
def this(conf: Configuration) = this(
assumeBinaryIsString = conf.get(SQLConf.PARQUET_BINARY_AS_STRING.key).toBoolean,
assumeInt96IsTimestamp = conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean,
writeLegacyParquetFormat = conf.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key,
- SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get.toString).toBoolean)
+ SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get.toString).toBoolean,
+ writeTimestampInMillis = conf.get(SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.key).toBoolean)
+
/**
* Converts Parquet [[MessageType]] `parquetSchema` to a Spark SQL [[StructType]].
@@ -158,7 +165,7 @@ private[parquet] class ParquetSchemaConverter(
case INT_64 | null => LongType
case DECIMAL => makeDecimalType(Decimal.MAX_LONG_DIGITS)
case UINT_64 => typeNotSupported()
- case TIMESTAMP_MILLIS => typeNotImplemented()
+ case TIMESTAMP_MILLIS => TimestampType
case _ => illegalType()
}
@@ -370,10 +377,16 @@ private[parquet] class ParquetSchemaConverter(
// we may resort to microsecond precision in the future.
//
// For Parquet, we plan to write all `TimestampType` value as `TIMESTAMP_MICROS`, but it's
- // currently not implemented yet because parquet-mr 1.7.0 (the version we're currently using)
- // hasn't implemented `TIMESTAMP_MICROS` yet.
+ // currently not implemented yet because parquet-mr 1.8.1 (the version we're currently using)
+ // hasn't implemented `TIMESTAMP_MICROS` yet, however it supports TIMESTAMP_MILLIS. We will
+ // encode timestamp values as TIMESTAMP_MILLIS annotating INT64 if
+ // 'spark.sql.parquet.int64AsTimestampMillis' is set.
//
// TODO Converts `TIMESTAMP_MICROS` once parquet-mr implements that.
+
+ case TimestampType if writeTimestampInMillis =>
+ Types.primitive(INT64, repetition).as(TIMESTAMP_MILLIS).named(field.name)
+
case TimestampType =>
Types.primitive(INT96, repetition).named(field.name)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
index a31d2b9c37..38b0e33937 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
@@ -66,6 +66,9 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit
// Whether to write data in legacy Parquet format compatible with Spark 1.4 and prior versions
private var writeLegacyParquetFormat: Boolean = _
+ // Whether to write timestamp value with milliseconds precision.
+ private var writeTimestampInMillis: Boolean = _
+
// Reusable byte array used to write timestamps as Parquet INT96 values
private val timestampBuffer = new Array[Byte](12)
@@ -80,6 +83,13 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit
assert(configuration.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key) != null)
configuration.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key).toBoolean
}
+
+ this.writeTimestampInMillis = {
+ assert(configuration.get(SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.key) != null)
+ configuration.get(SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.key).toBoolean
+ }
+
+
this.rootFieldWriters = schema.map(_.dataType).map(makeWriter)
val messageType = new ParquetSchemaConverter(configuration).convert(schema)
@@ -153,6 +163,11 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit
recordConsumer.addBinary(
Binary.fromReusedByteArray(row.getUTF8String(ordinal).getBytes))
+ case TimestampType if writeTimestampInMillis =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ val millis = DateTimeUtils.toMillis(row.getLong(ordinal))
+ recordConsumer.addLong(millis)
+
case TimestampType =>
(row: SpecializedGetters, ordinal: Int) => {
// TODO Writes `TimestampType` values as `TIMESTAMP_MICROS` once parquet-mr implements it