diff options
author | Dilip Biswal <dbiswal@us.ibm.com> | 2017-04-04 09:53:05 +0900 |
---|---|---|
committer | Takuya UESHIN <ueshin@databricks.com> | 2017-04-04 09:53:05 +0900 |
commit | 3bfb639cb7352aec572ef6686d3471bd78748ffa (patch) | |
tree | 9d2054aecd8b3356a4ec7a38543def25a3ee380e /sql/core/src/main | |
parent | e7877fd4728ed41e440d7c4d8b6b02bd0d9e873e (diff) | |
download | spark-3bfb639cb7352aec572ef6686d3471bd78748ffa.tar.gz spark-3bfb639cb7352aec572ef6686d3471bd78748ffa.tar.bz2 spark-3bfb639cb7352aec572ef6686d3471bd78748ffa.zip |
[SPARK-10364][SQL] Support Parquet logical type TIMESTAMP_MILLIS
## What changes were proposed in this pull request?
**Description** from JIRA
The TimestampType in Spark SQL is of microsecond precision. Ideally, we should convert Spark SQL timestamp values into Parquet TIMESTAMP_MICROS. But unfortunately parquet-mr hasn't supported it yet.
For the read path, we should be able to read TIMESTAMP_MILLIS Parquet values and pad a 0 microsecond part to read values.
For the write path, currently we are writing timestamps as INT96, similar to Impala and Hive. One alternative is that, we can have a separate SQL option to let users be able to write Spark SQL timestamp values as TIMESTAMP_MILLIS. Of course, in this way the microsecond part will be truncated.
## How was this patch tested?
Added new tests in ParquetQuerySuite and ParquetIOSuite
Author: Dilip Biswal <dbiswal@us.ibm.com>
Closes #15332 from dilipbiswal/parquet-time-millis.
Diffstat (limited to 'sql/core/src/main')
6 files changed, 79 insertions, 12 deletions
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java index bf87174835..eb97118872 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java @@ -197,6 +197,7 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo config.set("spark.sql.parquet.binaryAsString", "false"); config.set("spark.sql.parquet.int96AsTimestamp", "false"); config.set("spark.sql.parquet.writeLegacyFormat", "false"); + config.set("spark.sql.parquet.int64AsTimestampMillis", "false"); this.file = new Path(path); long length = this.file.getFileSystem(config).getFileStatus(this.file).getLen(); diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java index cb51cb499e..9d641b5287 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java @@ -28,6 +28,7 @@ import org.apache.parquet.column.values.ValuesReader; import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.PrimitiveType; +import org.apache.spark.sql.catalyst.util.DateTimeUtils; import org.apache.spark.sql.execution.vectorized.ColumnVector; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.DecimalType; @@ -155,9 +156,13 @@ public class VectorizedColumnReader { // Read and decode dictionary ids. defColumn.readIntegers( num, dictionaryIds, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); + + // Timestamp values encoded as INT64 can't be lazily decoded as we need to post process + // the values to add microseconds precision. if (column.hasDictionary() || (rowId == 0 && (descriptor.getType() == PrimitiveType.PrimitiveTypeName.INT32 || - descriptor.getType() == PrimitiveType.PrimitiveTypeName.INT64 || + (descriptor.getType() == PrimitiveType.PrimitiveTypeName.INT64 && + column.dataType() != DataTypes.TimestampType) || descriptor.getType() == PrimitiveType.PrimitiveTypeName.FLOAT || descriptor.getType() == PrimitiveType.PrimitiveTypeName.DOUBLE || descriptor.getType() == PrimitiveType.PrimitiveTypeName.BINARY))) { @@ -250,7 +255,15 @@ public class VectorizedColumnReader { column.putLong(i, dictionary.decodeToLong(dictionaryIds.getDictId(i))); } } - } else { + } else if (column.dataType() == DataTypes.TimestampType) { + for (int i = rowId; i < rowId + num; ++i) { + if (!column.isNullAt(i)) { + column.putLong(i, + DateTimeUtils.fromMillis(dictionary.decodeToLong(dictionaryIds.getDictId(i)))); + } + } + } + else { throw new UnsupportedOperationException("Unimplemented type: " + column.dataType()); } break; @@ -362,7 +375,15 @@ public class VectorizedColumnReader { if (column.dataType() == DataTypes.LongType || DecimalType.is64BitDecimalType(column.dataType())) { defColumn.readLongs( - num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); + num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); + } else if (column.dataType() == DataTypes.TimestampType) { + for (int i = 0; i < num; i++) { + if (defColumn.readInteger() == maxDefLevel) { + column.putLong(rowId + i, DateTimeUtils.fromMillis(dataColumn.readLong())); + } else { + column.putNull(rowId + i); + } + } } else { throw new UnsupportedOperationException("Unsupported conversion to: " + column.dataType()); } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 062aa5c8ea..2f3a2c62b9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -125,6 +125,10 @@ class ParquetFileFormat SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key, sparkSession.sessionState.conf.writeLegacyParquetFormat.toString) + conf.set( + SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.key, + sparkSession.sessionState.conf.isParquetINT64AsTimestampMillis.toString) + // Sets compression scheme conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodecClassName) @@ -300,6 +304,9 @@ class ParquetFileFormat hadoopConf.setBoolean( SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, sparkSession.sessionState.conf.isParquetINT96AsTimestamp) + hadoopConf.setBoolean( + SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.key, + sparkSession.sessionState.conf.isParquetINT64AsTimestampMillis) // Try to push down filters when filter push-down is enabled. val pushed = @@ -410,7 +417,8 @@ object ParquetFileFormat extends Logging { val converter = new ParquetSchemaConverter( sparkSession.sessionState.conf.isParquetBinaryAsString, sparkSession.sessionState.conf.isParquetBinaryAsString, - sparkSession.sessionState.conf.writeLegacyParquetFormat) + sparkSession.sessionState.conf.writeLegacyParquetFormat, + sparkSession.sessionState.conf.isParquetINT64AsTimestampMillis) converter.convert(schema) } @@ -510,6 +518,7 @@ object ParquetFileFormat extends Logging { sparkSession: SparkSession): Option[StructType] = { val assumeBinaryIsString = sparkSession.sessionState.conf.isParquetBinaryAsString val assumeInt96IsTimestamp = sparkSession.sessionState.conf.isParquetINT96AsTimestamp + val writeTimestampInMillis = sparkSession.sessionState.conf.isParquetINT64AsTimestampMillis val writeLegacyParquetFormat = sparkSession.sessionState.conf.writeLegacyParquetFormat val serializedConf = new SerializableConfiguration(sparkSession.sessionState.newHadoopConf()) @@ -554,7 +563,8 @@ object ParquetFileFormat extends Logging { new ParquetSchemaConverter( assumeBinaryIsString = assumeBinaryIsString, assumeInt96IsTimestamp = assumeInt96IsTimestamp, - writeLegacyParquetFormat = writeLegacyParquetFormat) + writeLegacyParquetFormat = writeLegacyParquetFormat, + writeTimestampInMillis = writeTimestampInMillis) if (footers.isEmpty) { Iterator.empty diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index 33dcf2f3fd..32e6c60cd9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -25,7 +25,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.parquet.column.Dictionary import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter} -import org.apache.parquet.schema.{GroupType, MessageType, Type} +import org.apache.parquet.schema.{GroupType, MessageType, OriginalType, Type} import org.apache.parquet.schema.OriginalType.{INT_32, LIST, UTF8} import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.{BINARY, DOUBLE, FIXED_LEN_BYTE_ARRAY, INT32, INT64} @@ -252,6 +252,13 @@ private[parquet] class ParquetRowConverter( case StringType => new ParquetStringConverter(updater) + case TimestampType if parquetType.getOriginalType == OriginalType.TIMESTAMP_MILLIS => + new ParquetPrimitiveConverter(updater) { + override def addLong(value: Long): Unit = { + updater.setLong(DateTimeUtils.fromMillis(value)) + } + } + case TimestampType => // TODO Implements `TIMESTAMP_MICROS` once parquet-mr has that. new ParquetPrimitiveConverter(updater) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala index 66d4027edf..0b805e4362 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala @@ -51,22 +51,29 @@ import org.apache.spark.sql.types._ * and prior versions when converting a Catalyst [[StructType]] to a Parquet [[MessageType]]. * When set to false, use standard format defined in parquet-format spec. This argument only * affects Parquet write path. + * @param writeTimestampInMillis Whether to write timestamp values as INT64 annotated by logical + * type TIMESTAMP_MILLIS. + * */ private[parquet] class ParquetSchemaConverter( assumeBinaryIsString: Boolean = SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get, assumeInt96IsTimestamp: Boolean = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get, - writeLegacyParquetFormat: Boolean = SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get) { + writeLegacyParquetFormat: Boolean = SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get, + writeTimestampInMillis: Boolean = SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.defaultValue.get) { def this(conf: SQLConf) = this( assumeBinaryIsString = conf.isParquetBinaryAsString, assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp, - writeLegacyParquetFormat = conf.writeLegacyParquetFormat) + writeLegacyParquetFormat = conf.writeLegacyParquetFormat, + writeTimestampInMillis = conf.isParquetINT64AsTimestampMillis) def this(conf: Configuration) = this( assumeBinaryIsString = conf.get(SQLConf.PARQUET_BINARY_AS_STRING.key).toBoolean, assumeInt96IsTimestamp = conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean, writeLegacyParquetFormat = conf.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key, - SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get.toString).toBoolean) + SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get.toString).toBoolean, + writeTimestampInMillis = conf.get(SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.key).toBoolean) + /** * Converts Parquet [[MessageType]] `parquetSchema` to a Spark SQL [[StructType]]. @@ -158,7 +165,7 @@ private[parquet] class ParquetSchemaConverter( case INT_64 | null => LongType case DECIMAL => makeDecimalType(Decimal.MAX_LONG_DIGITS) case UINT_64 => typeNotSupported() - case TIMESTAMP_MILLIS => typeNotImplemented() + case TIMESTAMP_MILLIS => TimestampType case _ => illegalType() } @@ -370,10 +377,16 @@ private[parquet] class ParquetSchemaConverter( // we may resort to microsecond precision in the future. // // For Parquet, we plan to write all `TimestampType` value as `TIMESTAMP_MICROS`, but it's - // currently not implemented yet because parquet-mr 1.7.0 (the version we're currently using) - // hasn't implemented `TIMESTAMP_MICROS` yet. + // currently not implemented yet because parquet-mr 1.8.1 (the version we're currently using) + // hasn't implemented `TIMESTAMP_MICROS` yet, however it supports TIMESTAMP_MILLIS. We will + // encode timestamp values as TIMESTAMP_MILLIS annotating INT64 if + // 'spark.sql.parquet.int64AsTimestampMillis' is set. // // TODO Converts `TIMESTAMP_MICROS` once parquet-mr implements that. + + case TimestampType if writeTimestampInMillis => + Types.primitive(INT64, repetition).as(TIMESTAMP_MILLIS).named(field.name) + case TimestampType => Types.primitive(INT96, repetition).named(field.name) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala index a31d2b9c37..38b0e33937 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala @@ -66,6 +66,9 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit // Whether to write data in legacy Parquet format compatible with Spark 1.4 and prior versions private var writeLegacyParquetFormat: Boolean = _ + // Whether to write timestamp value with milliseconds precision. + private var writeTimestampInMillis: Boolean = _ + // Reusable byte array used to write timestamps as Parquet INT96 values private val timestampBuffer = new Array[Byte](12) @@ -80,6 +83,13 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit assert(configuration.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key) != null) configuration.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key).toBoolean } + + this.writeTimestampInMillis = { + assert(configuration.get(SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.key) != null) + configuration.get(SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.key).toBoolean + } + + this.rootFieldWriters = schema.map(_.dataType).map(makeWriter) val messageType = new ParquetSchemaConverter(configuration).convert(schema) @@ -153,6 +163,11 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit recordConsumer.addBinary( Binary.fromReusedByteArray(row.getUTF8String(ordinal).getBytes)) + case TimestampType if writeTimestampInMillis => + (row: SpecializedGetters, ordinal: Int) => + val millis = DateTimeUtils.toMillis(row.getLong(ordinal)) + recordConsumer.addLong(millis) + case TimestampType => (row: SpecializedGetters, ordinal: Int) => { // TODO Writes `TimestampType` values as `TIMESTAMP_MICROS` once parquet-mr implements it |