diff options
9 files changed, 297 insertions, 61 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 8fafd7778a..bc08466461 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -211,6 +211,7 @@ object FunctionRegistry { expression[AddMonths]("add_months"), expression[CurrentDate]("current_date"), expression[CurrentTimestamp]("current_timestamp"), + expression[DateDiff]("datediff"), expression[DateAdd]("date_add"), expression[DateFormatClass]("date_format"), expression[DateSub]("date_sub"), @@ -218,6 +219,7 @@ object FunctionRegistry { expression[DayOfYear]("dayofyear"), expression[DayOfMonth]("dayofmonth"), expression[FromUnixTime]("from_unixtime"), + expression[FromUTCTimestamp]("from_utc_timestamp"), expression[Hour]("hour"), expression[LastDay]("last_day"), expression[Minute]("minute"), @@ -227,6 +229,7 @@ object FunctionRegistry { expression[Quarter]("quarter"), expression[Second]("second"), expression[ToDate]("to_date"), + expression[ToUTCTimestamp]("to_utc_timestamp"), expression[TruncDate]("trunc"), expression[UnixTimestamp]("unix_timestamp"), expression[WeekOfYear]("weekofyear"), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala index 07dea5b470..32dc9b7682 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala @@ -619,6 +619,53 @@ case class TimeAdd(start: Expression, interval: Expression) } /** + * Assumes given timestamp is UTC and converts to given timezone. + */ +case class FromUTCTimestamp(left: Expression, right: Expression) + extends BinaryExpression with ImplicitCastInputTypes { + + override def inputTypes: Seq[AbstractDataType] = Seq(TimestampType, StringType) + override def dataType: DataType = TimestampType + override def prettyName: String = "from_utc_timestamp" + + override def nullSafeEval(time: Any, timezone: Any): Any = { + DateTimeUtils.fromUTCTime(time.asInstanceOf[Long], + timezone.asInstanceOf[UTF8String].toString) + } + + override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = { + val dtu = DateTimeUtils.getClass.getName.stripSuffix("$") + if (right.foldable) { + val tz = right.eval() + if (tz == null) { + s""" + |boolean ${ev.isNull} = true; + |long ${ev.primitive} = 0; + """.stripMargin + } else { + val tzTerm = ctx.freshName("tz") + val tzClass = classOf[TimeZone].getName + ctx.addMutableState(tzClass, tzTerm, s"""$tzTerm = $tzClass.getTimeZone("$tz");""") + val eval = left.gen(ctx) + s""" + |${eval.code} + |boolean ${ev.isNull} = ${eval.isNull}; + |long ${ev.primitive} = 0; + |if (!${ev.isNull}) { + | ${ev.primitive} = ${eval.primitive} + + | ${tzTerm}.getOffset(${eval.primitive} / 1000) * 1000L; + |} + """.stripMargin + } + } else { + defineCodeGen(ctx, ev, (timestamp, format) => { + s"""$dtu.fromUTCTime($timestamp, $format.toString())""" + }) + } + } +} + +/** * Subtracts an interval from timestamp. */ case class TimeSub(start: Expression, interval: Expression) @@ -697,6 +744,53 @@ case class MonthsBetween(date1: Expression, date2: Expression) } /** + * Assumes given timestamp is in given timezone and converts to UTC. + */ +case class ToUTCTimestamp(left: Expression, right: Expression) + extends BinaryExpression with ImplicitCastInputTypes { + + override def inputTypes: Seq[AbstractDataType] = Seq(TimestampType, StringType) + override def dataType: DataType = TimestampType + override def prettyName: String = "to_utc_timestamp" + + override def nullSafeEval(time: Any, timezone: Any): Any = { + DateTimeUtils.toUTCTime(time.asInstanceOf[Long], + timezone.asInstanceOf[UTF8String].toString) + } + + override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = { + val dtu = DateTimeUtils.getClass.getName.stripSuffix("$") + if (right.foldable) { + val tz = right.eval() + if (tz == null) { + s""" + |boolean ${ev.isNull} = true; + |long ${ev.primitive} = 0; + """.stripMargin + } else { + val tzTerm = ctx.freshName("tz") + val tzClass = classOf[TimeZone].getName + ctx.addMutableState(tzClass, tzTerm, s"""$tzTerm = $tzClass.getTimeZone("$tz");""") + val eval = left.gen(ctx) + s""" + |${eval.code} + |boolean ${ev.isNull} = ${eval.isNull}; + |long ${ev.primitive} = 0; + |if (!${ev.isNull}) { + | ${ev.primitive} = ${eval.primitive} - + | ${tzTerm}.getOffset(${eval.primitive} / 1000) * 1000L; + |} + """.stripMargin + } + } else { + defineCodeGen(ctx, ev, (timestamp, format) => { + s"""$dtu.toUTCTime($timestamp, $format.toString())""" + }) + } + } +} + +/** * Returns the date part of a timestamp or string. */ case class ToDate(child: Expression) extends UnaryExpression with ImplicitCastInputTypes { @@ -714,7 +808,7 @@ case class ToDate(child: Expression) extends UnaryExpression with ImplicitCastIn } } -/* +/** * Returns date truncated to the unit specified by the format. */ case class TruncDate(date: Expression, format: Expression) @@ -783,3 +877,23 @@ case class TruncDate(date: Expression, format: Expression) } } } + +/** + * Returns the number of days from startDate to endDate. + */ +case class DateDiff(endDate: Expression, startDate: Expression) + extends BinaryExpression with ImplicitCastInputTypes { + + override def left: Expression = endDate + override def right: Expression = startDate + override def inputTypes: Seq[AbstractDataType] = Seq(DateType, DateType) + override def dataType: DataType = IntegerType + + override def nullSafeEval(end: Any, start: Any): Any = { + end.asInstanceOf[Int] - start.asInstanceOf[Int] + } + + override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = { + defineCodeGen(ctx, ev, (end, start) => s"$end - $start") + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index 032ed8a56a..6a98f4d9c5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -814,4 +814,24 @@ object DateTimeUtils { } } } + + /** + * Returns a timestamp of given timezone from utc timestamp, with the same string + * representation in their timezone. + */ + def fromUTCTime(time: Long, timeZone: String): Long = { + val tz = TimeZone.getTimeZone(timeZone) + val offset = tz.getOffset(time / 1000L) + time + offset * 1000L + } + + /** + * Returns a utc timestamp from a given timestamp from a given timezone, with the same + * string representation in their timezone. + */ + def toUTCTime(time: Long, timeZone: String): Long = { + val tz = TimeZone.getTimeZone(timeZone) + val offset = tz.getOffset(time / 1000L) + time - offset * 1000L + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala index 6c15c05da3..3bff8e012a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala @@ -23,8 +23,8 @@ import java.util.Calendar import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.unsafe.types.CalendarInterval import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { @@ -48,15 +48,13 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { test("DayOfYear") { val sdfDay = new SimpleDateFormat("D") - (1998 to 2002).foreach { y => - (0 to 3).foreach { m => - (0 to 5).foreach { i => - val c = Calendar.getInstance() - c.set(y, m, 28, 0, 0, 0) - c.add(Calendar.DATE, i) - checkEvaluation(DayOfYear(Literal(new Date(c.getTimeInMillis))), - sdfDay.format(c.getTime).toInt) - } + (0 to 3).foreach { m => + (0 to 5).foreach { i => + val c = Calendar.getInstance() + c.set(2000, m, 28, 0, 0, 0) + c.add(Calendar.DATE, i) + checkEvaluation(DayOfYear(Literal(new Date(c.getTimeInMillis))), + sdfDay.format(c.getTime).toInt) } } checkEvaluation(DayOfYear(Literal.create(null, DateType)), null) @@ -433,4 +431,58 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { checkEvaluation( UnixTimestamp(Literal("2015-07-24"), Literal("not a valid format")), null) } + + test("datediff") { + checkEvaluation( + DateDiff(Literal(Date.valueOf("2015-07-24")), Literal(Date.valueOf("2015-07-21"))), 3) + checkEvaluation( + DateDiff(Literal(Date.valueOf("2015-07-21")), Literal(Date.valueOf("2015-07-24"))), -3) + checkEvaluation(DateDiff(Literal.create(null, DateType), Literal(Date.valueOf("2015-07-24"))), + null) + checkEvaluation(DateDiff(Literal(Date.valueOf("2015-07-24")), Literal.create(null, DateType)), + null) + checkEvaluation( + DateDiff(Literal.create(null, DateType), Literal.create(null, DateType)), + null) + } + + test("to_utc_timestamp") { + def test(t: String, tz: String, expected: String): Unit = { + checkEvaluation( + ToUTCTimestamp( + Literal.create(if (t != null) Timestamp.valueOf(t) else null, TimestampType), + Literal.create(tz, StringType)), + if (expected != null) Timestamp.valueOf(expected) else null) + checkEvaluation( + ToUTCTimestamp( + Literal.create(if (t != null) Timestamp.valueOf(t) else null, TimestampType), + NonFoldableLiteral.create(tz, StringType)), + if (expected != null) Timestamp.valueOf(expected) else null) + } + test("2015-07-24 00:00:00", "PST", "2015-07-24 07:00:00") + test("2015-01-24 00:00:00", "PST", "2015-01-24 08:00:00") + test(null, "UTC", null) + test("2015-07-24 00:00:00", null, null) + test(null, null, null) + } + + test("from_utc_timestamp") { + def test(t: String, tz: String, expected: String): Unit = { + checkEvaluation( + FromUTCTimestamp( + Literal.create(if (t != null) Timestamp.valueOf(t) else null, TimestampType), + Literal.create(tz, StringType)), + if (expected != null) Timestamp.valueOf(expected) else null) + checkEvaluation( + FromUTCTimestamp( + Literal.create(if (t != null) Timestamp.valueOf(t) else null, TimestampType), + NonFoldableLiteral.create(tz, StringType)), + if (expected != null) Timestamp.valueOf(expected) else null) + } + test("2015-07-24 00:00:00", "PST", "2015-07-23 17:00:00") + test("2015-01-24 00:00:00", "PST", "2015-01-23 16:00:00") + test(null, "UTC", null) + test("2015-07-24 00:00:00", null, null) + test(null, null, null) + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala index 60d2bcfe13..d18fa4df13 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala @@ -398,4 +398,26 @@ class DateTimeUtilsSuite extends SparkFunSuite { c2.set(1996, 2, 31, 0, 0, 0) assert(monthsBetween(c1.getTimeInMillis * 1000L, c2.getTimeInMillis * 1000L) === 11) } + + test("from UTC timestamp") { + def test(utc: String, tz: String, expected: String): Unit = { + assert(toJavaTimestamp(fromUTCTime(fromJavaTimestamp(Timestamp.valueOf(utc)), tz)).toString + === expected) + } + test("2011-12-25 09:00:00.123456", "UTC", "2011-12-25 09:00:00.123456") + test("2011-12-25 09:00:00.123456", "JST", "2011-12-25 18:00:00.123456") + test("2011-12-25 09:00:00.123456", "PST", "2011-12-25 01:00:00.123456") + test("2011-12-25 09:00:00.123456", "Asia/Shanghai", "2011-12-25 17:00:00.123456") + } + + test("to UTC timestamp") { + def test(utc: String, tz: String, expected: String): Unit = { + assert(toJavaTimestamp(toUTCTime(fromJavaTimestamp(Timestamp.valueOf(utc)), tz)).toString + === expected) + } + test("2011-12-25 09:00:00.123456", "UTC", "2011-12-25 09:00:00.123456") + test("2011-12-25 18:00:00.123456", "JST", "2011-12-25 09:00:00.123456") + test("2011-12-25 01:00:00.123456", "PST", "2011-12-25 09:00:00.123456") + test("2011-12-25 17:00:00.123456", "Asia/Shanghai", "2011-12-25 09:00:00.123456") + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 818aa109f3..197cd3de61 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -2020,6 +2020,13 @@ object functions { def date_sub(start: Column, days: Int): Column = DateSub(start.expr, Literal(days)) /** + * Returns the number of days from `start` to `end`. + * @group datetime_funcs + * @since 1.5.0 + */ + def datediff(end: Column, start: Column): Column = DateDiff(end.expr, start.expr) + + /** * Extracts the year as an integer from a given date/timestamp/string. * @group datetime_funcs * @since 1.5.0 @@ -2238,6 +2245,21 @@ object functions { */ def trunc(date: Column, format: String): Column = TruncDate(date.expr, Literal(format)) + /** + * Assumes given timestamp is UTC and converts to given timezone. + * @group datetime_funcs + * @since 1.5.0 + */ + def from_utc_timestamp(ts: Column, tz: String): Column = + FromUTCTimestamp(ts.expr, Literal(tz).expr) + + /** + * Assumes given timestamp is in given timezone and converts to UTC. + * @group datetime_funcs + * @since 1.5.0 + */ + def to_utc_timestamp(ts: Column, tz: String): Column = ToUTCTimestamp(ts.expr, Literal(tz).expr) + ////////////////////////////////////////////////////////////////////////////////////////////// // Collection functions ////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala index 8c596fad74..0850f5cf77 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala @@ -445,4 +445,51 @@ class DateFunctionsSuite extends QueryTest { Row(ts1.getTime / 1000L), Row(ts2.getTime / 1000L))) } + test("datediff") { + val df = Seq( + (Date.valueOf("2015-07-24"), Timestamp.valueOf("2015-07-24 01:00:00"), + "2015-07-23", "2015-07-23 03:00:00"), + (Date.valueOf("2015-07-25"), Timestamp.valueOf("2015-07-25 02:00:00"), + "2015-07-24", "2015-07-24 04:00:00") + ).toDF("a", "b", "c", "d") + checkAnswer(df.select(datediff(col("a"), col("b"))), Seq(Row(0), Row(0))) + checkAnswer(df.select(datediff(col("a"), col("c"))), Seq(Row(1), Row(1))) + checkAnswer(df.select(datediff(col("d"), col("b"))), Seq(Row(-1), Row(-1))) + checkAnswer(df.selectExpr("datediff(a, d)"), Seq(Row(1), Row(1))) + } + + test("from_utc_timestamp") { + val df = Seq( + (Timestamp.valueOf("2015-07-24 00:00:00"), "2015-07-24 00:00:00"), + (Timestamp.valueOf("2015-07-25 00:00:00"), "2015-07-25 00:00:00") + ).toDF("a", "b") + checkAnswer( + df.select(from_utc_timestamp(col("a"), "PST")), + Seq( + Row(Timestamp.valueOf("2015-07-23 17:00:00")), + Row(Timestamp.valueOf("2015-07-24 17:00:00")))) + checkAnswer( + df.select(from_utc_timestamp(col("b"), "PST")), + Seq( + Row(Timestamp.valueOf("2015-07-23 17:00:00")), + Row(Timestamp.valueOf("2015-07-24 17:00:00")))) + } + + test("to_utc_timestamp") { + val df = Seq( + (Timestamp.valueOf("2015-07-24 00:00:00"), "2015-07-24 00:00:00"), + (Timestamp.valueOf("2015-07-25 00:00:00"), "2015-07-25 00:00:00") + ).toDF("a", "b") + checkAnswer( + df.select(to_utc_timestamp(col("a"), "PST")), + Seq( + Row(Timestamp.valueOf("2015-07-24 07:00:00")), + Row(Timestamp.valueOf("2015-07-25 07:00:00")))) + checkAnswer( + df.select(to_utc_timestamp(col("b"), "PST")), + Seq( + Row(Timestamp.valueOf("2015-07-24 07:00:00")), + Row(Timestamp.valueOf("2015-07-25 07:00:00")))) + } + } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala index 1c1be0c3cc..ab5da6ee79 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala @@ -176,58 +176,12 @@ class StringFunctionsSuite extends QueryTest { test("string substring_index function") { val df = Seq(("www.apache.org", ".", "zz")).toDF("a", "b", "c") checkAnswer( - df.select(substring_index($"a", ".", 3)), - Row("www.apache.org")) - checkAnswer( df.select(substring_index($"a", ".", 2)), Row("www.apache")) checkAnswer( - df.select(substring_index($"a", ".", 1)), - Row("www")) - checkAnswer( - df.select(substring_index($"a", ".", 0)), - Row("")) - checkAnswer( - df.select(substring_index(lit("www.apache.org"), ".", -1)), - Row("org")) - checkAnswer( - df.select(substring_index(lit("www.apache.org"), ".", -2)), - Row("apache.org")) - checkAnswer( - df.select(substring_index(lit("www.apache.org"), ".", -3)), - Row("www.apache.org")) - // str is empty string - checkAnswer( - df.select(substring_index(lit(""), ".", 1)), - Row("")) - // empty string delim - checkAnswer( - df.select(substring_index(lit("www.apache.org"), "", 1)), - Row("")) - // delim does not exist in str - checkAnswer( - df.select(substring_index(lit("www.apache.org"), "#", 1)), - Row("www.apache.org")) - // delim is 2 chars - checkAnswer( - df.select(substring_index(lit("www||apache||org"), "||", 2)), - Row("www||apache")) - checkAnswer( - df.select(substring_index(lit("www||apache||org"), "||", -2)), - Row("apache||org")) - // null - checkAnswer( - df.select(substring_index(lit(null), "||", 2)), - Row(null)) - checkAnswer( - df.select(substring_index(lit("www.apache.org"), null, 2)), - Row(null)) - // non ascii chars - // scalastyle:off - checkAnswer( - df.selectExpr("""substring_index("大千世界大千世界", "千", 2)"""), - Row("大千世界大")) - // scalastyle:on + df.selectExpr("substring_index(a, '.', 2)"), + Row("www.apache") + ) } test("string locate function") { diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index ec959cb219..53d5b22b52 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -263,6 +263,9 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "timestamp_2", "timestamp_udf", + // Hive returns string from UTC formatted timestamp, spark returns timestamp type + "date_udf", + // Unlike Hive, we do support log base in (0, 1.0], therefore disable this "udf7" ) @@ -397,7 +400,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "date_comparison", "date_join1", "date_serde", - "date_udf", "decimal_1", "decimal_4", "decimal_join", |