diff options
author | Yijie Shen <henry.yijieshen@gmail.com> | 2015-07-08 20:20:17 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-07-08 20:20:17 -0700 |
commit | a290814877308c6fa9b0f78b1a81145db7651ca4 (patch) | |
tree | 2ed969329abf881ed157734259cdbba93bbb3afb | |
parent | 28fa01e2ba146e823489f6d81c5eb3a76b20c71f (diff) | |
download | spark-a290814877308c6fa9b0f78b1a81145db7651ca4.tar.gz spark-a290814877308c6fa9b0f78b1a81145db7651ca4.tar.bz2 spark-a290814877308c6fa9b0f78b1a81145db7651ca4.zip |
[SPARK-8866][SQL] use 1us precision for timestamp type
JIRA: https://issues.apache.org/jira/browse/SPARK-8866
Author: Yijie Shen <henry.yijieshen@gmail.com>
Closes #7283 from yijieshen/micro_timestamp and squashes the following commits:
dc735df [Yijie Shen] update CastSuite to avoid round error
714eaea [Yijie Shen] add timestamp_udf into blacklist due to precision lose
c3ca2f4 [Yijie Shen] fix unhandled case in CurrentTimestamp
8d4aa6b [Yijie Shen] use 1us precision for timestamp type
11 files changed, 50 insertions, 50 deletions
diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 7e64cb0b54..fecfe6d71e 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -775,7 +775,7 @@ def _python_to_sql_converter(dataType): if dt: seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo else time.mktime(dt.timetuple())) - return int(seconds * 1e7 + dt.microsecond * 10) + return int(seconds * 1e6 + dt.microsecond) return to_posix_timstamp else: diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index 662ceeca77..567feca713 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -186,7 +186,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w case ByteType => buildCast[Byte](_, b => longToTimestamp(b.toLong)) case DateType => - buildCast[Int](_, d => DateTimeUtils.daysToMillis(d) * 10000) + buildCast[Int](_, d => DateTimeUtils.daysToMillis(d) * 1000) // TimestampWritable.decimalToTimestamp case DecimalType() => buildCast[Decimal](_, d => decimalToTimestamp(d)) @@ -207,16 +207,16 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w } private[this] def decimalToTimestamp(d: Decimal): Long = { - (d.toBigDecimal * 10000000L).longValue() + (d.toBigDecimal * 1000000L).longValue() } - // converting milliseconds to 100ns - private[this] def longToTimestamp(t: Long): Long = t * 10000L - // converting 100ns to seconds - private[this] def timestampToLong(ts: Long): Long = math.floor(ts.toDouble / 10000000L).toLong - // converting 100ns to seconds in double + // converting milliseconds to us + private[this] def longToTimestamp(t: Long): Long = t * 1000L + // converting us to seconds + private[this] def timestampToLong(ts: Long): Long = math.floor(ts.toDouble / 1000000L).toLong + // converting us to seconds in double private[this] def timestampToDouble(ts: Long): Double = { - ts / 10000000.0 + ts / 1000000.0 } // DateConverter @@ -229,7 +229,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w case TimestampType => // throw valid precision more than seconds, according to Hive. // Timestamp.nanos is in 0 to 999,999,999, no more than a second. - buildCast[Long](_, t => DateTimeUtils.millisToDays(t / 10000L)) + buildCast[Long](_, t => DateTimeUtils.millisToDays(t / 1000L)) // Hive throws this exception as a Semantic Exception // It is never possible to compare result when hive return with exception, // so we can return null diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala index a492b966a5..dd5ec330a7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala @@ -51,6 +51,6 @@ case class CurrentTimestamp() extends LeafExpression { override def dataType: DataType = TimestampType override def eval(input: InternalRow): Any = { - System.currentTimeMillis() * 10000L + System.currentTimeMillis() * 1000L } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index 4269ad5d56..c1ddee3ef0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -34,8 +34,8 @@ object DateTimeUtils { // see http://stackoverflow.com/questions/466321/convert-unix-timestamp-to-julian final val JULIAN_DAY_OF_EPOCH = 2440587 // and .5 final val SECONDS_PER_DAY = 60 * 60 * 24L - final val HUNDRED_NANOS_PER_SECOND = 1000L * 1000L * 10L - final val NANOS_PER_SECOND = HUNDRED_NANOS_PER_SECOND * 100 + final val MICROS_PER_SECOND = 1000L * 1000L + final val NANOS_PER_SECOND = MICROS_PER_SECOND * 1000L // Java TimeZone has no mention of thread safety. Use thread local instance to be safe. @@ -77,8 +77,8 @@ object DateTimeUtils { threadLocalDateFormat.get.format(toJavaDate(days)) // Converts Timestamp to string according to Hive TimestampWritable convention. - def timestampToString(num100ns: Long): String = { - val ts = toJavaTimestamp(num100ns) + def timestampToString(us: Long): String = { + val ts = toJavaTimestamp(us) val timestampString = ts.toString val formatted = threadLocalTimestampFormat.get.format(ts) @@ -132,52 +132,52 @@ object DateTimeUtils { } /** - * Returns a java.sql.Timestamp from number of 100ns since epoch. + * Returns a java.sql.Timestamp from number of micros since epoch. */ - def toJavaTimestamp(num100ns: Long): Timestamp = { + def toJavaTimestamp(us: Long): Timestamp = { // setNanos() will overwrite the millisecond part, so the milliseconds should be // cut off at seconds - var seconds = num100ns / HUNDRED_NANOS_PER_SECOND - var nanos = num100ns % HUNDRED_NANOS_PER_SECOND + var seconds = us / MICROS_PER_SECOND + var micros = us % MICROS_PER_SECOND // setNanos() can not accept negative value - if (nanos < 0) { - nanos += HUNDRED_NANOS_PER_SECOND + if (micros < 0) { + micros += MICROS_PER_SECOND seconds -= 1 } val t = new Timestamp(seconds * 1000) - t.setNanos(nanos.toInt * 100) + t.setNanos(micros.toInt * 1000) t } /** - * Returns the number of 100ns since epoch from java.sql.Timestamp. + * Returns the number of micros since epoch from java.sql.Timestamp. */ def fromJavaTimestamp(t: Timestamp): Long = { if (t != null) { - t.getTime() * 10000L + (t.getNanos().toLong / 100) % 10000L + t.getTime() * 1000L + (t.getNanos().toLong / 1000) % 1000L } else { 0L } } /** - * Returns the number of 100ns (hundred of nanoseconds) since epoch from Julian day + * Returns the number of microseconds since epoch from Julian day * and nanoseconds in a day */ def fromJulianDay(day: Int, nanoseconds: Long): Long = { // use Long to avoid rounding errors val seconds = (day - JULIAN_DAY_OF_EPOCH).toLong * SECONDS_PER_DAY - SECONDS_PER_DAY / 2 - seconds * HUNDRED_NANOS_PER_SECOND + nanoseconds / 100L + seconds * MICROS_PER_SECOND + nanoseconds / 1000L } /** - * Returns Julian day and nanoseconds in a day from the number of 100ns (hundred of nanoseconds) + * Returns Julian day and nanoseconds in a day from the number of microseconds */ - def toJulianDay(num100ns: Long): (Int, Long) = { - val seconds = num100ns / HUNDRED_NANOS_PER_SECOND + SECONDS_PER_DAY / 2 + def toJulianDay(us: Long): (Int, Long) = { + val seconds = us / MICROS_PER_SECOND + SECONDS_PER_DAY / 2 val day = seconds / SECONDS_PER_DAY + JULIAN_DAY_OF_EPOCH val secondsInDay = seconds % SECONDS_PER_DAY - val nanos = (num100ns % HUNDRED_NANOS_PER_SECOND) * 100L + val nanos = (us % MICROS_PER_SECOND) * 1000L (day.toInt, secondsInDay * NANOS_PER_SECOND + nanos) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala index 518961e383..919fdd470b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala @@ -293,15 +293,15 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper { } test("cast from timestamp") { - val millis = 15 * 1000 + 2 - val seconds = millis * 1000 + 2 + val millis = 15 * 1000 + 3 + val seconds = millis * 1000 + 3 val ts = new Timestamp(millis) val tss = new Timestamp(seconds) checkEvaluation(cast(ts, ShortType), 15.toShort) checkEvaluation(cast(ts, IntegerType), 15) checkEvaluation(cast(ts, LongType), 15.toLong) - checkEvaluation(cast(ts, FloatType), 15.002f) - checkEvaluation(cast(ts, DoubleType), 15.002) + checkEvaluation(cast(ts, FloatType), 15.003f) + checkEvaluation(cast(ts, DoubleType), 15.003) checkEvaluation(cast(cast(tss, ShortType), TimestampType), DateTimeUtils.fromJavaTimestamp(ts)) checkEvaluation(cast(cast(tss, IntegerType), TimestampType), DateTimeUtils.fromJavaTimestamp(ts)) @@ -317,7 +317,7 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper { Decimal(1)) // A test for higher precision than millis - checkEvaluation(cast(cast(0.0000001, TimestampType), DoubleType), 0.0000001) + checkEvaluation(cast(cast(0.000001, TimestampType), DoubleType), 0.000001) checkEvaluation(cast(Double.NaN, TimestampType), null) checkEvaluation(cast(1.0 / 0.0, TimestampType), null) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala index 1d4a60c81e..f63ac191e7 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala @@ -24,11 +24,11 @@ import org.apache.spark.SparkFunSuite class DateTimeUtilsSuite extends SparkFunSuite { - test("timestamp and 100ns") { + test("timestamp and us") { val now = new Timestamp(System.currentTimeMillis()) - now.setNanos(100) + now.setNanos(1000) val ns = DateTimeUtils.fromJavaTimestamp(now) - assert(ns % 10000000L === 1) + assert(ns % 1000000L === 1) assert(DateTimeUtils.toJavaTimestamp(ns) === now) List(-111111111111L, -1L, 0, 1L, 111111111111L).foreach { t => @@ -38,7 +38,7 @@ class DateTimeUtilsSuite extends SparkFunSuite { } } - test("100ns and julian day") { + test("us and julian day") { val (d, ns) = DateTimeUtils.toJulianDay(0) assert(d === DateTimeUtils.JULIAN_DAY_OF_EPOCH) assert(ns === DateTimeUtils.SECONDS_PER_DAY / 2 * DateTimeUtils.NANOS_PER_SECOND) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala index 4b8ab63b5a..381e7ed544 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala @@ -67,10 +67,10 @@ private[sql] object JacksonParser { DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(parser.getText).getTime) case (VALUE_STRING, TimestampType) => - DateTimeUtils.stringToTime(parser.getText).getTime * 10000L + DateTimeUtils.stringToTime(parser.getText).getTime * 1000L case (VALUE_NUMBER_INT, TimestampType) => - parser.getLongValue * 10000L + parser.getLongValue * 1000L case (_, StringType) => val writer = new ByteArrayOutputStream() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala index 01ba05cbd1..b392a51bf7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala @@ -401,9 +401,9 @@ private[sql] object JsonRDD extends Logging { private def toTimestamp(value: Any): Long = { value match { - case value: java.lang.Integer => value.asInstanceOf[Int].toLong * 10000L - case value: java.lang.Long => value * 10000L - case value: java.lang.String => DateTimeUtils.stringToTime(value).getTime * 10000L + case value: java.lang.Integer => value.asInstanceOf[Int].toLong * 1000L + case value: java.lang.Long => value * 1000L + case value: java.lang.String => DateTimeUtils.stringToTime(value).getTime * 1000L } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 69ab1c292d..566a52dc1b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -326,7 +326,7 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter { assert(cal.get(Calendar.HOUR) === 11) assert(cal.get(Calendar.MINUTE) === 22) assert(cal.get(Calendar.SECOND) === 33) - assert(rows(0).getAs[java.sql.Timestamp](2).getNanos === 543543500) + assert(rows(0).getAs[java.sql.Timestamp](2).getNanos === 543543000) } test("test DATE types") { diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index 415a81644c..c884c39928 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -254,9 +254,10 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { // the answer is sensitive for jdk version "udf_java_method", - // Spark SQL use Long for TimestampType, lose the precision under 100ns + // Spark SQL use Long for TimestampType, lose the precision under 1us "timestamp_1", - "timestamp_2" + "timestamp_2", + "timestamp_udf" ) /** @@ -803,7 +804,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "timestamp_comparison", "timestamp_lazy", "timestamp_null", - "timestamp_udf", "touch", "transform_ppr1", "transform_ppr2", diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala index 4cba17524a..a8f2ee37cb 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala @@ -267,7 +267,7 @@ private[hive] trait HiveInspectors { poi.getWritableConstantValue.getHiveDecimal) case poi: WritableConstantTimestampObjectInspector => val t = poi.getWritableConstantValue - t.getSeconds * 10000000L + t.getNanos / 100L + t.getSeconds * 1000000L + t.getNanos / 1000L case poi: WritableConstantIntObjectInspector => poi.getWritableConstantValue.get() case poi: WritableConstantDoubleObjectInspector => @@ -332,7 +332,7 @@ private[hive] trait HiveInspectors { case x: DateObjectInspector => DateTimeUtils.fromJavaDate(x.getPrimitiveJavaObject(data)) case x: TimestampObjectInspector if x.preferWritable() => val t = x.getPrimitiveWritableObject(data) - t.getSeconds * 10000000L + t.getNanos / 100 + t.getSeconds * 1000000L + t.getNanos / 1000L case ti: TimestampObjectInspector => DateTimeUtils.fromJavaTimestamp(ti.getPrimitiveJavaObject(data)) case _ => pi.getPrimitiveJavaObject(data) |