aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaoyuan Wang <daoyuan.wang@intel.com>2015-02-03 12:06:06 -0800
committerMichael Armbrust <michael@databricks.com>2015-02-03 12:06:06 -0800
commit0c20ce69fb4bcb1cec5313a9d072826c5588cbbc (patch)
tree5aeb2c2c72403440a6dd3bef398fbce92d5a5d7f
parent4204a1271d5bff4dd64f46eed9ee80b30081f9dc (diff)
downloadspark-0c20ce69fb4bcb1cec5313a9d072826c5588cbbc.tar.gz
spark-0c20ce69fb4bcb1cec5313a9d072826c5588cbbc.tar.bz2
spark-0c20ce69fb4bcb1cec5313a9d072826c5588cbbc.zip
[SPARK-4987] [SQL] parquet timestamp type support
Author: Daoyuan Wang <daoyuan.wang@intel.com> Closes #3820 from adrian-wang/parquettimestamp and squashes the following commits: b1e2a0d [Daoyuan Wang] fix for nanos 4dadef1 [Daoyuan Wang] fix wrong read 93f438d [Daoyuan Wang] parquet timestamp support
-rw-r--r--docs/sql-programming-guide.md9
-rw-r--r--pom.xml1
-rw-r--r--sql/core/pom.xml5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala94
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala13
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala52
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/timestamp/NanoTime.scala69
11 files changed, 239 insertions, 26 deletions
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index be8c5c2c15..22664b419f 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -581,6 +581,15 @@ Configuration of Parquet can be done using the `setConf` method on SQLContext or
</td>
</tr>
<tr>
+ <td><code>spark.sql.parquet.int96AsTimestamp</code></td>
+ <td>true</td>
+ <td>
+ Some Parquet-producing systems, in particular Impala, store Timestamp into INT96. Spark would also
+ store Timestamp as INT96 because we need to avoid precision lost of the nanoseconds field. This
+ flag tells Spark SQL to interpret INT96 data as a timestamp to provide compatibility with these systems.
+ </td>
+</tr>
+<tr>
<td><code>spark.sql.parquet.cacheMetadata</code></td>
<td>true</td>
<td>
diff --git a/pom.xml b/pom.xml
index e25eced877..542efbaf06 100644
--- a/pom.xml
+++ b/pom.xml
@@ -149,6 +149,7 @@
<scala.binary.version>2.10</scala.binary.version>
<jline.version>${scala.version}</jline.version>
<jline.groupid>org.scala-lang</jline.groupid>
+ <jodd.version>3.6.3</jodd.version>
<codehaus.jackson.version>1.8.8</codehaus.jackson.version>
<snappy.version>1.1.1.6</snappy.version>
diff --git a/sql/core/pom.xml b/sql/core/pom.xml
index 1a0c77d282..03a5c9e7c2 100644
--- a/sql/core/pom.xml
+++ b/sql/core/pom.xml
@@ -67,6 +67,11 @@
<version>2.3.0</version>
</dependency>
<dependency>
+ <groupId>org.jodd</groupId>
+ <artifactId>jodd-core</artifactId>
+ <version>${jodd.version}</version>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 561a91d2d6..7fe17944a7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -33,6 +33,7 @@ private[spark] object SQLConf {
val DIALECT = "spark.sql.dialect"
val PARQUET_BINARY_AS_STRING = "spark.sql.parquet.binaryAsString"
+ val PARQUET_INT96_AS_TIMESTAMP = "spark.sql.parquet.int96AsTimestamp"
val PARQUET_CACHE_METADATA = "spark.sql.parquet.cacheMetadata"
val PARQUET_COMPRESSION = "spark.sql.parquet.compression.codec"
val PARQUET_FILTER_PUSHDOWN_ENABLED = "spark.sql.parquet.filterPushdown"
@@ -144,6 +145,12 @@ private[sql] class SQLConf extends Serializable {
getConf(PARQUET_BINARY_AS_STRING, "false").toBoolean
/**
+ * When set to true, we always treat INT96Values in Parquet files as timestamp.
+ */
+ private[spark] def isParquetINT96AsTimestamp: Boolean =
+ getConf(PARQUET_INT96_AS_TIMESTAMP, "true").toBoolean
+
+ /**
* When set to true, partition pruning for in-memory columnar tables is enabled.
*/
private[spark] def inMemoryPartitionPruning: Boolean =
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
index 10df8c3310..d87ddfeabd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
@@ -17,8 +17,12 @@
package org.apache.spark.sql.parquet
+import java.sql.Timestamp
+import java.util.{TimeZone, Calendar}
+
import scala.collection.mutable.{Buffer, ArrayBuffer, HashMap}
+import jodd.datetime.JDateTime
import parquet.column.Dictionary
import parquet.io.api.{PrimitiveConverter, GroupConverter, Binary, Converter}
import parquet.schema.MessageType
@@ -26,6 +30,7 @@ import parquet.schema.MessageType
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.parquet.CatalystConverter.FieldType
import org.apache.spark.sql.types._
+import org.apache.spark.sql.parquet.timestamp.NanoTime
/**
* Collection of converters of Parquet types (group and primitive types) that
@@ -123,6 +128,12 @@ private[sql] object CatalystConverter {
parent.updateDecimal(fieldIndex, value, d)
}
}
+ case TimestampType => {
+ new CatalystPrimitiveConverter(parent, fieldIndex) {
+ override def addBinary(value: Binary): Unit =
+ parent.updateTimestamp(fieldIndex, value)
+ }
+ }
// All other primitive types use the default converter
case ctype: PrimitiveType => { // note: need the type tag here!
new CatalystPrimitiveConverter(parent, fieldIndex)
@@ -197,9 +208,11 @@ private[parquet] abstract class CatalystConverter extends GroupConverter {
protected[parquet] def updateString(fieldIndex: Int, value: String): Unit =
updateField(fieldIndex, value)
- protected[parquet] def updateDecimal(fieldIndex: Int, value: Binary, ctype: DecimalType): Unit = {
+ protected[parquet] def updateTimestamp(fieldIndex: Int, value: Binary): Unit =
+ updateField(fieldIndex, readTimestamp(value))
+
+ protected[parquet] def updateDecimal(fieldIndex: Int, value: Binary, ctype: DecimalType): Unit =
updateField(fieldIndex, readDecimal(new Decimal(), value, ctype))
- }
protected[parquet] def isRootConverter: Boolean = parent == null
@@ -232,6 +245,13 @@ private[parquet] abstract class CatalystConverter extends GroupConverter {
unscaled = (unscaled << (64 - numBits)) >> (64 - numBits)
dest.set(unscaled, precision, scale)
}
+
+ /**
+ * Read a Timestamp value from a Parquet Int96Value
+ */
+ protected[parquet] def readTimestamp(value: Binary): Timestamp = {
+ CatalystTimestampConverter.convertToTimestamp(value)
+ }
}
/**
@@ -384,6 +404,9 @@ private[parquet] class CatalystPrimitiveRowConverter(
override protected[parquet] def updateString(fieldIndex: Int, value: String): Unit =
current.setString(fieldIndex, value)
+ override protected[parquet] def updateTimestamp(fieldIndex: Int, value: Binary): Unit =
+ current.update(fieldIndex, readTimestamp(value))
+
override protected[parquet] def updateDecimal(
fieldIndex: Int, value: Binary, ctype: DecimalType): Unit = {
var decimal = current(fieldIndex).asInstanceOf[Decimal]
@@ -454,6 +477,73 @@ private[parquet] object CatalystArrayConverter {
val INITIAL_ARRAY_SIZE = 20
}
+private[parquet] object CatalystTimestampConverter {
+ // TODO most part of this comes from Hive-0.14
+ // Hive code might have some issues, so we need to keep an eye on it.
+ // Also we use NanoTime and Int96Values from parquet-examples.
+ // We utilize jodd to convert between NanoTime and Timestamp
+ val parquetTsCalendar = new ThreadLocal[Calendar]
+ def getCalendar = {
+ // this is a cache for the calendar instance.
+ if (parquetTsCalendar.get == null) {
+ parquetTsCalendar.set(Calendar.getInstance(TimeZone.getTimeZone("GMT")))
+ }
+ parquetTsCalendar.get
+ }
+ val NANOS_PER_SECOND: Long = 1000000000
+ val SECONDS_PER_MINUTE: Long = 60
+ val MINUTES_PER_HOUR: Long = 60
+ val NANOS_PER_MILLI: Long = 1000000
+
+ def convertToTimestamp(value: Binary): Timestamp = {
+ val nt = NanoTime.fromBinary(value)
+ val timeOfDayNanos = nt.getTimeOfDayNanos
+ val julianDay = nt.getJulianDay
+ val jDateTime = new JDateTime(julianDay.toDouble)
+ val calendar = getCalendar
+ calendar.set(Calendar.YEAR, jDateTime.getYear)
+ calendar.set(Calendar.MONTH, jDateTime.getMonth - 1)
+ calendar.set(Calendar.DAY_OF_MONTH, jDateTime.getDay)
+
+ // written in command style
+ var remainder = timeOfDayNanos
+ calendar.set(
+ Calendar.HOUR_OF_DAY,
+ (remainder / (NANOS_PER_SECOND * SECONDS_PER_MINUTE * MINUTES_PER_HOUR)).toInt)
+ remainder = remainder % (NANOS_PER_SECOND * SECONDS_PER_MINUTE * MINUTES_PER_HOUR)
+ calendar.set(
+ Calendar.MINUTE, (remainder / (NANOS_PER_SECOND * SECONDS_PER_MINUTE)).toInt)
+ remainder = remainder % (NANOS_PER_SECOND * SECONDS_PER_MINUTE)
+ calendar.set(Calendar.SECOND, (remainder / NANOS_PER_SECOND).toInt)
+ val nanos = remainder % NANOS_PER_SECOND
+ val ts = new Timestamp(calendar.getTimeInMillis)
+ ts.setNanos(nanos.toInt)
+ ts
+ }
+
+ def convertFromTimestamp(ts: Timestamp): Binary = {
+ val calendar = getCalendar
+ calendar.setTime(ts)
+ val jDateTime = new JDateTime(calendar.get(Calendar.YEAR),
+ calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH))
+ // Hive-0.14 didn't set hour before get day number, while the day number should
+ // has something to do with hour, since julian day number grows at 12h GMT
+ // here we just follow what hive does.
+ val julianDay = jDateTime.getJulianDayNumber
+
+ val hour = calendar.get(Calendar.HOUR_OF_DAY)
+ val minute = calendar.get(Calendar.MINUTE)
+ val second = calendar.get(Calendar.SECOND)
+ val nanos = ts.getNanos
+ // Hive-0.14 would use hours directly, that might be wrong, since the day starts
+ // from 12h in Julian. here we just follow what hive does.
+ val nanosOfDay = nanos + second * NANOS_PER_SECOND +
+ minute * NANOS_PER_SECOND * SECONDS_PER_MINUTE +
+ hour * NANOS_PER_SECOND * SECONDS_PER_MINUTE * MINUTES_PER_HOUR
+ NanoTime(julianDay, nanosOfDay).toBinary
+ }
+}
+
/**
* A `parquet.io.api.GroupConverter` that converts a single-element groups that
* match the characteristics of an array (see
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
index a54485e719..b0db9943a5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
@@ -65,8 +65,8 @@ private[sql] case class ParquetRelation(
ParquetTypesConverter.readSchemaFromFile(
new Path(path.split(",").head),
conf,
- sqlContext.conf.isParquetBinaryAsString)
-
+ sqlContext.conf.isParquetBinaryAsString,
+ sqlContext.conf.isParquetINT96AsTimestamp)
lazy val attributeMap = AttributeMap(output.map(o => o -> o))
override def newInstance() = ParquetRelation(path, conf, sqlContext).asInstanceOf[this.type]
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
index fd63ad8144..3fb1cc4105 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
@@ -83,7 +83,8 @@ private[parquet] class RowReadSupport extends ReadSupport[Row] with Logging {
// TODO: Why it can be null?
if (schema == null) {
log.debug("falling back to Parquet read schema")
- schema = ParquetTypesConverter.convertToAttributes(parquetSchema, false)
+ schema = ParquetTypesConverter.convertToAttributes(
+ parquetSchema, false, true)
}
log.debug(s"list of attributes that will be read: $schema")
new RowRecordMaterializer(parquetSchema, schema)
@@ -184,12 +185,12 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
case t @ StructType(_) => writeStruct(
t,
value.asInstanceOf[CatalystConverter.StructScalaType[_]])
- case _ => writePrimitive(schema.asInstanceOf[PrimitiveType], value)
+ case _ => writePrimitive(schema.asInstanceOf[NativeType], value)
}
}
}
- private[parquet] def writePrimitive(schema: PrimitiveType, value: Any): Unit = {
+ private[parquet] def writePrimitive(schema: DataType, value: Any): Unit = {
if (value != null) {
schema match {
case StringType => writer.addBinary(
@@ -202,6 +203,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
case IntegerType => writer.addInteger(value.asInstanceOf[Int])
case ShortType => writer.addInteger(value.asInstanceOf[Short])
case LongType => writer.addLong(value.asInstanceOf[Long])
+ case TimestampType => writeTimestamp(value.asInstanceOf[java.sql.Timestamp])
case ByteType => writer.addInteger(value.asInstanceOf[Byte])
case DoubleType => writer.addDouble(value.asInstanceOf[Double])
case FloatType => writer.addFloat(value.asInstanceOf[Float])
@@ -307,6 +309,10 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
writer.addBinary(Binary.fromByteArray(scratchBytes, 0, numBytes))
}
+ private[parquet] def writeTimestamp(ts: java.sql.Timestamp): Unit = {
+ val binaryNanoTime = CatalystTimestampConverter.convertFromTimestamp(ts)
+ writer.addBinary(binaryNanoTime)
+ }
}
// Optimized for non-nested rows
@@ -351,6 +357,7 @@ private[parquet] class MutableRowWriteSupport extends RowWriteSupport {
case DoubleType => writer.addDouble(record.getDouble(index))
case FloatType => writer.addFloat(record.getFloat(index))
case BooleanType => writer.addBoolean(record.getBoolean(index))
+ case TimestampType => writeTimestamp(record(index).asInstanceOf[java.sql.Timestamp])
case d: DecimalType =>
if (d.precisionInfo == None || d.precisionInfo.get.precision > 18) {
sys.error(s"Unsupported datatype $d, cannot write to consumer")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala
index d5993656e0..e4a10aa2ae 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala
@@ -25,7 +25,7 @@ import org.apache.hadoop.mapreduce.Job
import org.apache.spark.sql.test.TestSQLContext
import parquet.example.data.{GroupWriter, Group}
-import parquet.example.data.simple.SimpleGroup
+import parquet.example.data.simple.{NanoTime, SimpleGroup}
import parquet.hadoop.{ParquetReader, ParquetFileReader, ParquetWriter}
import parquet.hadoop.api.WriteSupport
import parquet.hadoop.api.WriteSupport.WriteContext
@@ -63,6 +63,7 @@ private[sql] object ParquetTestData {
optional int64 mylong;
optional float myfloat;
optional double mydouble;
+ optional int96 mytimestamp;
}"""
// field names for test assertion error messages
@@ -72,7 +73,8 @@ private[sql] object ParquetTestData {
"mystring:String",
"mylong:Long",
"myfloat:Float",
- "mydouble:Double"
+ "mydouble:Double",
+ "mytimestamp:Timestamp"
)
val subTestSchema =
@@ -98,6 +100,7 @@ private[sql] object ParquetTestData {
optional int64 myoptlong;
optional float myoptfloat;
optional double myoptdouble;
+ optional int96 mytimestamp;
}
"""
@@ -236,6 +239,7 @@ private[sql] object ParquetTestData {
record.add(3, i.toLong << 33)
record.add(4, 2.5F)
record.add(5, 4.5D)
+ record.add(6, new NanoTime(1,2))
writer.write(record)
}
writer.close()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
index 6d8c682ccc..f1d4ff2387 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
@@ -54,7 +54,8 @@ private[parquet] object ParquetTypesConverter extends Logging {
def toPrimitiveDataType(
parquetType: ParquetPrimitiveType,
- binaryAsString: Boolean): DataType = {
+ binaryAsString: Boolean,
+ int96AsTimestamp: Boolean): DataType = {
val originalType = parquetType.getOriginalType
val decimalInfo = parquetType.getDecimalMetadata
parquetType.getPrimitiveTypeName match {
@@ -66,6 +67,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
case ParquetPrimitiveTypeName.FLOAT => FloatType
case ParquetPrimitiveTypeName.INT32 => IntegerType
case ParquetPrimitiveTypeName.INT64 => LongType
+ case ParquetPrimitiveTypeName.INT96 if int96AsTimestamp => TimestampType
case ParquetPrimitiveTypeName.INT96 =>
// TODO: add BigInteger type? TODO(andre) use DecimalType instead????
sys.error("Potential loss of precision: cannot convert INT96")
@@ -103,7 +105,9 @@ private[parquet] object ParquetTypesConverter extends Logging {
* @param parquetType The type to convert.
* @return The corresponding Catalyst type.
*/
- def toDataType(parquetType: ParquetType, isBinaryAsString: Boolean): DataType = {
+ def toDataType(parquetType: ParquetType,
+ isBinaryAsString: Boolean,
+ isInt96AsTimestamp: Boolean): DataType = {
def correspondsToMap(groupType: ParquetGroupType): Boolean = {
if (groupType.getFieldCount != 1 || groupType.getFields.apply(0).isPrimitive) {
false
@@ -125,7 +129,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
}
if (parquetType.isPrimitive) {
- toPrimitiveDataType(parquetType.asPrimitiveType, isBinaryAsString)
+ toPrimitiveDataType(parquetType.asPrimitiveType, isBinaryAsString, isInt96AsTimestamp)
} else {
val groupType = parquetType.asGroupType()
parquetType.getOriginalType match {
@@ -137,9 +141,12 @@ private[parquet] object ParquetTypesConverter extends Logging {
if (field.getName == CatalystConverter.ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME) {
val bag = field.asGroupType()
assert(bag.getFieldCount == 1)
- ArrayType(toDataType(bag.getFields.apply(0), isBinaryAsString), containsNull = true)
+ ArrayType(
+ toDataType(bag.getFields.apply(0), isBinaryAsString, isInt96AsTimestamp),
+ containsNull = true)
} else {
- ArrayType(toDataType(field, isBinaryAsString), containsNull = false)
+ ArrayType(
+ toDataType(field, isBinaryAsString, isInt96AsTimestamp), containsNull = false)
}
}
case ParquetOriginalType.MAP => {
@@ -152,8 +159,10 @@ private[parquet] object ParquetTypesConverter extends Logging {
"Parquet Map type malformatted: nested group should have 2 (key, value) fields!")
assert(keyValueGroup.getFields.apply(0).getRepetition == Repetition.REQUIRED)
- val keyType = toDataType(keyValueGroup.getFields.apply(0), isBinaryAsString)
- val valueType = toDataType(keyValueGroup.getFields.apply(1), isBinaryAsString)
+ val keyType =
+ toDataType(keyValueGroup.getFields.apply(0), isBinaryAsString, isInt96AsTimestamp)
+ val valueType =
+ toDataType(keyValueGroup.getFields.apply(1), isBinaryAsString, isInt96AsTimestamp)
MapType(keyType, valueType,
keyValueGroup.getFields.apply(1).getRepetition != Repetition.REQUIRED)
}
@@ -163,8 +172,10 @@ private[parquet] object ParquetTypesConverter extends Logging {
val keyValueGroup = groupType.getFields.apply(0).asGroupType()
assert(keyValueGroup.getFields.apply(0).getRepetition == Repetition.REQUIRED)
- val keyType = toDataType(keyValueGroup.getFields.apply(0), isBinaryAsString)
- val valueType = toDataType(keyValueGroup.getFields.apply(1), isBinaryAsString)
+ val keyType =
+ toDataType(keyValueGroup.getFields.apply(0), isBinaryAsString, isInt96AsTimestamp)
+ val valueType =
+ toDataType(keyValueGroup.getFields.apply(1), isBinaryAsString, isInt96AsTimestamp)
MapType(keyType, valueType,
keyValueGroup.getFields.apply(1).getRepetition != Repetition.REQUIRED)
} else if (correspondsToArray(groupType)) { // ArrayType
@@ -172,16 +183,19 @@ private[parquet] object ParquetTypesConverter extends Logging {
if (field.getName == CatalystConverter.ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME) {
val bag = field.asGroupType()
assert(bag.getFieldCount == 1)
- ArrayType(toDataType(bag.getFields.apply(0), isBinaryAsString), containsNull = true)
+ ArrayType(
+ toDataType(bag.getFields.apply(0), isBinaryAsString, isInt96AsTimestamp),
+ containsNull = true)
} else {
- ArrayType(toDataType(field, isBinaryAsString), containsNull = false)
+ ArrayType(
+ toDataType(field, isBinaryAsString, isInt96AsTimestamp), containsNull = false)
}
} else { // everything else: StructType
val fields = groupType
.getFields
.map(ptype => new StructField(
ptype.getName,
- toDataType(ptype, isBinaryAsString),
+ toDataType(ptype, isBinaryAsString, isInt96AsTimestamp),
ptype.getRepetition != Repetition.REQUIRED))
StructType(fields)
}
@@ -210,6 +224,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
case ShortType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.INT32))
case ByteType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.INT32))
case LongType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.INT64))
+ case TimestampType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.INT96))
case DecimalType.Fixed(precision, scale) if precision <= 18 =>
// TODO: for now, our writer only supports decimals that fit in a Long
Some(ParquetTypeInfo(ParquetPrimitiveTypeName.FIXED_LEN_BYTE_ARRAY,
@@ -345,7 +360,9 @@ private[parquet] object ParquetTypesConverter extends Logging {
}
}
- def convertToAttributes(parquetSchema: ParquetType, isBinaryAsString: Boolean): Seq[Attribute] = {
+ def convertToAttributes(parquetSchema: ParquetType,
+ isBinaryAsString: Boolean,
+ isInt96AsTimestamp: Boolean): Seq[Attribute] = {
parquetSchema
.asGroupType()
.getFields
@@ -353,7 +370,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
field =>
new AttributeReference(
field.getName,
- toDataType(field, isBinaryAsString),
+ toDataType(field, isBinaryAsString, isInt96AsTimestamp),
field.getRepetition != Repetition.REQUIRED)())
}
@@ -476,7 +493,8 @@ private[parquet] object ParquetTypesConverter extends Logging {
def readSchemaFromFile(
origPath: Path,
conf: Option[Configuration],
- isBinaryAsString: Boolean): Seq[Attribute] = {
+ isBinaryAsString: Boolean,
+ isInt96AsTimestamp: Boolean): Seq[Attribute] = {
val keyValueMetadata: java.util.Map[String, String] =
readMetaData(origPath, conf)
.getFileMetaData
@@ -485,7 +503,9 @@ private[parquet] object ParquetTypesConverter extends Logging {
convertFromString(keyValueMetadata.get(RowReadSupport.SPARK_METADATA_KEY))
} else {
val attributes = convertToAttributes(
- readMetaData(origPath, conf).getFileMetaData.getSchema, isBinaryAsString)
+ readMetaData(origPath, conf).getFileMetaData.getSchema,
+ isBinaryAsString,
+ isInt96AsTimestamp)
log.info(s"Falling back to schema conversion from Parquet types; result: $attributes")
attributes
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 1e794cad73..179c0d6b22 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -136,7 +136,8 @@ case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext)
ParquetTypesConverter.readSchemaFromFile(
partitions.head.files.head.getPath,
Some(sparkContext.hadoopConfiguration),
- sqlContext.conf.isParquetBinaryAsString))
+ sqlContext.conf.isParquetBinaryAsString,
+ sqlContext.conf.isParquetINT96AsTimestamp))
val dataIncludesKey =
partitionKeys.headOption.map(dataSchema.fieldNames.contains(_)).getOrElse(true)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/timestamp/NanoTime.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/timestamp/NanoTime.scala
new file mode 100644
index 0000000000..8871616844
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/timestamp/NanoTime.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.parquet.timestamp
+
+import java.nio.{ByteBuffer, ByteOrder}
+
+import parquet.Preconditions
+import parquet.io.api.{Binary, RecordConsumer}
+
+private[parquet] class NanoTime extends Serializable {
+ private var julianDay = 0
+ private var timeOfDayNanos = 0L
+
+ def set(julianDay: Int, timeOfDayNanos: Long) = {
+ this.julianDay = julianDay
+ this.timeOfDayNanos = timeOfDayNanos
+ this
+ }
+
+ def getJulianDay: Int = julianDay
+
+ def getTimeOfDayNanos: Long = timeOfDayNanos
+
+ def toBinary: Binary = {
+ val buf = ByteBuffer.allocate(12)
+ buf.order(ByteOrder.LITTLE_ENDIAN)
+ buf.putLong(timeOfDayNanos)
+ buf.putInt(julianDay)
+ buf.flip()
+ Binary.fromByteBuffer(buf)
+ }
+
+ def writeValue(recordConsumer: RecordConsumer) {
+ recordConsumer.addBinary(toBinary)
+ }
+
+ override def toString =
+ "NanoTime{julianDay=" + julianDay + ", timeOfDayNanos=" + timeOfDayNanos + "}"
+}
+
+object NanoTime {
+ def fromBinary(bytes: Binary): NanoTime = {
+ Preconditions.checkArgument(bytes.length() == 12, "Must be 12 bytes")
+ val buf = bytes.toByteBuffer
+ buf.order(ByteOrder.LITTLE_ENDIAN)
+ val timeOfDayNanos = buf.getLong
+ val julianDay = buf.getInt
+ new NanoTime().set(julianDay, timeOfDayNanos)
+ }
+
+ def apply(julianDay: Int, timeOfDayNanos: Long): NanoTime = {
+ new NanoTime().set(julianDay, timeOfDayNanos)
+ }
+}