aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-06-10 16:55:39 -0700
committerReynold Xin <rxin@databricks.com>2015-06-10 16:55:39 -0700
commit37719e0cd0b00cc5ffee0ebe1652d465a574db0f (patch)
tree326c8178ed25ef17135ed2978c6dbfaf9e7593e3 /core
parentb928f543845ddd39e914a0e8f0b0205fd86100c5 (diff)
downloadspark-37719e0cd0b00cc5ffee0ebe1652d465a574db0f.tar.gz
spark-37719e0cd0b00cc5ffee0ebe1652d465a574db0f.tar.bz2
spark-37719e0cd0b00cc5ffee0ebe1652d465a574db0f.zip
[SPARK-8189] [SQL] use Long for TimestampType in SQL
This PR change to use Long as internal type for TimestampType for efficiency, which means it will the precision below 100ns. Author: Davies Liu <davies@databricks.com> Closes #6733 from davies/timestamp and squashes the following commits: d9565fa [Davies Liu] remove print 65cf2f1 [Davies Liu] fix Timestamp in SparkR 86fecfb [Davies Liu] disable two timestamp tests 8f77ee0 [Davies Liu] fix scala style 246ee74 [Davies Liu] address comments 309d2e1 [Davies Liu] use Long for TimestampType in SQL
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/api/r/SerDe.scala17
1 files changed, 13 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
index f8e3f1a790..56adc857d4 100644
--- a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
@@ -18,7 +18,7 @@
package org.apache.spark.api.r
import java.io.{DataInputStream, DataOutputStream}
-import java.sql.{Date, Time}
+import java.sql.{Timestamp, Date, Time}
import scala.collection.JavaConversions._
@@ -107,9 +107,12 @@ private[spark] object SerDe {
Date.valueOf(readString(in))
}
- def readTime(in: DataInputStream): Time = {
- val t = in.readDouble()
- new Time((t * 1000L).toLong)
+ def readTime(in: DataInputStream): Timestamp = {
+ val seconds = in.readDouble()
+ val sec = Math.floor(seconds).toLong
+ val t = new Timestamp(sec * 1000L)
+ t.setNanos(((seconds - sec) * 1e9).toInt)
+ t
}
def readBytesArr(in: DataInputStream): Array[Array[Byte]] = {
@@ -227,6 +230,9 @@ private[spark] object SerDe {
case "java.sql.Time" =>
writeType(dos, "time")
writeTime(dos, value.asInstanceOf[Time])
+ case "java.sql.Timestamp" =>
+ writeType(dos, "time")
+ writeTime(dos, value.asInstanceOf[Timestamp])
case "[B" =>
writeType(dos, "raw")
writeBytes(dos, value.asInstanceOf[Array[Byte]])
@@ -289,6 +295,9 @@ private[spark] object SerDe {
out.writeDouble(value.getTime.toDouble / 1000.0)
}
+ def writeTime(out: DataOutputStream, value: Timestamp): Unit = {
+ out.writeDouble((value.getTime / 1000).toDouble + value.getNanos.toDouble / 1e9)
+ }
// NOTE: Only works for ASCII right now
def writeString(out: DataOutputStream, value: String): Unit = {