aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaoyuan Wang <daoyuan.wang@intel.com>2015-11-11 20:36:21 -0800
committerDavies Liu <davies.liu@gmail.com>2015-11-11 20:36:21 -0800
commit39b1e36fbc284ba999e1c00b20d1b0b5de6b40b2 (patch)
tree43f4407cfe3410852fb68747504d5aea8668d865
parente49e723392b8a64d30bd90944a748eb6f5ef3a8a (diff)
downloadspark-39b1e36fbc284ba999e1c00b20d1b0b5de6b40b2.tar.gz
spark-39b1e36fbc284ba999e1c00b20d1b0b5de6b40b2.tar.bz2
spark-39b1e36fbc284ba999e1c00b20d1b0b5de6b40b2.zip
[SPARK-11396] [SQL] add native implementation of datetime function to_unix_timestamp
`to_unix_timestamp` is the deterministic version of `unix_timestamp`, as it accepts at least one parameters. Since the behavior here is quite similar to `unix_timestamp`, I think the dataframe API is not necessary here. Author: Daoyuan Wang <daoyuan.wang@intel.com> Closes #9347 from adrian-wang/to_unix_timestamp.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala1
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala24
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala36
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala21
4 files changed, 77 insertions, 5 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index dfa749d1af..870808aa56 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -262,6 +262,7 @@ object FunctionRegistry {
expression[Quarter]("quarter"),
expression[Second]("second"),
expression[ToDate]("to_date"),
+ expression[ToUnixTimestamp]("to_unix_timestamp"),
expression[ToUTCTimestamp]("to_utc_timestamp"),
expression[TruncDate]("trunc"),
expression[UnixTimestamp]("unix_timestamp"),
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
index 13cc6bb6f2..03c39f8404 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
@@ -299,7 +299,20 @@ case class DateFormatClass(left: Expression, right: Expression) extends BinaryEx
}
/**
- * Converts time string with given pattern
+ * Converts time string with given pattern.
+ * Deterministic version of [[UnixTimestamp]], must have at least one parameter.
+ */
+case class ToUnixTimestamp(timeExp: Expression, format: Expression) extends UnixTime {
+ override def left: Expression = timeExp
+ override def right: Expression = format
+
+ def this(time: Expression) = {
+ this(time, Literal("yyyy-MM-dd HH:mm:ss"))
+ }
+}
+
+/**
+ * Converts time string with given pattern.
* (see [http://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html])
* to Unix time stamp (in seconds), returns null if fail.
* Note that hive Language Manual says it returns 0 if fail, but in fact it returns null.
@@ -308,9 +321,7 @@ case class DateFormatClass(left: Expression, right: Expression) extends BinaryEx
* If the first parameter is a Date or Timestamp instead of String, we will ignore the
* second parameter.
*/
-case class UnixTimestamp(timeExp: Expression, format: Expression)
- extends BinaryExpression with ExpectsInputTypes {
-
+case class UnixTimestamp(timeExp: Expression, format: Expression) extends UnixTime {
override def left: Expression = timeExp
override def right: Expression = format
@@ -321,6 +332,9 @@ case class UnixTimestamp(timeExp: Expression, format: Expression)
def this() = {
this(CurrentTimestamp())
}
+}
+
+abstract class UnixTime extends BinaryExpression with ExpectsInputTypes {
override def inputTypes: Seq[AbstractDataType] =
Seq(TypeCollection(StringType, DateType, TimestampType), StringType)
@@ -347,7 +361,7 @@ case class UnixTimestamp(timeExp: Expression, format: Expression)
null
}
case StringType =>
- val f = format.eval(input)
+ val f = right.eval(input)
if (f == null) {
null
} else {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
index 610d39e849..53c66d8a75 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
@@ -465,6 +465,42 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
UnixTimestamp(Literal("2015-07-24"), Literal("not a valid format")), null)
}
+ test("to_unix_timestamp") {
+ val sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
+ val fmt2 = "yyyy-MM-dd HH:mm:ss.SSS"
+ val sdf2 = new SimpleDateFormat(fmt2)
+ val fmt3 = "yy-MM-dd"
+ val sdf3 = new SimpleDateFormat(fmt3)
+ val date1 = Date.valueOf("2015-07-24")
+ checkEvaluation(
+ ToUnixTimestamp(Literal(sdf1.format(new Timestamp(0))), Literal("yyyy-MM-dd HH:mm:ss")), 0L)
+ checkEvaluation(ToUnixTimestamp(
+ Literal(sdf1.format(new Timestamp(1000000))), Literal("yyyy-MM-dd HH:mm:ss")), 1000L)
+ checkEvaluation(
+ ToUnixTimestamp(Literal(new Timestamp(1000000)), Literal("yyyy-MM-dd HH:mm:ss")), 1000L)
+ checkEvaluation(
+ ToUnixTimestamp(Literal(date1), Literal("yyyy-MM-dd HH:mm:ss")),
+ DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1)) / 1000L)
+ checkEvaluation(
+ ToUnixTimestamp(Literal(sdf2.format(new Timestamp(-1000000))), Literal(fmt2)), -1000L)
+ checkEvaluation(ToUnixTimestamp(
+ Literal(sdf3.format(Date.valueOf("2015-07-24"))), Literal(fmt3)),
+ DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(Date.valueOf("2015-07-24"))) / 1000L)
+ val t1 = ToUnixTimestamp(
+ CurrentTimestamp(), Literal("yyyy-MM-dd HH:mm:ss")).eval().asInstanceOf[Long]
+ val t2 = ToUnixTimestamp(
+ CurrentTimestamp(), Literal("yyyy-MM-dd HH:mm:ss")).eval().asInstanceOf[Long]
+ assert(t2 - t1 <= 1)
+ checkEvaluation(
+ ToUnixTimestamp(Literal.create(null, DateType), Literal.create(null, StringType)), null)
+ checkEvaluation(
+ ToUnixTimestamp(Literal.create(null, DateType), Literal("yyyy-MM-dd HH:mm:ss")), null)
+ checkEvaluation(ToUnixTimestamp(
+ Literal(date1), Literal.create(null, StringType)), date1.getTime / 1000L)
+ checkEvaluation(
+ ToUnixTimestamp(Literal("2015-07-24"), Literal("not a valid format")), null)
+ }
+
test("datediff") {
checkEvaluation(
DateDiff(Literal(Date.valueOf("2015-07-24")), Literal(Date.valueOf("2015-07-21"))), 3)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
index 9080c53c49..1266d534cc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
@@ -444,6 +444,27 @@ class DateFunctionsSuite extends QueryTest with SharedSQLContext {
Row(ts1.getTime / 1000L), Row(ts2.getTime / 1000L)))
}
+ test("to_unix_timestamp") {
+ val date1 = Date.valueOf("2015-07-24")
+ val date2 = Date.valueOf("2015-07-25")
+ val ts1 = Timestamp.valueOf("2015-07-24 10:00:00.3")
+ val ts2 = Timestamp.valueOf("2015-07-25 02:02:02.2")
+ val s1 = "2015/07/24 10:00:00.5"
+ val s2 = "2015/07/25 02:02:02.6"
+ val ss1 = "2015-07-24 10:00:00"
+ val ss2 = "2015-07-25 02:02:02"
+ val fmt = "yyyy/MM/dd HH:mm:ss.S"
+ val df = Seq((date1, ts1, s1, ss1), (date2, ts2, s2, ss2)).toDF("d", "ts", "s", "ss")
+ checkAnswer(df.selectExpr("to_unix_timestamp(ts)"), Seq(
+ Row(ts1.getTime / 1000L), Row(ts2.getTime / 1000L)))
+ checkAnswer(df.selectExpr("to_unix_timestamp(ss)"), Seq(
+ Row(ts1.getTime / 1000L), Row(ts2.getTime / 1000L)))
+ checkAnswer(df.selectExpr(s"to_unix_timestamp(d, '$fmt')"), Seq(
+ Row(date1.getTime / 1000L), Row(date2.getTime / 1000L)))
+ checkAnswer(df.selectExpr(s"to_unix_timestamp(s, '$fmt')"), Seq(
+ Row(ts1.getTime / 1000L), Row(ts2.getTime / 1000L)))
+ }
+
test("datediff") {
val df = Seq(
(Date.valueOf("2015-07-24"), Timestamp.valueOf("2015-07-24 01:00:00"),