aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst/src/main/scala
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-08-01 21:46:46 -0700
committerDavies Liu <davies.liu@gmail.com>2015-08-01 21:46:46 -0700
commitc1b0cbd762d78bedca0ab564cf9ca0970b7b99d2 (patch)
treecbdb4059022abbf2cc502d1fd920f3c29f6e2618 /sql/catalyst/src/main/scala
parent00cd92f32f17ca57d47aa2dcc716eb707aaee799 (diff)
downloadspark-c1b0cbd762d78bedca0ab564cf9ca0970b7b99d2.tar.gz
spark-c1b0cbd762d78bedca0ab564cf9ca0970b7b99d2.tar.bz2
spark-c1b0cbd762d78bedca0ab564cf9ca0970b7b99d2.zip
[SPARK-8185] [SPARK-8188] [SPARK-8191] [SQL] function datediff, to_utc_timestamp, from_utc_timestamp
This PR is based on #7643 , thanks to adrian-wang Author: Davies Liu <davies@databricks.com> Author: Daoyuan Wang <daoyuan.wang@intel.com> Closes #7847 from davies/datediff and squashes the following commits: 74333d7 [Davies Liu] fix bug 22d8a8c [Davies Liu] optimize 85cdd21 [Davies Liu] remove unnecessary tests 241d90c [Davies Liu] Merge branch 'master' of github.com:apache/spark into datediff e9dc0f5 [Davies Liu] fix datediff/to_utc_timestamp/from_utc_timestamp c360447 [Daoyuan Wang] function datediff, to_utc_timestamp, from_utc_timestamp (commits merged)
Diffstat (limited to 'sql/catalyst/src/main/scala')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala3
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala116
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala20
3 files changed, 138 insertions, 1 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 8fafd7778a..bc08466461 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -211,6 +211,7 @@ object FunctionRegistry {
expression[AddMonths]("add_months"),
expression[CurrentDate]("current_date"),
expression[CurrentTimestamp]("current_timestamp"),
+ expression[DateDiff]("datediff"),
expression[DateAdd]("date_add"),
expression[DateFormatClass]("date_format"),
expression[DateSub]("date_sub"),
@@ -218,6 +219,7 @@ object FunctionRegistry {
expression[DayOfYear]("dayofyear"),
expression[DayOfMonth]("dayofmonth"),
expression[FromUnixTime]("from_unixtime"),
+ expression[FromUTCTimestamp]("from_utc_timestamp"),
expression[Hour]("hour"),
expression[LastDay]("last_day"),
expression[Minute]("minute"),
@@ -227,6 +229,7 @@ object FunctionRegistry {
expression[Quarter]("quarter"),
expression[Second]("second"),
expression[ToDate]("to_date"),
+ expression[ToUTCTimestamp]("to_utc_timestamp"),
expression[TruncDate]("trunc"),
expression[UnixTimestamp]("unix_timestamp"),
expression[WeekOfYear]("weekofyear"),
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala
index 07dea5b470..32dc9b7682 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala
@@ -619,6 +619,53 @@ case class TimeAdd(start: Expression, interval: Expression)
}
/**
+ * Assumes given timestamp is UTC and converts to given timezone.
+ */
+case class FromUTCTimestamp(left: Expression, right: Expression)
+ extends BinaryExpression with ImplicitCastInputTypes {
+
+ override def inputTypes: Seq[AbstractDataType] = Seq(TimestampType, StringType)
+ override def dataType: DataType = TimestampType
+ override def prettyName: String = "from_utc_timestamp"
+
+ override def nullSafeEval(time: Any, timezone: Any): Any = {
+ DateTimeUtils.fromUTCTime(time.asInstanceOf[Long],
+ timezone.asInstanceOf[UTF8String].toString)
+ }
+
+ override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
+ val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
+ if (right.foldable) {
+ val tz = right.eval()
+ if (tz == null) {
+ s"""
+ |boolean ${ev.isNull} = true;
+ |long ${ev.primitive} = 0;
+ """.stripMargin
+ } else {
+ val tzTerm = ctx.freshName("tz")
+ val tzClass = classOf[TimeZone].getName
+ ctx.addMutableState(tzClass, tzTerm, s"""$tzTerm = $tzClass.getTimeZone("$tz");""")
+ val eval = left.gen(ctx)
+ s"""
+ |${eval.code}
+ |boolean ${ev.isNull} = ${eval.isNull};
+ |long ${ev.primitive} = 0;
+ |if (!${ev.isNull}) {
+ | ${ev.primitive} = ${eval.primitive} +
+ | ${tzTerm}.getOffset(${eval.primitive} / 1000) * 1000L;
+ |}
+ """.stripMargin
+ }
+ } else {
+ defineCodeGen(ctx, ev, (timestamp, format) => {
+ s"""$dtu.fromUTCTime($timestamp, $format.toString())"""
+ })
+ }
+ }
+}
+
+/**
* Subtracts an interval from timestamp.
*/
case class TimeSub(start: Expression, interval: Expression)
@@ -697,6 +744,53 @@ case class MonthsBetween(date1: Expression, date2: Expression)
}
/**
+ * Assumes given timestamp is in given timezone and converts to UTC.
+ */
+case class ToUTCTimestamp(left: Expression, right: Expression)
+ extends BinaryExpression with ImplicitCastInputTypes {
+
+ override def inputTypes: Seq[AbstractDataType] = Seq(TimestampType, StringType)
+ override def dataType: DataType = TimestampType
+ override def prettyName: String = "to_utc_timestamp"
+
+ override def nullSafeEval(time: Any, timezone: Any): Any = {
+ DateTimeUtils.toUTCTime(time.asInstanceOf[Long],
+ timezone.asInstanceOf[UTF8String].toString)
+ }
+
+ override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
+ val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
+ if (right.foldable) {
+ val tz = right.eval()
+ if (tz == null) {
+ s"""
+ |boolean ${ev.isNull} = true;
+ |long ${ev.primitive} = 0;
+ """.stripMargin
+ } else {
+ val tzTerm = ctx.freshName("tz")
+ val tzClass = classOf[TimeZone].getName
+ ctx.addMutableState(tzClass, tzTerm, s"""$tzTerm = $tzClass.getTimeZone("$tz");""")
+ val eval = left.gen(ctx)
+ s"""
+ |${eval.code}
+ |boolean ${ev.isNull} = ${eval.isNull};
+ |long ${ev.primitive} = 0;
+ |if (!${ev.isNull}) {
+ | ${ev.primitive} = ${eval.primitive} -
+ | ${tzTerm}.getOffset(${eval.primitive} / 1000) * 1000L;
+ |}
+ """.stripMargin
+ }
+ } else {
+ defineCodeGen(ctx, ev, (timestamp, format) => {
+ s"""$dtu.toUTCTime($timestamp, $format.toString())"""
+ })
+ }
+ }
+}
+
+/**
* Returns the date part of a timestamp or string.
*/
case class ToDate(child: Expression) extends UnaryExpression with ImplicitCastInputTypes {
@@ -714,7 +808,7 @@ case class ToDate(child: Expression) extends UnaryExpression with ImplicitCastIn
}
}
-/*
+/**
* Returns date truncated to the unit specified by the format.
*/
case class TruncDate(date: Expression, format: Expression)
@@ -783,3 +877,23 @@ case class TruncDate(date: Expression, format: Expression)
}
}
}
+
+/**
+ * Returns the number of days from startDate to endDate.
+ */
+case class DateDiff(endDate: Expression, startDate: Expression)
+ extends BinaryExpression with ImplicitCastInputTypes {
+
+ override def left: Expression = endDate
+ override def right: Expression = startDate
+ override def inputTypes: Seq[AbstractDataType] = Seq(DateType, DateType)
+ override def dataType: DataType = IntegerType
+
+ override def nullSafeEval(end: Any, start: Any): Any = {
+ end.asInstanceOf[Int] - start.asInstanceOf[Int]
+ }
+
+ override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
+ defineCodeGen(ctx, ev, (end, start) => s"$end - $start")
+ }
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
index 032ed8a56a..6a98f4d9c5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
@@ -814,4 +814,24 @@ object DateTimeUtils {
}
}
}
+
+ /**
+ * Returns a timestamp of given timezone from utc timestamp, with the same string
+ * representation in their timezone.
+ */
+ def fromUTCTime(time: Long, timeZone: String): Long = {
+ val tz = TimeZone.getTimeZone(timeZone)
+ val offset = tz.getOffset(time / 1000L)
+ time + offset * 1000L
+ }
+
+ /**
+ * Returns a utc timestamp from a given timestamp from a given timezone, with the same
+ * string representation in their timezone.
+ */
+ def toUTCTime(time: Long, timeZone: String): Long = {
+ val tz = TimeZone.getTimeZone(timeZone)
+ val offset = tz.getOffset(time / 1000L)
+ time - offset * 1000L
+ }
}