aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-06-10 16:55:39 -0700
committerReynold Xin <rxin@databricks.com>2015-06-10 16:55:39 -0700
commit37719e0cd0b00cc5ffee0ebe1652d465a574db0f (patch)
tree326c8178ed25ef17135ed2978c6dbfaf9e7593e3
parentb928f543845ddd39e914a0e8f0b0205fd86100c5 (diff)
downloadspark-37719e0cd0b00cc5ffee0ebe1652d465a574db0f.tar.gz
spark-37719e0cd0b00cc5ffee0ebe1652d465a574db0f.tar.bz2
spark-37719e0cd0b00cc5ffee0ebe1652d465a574db0f.zip
[SPARK-8189] [SQL] use Long for TimestampType in SQL
This PR change to use Long as internal type for TimestampType for efficiency, which means it will the precision below 100ns. Author: Davies Liu <davies@databricks.com> Closes #6733 from davies/timestamp and squashes the following commits: d9565fa [Davies Liu] remove print 65cf2f1 [Davies Liu] fix Timestamp in SparkR 86fecfb [Davies Liu] disable two timestamp tests 8f77ee0 [Davies Liu] fix scala style 246ee74 [Davies Liu] address comments 309d2e1 [Davies Liu] use Long for TimestampType in SQL
-rw-r--r--core/src/main/scala/org/apache/spark/api/r/SerDe.scala17
-rw-r--r--python/pyspark/sql/types.py11
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/BaseRow.java6
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala8
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala13
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala62
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala1
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala4
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala10
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala15
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala6
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateUtils.scala44
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/types/TimestampType.scala10
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala11
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateUtilsSuite.scala40
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala21
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala19
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala17
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala10
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala11
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala9
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala14
-rw-r--r--sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala8
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala20
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala4
-rw-r--r--sql/hive/src/test/resources/golden/timestamp cast #5-0-dbd7bcd167d322d6617b884c02c7f2472
36 files changed, 272 insertions, 172 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
index f8e3f1a790..56adc857d4 100644
--- a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
@@ -18,7 +18,7 @@
package org.apache.spark.api.r
import java.io.{DataInputStream, DataOutputStream}
-import java.sql.{Date, Time}
+import java.sql.{Timestamp, Date, Time}
import scala.collection.JavaConversions._
@@ -107,9 +107,12 @@ private[spark] object SerDe {
Date.valueOf(readString(in))
}
- def readTime(in: DataInputStream): Time = {
- val t = in.readDouble()
- new Time((t * 1000L).toLong)
+ def readTime(in: DataInputStream): Timestamp = {
+ val seconds = in.readDouble()
+ val sec = Math.floor(seconds).toLong
+ val t = new Timestamp(sec * 1000L)
+ t.setNanos(((seconds - sec) * 1e9).toInt)
+ t
}
def readBytesArr(in: DataInputStream): Array[Array[Byte]] = {
@@ -227,6 +230,9 @@ private[spark] object SerDe {
case "java.sql.Time" =>
writeType(dos, "time")
writeTime(dos, value.asInstanceOf[Time])
+ case "java.sql.Timestamp" =>
+ writeType(dos, "time")
+ writeTime(dos, value.asInstanceOf[Timestamp])
case "[B" =>
writeType(dos, "raw")
writeBytes(dos, value.asInstanceOf[Array[Byte]])
@@ -289,6 +295,9 @@ private[spark] object SerDe {
out.writeDouble(value.getTime.toDouble / 1000.0)
}
+ def writeTime(out: DataOutputStream, value: Timestamp): Unit = {
+ out.writeDouble((value.getTime / 1000).toDouble + value.getNanos.toDouble / 1e9)
+ }
// NOTE: Only works for ASCII right now
def writeString(out: DataOutputStream, value: String): Unit = {
diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py
index b6ec6137c9..8f286b631f 100644
--- a/python/pyspark/sql/types.py
+++ b/python/pyspark/sql/types.py
@@ -19,6 +19,7 @@ import sys
import decimal
import time
import datetime
+import calendar
import keyword
import warnings
import json
@@ -654,6 +655,8 @@ def _need_python_to_sql_conversion(dataType):
_need_python_to_sql_conversion(dataType.valueType)
elif isinstance(dataType, UserDefinedType):
return True
+ elif isinstance(dataType, TimestampType):
+ return True
else:
return False
@@ -707,6 +710,14 @@ def _python_to_sql_converter(dataType):
return lambda m: dict([(key_converter(k), value_converter(v)) for k, v in m.items()])
elif isinstance(dataType, UserDefinedType):
return lambda obj: dataType.serialize(obj)
+ elif isinstance(dataType, TimestampType):
+
+ def to_posix_timstamp(dt):
+ if dt.tzinfo is None:
+ return int(time.mktime(dt.timetuple()) * 1e7 + dt.microsecond * 10)
+ else:
+ return int(calendar.timegm(dt.utctimetuple()) * 1e7 + dt.microsecond * 10)
+ return to_posix_timstamp
else:
raise ValueError("Unexpected type %r" % dataType)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/BaseRow.java b/sql/catalyst/src/main/scala/org/apache/spark/sql/BaseRow.java
index d138b43a34..6584882a62 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/BaseRow.java
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/BaseRow.java
@@ -19,6 +19,7 @@ package org.apache.spark.sql;
import java.math.BigDecimal;
import java.sql.Date;
+import java.sql.Timestamp;
import java.util.List;
import scala.collection.Seq;
@@ -104,6 +105,11 @@ public abstract class BaseRow implements Row {
}
@Override
+ public Timestamp getTimestamp(int i) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public <T> Seq<T> getSeq(int i) {
throw new UnsupportedOperationException();
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala
index 0d460b634d..8aaf5d7d89 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala
@@ -260,10 +260,16 @@ trait Row extends Serializable {
*
* @throws ClassCastException when data type does not match.
*/
- // TODO(davies): This is not the right default implementation, we use Int as Date internally
def getDate(i: Int): java.sql.Date = apply(i).asInstanceOf[java.sql.Date]
/**
+ * Returns the value at position i of date type as java.sql.Timestamp.
+ *
+ * @throws ClassCastException when data type does not match.
+ */
+ def getTimestamp(i: Int): java.sql.Timestamp = apply(i).asInstanceOf[java.sql.Timestamp]
+
+ /**
* Returns the value at position i of array type as a Scala Seq.
*
* @throws ClassCastException when data type does not match.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
index 2e7b4c236d..beb82dbc08 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst
import java.lang.{Iterable => JavaIterable}
import java.math.{BigDecimal => JavaBigDecimal}
-import java.sql.Date
+import java.sql.{Timestamp, Date}
import java.util.{Map => JavaMap}
import javax.annotation.Nullable
@@ -58,6 +58,7 @@ object CatalystTypeConverters {
case structType: StructType => StructConverter(structType)
case StringType => StringConverter
case DateType => DateConverter
+ case TimestampType => TimestampConverter
case dt: DecimalType => BigDecimalConverter
case BooleanType => BooleanConverter
case ByteType => ByteConverter
@@ -274,6 +275,15 @@ object CatalystTypeConverters {
override def toScalaImpl(row: Row, column: Int): Date = toScala(row.getInt(column))
}
+ private object TimestampConverter extends CatalystTypeConverter[Timestamp, Timestamp, Any] {
+ override def toCatalystImpl(scalaValue: Timestamp): Long =
+ DateUtils.fromJavaTimestamp(scalaValue)
+ override def toScala(catalystValue: Any): Timestamp =
+ if (catalystValue == null) null
+ else DateUtils.toJavaTimestamp(catalystValue.asInstanceOf[Long])
+ override def toScalaImpl(row: Row, column: Int): Timestamp = toScala(row.getLong(column))
+ }
+
private object BigDecimalConverter extends CatalystTypeConverter[Any, JavaBigDecimal, Decimal] {
override def toCatalystImpl(scalaValue: Any): Decimal = scalaValue match {
case d: BigDecimal => Decimal(d)
@@ -367,6 +377,7 @@ object CatalystTypeConverters {
def convertToCatalyst(a: Any): Any = a match {
case s: String => StringConverter.toCatalyst(s)
case d: Date => DateConverter.toCatalyst(d)
+ case t: Timestamp => TimestampConverter.toCatalyst(t)
case d: BigDecimal => BigDecimalConverter.toCatalyst(d)
case d: JavaBigDecimal => BigDecimalConverter.toCatalyst(d)
case seq: Seq[Any] => seq.map(convertToCatalyst)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
index 18102d1acb..8d93957fea 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
@@ -113,7 +113,8 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
private[this] def castToString(from: DataType): Any => Any = from match {
case BinaryType => buildCast[Array[Byte]](_, UTF8String(_))
case DateType => buildCast[Int](_, d => UTF8String(DateUtils.toString(d)))
- case TimestampType => buildCast[Timestamp](_, t => UTF8String(timestampToString(t)))
+ case TimestampType => buildCast[Long](_,
+ t => UTF8String(timestampToString(DateUtils.toJavaTimestamp(t))))
case _ => buildCast[Any](_, o => UTF8String(o.toString))
}
@@ -127,7 +128,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
case StringType =>
buildCast[UTF8String](_, _.length() != 0)
case TimestampType =>
- buildCast[Timestamp](_, t => t.getTime() != 0 || t.getNanos() != 0)
+ buildCast[Long](_, t => t != 0)
case DateType =>
// Hive would return null when cast from date to boolean
buildCast[Int](_, d => null)
@@ -158,20 +159,21 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
if (periodIdx != -1 && n.length() - periodIdx > 9) {
n = n.substring(0, periodIdx + 10)
}
- try Timestamp.valueOf(n) catch { case _: java.lang.IllegalArgumentException => null }
+ try DateUtils.fromJavaTimestamp(Timestamp.valueOf(n))
+ catch { case _: java.lang.IllegalArgumentException => null }
})
case BooleanType =>
- buildCast[Boolean](_, b => new Timestamp(if (b) 1 else 0))
+ buildCast[Boolean](_, b => (if (b) 1L else 0))
case LongType =>
- buildCast[Long](_, l => new Timestamp(l))
+ buildCast[Long](_, l => longToTimestamp(l))
case IntegerType =>
- buildCast[Int](_, i => new Timestamp(i))
+ buildCast[Int](_, i => longToTimestamp(i.toLong))
case ShortType =>
- buildCast[Short](_, s => new Timestamp(s))
+ buildCast[Short](_, s => longToTimestamp(s.toLong))
case ByteType =>
- buildCast[Byte](_, b => new Timestamp(b))
+ buildCast[Byte](_, b => longToTimestamp(b.toLong))
case DateType =>
- buildCast[Int](_, d => new Timestamp(DateUtils.toJavaDate(d).getTime))
+ buildCast[Int](_, d => DateUtils.toMillisSinceEpoch(d) * 10000)
// TimestampWritable.decimalToTimestamp
case DecimalType() =>
buildCast[Decimal](_, d => decimalToTimestamp(d))
@@ -191,25 +193,17 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
})
}
- private[this] def decimalToTimestamp(d: Decimal) = {
- val seconds = Math.floor(d.toDouble).toLong
- val bd = (d.toBigDecimal - seconds) * 1000000000
- val nanos = bd.intValue()
-
- val millis = seconds * 1000
- val t = new Timestamp(millis)
-
- // remaining fractional portion as nanos
- t.setNanos(nanos)
- t
+ private[this] def decimalToTimestamp(d: Decimal): Long = {
+ (d.toBigDecimal * 10000000L).longValue()
}
- // Timestamp to long, converting milliseconds to seconds
- private[this] def timestampToLong(ts: Timestamp) = Math.floor(ts.getTime / 1000.0).toLong
-
- private[this] def timestampToDouble(ts: Timestamp) = {
- // First part is the seconds since the beginning of time, followed by nanosecs.
- Math.floor(ts.getTime / 1000.0).toLong + ts.getNanos.toDouble / 1000000000
+ // converting milliseconds to 100ns
+ private[this] def longToTimestamp(t: Long): Long = t * 10000L
+ // converting 100ns to seconds
+ private[this] def timestampToLong(ts: Long): Long = math.floor(ts.toDouble / 10000000L).toLong
+ // converting 100ns to seconds in double
+ private[this] def timestampToDouble(ts: Long): Double = {
+ ts / 10000000.0
}
// Converts Timestamp to string according to Hive TimestampWritable convention
@@ -234,7 +228,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
case TimestampType =>
// throw valid precision more than seconds, according to Hive.
// Timestamp.nanos is in 0 to 999,999,999, no more than a second.
- buildCast[Timestamp](_, t => DateUtils.millisToDays(t.getTime))
+ buildCast[Long](_, t => DateUtils.millisToDays(t / 10000L))
// Hive throws this exception as a Semantic Exception
// It is never possible to compare result when hive return with exception,
// so we can return null
@@ -253,7 +247,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
case DateType =>
buildCast[Int](_, d => null)
case TimestampType =>
- buildCast[Timestamp](_, t => timestampToLong(t))
+ buildCast[Long](_, t => timestampToLong(t))
case x: NumericType =>
b => x.numeric.asInstanceOf[Numeric[Any]].toLong(b)
}
@@ -269,7 +263,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
case DateType =>
buildCast[Int](_, d => null)
case TimestampType =>
- buildCast[Timestamp](_, t => timestampToLong(t).toInt)
+ buildCast[Long](_, t => timestampToLong(t).toInt)
case x: NumericType =>
b => x.numeric.asInstanceOf[Numeric[Any]].toInt(b)
}
@@ -285,7 +279,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
case DateType =>
buildCast[Int](_, d => null)
case TimestampType =>
- buildCast[Timestamp](_, t => timestampToLong(t).toShort)
+ buildCast[Long](_, t => timestampToLong(t).toShort)
case x: NumericType =>
b => x.numeric.asInstanceOf[Numeric[Any]].toInt(b).toShort
}
@@ -301,7 +295,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
case DateType =>
buildCast[Int](_, d => null)
case TimestampType =>
- buildCast[Timestamp](_, t => timestampToLong(t).toByte)
+ buildCast[Long](_, t => timestampToLong(t).toByte)
case x: NumericType =>
b => x.numeric.asInstanceOf[Numeric[Any]].toInt(b).toByte
}
@@ -334,7 +328,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
buildCast[Int](_, d => null) // date can't cast to decimal in Hive
case TimestampType =>
// Note that we lose precision here.
- buildCast[Timestamp](_, t => changePrecision(Decimal(timestampToDouble(t)), target))
+ buildCast[Long](_, t => changePrecision(Decimal(timestampToDouble(t)), target))
case DecimalType() =>
b => changePrecision(b.asInstanceOf[Decimal].clone(), target)
case LongType =>
@@ -358,7 +352,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
case DateType =>
buildCast[Int](_, d => null)
case TimestampType =>
- buildCast[Timestamp](_, t => timestampToDouble(t))
+ buildCast[Long](_, t => timestampToDouble(t))
case x: NumericType =>
b => x.numeric.asInstanceOf[Numeric[Any]].toDouble(b)
}
@@ -374,7 +368,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
case DateType =>
buildCast[Int](_, d => null)
case TimestampType =>
- buildCast[Timestamp](_, t => timestampToDouble(t).toFloat)
+ buildCast[Long](_, t => timestampToDouble(t).toFloat)
case x: NumericType =>
b => x.numeric.asInstanceOf[Numeric[Any]].toFloat(b)
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala
index aa4099e4d7..2c884517d6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala
@@ -203,6 +203,7 @@ final class SpecificMutableRow(val values: Array[MutableValue]) extends MutableR
case BooleanType => new MutableBoolean
case LongType => new MutableLong
case DateType => new MutableInt // We use INT for DATE internally
+ case TimestampType => new MutableLong // We use Long for Timestamp internally
case _ => new MutableAny
}.toArray)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index e95682f952..80aa8fa056 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -122,7 +122,7 @@ class CodeGenContext {
case BinaryType => "byte[]"
case StringType => stringType
case DateType => "int"
- case TimestampType => "java.sql.Timestamp"
+ case TimestampType => "long"
case dt: OpenHashSetUDT if dt.elementType == IntegerType => classOf[IntegerHashSet].getName
case dt: OpenHashSetUDT if dt.elementType == LongType => classOf[LongHashSet].getName
case _ => "Object"
@@ -140,6 +140,7 @@ class CodeGenContext {
case FloatType => "Float"
case BooleanType => "Boolean"
case DateType => "Integer"
+ case TimestampType => "Long"
case _ => javaType(dt)
}
@@ -155,6 +156,7 @@ class CodeGenContext {
case DoubleType => "-1.0"
case IntegerType => "-1"
case DateType => "-1"
+ case TimestampType => "-1L"
case _ => "null"
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala
index 7caf4aaab8..274429cd1c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala
@@ -73,7 +73,9 @@ object GenerateProjection extends CodeGenerator[Seq[Expression], Projection] {
val specificAccessorFunctions = ctx.nativeTypes.map { dataType =>
val cases = expressions.zipWithIndex.map {
- case (e, i) if e.dataType == dataType =>
+ case (e, i) if e.dataType == dataType
+ || dataType == IntegerType && e.dataType == DateType
+ || dataType == LongType && e.dataType == TimestampType =>
s"case $i: return c$i;"
case _ => ""
}.mkString("\n ")
@@ -96,7 +98,9 @@ object GenerateProjection extends CodeGenerator[Seq[Expression], Projection] {
val specificMutatorFunctions = ctx.nativeTypes.map { dataType =>
val cases = expressions.zipWithIndex.map {
- case (e, i) if e.dataType == dataType =>
+ case (e, i) if e.dataType == dataType
+ || dataType == IntegerType && e.dataType == DateType
+ || dataType == LongType && e.dataType == TimestampType =>
s"case $i: { c$i = value; return; }"
case _ => ""
}.mkString("\n")
@@ -119,7 +123,7 @@ object GenerateProjection extends CodeGenerator[Seq[Expression], Projection] {
val nonNull = e.dataType match {
case BooleanType => s"$col ? 0 : 1"
case ByteType | ShortType | IntegerType | DateType => s"$col"
- case LongType => s"$col ^ ($col >>> 32)"
+ case LongType | TimestampType => s"$col ^ ($col >>> 32)"
case FloatType => s"Float.floatToIntBits($col)"
case DoubleType =>
s"(int)(Double.doubleToLongBits($col) ^ (Double.doubleToLongBits($col) >>> 32))"
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
index 297b35b4da..833c08a293 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
@@ -37,7 +37,7 @@ object Literal {
case d: BigDecimal => Literal(Decimal(d), DecimalType.Unlimited)
case d: java.math.BigDecimal => Literal(Decimal(d), DecimalType.Unlimited)
case d: Decimal => Literal(d, DecimalType.Unlimited)
- case t: Timestamp => Literal(t, TimestampType)
+ case t: Timestamp => Literal(DateUtils.fromJavaTimestamp(t), TimestampType)
case d: Date => Literal(DateUtils.fromJavaDate(d), DateType)
case a: Array[Byte] => Literal(a, BinaryType)
case null => Literal(null, NullType)
@@ -100,7 +100,7 @@ case class Literal protected (value: Any, dataType: DataType) extends LeafExpres
ev.isNull = "false"
ev.primitive = value.toString
""
- case FloatType => // This must go before NumericType
+ case FloatType =>
val v = value.asInstanceOf[Float]
if (v.isNaN || v.isInfinite) {
super.genCode(ctx, ev)
@@ -109,7 +109,7 @@ case class Literal protected (value: Any, dataType: DataType) extends LeafExpres
ev.primitive = s"${value}f"
""
}
- case DoubleType => // This must go before NumericType
+ case DoubleType =>
val v = value.asInstanceOf[Double]
if (v.isNaN || v.isInfinite) {
super.genCode(ctx, ev)
@@ -118,15 +118,18 @@ case class Literal protected (value: Any, dataType: DataType) extends LeafExpres
ev.primitive = s"${value}"
""
}
-
- case ByteType | ShortType => // This must go before NumericType
+ case ByteType | ShortType =>
ev.isNull = "false"
ev.primitive = s"(${ctx.javaType(dataType)})$value"
""
- case dt: NumericType if !dt.isInstanceOf[DecimalType] =>
+ case IntegerType | DateType =>
ev.isNull = "false"
ev.primitive = value.toString
""
+ case TimestampType | LongType =>
+ ev.isNull = "false"
+ ev.primitive = s"${value}L"
+ ""
// eval() version may be faster for non-primitive types
case other =>
super.genCode(ctx, ev)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
index 3cbdfdfb13..2c49352874 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
@@ -254,9 +254,9 @@ abstract class BinaryComparison extends BinaryExpression with Predicate {
case dt: NumericType if ctx.isNativeType(dt) => defineCodeGen (ctx, ev, {
(c1, c3) => s"$c1 $symbol $c3"
})
- case TimestampType =>
- // java.sql.Timestamp does not have compare()
- super.genCode(ctx, ev)
+ case DateType | TimestampType => defineCodeGen (ctx, ev, {
+ (c1, c3) => s"$c1 $symbol $c3"
+ })
case other => defineCodeGen (ctx, ev, {
(c1, c2) => s"$c1.compare($c2) $symbol 0"
})
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateUtils.scala
index ad649acf53..5cadc141af 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateUtils.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.catalyst.util
-import java.sql.Date
+import java.sql.{Timestamp, Date}
import java.text.SimpleDateFormat
import java.util.{Calendar, TimeZone}
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.Cast
*/
object DateUtils {
private val MILLIS_PER_DAY = 86400000
+ private val HUNDRED_NANOS_PER_SECOND = 10000000L
// Java TimeZone has no mention of thread safety. Use thread local instance to be safe.
private val LOCAL_TIMEZONE = new ThreadLocal[TimeZone] {
@@ -45,17 +46,17 @@ object DateUtils {
((millisLocal + LOCAL_TIMEZONE.get().getOffset(millisLocal)) / MILLIS_PER_DAY).toInt
}
- private def toMillisSinceEpoch(days: Int): Long = {
+ def toMillisSinceEpoch(days: Int): Long = {
val millisUtc = days.toLong * MILLIS_PER_DAY
millisUtc - LOCAL_TIMEZONE.get().getOffset(millisUtc)
}
- def fromJavaDate(date: java.sql.Date): Int = {
+ def fromJavaDate(date: Date): Int = {
javaDateToDays(date)
}
- def toJavaDate(daysSinceEpoch: Int): java.sql.Date = {
- new java.sql.Date(toMillisSinceEpoch(daysSinceEpoch))
+ def toJavaDate(daysSinceEpoch: Int): Date = {
+ new Date(toMillisSinceEpoch(daysSinceEpoch))
}
def toString(days: Int): String = Cast.threadLocalDateFormat.get.format(toJavaDate(days))
@@ -64,9 +65,9 @@ object DateUtils {
if (!s.contains('T')) {
// JDBC escape string
if (s.contains(' ')) {
- java.sql.Timestamp.valueOf(s)
+ Timestamp.valueOf(s)
} else {
- java.sql.Date.valueOf(s)
+ Date.valueOf(s)
}
} else if (s.endsWith("Z")) {
// this is zero timezone of ISO8601
@@ -87,4 +88,33 @@ object DateUtils {
ISO8601GMT.parse(s)
}
}
+
+ /**
+ * Return a java.sql.Timestamp from number of 100ns since epoch
+ */
+ def toJavaTimestamp(num100ns: Long): Timestamp = {
+ // setNanos() will overwrite the millisecond part, so the milliseconds should be
+ // cut off at seconds
+ var seconds = num100ns / HUNDRED_NANOS_PER_SECOND
+ var nanos = num100ns % HUNDRED_NANOS_PER_SECOND
+ // setNanos() can not accept negative value
+ if (nanos < 0) {
+ nanos += HUNDRED_NANOS_PER_SECOND
+ seconds -= 1
+ }
+ val t = new Timestamp(seconds * 1000)
+ t.setNanos(nanos.toInt * 100)
+ t
+ }
+
+ /**
+ * Return the number of 100ns since epoch from java.sql.Timestamp.
+ */
+ def fromJavaTimestamp(t: Timestamp): Long = {
+ if (t != null) {
+ t.getTime() * 10000L + (t.getNanos().toLong / 100) % 10000L
+ } else {
+ 0L
+ }
+ }
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/TimestampType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/TimestampType.scala
index aebabfc475..a558641fcf 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/TimestampType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/TimestampType.scala
@@ -17,8 +17,6 @@
package org.apache.spark.sql.types
-import java.sql.Timestamp
-
import scala.math.Ordering
import scala.reflect.runtime.universe.typeTag
@@ -38,18 +36,16 @@ class TimestampType private() extends AtomicType {
// The companion object and this class is separated so the companion object also subclasses
// this type. Otherwise, the companion object would be of type "TimestampType$" in byte code.
// Defined with a private constructor so the companion object is the only possible instantiation.
- private[sql] type InternalType = Timestamp
+ private[sql] type InternalType = Long
@transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[InternalType] }
- private[sql] val ordering = new Ordering[InternalType] {
- def compare(x: Timestamp, y: Timestamp): Int = x.compareTo(y)
- }
+ private[sql] val ordering = implicitly[Ordering[InternalType]]
/**
* The default size of a value of the TimestampType is 12 bytes.
*/
- override def defaultSize: Int = 12
+ override def defaultSize: Int = 8
private[spark] override def asNullable: TimestampType = this
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala
index 5bc7c30eee..3aca94db3b 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.expressions
import java.sql.{Timestamp, Date}
import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.types._
/**
@@ -137,7 +138,7 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper {
checkEvaluation(cast(cast(sd, DateType), StringType), sd)
checkEvaluation(cast(cast(d, StringType), DateType), 0)
checkEvaluation(cast(cast(nts, TimestampType), StringType), nts)
- checkEvaluation(cast(cast(ts, StringType), TimestampType), ts)
+ checkEvaluation(cast(cast(ts, StringType), TimestampType), DateUtils.fromJavaTimestamp(ts))
// all convert to string type to check
checkEvaluation(cast(cast(cast(nts, TimestampType), DateType), StringType), sd)
@@ -269,9 +270,9 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper {
checkEvaluation(cast(ts, LongType), 15.toLong)
checkEvaluation(cast(ts, FloatType), 15.002f)
checkEvaluation(cast(ts, DoubleType), 15.002)
- checkEvaluation(cast(cast(tss, ShortType), TimestampType), ts)
- checkEvaluation(cast(cast(tss, IntegerType), TimestampType), ts)
- checkEvaluation(cast(cast(tss, LongType), TimestampType), ts)
+ checkEvaluation(cast(cast(tss, ShortType), TimestampType), DateUtils.fromJavaTimestamp(ts))
+ checkEvaluation(cast(cast(tss, IntegerType), TimestampType), DateUtils.fromJavaTimestamp(ts))
+ checkEvaluation(cast(cast(tss, LongType), TimestampType), DateUtils.fromJavaTimestamp(ts))
checkEvaluation(
cast(cast(millis.toFloat / 1000, TimestampType), FloatType),
millis.toFloat / 1000)
@@ -283,7 +284,7 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper {
Decimal(1))
// A test for higher precision than millis
- checkEvaluation(cast(cast(0.00000001, TimestampType), DoubleType), 0.00000001)
+ checkEvaluation(cast(cast(0.0000001, TimestampType), DoubleType), 0.0000001)
checkEvaluation(cast(Double.NaN, TimestampType), null)
checkEvaluation(cast(1.0 / 0.0, TimestampType), null)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateUtilsSuite.scala
new file mode 100644
index 0000000000..a4245545ff
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateUtilsSuite.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import java.sql.Timestamp
+
+import org.apache.spark.SparkFunSuite
+
+
+class DateUtilsSuite extends SparkFunSuite {
+
+ test("timestamp") {
+ val now = new Timestamp(System.currentTimeMillis())
+ now.setNanos(100)
+ val ns = DateUtils.fromJavaTimestamp(now)
+ assert(ns % 10000000L == 1)
+ assert(DateUtils.toJavaTimestamp(ns) == now)
+
+ List(-111111111111L, -1L, 0, 1L, 111111111111L).foreach { t =>
+ val ts = DateUtils.toJavaTimestamp(t)
+ assert(DateUtils.fromJavaTimestamp(ts) == t)
+ assert(DateUtils.toJavaTimestamp(DateUtils.fromJavaTimestamp(ts)) == ts)
+ }
+ }
+}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala
index 261c4fcad2..077c0ad70a 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala
@@ -190,7 +190,7 @@ class DataTypeSuite extends SparkFunSuite {
checkDefaultSize(DecimalType(10, 5), 4096)
checkDefaultSize(DecimalType.Unlimited, 4096)
checkDefaultSize(DateType, 4)
- checkDefaultSize(TimestampType, 12)
+ checkDefaultSize(TimestampType, 8)
checkDefaultSize(StringType, 4096)
checkDefaultSize(BinaryType, 4096)
checkDefaultSize(ArrayType(DoubleType, true), 800)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
index b0f983c180..83881a3687 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
@@ -17,10 +17,8 @@
package org.apache.spark.sql.columnar
-import java.sql.Timestamp
-
import org.apache.spark.sql.Row
-import org.apache.spark.sql.catalyst.expressions.{AttributeMap, Attribute, AttributeReference}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference}
import org.apache.spark.sql.types._
private[sql] class ColumnStatisticsSchema(a: Attribute) extends Serializable {
@@ -234,22 +232,7 @@ private[sql] class StringColumnStats extends ColumnStats {
private[sql] class DateColumnStats extends IntColumnStats
-private[sql] class TimestampColumnStats extends ColumnStats {
- protected var upper: Timestamp = null
- protected var lower: Timestamp = null
-
- override def gatherStats(row: Row, ordinal: Int): Unit = {
- super.gatherStats(row, ordinal)
- if (!row.isNullAt(ordinal)) {
- val value = row(ordinal).asInstanceOf[Timestamp]
- if (upper == null || value.compareTo(upper) > 0) upper = value
- if (lower == null || value.compareTo(lower) < 0) lower = value
- sizeInBytes += TIMESTAMP.defaultSize
- }
- }
-
- override def collectedStatistics: Row = Row(lower, upper, nullCount, count, sizeInBytes)
-}
+private[sql] class TimestampColumnStats extends LongColumnStats
private[sql] class BinaryColumnStats extends ColumnStats {
override def gatherStats(row: Row, ordinal: Int): Unit = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
index 20be5ca9d0..c9c4d630fb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
@@ -18,7 +18,6 @@
package org.apache.spark.sql.columnar
import java.nio.ByteBuffer
-import java.sql.Timestamp
import scala.reflect.runtime.universe.TypeTag
@@ -355,22 +354,20 @@ private[sql] object DATE extends NativeColumnType(DateType, 8, 4) {
}
}
-private[sql] object TIMESTAMP extends NativeColumnType(TimestampType, 9, 12) {
- override def extract(buffer: ByteBuffer): Timestamp = {
- val timestamp = new Timestamp(buffer.getLong())
- timestamp.setNanos(buffer.getInt())
- timestamp
+private[sql] object TIMESTAMP extends NativeColumnType(TimestampType, 9, 8) {
+ override def extract(buffer: ByteBuffer): Long = {
+ buffer.getLong
}
- override def append(v: Timestamp, buffer: ByteBuffer): Unit = {
- buffer.putLong(v.getTime).putInt(v.getNanos)
+ override def append(v: Long, buffer: ByteBuffer): Unit = {
+ buffer.putLong(v)
}
- override def getField(row: Row, ordinal: Int): Timestamp = {
- row(ordinal).asInstanceOf[Timestamp]
+ override def getField(row: Row, ordinal: Int): Long = {
+ row(ordinal).asInstanceOf[Long]
}
- override def setField(row: MutableRow, ordinal: Int, value: Timestamp): Unit = {
+ override def setField(row: MutableRow, ordinal: Int, value: Long): Unit = {
row(ordinal) = value
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
index 256d527d7b..60f3b2d539 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
@@ -20,14 +20,13 @@ package org.apache.spark.sql.execution
import java.io._
import java.math.{BigDecimal, BigInteger}
import java.nio.ByteBuffer
-import java.sql.Timestamp
import scala.reflect.ClassTag
-import org.apache.spark.serializer._
import org.apache.spark.Logging
+import org.apache.spark.serializer._
import org.apache.spark.sql.Row
-import org.apache.spark.sql.catalyst.expressions.{SpecificMutableRow, MutableRow, GenericMutableRow}
+import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, MutableRow, SpecificMutableRow}
import org.apache.spark.sql.types._
/**
@@ -304,11 +303,7 @@ private[sql] object SparkSqlSerializer2 {
out.writeByte(NULL)
} else {
out.writeByte(NOT_NULL)
- val timestamp = row.getAs[java.sql.Timestamp](i)
- val time = timestamp.getTime
- val nanos = timestamp.getNanos
- out.writeLong(time - (nanos / 1000000)) // Write the milliseconds value.
- out.writeInt(nanos) // Write the nanoseconds part.
+ out.writeLong(row.getAs[Long](i))
}
case StringType =>
@@ -429,11 +424,7 @@ private[sql] object SparkSqlSerializer2 {
if (in.readByte() == NULL) {
mutableRow.setNullAt(i)
} else {
- val time = in.readLong() // Read the milliseconds value.
- val nanos = in.readInt() // Read the nanoseconds part.
- val timestamp = new Timestamp(time)
- timestamp.setNanos(nanos)
- mutableRow.update(i, timestamp)
+ mutableRow.update(i, in.readLong())
}
case StringType =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
index dffb265601..720b529d59 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
@@ -170,6 +170,8 @@ package object debug {
case (_: Short, ShortType) =>
case (_: Boolean, BooleanType) =>
case (_: Double, DoubleType) =>
+ case (_: Int, DateType) =>
+ case (_: Long, TimestampType) =>
case (v, udt: UserDefinedType[_]) => typeCheck(v, udt.sqlType)
case (d, t) => sys.error(s"Invalid data found: got $d (${d.getClass}) expected $t")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
index 3425879047..955b478a48 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
@@ -148,6 +148,7 @@ object EvaluatePython {
case (ud, udt: UserDefinedType[_]) => toJava(udt.serialize(ud), udt.sqlType)
case (date: Int, DateType) => DateUtils.toJavaDate(date)
+ case (t: Long, TimestampType) => DateUtils.toJavaTimestamp(t)
case (s: UTF8String, StringType) => s.toString
// Pyrolite can handle Timestamp and Decimal
@@ -186,10 +187,12 @@ object EvaluatePython {
}): Row
case (c: java.util.Calendar, DateType) =>
- DateUtils.fromJavaDate(new java.sql.Date(c.getTime().getTime()))
+ DateUtils.fromJavaDate(new java.sql.Date(c.getTimeInMillis))
case (c: java.util.Calendar, TimestampType) =>
- new java.sql.Timestamp(c.getTime().getTime())
+ c.getTimeInMillis * 10000L
+ case (t: java.sql.Timestamp, TimestampType) =>
+ DateUtils.fromJavaTimestamp(t)
case (_, udt: UserDefinedType[_]) =>
fromJava(obj, udt.sqlType)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
index db68b9c86d..9028d5ed72 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
@@ -385,7 +385,7 @@ private[sql] class JDBCRDD(
// DateUtils.fromJavaDate does not handle null value, so we need to check it.
val dateVal = rs.getDate(pos)
if (dateVal != null) {
- mutableRow.update(i, DateUtils.fromJavaDate(dateVal))
+ mutableRow.setInt(i, DateUtils.fromJavaDate(dateVal))
} else {
mutableRow.update(i, null)
}
@@ -417,7 +417,13 @@ private[sql] class JDBCRDD(
case LongConversion => mutableRow.setLong(i, rs.getLong(pos))
// TODO(davies): use getBytes for better performance, if the encoding is UTF-8
case StringConversion => mutableRow.setString(i, rs.getString(pos))
- case TimestampConversion => mutableRow.update(i, rs.getTimestamp(pos))
+ case TimestampConversion =>
+ val t = rs.getTimestamp(pos)
+ if (t != null) {
+ mutableRow.setLong(i, DateUtils.fromJavaTimestamp(t))
+ } else {
+ mutableRow.update(i, null)
+ }
case BinaryConversion => mutableRow.update(i, rs.getBytes(pos))
case BinaryLongConversion => {
val bytes = rs.getBytes(pos)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala
index 0e22375805..4e07cf36ae 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala
@@ -18,7 +18,6 @@
package org.apache.spark.sql.json
import java.io.ByteArrayOutputStream
-import java.sql.Timestamp
import scala.collection.Map
@@ -65,10 +64,10 @@ private[sql] object JacksonParser {
DateUtils.millisToDays(DateUtils.stringToTime(parser.getText).getTime)
case (VALUE_STRING, TimestampType) =>
- new Timestamp(DateUtils.stringToTime(parser.getText).getTime)
+ DateUtils.stringToTime(parser.getText).getTime * 10000L
case (VALUE_NUMBER_INT, TimestampType) =>
- new Timestamp(parser.getLongValue)
+ parser.getLongValue * 10000L
case (_, StringType) =>
val writer = new ByteArrayOutputStream()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
index 7e1e21f5fb..fb0d137bdb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
@@ -17,8 +17,6 @@
package org.apache.spark.sql.json
-import java.sql.Timestamp
-
import scala.collection.Map
import scala.collection.convert.Wrappers.{JListWrapper, JMapWrapper}
@@ -398,11 +396,11 @@ private[sql] object JsonRDD extends Logging {
}
}
- private def toTimestamp(value: Any): Timestamp = {
+ private def toTimestamp(value: Any): Long = {
value match {
- case value: java.lang.Integer => new Timestamp(value.asInstanceOf[Int].toLong)
- case value: java.lang.Long => new Timestamp(value)
- case value: java.lang.String => toTimestamp(DateUtils.stringToTime(value).getTime)
+ case value: java.lang.Integer => value.asInstanceOf[Int].toLong * 10000L
+ case value: java.lang.Long => value * 10000L
+ case value: java.lang.String => DateUtils.stringToTime(value).getTime * 10000L
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
index 85c2ce740f..ddc5097f88 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
@@ -28,6 +28,7 @@ import org.apache.parquet.io.api.{PrimitiveConverter, GroupConverter, Binary, Co
import org.apache.parquet.schema.MessageType
import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.parquet.CatalystConverter.FieldType
import org.apache.spark.sql.types._
import org.apache.spark.sql.parquet.timestamp.NanoTime
@@ -266,8 +267,8 @@ private[parquet] abstract class CatalystConverter extends GroupConverter {
/**
* Read a Timestamp value from a Parquet Int96Value
*/
- protected[parquet] def readTimestamp(value: Binary): Timestamp = {
- CatalystTimestampConverter.convertToTimestamp(value)
+ protected[parquet] def readTimestamp(value: Binary): Long = {
+ DateUtils.fromJavaTimestamp(CatalystTimestampConverter.convertToTimestamp(value))
}
}
@@ -401,7 +402,7 @@ private[parquet] class CatalystPrimitiveRowConverter(
current.setInt(fieldIndex, value)
override protected[parquet] def updateDate(fieldIndex: Int, value: Int): Unit =
- current.update(fieldIndex, value)
+ current.setInt(fieldIndex, value)
override protected[parquet] def updateLong(fieldIndex: Int, value: Long): Unit =
current.setLong(fieldIndex, value)
@@ -425,7 +426,7 @@ private[parquet] class CatalystPrimitiveRowConverter(
current.update(fieldIndex, UTF8String(value))
override protected[parquet] def updateTimestamp(fieldIndex: Int, value: Binary): Unit =
- current.update(fieldIndex, readTimestamp(value))
+ current.setLong(fieldIndex, readTimestamp(value))
override protected[parquet] def updateDecimal(
fieldIndex: Int, value: Binary, ctype: DecimalType): Unit = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
index 89db408b1c..e03dbdec04 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
@@ -29,6 +29,7 @@ import org.apache.parquet.schema.MessageType
import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.expressions.{Attribute, Row}
+import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.types._
/**
@@ -204,7 +205,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
case IntegerType => writer.addInteger(value.asInstanceOf[Int])
case ShortType => writer.addInteger(value.asInstanceOf[Short])
case LongType => writer.addLong(value.asInstanceOf[Long])
- case TimestampType => writeTimestamp(value.asInstanceOf[java.sql.Timestamp])
+ case TimestampType => writeTimestamp(value.asInstanceOf[Long])
case ByteType => writer.addInteger(value.asInstanceOf[Byte])
case DoubleType => writer.addDouble(value.asInstanceOf[Double])
case FloatType => writer.addFloat(value.asInstanceOf[Float])
@@ -311,8 +312,9 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
writer.addBinary(Binary.fromByteArray(scratchBytes, 0, numBytes))
}
- private[parquet] def writeTimestamp(ts: java.sql.Timestamp): Unit = {
- val binaryNanoTime = CatalystTimestampConverter.convertFromTimestamp(ts)
+ private[parquet] def writeTimestamp(ts: Long): Unit = {
+ val binaryNanoTime = CatalystTimestampConverter.convertFromTimestamp(
+ DateUtils.toJavaTimestamp(ts))
writer.addBinary(binaryNanoTime)
}
}
@@ -357,7 +359,7 @@ private[parquet] class MutableRowWriteSupport extends RowWriteSupport {
case FloatType => writer.addFloat(record.getFloat(index))
case BooleanType => writer.addBoolean(record.getBoolean(index))
case DateType => writer.addInteger(record.getInt(index))
- case TimestampType => writeTimestamp(record(index).asInstanceOf[java.sql.Timestamp])
+ case TimestampType => writeTimestamp(record(index).asInstanceOf[Long])
case d: DecimalType =>
if (d.precisionInfo == None || d.precisionInfo.get.precision > 18) {
sys.error(s"Unsupported datatype $d, cannot write to consumer")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 72e60d9aa7..17a3cec48b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -25,7 +25,7 @@ import org.scalatest.concurrent.Eventually._
import org.apache.spark.Accumulators
import org.apache.spark.sql.TestData._
import org.apache.spark.sql.columnar._
-import org.apache.spark.storage.{RDDBlockId, StorageLevel}
+import org.apache.spark.storage.{StorageLevel, RDDBlockId}
case class BigData(s: String)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala
index 339e719f39..16836628cb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala
@@ -31,7 +31,7 @@ class ColumnStatsSuite extends SparkFunSuite {
testColumnStats(classOf[FixedDecimalColumnStats], FIXED_DECIMAL(15, 10), Row(null, null, 0))
testColumnStats(classOf[StringColumnStats], STRING, Row(null, null, 0))
testColumnStats(classOf[DateColumnStats], DATE, Row(Int.MaxValue, Int.MinValue, 0))
- testColumnStats(classOf[TimestampColumnStats], TIMESTAMP, Row(null, null, 0))
+ testColumnStats(classOf[TimestampColumnStats], TIMESTAMP, Row(Long.MaxValue, Long.MinValue, 0))
def testColumnStats[T <: AtomicType, U <: ColumnStats](
columnStatsClass: Class[U],
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala
index a1e76eaa98..8421e670ff 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala
@@ -18,17 +18,16 @@
package org.apache.spark.sql.columnar
import java.nio.ByteBuffer
-import java.sql.Timestamp
-import com.esotericsoftware.kryo.{Serializer, Kryo}
import com.esotericsoftware.kryo.io.{Input, Output}
-import org.apache.spark.serializer.KryoRegistrator
+import com.esotericsoftware.kryo.{Kryo, Serializer}
-import org.apache.spark.{Logging, SparkConf, SparkFunSuite}
+import org.apache.spark.serializer.KryoRegistrator
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
import org.apache.spark.sql.columnar.ColumnarTestUtils._
import org.apache.spark.sql.execution.SparkSqlSerializer
import org.apache.spark.sql.types._
+import org.apache.spark.{Logging, SparkConf, SparkFunSuite}
class ColumnTypeSuite extends SparkFunSuite with Logging {
val DEFAULT_BUFFER_SIZE = 512
@@ -36,7 +35,7 @@ class ColumnTypeSuite extends SparkFunSuite with Logging {
test("defaultSize") {
val checks = Map(
INT -> 4, SHORT -> 2, LONG -> 8, BYTE -> 1, DOUBLE -> 8, FLOAT -> 4,
- FIXED_DECIMAL(15, 10) -> 8, BOOLEAN -> 1, STRING -> 8, DATE -> 4, TIMESTAMP -> 12,
+ FIXED_DECIMAL(15, 10) -> 8, BOOLEAN -> 1, STRING -> 8, DATE -> 4, TIMESTAMP -> 8,
BINARY -> 16, GENERIC -> 16)
checks.foreach { case (columnType, expectedSize) =>
@@ -69,7 +68,7 @@ class ColumnTypeSuite extends SparkFunSuite with Logging {
checkActualSize(BOOLEAN, true, 1)
checkActualSize(STRING, UTF8String("hello"), 4 + "hello".getBytes("utf-8").length)
checkActualSize(DATE, 0, 4)
- checkActualSize(TIMESTAMP, new Timestamp(0L), 12)
+ checkActualSize(TIMESTAMP, 0L, 8)
val binary = Array.fill[Byte](4)(0: Byte)
checkActualSize(BINARY, binary, 4 + 4)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala
index 75d993e563..c5d38595c0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala
@@ -17,14 +17,12 @@
package org.apache.spark.sql.columnar
-import java.sql.Timestamp
-
import scala.collection.immutable.HashSet
import scala.util.Random
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
-import org.apache.spark.sql.types.{UTF8String, DataType, Decimal, AtomicType}
+import org.apache.spark.sql.types.{AtomicType, DataType, Decimal, UTF8String}
object ColumnarTestUtils {
def makeNullRow(length: Int): GenericMutableRow = {
@@ -52,10 +50,7 @@ object ColumnarTestUtils {
case BOOLEAN => Random.nextBoolean()
case BINARY => randomBytes(Random.nextInt(32))
case DATE => Random.nextInt()
- case TIMESTAMP =>
- val timestamp = new Timestamp(Random.nextLong())
- timestamp.setNanos(Random.nextInt(999999999))
- timestamp
+ case TIMESTAMP => Random.nextLong()
case _ =>
// Using a random one-element map instead of an arbitrary object
Map(Random.nextInt() -> Random.nextString(Random.nextInt(32)))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 49d348c3ed..69ab1c292d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -326,7 +326,7 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter {
assert(cal.get(Calendar.HOUR) === 11)
assert(cal.get(Calendar.MINUTE) === 22)
assert(cal.get(Calendar.SECOND) === 33)
- assert(rows(0).getAs[java.sql.Timestamp](2).getNanos === 543543543)
+ assert(rows(0).getAs[java.sql.Timestamp](2).getNanos === 543543500)
}
test("test DATE types") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
index d889c7be17..fca24364fe 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
@@ -76,21 +76,25 @@ class JsonSuite extends QueryTest with TestJsonData {
checkTypePromotion(
Decimal(doubleNumber), enforceCorrectType(doubleNumber, DecimalType.Unlimited))
- checkTypePromotion(new Timestamp(intNumber), enforceCorrectType(intNumber, TimestampType))
- checkTypePromotion(new Timestamp(intNumber.toLong),
+ checkTypePromotion(DateUtils.fromJavaTimestamp(new Timestamp(intNumber)),
+ enforceCorrectType(intNumber, TimestampType))
+ checkTypePromotion(DateUtils.fromJavaTimestamp(new Timestamp(intNumber.toLong)),
enforceCorrectType(intNumber.toLong, TimestampType))
val strTime = "2014-09-30 12:34:56"
- checkTypePromotion(Timestamp.valueOf(strTime), enforceCorrectType(strTime, TimestampType))
+ checkTypePromotion(DateUtils.fromJavaTimestamp(Timestamp.valueOf(strTime)),
+ enforceCorrectType(strTime, TimestampType))
val strDate = "2014-10-15"
checkTypePromotion(
DateUtils.fromJavaDate(Date.valueOf(strDate)), enforceCorrectType(strDate, DateType))
val ISO8601Time1 = "1970-01-01T01:00:01.0Z"
- checkTypePromotion(new Timestamp(3601000), enforceCorrectType(ISO8601Time1, TimestampType))
+ checkTypePromotion(DateUtils.fromJavaTimestamp(new Timestamp(3601000)),
+ enforceCorrectType(ISO8601Time1, TimestampType))
checkTypePromotion(DateUtils.millisToDays(3601000), enforceCorrectType(ISO8601Time1, DateType))
val ISO8601Time2 = "1970-01-01T02:00:01-01:00"
- checkTypePromotion(new Timestamp(10801000), enforceCorrectType(ISO8601Time2, TimestampType))
+ checkTypePromotion(DateUtils.fromJavaTimestamp(new Timestamp(10801000)),
+ enforceCorrectType(ISO8601Time2, TimestampType))
checkTypePromotion(DateUtils.millisToDays(10801000), enforceCorrectType(ISO8601Time2, DateType))
}
diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
index 0693c7ea5b..82c0b49459 100644
--- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
+++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
@@ -252,7 +252,11 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
"load_dyn_part14.*", // These work alone but fail when run with other tests...
// the answer is sensitive for jdk version
- "udf_java_method"
+ "udf_java_method",
+
+ // Spark SQL use Long for TimestampType, lose the precision under 100ns
+ "timestamp_1",
+ "timestamp_2"
)
/**
@@ -795,8 +799,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
"stats_publisher_error_1",
"subq2",
"tablename_with_select",
- "timestamp_1",
- "timestamp_2",
"timestamp_3",
"timestamp_comparison",
"timestamp_lazy",
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
index c466203cd0..1f14cba78f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
@@ -250,7 +250,8 @@ private[hive] trait HiveInspectors {
PrimitiveObjectInspectorFactory.javaHiveDecimalObjectInspector,
poi.getWritableConstantValue.getHiveDecimal)
case poi: WritableConstantTimestampObjectInspector =>
- poi.getWritableConstantValue.getTimestamp.clone()
+ val t = poi.getWritableConstantValue
+ t.getSeconds * 10000000L + t.getNanos / 100L
case poi: WritableConstantIntObjectInspector =>
poi.getWritableConstantValue.get()
case poi: WritableConstantDoubleObjectInspector =>
@@ -313,11 +314,11 @@ private[hive] trait HiveInspectors {
case x: DateObjectInspector if x.preferWritable() =>
DateUtils.fromJavaDate(x.getPrimitiveWritableObject(data).get())
case x: DateObjectInspector => DateUtils.fromJavaDate(x.getPrimitiveJavaObject(data))
- // org.apache.hadoop.hive.serde2.io.TimestampWritable.set will reset current time object
- // if next timestamp is null, so Timestamp object is cloned
case x: TimestampObjectInspector if x.preferWritable() =>
- x.getPrimitiveWritableObject(data).getTimestamp.clone()
- case ti: TimestampObjectInspector => ti.getPrimitiveJavaObject(data).clone()
+ val t = x.getPrimitiveWritableObject(data)
+ t.getSeconds * 10000000L + t.getNanos / 100
+ case ti: TimestampObjectInspector =>
+ DateUtils.fromJavaTimestamp(ti.getPrimitiveJavaObject(data))
case _ => pi.getPrimitiveJavaObject(data)
}
case li: ListObjectInspector =>
@@ -356,6 +357,9 @@ private[hive] trait HiveInspectors {
case _: JavaDateObjectInspector =>
(o: Any) => DateUtils.toJavaDate(o.asInstanceOf[Int])
+ case _: JavaTimestampObjectInspector =>
+ (o: Any) => DateUtils.toJavaTimestamp(o.asInstanceOf[Long])
+
case soi: StandardStructObjectInspector =>
val wrappers = soi.getAllStructFieldRefs.map(ref => wrapperFor(ref.getFieldObjectInspector))
(o: Any) => {
@@ -465,7 +469,7 @@ private[hive] trait HiveInspectors {
case _: DateObjectInspector if x.preferWritable() => getDateWritable(a)
case _: DateObjectInspector => DateUtils.toJavaDate(a.asInstanceOf[Int])
case _: TimestampObjectInspector if x.preferWritable() => getTimestampWritable(a)
- case _: TimestampObjectInspector => a.asInstanceOf[java.sql.Timestamp]
+ case _: TimestampObjectInspector => DateUtils.toJavaTimestamp(a.asInstanceOf[Long])
}
case x: SettableStructObjectInspector =>
val fieldRefs = x.getAllStructFieldRefs
@@ -727,7 +731,7 @@ private[hive] trait HiveInspectors {
TypeInfoFactory.voidTypeInfo, null)
private def getStringWritable(value: Any): hadoopIo.Text =
- if (value == null) null else new hadoopIo.Text(value.asInstanceOf[UTF8String].toString)
+ if (value == null) null else new hadoopIo.Text(value.asInstanceOf[UTF8String].getBytes)
private def getIntWritable(value: Any): hadoopIo.IntWritable =
if (value == null) null else new hadoopIo.IntWritable(value.asInstanceOf[Int])
@@ -776,7 +780,7 @@ private[hive] trait HiveInspectors {
if (value == null) {
null
} else {
- new hiveIo.TimestampWritable(value.asInstanceOf[java.sql.Timestamp])
+ new hiveIo.TimestampWritable(DateUtils.toJavaTimestamp(value.asInstanceOf[Long]))
}
private def getDecimalWritable(value: Any): hiveIo.HiveDecimalWritable =
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index 334bfccc9d..d3c82d8c2e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -363,10 +363,10 @@ private[hive] object HadoopTableReader extends HiveInspectors with Logging {
row.update(ordinal, HiveShim.toCatalystDecimal(oi, value))
case oi: TimestampObjectInspector =>
(value: Any, row: MutableRow, ordinal: Int) =>
- row.update(ordinal, oi.getPrimitiveJavaObject(value).clone())
+ row.setLong(ordinal, DateUtils.fromJavaTimestamp(oi.getPrimitiveJavaObject(value)))
case oi: DateObjectInspector =>
(value: Any, row: MutableRow, ordinal: Int) =>
- row.update(ordinal, DateUtils.fromJavaDate(oi.getPrimitiveJavaObject(value)))
+ row.setInt(ordinal, DateUtils.fromJavaDate(oi.getPrimitiveJavaObject(value)))
case oi: BinaryObjectInspector =>
(value: Any, row: MutableRow, ordinal: Int) =>
row.update(ordinal, oi.getPrimitiveJavaObject(value))
diff --git a/sql/hive/src/test/resources/golden/timestamp cast #5-0-dbd7bcd167d322d6617b884c02c7f247 b/sql/hive/src/test/resources/golden/timestamp cast #5-0-dbd7bcd167d322d6617b884c02c7f247
index 27de46fdf2..84a31a5a69 100644
--- a/sql/hive/src/test/resources/golden/timestamp cast #5-0-dbd7bcd167d322d6617b884c02c7f247
+++ b/sql/hive/src/test/resources/golden/timestamp cast #5-0-dbd7bcd167d322d6617b884c02c7f247
@@ -1 +1 @@
--0.0010000000000000009
+-0.001