aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorDaoyuan Wang <daoyuan.wang@intel.com>2015-07-27 21:08:56 -0700
committerReynold Xin <rxin@databricks.com>2015-07-27 21:08:56 -0700
commit2e7f99a004f08a42e86f6f603e4ba35cb52561c4 (patch)
treed563529bce742f218fdd7559cefee53ead919d78 /sql
parentdaa1964b6098f79100def78451bda181b5c92198 (diff)
downloadspark-2e7f99a004f08a42e86f6f603e4ba35cb52561c4.tar.gz
spark-2e7f99a004f08a42e86f6f603e4ba35cb52561c4.tar.bz2
spark-2e7f99a004f08a42e86f6f603e4ba35cb52561c4.zip
[SPARK-8195] [SPARK-8196] [SQL] udf next_day last_day
next_day, returns next certain dayofweek. last_day, returns the last day of the month which given date belongs to. Author: Daoyuan Wang <daoyuan.wang@intel.com> Closes #6986 from adrian-wang/udfnlday and squashes the following commits: ef7e3da [Daoyuan Wang] fix 02b3426 [Daoyuan Wang] address 2 comments dc69630 [Daoyuan Wang] address comments from rxin 8846086 [Daoyuan Wang] address comments from rxin d09bcce [Daoyuan Wang] multi fix 1a9de3d [Daoyuan Wang] function next_day and last_day
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala4
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala72
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala46
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala28
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/functions.scala17
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala22
6 files changed, 188 insertions, 1 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index aa05f448d1..61ee6f6f71 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -219,8 +219,10 @@ object FunctionRegistry {
expression[DayOfYear]("dayofyear"),
expression[DayOfMonth]("dayofmonth"),
expression[Hour]("hour"),
- expression[Month]("month"),
+ expression[LastDay]("last_day"),
expression[Minute]("minute"),
+ expression[Month]("month"),
+ expression[NextDay]("next_day"),
expression[Quarter]("quarter"),
expression[Second]("second"),
expression[WeekOfYear]("weekofyear"),
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala
index 9e55f0546e..b00a1b26fa 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala
@@ -265,3 +265,75 @@ case class DateFormatClass(left: Expression, right: Expression) extends BinaryEx
})
}
}
+
+/**
+ * Returns the last day of the month which the date belongs to.
+ */
+case class LastDay(startDate: Expression) extends UnaryExpression with ImplicitCastInputTypes {
+ override def child: Expression = startDate
+
+ override def inputTypes: Seq[AbstractDataType] = Seq(DateType)
+
+ override def dataType: DataType = DateType
+
+ override def prettyName: String = "last_day"
+
+ override def nullSafeEval(date: Any): Any = {
+ val days = date.asInstanceOf[Int]
+ DateTimeUtils.getLastDayOfMonth(days)
+ }
+
+ override protected def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
+ val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
+ defineCodeGen(ctx, ev, (sd) => {
+ s"$dtu.getLastDayOfMonth($sd)"
+ })
+ }
+}
+
+/**
+ * Returns the first date which is later than startDate and named as dayOfWeek.
+ * For example, NextDay(2015-07-27, Sunday) would return 2015-08-02, which is the first
+ * sunday later than 2015-07-27.
+ */
+case class NextDay(startDate: Expression, dayOfWeek: Expression)
+ extends BinaryExpression with ImplicitCastInputTypes {
+
+ override def left: Expression = startDate
+ override def right: Expression = dayOfWeek
+
+ override def inputTypes: Seq[AbstractDataType] = Seq(DateType, StringType)
+
+ override def dataType: DataType = DateType
+
+ override def nullSafeEval(start: Any, dayOfW: Any): Any = {
+ val dow = DateTimeUtils.getDayOfWeekFromString(dayOfW.asInstanceOf[UTF8String])
+ if (dow == -1) {
+ null
+ } else {
+ val sd = start.asInstanceOf[Int]
+ DateTimeUtils.getNextDateForDayOfWeek(sd, dow)
+ }
+ }
+
+ override protected def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
+ nullSafeCodeGen(ctx, ev, (sd, dowS) => {
+ val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
+ val dow = ctx.freshName("dow")
+ val genDow = if (right.foldable) {
+ val dowVal = DateTimeUtils.getDayOfWeekFromString(
+ dayOfWeek.eval(InternalRow.empty).asInstanceOf[UTF8String])
+ s"int $dow = $dowVal;"
+ } else {
+ s"int $dow = $dtu.getDayOfWeekFromString($dowS);"
+ }
+ genDow + s"""
+ if ($dow == -1) {
+ ${ev.isNull} = true;
+ } else {
+ ${ev.primitive} = $dtu.getNextDateForDayOfWeek($sd, $dow);
+ }
+ """
+ })
+ }
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
index 07412e73b6..2e28fb9af9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
@@ -573,4 +573,50 @@ object DateTimeUtils {
dayInYear - 334
}
}
+
+ /**
+ * Returns Day of week from String. Starting from Thursday, marked as 0.
+ * (Because 1970-01-01 is Thursday).
+ */
+ def getDayOfWeekFromString(string: UTF8String): Int = {
+ val dowString = string.toString.toUpperCase
+ dowString match {
+ case "SU" | "SUN" | "SUNDAY" => 3
+ case "MO" | "MON" | "MONDAY" => 4
+ case "TU" | "TUE" | "TUESDAY" => 5
+ case "WE" | "WED" | "WEDNESDAY" => 6
+ case "TH" | "THU" | "THURSDAY" => 0
+ case "FR" | "FRI" | "FRIDAY" => 1
+ case "SA" | "SAT" | "SATURDAY" => 2
+ case _ => -1
+ }
+ }
+
+ /**
+ * Returns the first date which is later than startDate and is of the given dayOfWeek.
+ * dayOfWeek is an integer ranges in [0, 6], and 0 is Thu, 1 is Fri, etc,.
+ */
+ def getNextDateForDayOfWeek(startDate: Int, dayOfWeek: Int): Int = {
+ startDate + 1 + ((dayOfWeek - 1 - startDate) % 7 + 7) % 7
+ }
+
+ /**
+ * number of days in a non-leap year.
+ */
+ private[this] val daysInNormalYear = Array(31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31)
+
+ /**
+ * Returns last day of the month for the given date. The date is expressed in days
+ * since 1.1.1970.
+ */
+ def getLastDayOfMonth(date: Int): Int = {
+ val dayOfMonth = getDayOfMonth(date)
+ val month = getMonth(date)
+ if (month == 2 && isLeapYear(getYear(date))) {
+ date + daysInNormalYear(month - 1) + 1 - dayOfMonth
+ } else {
+ date + daysInNormalYear(month - 1) - dayOfMonth
+ }
+ }
+
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
index bdba6ce891..4d2d33765a 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
@@ -22,6 +22,7 @@ import java.text.SimpleDateFormat
import java.util.Calendar
import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types.{StringType, TimestampType, DateType}
class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
@@ -246,4 +247,31 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
}
}
+ test("last_day") {
+ checkEvaluation(LastDay(Literal(Date.valueOf("2015-02-28"))), Date.valueOf("2015-02-28"))
+ checkEvaluation(LastDay(Literal(Date.valueOf("2015-03-27"))), Date.valueOf("2015-03-31"))
+ checkEvaluation(LastDay(Literal(Date.valueOf("2015-04-26"))), Date.valueOf("2015-04-30"))
+ checkEvaluation(LastDay(Literal(Date.valueOf("2015-05-25"))), Date.valueOf("2015-05-31"))
+ checkEvaluation(LastDay(Literal(Date.valueOf("2015-06-24"))), Date.valueOf("2015-06-30"))
+ checkEvaluation(LastDay(Literal(Date.valueOf("2015-07-23"))), Date.valueOf("2015-07-31"))
+ checkEvaluation(LastDay(Literal(Date.valueOf("2015-08-01"))), Date.valueOf("2015-08-31"))
+ checkEvaluation(LastDay(Literal(Date.valueOf("2015-09-02"))), Date.valueOf("2015-09-30"))
+ checkEvaluation(LastDay(Literal(Date.valueOf("2015-10-03"))), Date.valueOf("2015-10-31"))
+ checkEvaluation(LastDay(Literal(Date.valueOf("2015-11-04"))), Date.valueOf("2015-11-30"))
+ checkEvaluation(LastDay(Literal(Date.valueOf("2015-12-05"))), Date.valueOf("2015-12-31"))
+ checkEvaluation(LastDay(Literal(Date.valueOf("2016-01-06"))), Date.valueOf("2016-01-31"))
+ checkEvaluation(LastDay(Literal(Date.valueOf("2016-02-07"))), Date.valueOf("2016-02-29"))
+ }
+
+ test("next_day") {
+ checkEvaluation(
+ NextDay(Literal(Date.valueOf("2015-07-23")), Literal("Thu")),
+ DateTimeUtils.fromJavaDate(Date.valueOf("2015-07-30")))
+ checkEvaluation(
+ NextDay(Literal(Date.valueOf("2015-07-23")), Literal("THURSDAY")),
+ DateTimeUtils.fromJavaDate(Date.valueOf("2015-07-30")))
+ checkEvaluation(
+ NextDay(Literal(Date.valueOf("2015-07-23")), Literal("th")),
+ DateTimeUtils.fromJavaDate(Date.valueOf("2015-07-30")))
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index cab3db609d..d18558b510 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -2033,6 +2033,13 @@ object functions {
def hour(columnName: String): Column = hour(Column(columnName))
/**
+ * Returns the last day of the month which the given date belongs to.
+ * @group datetime_funcs
+ * @since 1.5.0
+ */
+ def last_day(e: Column): Column = LastDay(e.expr)
+
+ /**
* Extracts the minutes as an integer from a given date/timestamp/string.
* @group datetime_funcs
* @since 1.5.0
@@ -2047,6 +2054,16 @@ object functions {
def minute(columnName: String): Column = minute(Column(columnName))
/**
+ * Returns the first date which is later than given date sd and named as dow.
+ * For example, `next_day('2015-07-27', "Sunday")` would return 2015-08-02, which is the
+ * first Sunday later than 2015-07-27. The parameter dayOfWeek could be 2-letter, 3-letter,
+ * or full name of the day of the week (e.g. Mo, tue, FRIDAY).
+ * @group datetime_funcs
+ * @since 1.5.0
+ */
+ def next_day(sd: Column, dayOfWeek: String): Column = NextDay(sd.expr, lit(dayOfWeek).expr)
+
+ /**
* Extracts the seconds as an integer from a given date/timestamp/string.
* @group datetime_funcs
* @since 1.5.0
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
index 9e80ae8692..ff1c7562dc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
@@ -184,4 +184,26 @@ class DateFunctionsSuite extends QueryTest {
Row(15, 15, 15))
}
+ test("function last_day") {
+ val df1 = Seq((1, "2015-07-23"), (2, "2015-07-24")).toDF("i", "d")
+ val df2 = Seq((1, "2015-07-23 00:11:22"), (2, "2015-07-24 11:22:33")).toDF("i", "t")
+ checkAnswer(
+ df1.select(last_day(col("d"))),
+ Seq(Row(Date.valueOf("2015-07-31")), Row(Date.valueOf("2015-07-31"))))
+ checkAnswer(
+ df2.select(last_day(col("t"))),
+ Seq(Row(Date.valueOf("2015-07-31")), Row(Date.valueOf("2015-07-31"))))
+ }
+
+ test("function next_day") {
+ val df1 = Seq(("mon", "2015-07-23"), ("tuesday", "2015-07-20")).toDF("dow", "d")
+ val df2 = Seq(("th", "2015-07-23 00:11:22"), ("xx", "2015-07-24 11:22:33")).toDF("dow", "t")
+ checkAnswer(
+ df1.select(next_day(col("d"), "MONDAY")),
+ Seq(Row(Date.valueOf("2015-07-27")), Row(Date.valueOf("2015-07-27"))))
+ checkAnswer(
+ df2.select(next_day(col("t"), "th")),
+ Seq(Row(Date.valueOf("2015-07-30")), Row(null)))
+ }
+
}