aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorDaoyuan Wang <daoyuan.wang@intel.com>2015-07-30 19:22:38 -0700
committerReynold Xin <rxin@databricks.com>2015-07-30 19:22:38 -0700
commit83670fc9e6fc9c7a6ae68dfdd3f9335ea72f4ab0 (patch)
tree6dcb87133b02c1f9e847dbcb9b5bf667a7dadd3f /sql
parent9307f5653d19a6a2fda355a675ca9ea97e35611b (diff)
downloadspark-83670fc9e6fc9c7a6ae68dfdd3f9335ea72f4ab0.tar.gz
spark-83670fc9e6fc9c7a6ae68dfdd3f9335ea72f4ab0.tar.bz2
spark-83670fc9e6fc9c7a6ae68dfdd3f9335ea72f4ab0.zip
[SPARK-8176] [SPARK-8197] [SQL] function to_date/ trunc
This PR is based on #6988 , thanks to adrian-wang . This brings two SQL functions: to_date() and trunc(). Closes #6988 Author: Daoyuan Wang <daoyuan.wang@intel.com> Author: Davies Liu <davies@databricks.com> Closes #7805 from davies/to_date and squashes the following commits: 2c7beba [Davies Liu] Merge branch 'master' of github.com:apache/spark into to_date 310dd55 [Daoyuan Wang] remove dup test in rebase 980b092 [Daoyuan Wang] resolve rebase conflict a476c5a [Daoyuan Wang] address comments from davies d44ea5f [Daoyuan Wang] function to_date, trunc
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala88
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala34
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala29
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NonFoldableLiteral.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/functions.scala16
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala44
7 files changed, 215 insertions, 2 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 6c7c481fab..1bf7204a25 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -223,6 +223,8 @@ object FunctionRegistry {
expression[NextDay]("next_day"),
expression[Quarter]("quarter"),
expression[Second]("second"),
+ expression[ToDate]("to_date"),
+ expression[TruncDate]("trunc"),
expression[UnixTimestamp]("unix_timestamp"),
expression[WeekOfYear]("weekofyear"),
expression[Year]("year"),
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala
index 9795673ee0..6e7613340c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala
@@ -507,7 +507,6 @@ case class FromUnixTime(sec: Expression, format: Expression)
})
}
}
-
}
/**
@@ -696,3 +695,90 @@ case class MonthsBetween(date1: Expression, date2: Expression)
})
}
}
+
+/**
+ * Returns the date part of a timestamp or string.
+ */
+case class ToDate(child: Expression) extends UnaryExpression with ImplicitCastInputTypes {
+
+ // Implicit casting of spark will accept string in both date and timestamp format, as
+ // well as TimestampType.
+ override def inputTypes: Seq[AbstractDataType] = Seq(DateType)
+
+ override def dataType: DataType = DateType
+
+ override def eval(input: InternalRow): Any = child.eval(input)
+
+ override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
+ defineCodeGen(ctx, ev, d => d)
+ }
+}
+
+/*
+ * Returns date truncated to the unit specified by the format.
+ */
+case class TruncDate(date: Expression, format: Expression)
+ extends BinaryExpression with ImplicitCastInputTypes {
+ override def left: Expression = date
+ override def right: Expression = format
+
+ override def inputTypes: Seq[AbstractDataType] = Seq(DateType, StringType)
+ override def dataType: DataType = DateType
+ override def prettyName: String = "trunc"
+
+ lazy val minItemConst = DateTimeUtils.parseTruncLevel(format.eval().asInstanceOf[UTF8String])
+
+ override def eval(input: InternalRow): Any = {
+ val minItem = if (format.foldable) {
+ minItemConst
+ } else {
+ DateTimeUtils.parseTruncLevel(format.eval().asInstanceOf[UTF8String])
+ }
+ if (minItem == -1) {
+ // unknown format
+ null
+ } else {
+ val d = date.eval(input)
+ if (d == null) {
+ null
+ } else {
+ DateTimeUtils.truncDate(d.asInstanceOf[Int], minItem)
+ }
+ }
+ }
+
+ override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
+ val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
+
+ if (format.foldable) {
+ if (minItemConst == -1) {
+ s"""
+ boolean ${ev.isNull} = true;
+ ${ctx.javaType(dataType)} ${ev.primitive} = ${ctx.defaultValue(dataType)};
+ """
+ } else {
+ val d = date.gen(ctx)
+ s"""
+ ${d.code}
+ boolean ${ev.isNull} = ${d.isNull};
+ ${ctx.javaType(dataType)} ${ev.primitive} = ${ctx.defaultValue(dataType)};
+ if (!${ev.isNull}) {
+ ${ev.primitive} = $dtu.truncDate(${d.primitive}, $minItemConst);
+ }
+ """
+ }
+ } else {
+ nullSafeCodeGen(ctx, ev, (dateVal, fmt) => {
+ val form = ctx.freshName("form")
+ s"""
+ int $form = $dtu.parseTruncLevel($fmt);
+ if ($form == -1) {
+ ${ev.isNull} = true;
+ } else {
+ ${ev.primitive} = $dtu.truncDate($dateVal, $form);
+ }
+ """
+ })
+ }
+ }
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
index 53abdf6618..5a7c25b8d5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
@@ -779,4 +779,38 @@ object DateTimeUtils {
}
date + (lastDayOfMonthInYear - dayInYear)
}
+
+ private val TRUNC_TO_YEAR = 1
+ private val TRUNC_TO_MONTH = 2
+ private val TRUNC_INVALID = -1
+
+ /**
+ * Returns the trunc date from original date and trunc level.
+ * Trunc level should be generated using `parseTruncLevel()`, should only be 1 or 2.
+ */
+ def truncDate(d: Int, level: Int): Int = {
+ if (level == TRUNC_TO_YEAR) {
+ d - DateTimeUtils.getDayInYear(d) + 1
+ } else if (level == TRUNC_TO_MONTH) {
+ d - DateTimeUtils.getDayOfMonth(d) + 1
+ } else {
+ throw new Exception(s"Invalid trunc level: $level")
+ }
+ }
+
+ /**
+ * Returns the truncate level, could be TRUNC_YEAR, TRUNC_MONTH, or TRUNC_INVALID,
+ * TRUNC_INVALID means unsupported truncate level.
+ */
+ def parseTruncLevel(format: UTF8String): Int = {
+ if (format == null) {
+ TRUNC_INVALID
+ } else {
+ format.toString.toUpperCase match {
+ case "YEAR" | "YYYY" | "YY" => TRUNC_TO_YEAR
+ case "MON" | "MONTH" | "MM" => TRUNC_TO_MONTH
+ case _ => TRUNC_INVALID
+ }
+ }
+ }
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
index 887e43621a..6c15c05da3 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
@@ -351,6 +351,34 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
NextDay(Literal(Date.valueOf("2015-07-23")), Literal.create(null, StringType)), null)
}
+ test("function to_date") {
+ checkEvaluation(
+ ToDate(Literal(Date.valueOf("2015-07-22"))),
+ DateTimeUtils.fromJavaDate(Date.valueOf("2015-07-22")))
+ checkEvaluation(ToDate(Literal.create(null, DateType)), null)
+ }
+
+ test("function trunc") {
+ def testTrunc(input: Date, fmt: String, expected: Date): Unit = {
+ checkEvaluation(TruncDate(Literal.create(input, DateType), Literal.create(fmt, StringType)),
+ expected)
+ checkEvaluation(
+ TruncDate(Literal.create(input, DateType), NonFoldableLiteral.create(fmt, StringType)),
+ expected)
+ }
+ val date = Date.valueOf("2015-07-22")
+ Seq("yyyy", "YYYY", "year", "YEAR", "yy", "YY").foreach{ fmt =>
+ testTrunc(date, fmt, Date.valueOf("2015-01-01"))
+ }
+ Seq("month", "MONTH", "mon", "MON", "mm", "MM").foreach { fmt =>
+ testTrunc(date, fmt, Date.valueOf("2015-07-01"))
+ }
+ testTrunc(date, "DD", null)
+ testTrunc(date, null, null)
+ testTrunc(null, "MON", null)
+ testTrunc(null, null, null)
+ }
+
test("from_unixtime") {
val sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val fmt2 = "yyyy-MM-dd HH:mm:ss.SSS"
@@ -405,5 +433,4 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
checkEvaluation(
UnixTimestamp(Literal("2015-07-24"), Literal("not a valid format")), null)
}
-
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NonFoldableLiteral.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NonFoldableLiteral.scala
index 0559fb80e7..31ecf4a9e8 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NonFoldableLiteral.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NonFoldableLiteral.scala
@@ -47,4 +47,8 @@ object NonFoldableLiteral {
val lit = Literal(value)
NonFoldableLiteral(lit.value, lit.dataType)
}
+ def create(value: Any, dataType: DataType): NonFoldableLiteral = {
+ val lit = Literal.create(value, dataType)
+ NonFoldableLiteral(lit.value, lit.dataType)
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 168894d661..46dc4605a5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -2181,6 +2181,22 @@ object functions {
*/
def unix_timestamp(s: Column, p: String): Column = UnixTimestamp(s.expr, Literal(p))
+ /*
+ * Converts the column into DateType.
+ *
+ * @group datetime_funcs
+ * @since 1.5.0
+ */
+ def to_date(e: Column): Column = ToDate(e.expr)
+
+ /**
+ * Returns date truncated to the unit specified by the format.
+ *
+ * @group datetime_funcs
+ * @since 1.5.0
+ */
+ def trunc(date: Column, format: String): Column = TruncDate(date.expr, Literal(format))
+
//////////////////////////////////////////////////////////////////////////////////////////////
// Collection functions
//////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
index b7267c4131..8c596fad74 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
@@ -345,6 +345,50 @@ class DateFunctionsSuite extends QueryTest {
Seq(Row(Date.valueOf("2015-07-30")), Row(Date.valueOf("2015-07-30"))))
}
+ test("function to_date") {
+ val d1 = Date.valueOf("2015-07-22")
+ val d2 = Date.valueOf("2015-07-01")
+ val t1 = Timestamp.valueOf("2015-07-22 10:00:00")
+ val t2 = Timestamp.valueOf("2014-12-31 23:59:59")
+ val s1 = "2015-07-22 10:00:00"
+ val s2 = "2014-12-31"
+ val df = Seq((d1, t1, s1), (d2, t2, s2)).toDF("d", "t", "s")
+
+ checkAnswer(
+ df.select(to_date(col("t"))),
+ Seq(Row(Date.valueOf("2015-07-22")), Row(Date.valueOf("2014-12-31"))))
+ checkAnswer(
+ df.select(to_date(col("d"))),
+ Seq(Row(Date.valueOf("2015-07-22")), Row(Date.valueOf("2015-07-01"))))
+ checkAnswer(
+ df.select(to_date(col("s"))),
+ Seq(Row(Date.valueOf("2015-07-22")), Row(Date.valueOf("2014-12-31"))))
+
+ checkAnswer(
+ df.selectExpr("to_date(t)"),
+ Seq(Row(Date.valueOf("2015-07-22")), Row(Date.valueOf("2014-12-31"))))
+ checkAnswer(
+ df.selectExpr("to_date(d)"),
+ Seq(Row(Date.valueOf("2015-07-22")), Row(Date.valueOf("2015-07-01"))))
+ checkAnswer(
+ df.selectExpr("to_date(s)"),
+ Seq(Row(Date.valueOf("2015-07-22")), Row(Date.valueOf("2014-12-31"))))
+ }
+
+ test("function trunc") {
+ val df = Seq(
+ (1, Timestamp.valueOf("2015-07-22 10:00:00")),
+ (2, Timestamp.valueOf("2014-12-31 00:00:00"))).toDF("i", "t")
+
+ checkAnswer(
+ df.select(trunc(col("t"), "YY")),
+ Seq(Row(Date.valueOf("2015-01-01")), Row(Date.valueOf("2014-01-01"))))
+
+ checkAnswer(
+ df.selectExpr("trunc(t, 'Month')"),
+ Seq(Row(Date.valueOf("2015-07-01")), Row(Date.valueOf("2014-12-01"))))
+ }
+
test("from_unixtime") {
val sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val fmt2 = "yyyy-MM-dd HH:mm:ss.SSS"