diff options
author | Davies Liu <davies@databricks.com> | 2015-08-01 21:46:46 -0700 |
---|---|---|
committer | Davies Liu <davies.liu@gmail.com> | 2015-08-01 21:46:46 -0700 |
commit | c1b0cbd762d78bedca0ab564cf9ca0970b7b99d2 (patch) | |
tree | cbdb4059022abbf2cc502d1fd920f3c29f6e2618 /sql/catalyst/src/main/scala | |
parent | 00cd92f32f17ca57d47aa2dcc716eb707aaee799 (diff) | |
download | spark-c1b0cbd762d78bedca0ab564cf9ca0970b7b99d2.tar.gz spark-c1b0cbd762d78bedca0ab564cf9ca0970b7b99d2.tar.bz2 spark-c1b0cbd762d78bedca0ab564cf9ca0970b7b99d2.zip |
[SPARK-8185] [SPARK-8188] [SPARK-8191] [SQL] function datediff, to_utc_timestamp, from_utc_timestamp
This PR is based on #7643 , thanks to adrian-wang
Author: Davies Liu <davies@databricks.com>
Author: Daoyuan Wang <daoyuan.wang@intel.com>
Closes #7847 from davies/datediff and squashes the following commits:
74333d7 [Davies Liu] fix bug
22d8a8c [Davies Liu] optimize
85cdd21 [Davies Liu] remove unnecessary tests
241d90c [Davies Liu] Merge branch 'master' of github.com:apache/spark into datediff
e9dc0f5 [Davies Liu] fix datediff/to_utc_timestamp/from_utc_timestamp
c360447 [Daoyuan Wang] function datediff, to_utc_timestamp, from_utc_timestamp (commits merged)
Diffstat (limited to 'sql/catalyst/src/main/scala')
3 files changed, 138 insertions, 1 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 8fafd7778a..bc08466461 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -211,6 +211,7 @@ object FunctionRegistry { expression[AddMonths]("add_months"), expression[CurrentDate]("current_date"), expression[CurrentTimestamp]("current_timestamp"), + expression[DateDiff]("datediff"), expression[DateAdd]("date_add"), expression[DateFormatClass]("date_format"), expression[DateSub]("date_sub"), @@ -218,6 +219,7 @@ object FunctionRegistry { expression[DayOfYear]("dayofyear"), expression[DayOfMonth]("dayofmonth"), expression[FromUnixTime]("from_unixtime"), + expression[FromUTCTimestamp]("from_utc_timestamp"), expression[Hour]("hour"), expression[LastDay]("last_day"), expression[Minute]("minute"), @@ -227,6 +229,7 @@ object FunctionRegistry { expression[Quarter]("quarter"), expression[Second]("second"), expression[ToDate]("to_date"), + expression[ToUTCTimestamp]("to_utc_timestamp"), expression[TruncDate]("trunc"), expression[UnixTimestamp]("unix_timestamp"), expression[WeekOfYear]("weekofyear"), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala index 07dea5b470..32dc9b7682 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala @@ -619,6 +619,53 @@ case class TimeAdd(start: Expression, interval: Expression) } /** + * Assumes given timestamp is UTC and converts to given timezone. + */ +case class FromUTCTimestamp(left: Expression, right: Expression) + extends BinaryExpression with ImplicitCastInputTypes { + + override def inputTypes: Seq[AbstractDataType] = Seq(TimestampType, StringType) + override def dataType: DataType = TimestampType + override def prettyName: String = "from_utc_timestamp" + + override def nullSafeEval(time: Any, timezone: Any): Any = { + DateTimeUtils.fromUTCTime(time.asInstanceOf[Long], + timezone.asInstanceOf[UTF8String].toString) + } + + override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = { + val dtu = DateTimeUtils.getClass.getName.stripSuffix("$") + if (right.foldable) { + val tz = right.eval() + if (tz == null) { + s""" + |boolean ${ev.isNull} = true; + |long ${ev.primitive} = 0; + """.stripMargin + } else { + val tzTerm = ctx.freshName("tz") + val tzClass = classOf[TimeZone].getName + ctx.addMutableState(tzClass, tzTerm, s"""$tzTerm = $tzClass.getTimeZone("$tz");""") + val eval = left.gen(ctx) + s""" + |${eval.code} + |boolean ${ev.isNull} = ${eval.isNull}; + |long ${ev.primitive} = 0; + |if (!${ev.isNull}) { + | ${ev.primitive} = ${eval.primitive} + + | ${tzTerm}.getOffset(${eval.primitive} / 1000) * 1000L; + |} + """.stripMargin + } + } else { + defineCodeGen(ctx, ev, (timestamp, format) => { + s"""$dtu.fromUTCTime($timestamp, $format.toString())""" + }) + } + } +} + +/** * Subtracts an interval from timestamp. */ case class TimeSub(start: Expression, interval: Expression) @@ -697,6 +744,53 @@ case class MonthsBetween(date1: Expression, date2: Expression) } /** + * Assumes given timestamp is in given timezone and converts to UTC. + */ +case class ToUTCTimestamp(left: Expression, right: Expression) + extends BinaryExpression with ImplicitCastInputTypes { + + override def inputTypes: Seq[AbstractDataType] = Seq(TimestampType, StringType) + override def dataType: DataType = TimestampType + override def prettyName: String = "to_utc_timestamp" + + override def nullSafeEval(time: Any, timezone: Any): Any = { + DateTimeUtils.toUTCTime(time.asInstanceOf[Long], + timezone.asInstanceOf[UTF8String].toString) + } + + override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = { + val dtu = DateTimeUtils.getClass.getName.stripSuffix("$") + if (right.foldable) { + val tz = right.eval() + if (tz == null) { + s""" + |boolean ${ev.isNull} = true; + |long ${ev.primitive} = 0; + """.stripMargin + } else { + val tzTerm = ctx.freshName("tz") + val tzClass = classOf[TimeZone].getName + ctx.addMutableState(tzClass, tzTerm, s"""$tzTerm = $tzClass.getTimeZone("$tz");""") + val eval = left.gen(ctx) + s""" + |${eval.code} + |boolean ${ev.isNull} = ${eval.isNull}; + |long ${ev.primitive} = 0; + |if (!${ev.isNull}) { + | ${ev.primitive} = ${eval.primitive} - + | ${tzTerm}.getOffset(${eval.primitive} / 1000) * 1000L; + |} + """.stripMargin + } + } else { + defineCodeGen(ctx, ev, (timestamp, format) => { + s"""$dtu.toUTCTime($timestamp, $format.toString())""" + }) + } + } +} + +/** * Returns the date part of a timestamp or string. */ case class ToDate(child: Expression) extends UnaryExpression with ImplicitCastInputTypes { @@ -714,7 +808,7 @@ case class ToDate(child: Expression) extends UnaryExpression with ImplicitCastIn } } -/* +/** * Returns date truncated to the unit specified by the format. */ case class TruncDate(date: Expression, format: Expression) @@ -783,3 +877,23 @@ case class TruncDate(date: Expression, format: Expression) } } } + +/** + * Returns the number of days from startDate to endDate. + */ +case class DateDiff(endDate: Expression, startDate: Expression) + extends BinaryExpression with ImplicitCastInputTypes { + + override def left: Expression = endDate + override def right: Expression = startDate + override def inputTypes: Seq[AbstractDataType] = Seq(DateType, DateType) + override def dataType: DataType = IntegerType + + override def nullSafeEval(end: Any, start: Any): Any = { + end.asInstanceOf[Int] - start.asInstanceOf[Int] + } + + override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = { + defineCodeGen(ctx, ev, (end, start) => s"$end - $start") + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index 032ed8a56a..6a98f4d9c5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -814,4 +814,24 @@ object DateTimeUtils { } } } + + /** + * Returns a timestamp of given timezone from utc timestamp, with the same string + * representation in their timezone. + */ + def fromUTCTime(time: Long, timeZone: String): Long = { + val tz = TimeZone.getTimeZone(timeZone) + val offset = tz.getOffset(time / 1000L) + time + offset * 1000L + } + + /** + * Returns a utc timestamp from a given timestamp from a given timezone, with the same + * string representation in their timezone. + */ + def toUTCTime(time: Long, timeZone: String): Long = { + val tz = TimeZone.getTimeZone(timeZone) + val offset = tz.getOffset(time / 1000L) + time - offset * 1000L + } } |