aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorBurak Yavuz <brkyvz@gmail.com>2016-04-01 13:19:24 -0700
committerMichael Armbrust <michael@databricks.com>2016-04-01 13:19:24 -0700
commit1b829ce13990b40fd8d7c9efcc2ae55c4dbc861c (patch)
tree3dd5b6bfd14b9eafde58bed77dc89ae43712a599 /sql/core
parent1e886159849e3918445d3fdc3c4cef86c6c1a236 (diff)
downloadspark-1b829ce13990b40fd8d7c9efcc2ae55c4dbc861c.tar.gz
spark-1b829ce13990b40fd8d7c9efcc2ae55c4dbc861c.tar.bz2
spark-1b829ce13990b40fd8d7c9efcc2ae55c4dbc861c.zip
[SPARK-14160] Time Windowing functions for Datasets
## What changes were proposed in this pull request? This PR adds the function `window` as a column expression. `window` can be used to bucket rows into time windows given a time column. With this expression, performing time series analysis on batch data, as well as streaming data should become much more simpler. ### Usage Assume the following schema: `sensor_id, measurement, timestamp` To average 5 minute data every 1 minute (window length of 5 minutes, slide duration of 1 minute), we will use: ```scala df.groupBy(window("timestamp", “5 minutes”, “1 minute”), "sensor_id") .agg(mean("measurement").as("avg_meas")) ``` This will generate windows such as: ``` 09:00:00-09:05:00 09:01:00-09:06:00 09:02:00-09:07:00 ... ``` Intervals will start at every `slideDuration` starting at the unix epoch (1970-01-01 00:00:00 UTC). To start intervals at a different point of time, e.g. 30 seconds after a minute, the `startTime` parameter can be used. ```scala df.groupBy(window("timestamp", “5 minutes”, “1 minute”, "30 second"), "sensor_id") .agg(mean("measurement").as("avg_meas")) ``` This will generate windows such as: ``` 09:00:30-09:05:30 09:01:30-09:06:30 09:02:30-09:07:30 ... ``` Support for Python will be made in a follow up PR after this. ## How was this patch tested? This patch has some basic unit tests for the `TimeWindow` expression testing that the parameters pass validation, and it also has some unit/integration tests testing the correctness of the windowing and usability in complex operations (multi-column grouping, multi-column projections, joins). Author: Burak Yavuz <brkyvz@gmail.com> Author: Michael Armbrust <michael@databricks.com> Closes #12008 from brkyvz/df-time-window.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/functions.scala137
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala242
2 files changed, 379 insertions, 0 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 7ce15e3f35..74906050ac 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -2550,6 +2550,143 @@ object functions {
ToUTCTimestamp(ts.expr, Literal(tz))
}
+ /**
+ * Bucketize rows into one or more time windows given a timestamp specifying column. Window
+ * starts are inclusive but the window ends are exclusive, e.g. 12:05 will be in the window
+ * [12:05,12:10) but not in [12:00,12:05). Windows can support microsecond precision. Windows in
+ * the order of months are not supported. The following example takes the average stock price for
+ * a one minute window every 10 seconds starting 5 seconds after the hour:
+ *
+ * {{{
+ * val df = ... // schema => timestamp: TimestampType, stockId: StringType, price: DoubleType
+ * df.groupBy(window($"time", "1 minute", "10 seconds", "5 seconds"), $"stockId")
+ * .agg(mean("price"))
+ * }}}
+ *
+ * The windows will look like:
+ *
+ * {{{
+ * 09:00:05-09:01:05
+ * 09:00:15-09:01:15
+ * 09:00:25-09:01:25 ...
+ * }}}
+ *
+ * For a continuous query, you may use the function `current_timestamp` to generate windows on
+ * processing time.
+ *
+ * @param timeColumn The column or the expression to use as the timestamp for windowing by time.
+ * The time can be as TimestampType or LongType, however when using LongType,
+ * the time must be given in seconds.
+ * @param windowDuration A string specifying the width of the window, e.g. `10 minutes`,
+ * `1 second`. Check [[org.apache.spark.unsafe.types.CalendarInterval]] for
+ * valid duration identifiers.
+ * @param slideDuration A string specifying the sliding interval of the window, e.g. `1 minute`.
+ * A new window will be generated every `slideDuration`. Must be less than
+ * or equal to the `windowDuration`. Check
+ * [[org.apache.spark.unsafe.types.CalendarInterval]] for valid duration
+ * identifiers.
+ * @param startTime The offset with respect to 1970-01-01 00:00:00 UTC with which to start
+ * window intervals. For example, in order to have hourly tumbling windows that
+ * start 15 minutes past the hour, e.g. 12:15-13:15, 13:15-14:15... provide
+ * `startTime` as `15 minutes`.
+ *
+ * @group datetime_funcs
+ * @since 2.0.0
+ */
+ @Experimental
+ def window(
+ timeColumn: Column,
+ windowDuration: String,
+ slideDuration: String,
+ startTime: String): Column = {
+ withExpr {
+ TimeWindow(timeColumn.expr, windowDuration, slideDuration, startTime)
+ }.as("window")
+ }
+
+
+ /**
+ * Bucketize rows into one or more time windows given a timestamp specifying column. Window
+ * starts are inclusive but the window ends are exclusive, e.g. 12:05 will be in the window
+ * [12:05,12:10) but not in [12:00,12:05). Windows can support microsecond precision. Windows in
+ * the order of months are not supported. The windows start beginning at 1970-01-01 00:00:00 UTC.
+ * The following example takes the average stock price for a one minute window every 10 seconds:
+ *
+ * {{{
+ * val df = ... // schema => timestamp: TimestampType, stockId: StringType, price: DoubleType
+ * df.groupBy(window($"time", "1 minute", "10 seconds"), $"stockId")
+ * .agg(mean("price"))
+ * }}}
+ *
+ * The windows will look like:
+ *
+ * {{{
+ * 09:00:00-09:01:00
+ * 09:00:10-09:01:10
+ * 09:00:20-09:01:20 ...
+ * }}}
+ *
+ * For a continuous query, you may use the function `current_timestamp` to generate windows on
+ * processing time.
+ *
+ * @param timeColumn The column or the expression to use as the timestamp for windowing by time.
+ * The time can be as TimestampType or LongType, however when using LongType,
+ * the time must be given in seconds.
+ * @param windowDuration A string specifying the width of the window, e.g. `10 minutes`,
+ * `1 second`. Check [[org.apache.spark.unsafe.types.CalendarInterval]] for
+ * valid duration identifiers.
+ * @param slideDuration A string specifying the sliding interval of the window, e.g. `1 minute`.
+ * A new window will be generated every `slideDuration`. Must be less than
+ * or equal to the `windowDuration`. Check
+ * [[org.apache.spark.unsafe.types.CalendarInterval]] for valid duration.
+ *
+ * @group datetime_funcs
+ * @since 2.0.0
+ */
+ @Experimental
+ def window(timeColumn: Column, windowDuration: String, slideDuration: String): Column = {
+ window(timeColumn, windowDuration, slideDuration, "0 second")
+ }
+
+ /**
+ * Generates tumbling time windows given a timestamp specifying column. Window
+ * starts are inclusive but the window ends are exclusive, e.g. 12:05 will be in the window
+ * [12:05,12:10) but not in [12:00,12:05). Windows can support microsecond precision. Windows in
+ * the order of months are not supported. The windows start beginning at 1970-01-01 00:00:00 UTC.
+ * The following example takes the average stock price for a one minute tumbling window:
+ *
+ * {{{
+ * val df = ... // schema => timestamp: TimestampType, stockId: StringType, price: DoubleType
+ * df.groupBy(window($"time", "1 minute"), $"stockId")
+ * .agg(mean("price"))
+ * }}}
+ *
+ * The windows will look like:
+ *
+ * {{{
+ * 09:00:00-09:01:00
+ * 09:01:00-09:02:00
+ * 09:02:00-09:03:00 ...
+ * }}}
+ *
+ * For a continuous query, you may use the function `current_timestamp` to generate windows on
+ * processing time.
+ *
+ * @param timeColumn The column or the expression to use as the timestamp for windowing by time.
+ * The time can be as TimestampType or LongType, however when using LongType,
+ * the time must be given in seconds.
+ * @param windowDuration A string specifying the width of the window, e.g. `10 minutes`,
+ * `1 second`. Check [[org.apache.spark.unsafe.types.CalendarInterval]] for
+ * valid duration identifiers.
+ *
+ * @group datetime_funcs
+ * @since 2.0.0
+ */
+ @Experimental
+ def window(timeColumn: Column, windowDuration: String): Column = {
+ window(timeColumn, windowDuration, windowDuration, "0 second")
+ }
+
//////////////////////////////////////////////////////////////////////////////////////////////
// Collection functions
//////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
new file mode 100644
index 0000000000..e8103a31d5
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.TimeZone
+
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.StringType
+
+class DataFrameTimeWindowingSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
+
+ import testImplicits._
+
+ override def beforeEach(): Unit = {
+ super.beforeEach()
+ TimeZone.setDefault(TimeZone.getTimeZone("UTC"))
+ }
+
+ override def afterEach(): Unit = {
+ super.beforeEach()
+ TimeZone.setDefault(null)
+ }
+
+ test("tumbling window groupBy statement") {
+ val df = Seq(
+ ("2016-03-27 19:39:34", 1, "a"),
+ ("2016-03-27 19:39:56", 2, "a"),
+ ("2016-03-27 19:39:27", 4, "b")).toDF("time", "value", "id")
+ checkAnswer(
+ df.groupBy(window($"time", "10 seconds"))
+ .agg(count("*").as("counts"))
+ .orderBy($"window.start".asc)
+ .select("counts"),
+ Seq(Row(1), Row(1), Row(1))
+ )
+ }
+
+ test("tumbling window groupBy statement with startTime") {
+ val df = Seq(
+ ("2016-03-27 19:39:34", 1, "a"),
+ ("2016-03-27 19:39:56", 2, "a"),
+ ("2016-03-27 19:39:27", 4, "b")).toDF("time", "value", "id")
+
+ checkAnswer(
+ df.groupBy(window($"time", "10 seconds", "10 seconds", "5 seconds"), $"id")
+ .agg(count("*").as("counts"))
+ .orderBy($"window.start".asc)
+ .select("counts"),
+ Seq(Row(1), Row(1), Row(1)))
+ }
+
+ test("tumbling window with multi-column projection") {
+ val df = Seq(
+ ("2016-03-27 19:39:34", 1, "a"),
+ ("2016-03-27 19:39:56", 2, "a"),
+ ("2016-03-27 19:39:27", 4, "b")).toDF("time", "value", "id")
+
+ checkAnswer(
+ df.select(window($"time", "10 seconds"), $"value")
+ .orderBy($"window.start".asc)
+ .select($"window.start".cast("string"), $"window.end".cast("string"), $"value"),
+ Seq(
+ Row("2016-03-27 19:39:20", "2016-03-27 19:39:30", 4),
+ Row("2016-03-27 19:39:30", "2016-03-27 19:39:40", 1),
+ Row("2016-03-27 19:39:50", "2016-03-27 19:40:00", 2)
+ )
+ )
+ }
+
+ test("sliding window grouping") {
+ val df = Seq(
+ ("2016-03-27 19:39:34", 1, "a"),
+ ("2016-03-27 19:39:56", 2, "a"),
+ ("2016-03-27 19:39:27", 4, "b")).toDF("time", "value", "id")
+
+ checkAnswer(
+ df.groupBy(window($"time", "10 seconds", "3 seconds", "0 second"))
+ .agg(count("*").as("counts"))
+ .orderBy($"window.start".asc)
+ .select($"window.start".cast("string"), $"window.end".cast("string"), $"counts"),
+ // 2016-03-27 19:39:27 UTC -> 4 bins
+ // 2016-03-27 19:39:34 UTC -> 3 bins
+ // 2016-03-27 19:39:56 UTC -> 3 bins
+ Seq(
+ Row("2016-03-27 19:39:18", "2016-03-27 19:39:28", 1),
+ Row("2016-03-27 19:39:21", "2016-03-27 19:39:31", 1),
+ Row("2016-03-27 19:39:24", "2016-03-27 19:39:34", 1),
+ Row("2016-03-27 19:39:27", "2016-03-27 19:39:37", 2),
+ Row("2016-03-27 19:39:30", "2016-03-27 19:39:40", 1),
+ Row("2016-03-27 19:39:33", "2016-03-27 19:39:43", 1),
+ Row("2016-03-27 19:39:48", "2016-03-27 19:39:58", 1),
+ Row("2016-03-27 19:39:51", "2016-03-27 19:40:01", 1),
+ Row("2016-03-27 19:39:54", "2016-03-27 19:40:04", 1))
+ )
+ }
+
+ test("sliding window projection") {
+ val df = Seq(
+ ("2016-03-27 19:39:34", 1, "a"),
+ ("2016-03-27 19:39:56", 2, "a"),
+ ("2016-03-27 19:39:27", 4, "b")).toDF("time", "value", "id")
+
+ checkAnswer(
+ df.select(window($"time", "10 seconds", "3 seconds", "0 second"), $"value")
+ .orderBy($"window.start".asc, $"value".desc).select("value"),
+ // 2016-03-27 19:39:27 UTC -> 4 bins
+ // 2016-03-27 19:39:34 UTC -> 3 bins
+ // 2016-03-27 19:39:56 UTC -> 3 bins
+ Seq(Row(4), Row(4), Row(4), Row(4), Row(1), Row(1), Row(1), Row(2), Row(2), Row(2))
+ )
+ }
+
+ test("windowing combined with explode expression") {
+ val df = Seq(
+ ("2016-03-27 19:39:34", 1, Seq("a", "b")),
+ ("2016-03-27 19:39:56", 2, Seq("a", "c", "d"))).toDF("time", "value", "ids")
+
+ checkAnswer(
+ df.select(window($"time", "10 seconds"), $"value", explode($"ids"))
+ .orderBy($"window.start".asc).select("value"),
+ // first window exploded to two rows for "a", and "b", second window exploded to 3 rows
+ Seq(Row(1), Row(1), Row(2), Row(2), Row(2))
+ )
+ }
+
+ test("null timestamps") {
+ val df = Seq(
+ ("2016-03-27 09:00:05", 1),
+ ("2016-03-27 09:00:32", 2),
+ (null, 3),
+ (null, 4)).toDF("time", "value")
+
+ checkDataset(
+ df.select(window($"time", "10 seconds"), $"value")
+ .orderBy($"window.start".asc)
+ .select("value")
+ .as[Int],
+ 1, 2) // null columns are dropped
+ }
+
+ test("time window joins") {
+ val df = Seq(
+ ("2016-03-27 09:00:05", 1),
+ ("2016-03-27 09:00:32", 2),
+ (null, 3),
+ (null, 4)).toDF("time", "value")
+
+ val df2 = Seq(
+ ("2016-03-27 09:00:02", 3),
+ ("2016-03-27 09:00:35", 6)).toDF("time", "othervalue")
+
+ checkAnswer(
+ df.select(window($"time", "10 seconds"), $"value").join(
+ df2.select(window($"time", "10 seconds"), $"othervalue"), Seq("window"))
+ .groupBy("window")
+ .agg((sum("value") + sum("othervalue")).as("total"))
+ .orderBy($"window.start".asc).select("total"),
+ Seq(Row(4), Row(8)))
+ }
+
+ test("negative timestamps") {
+ val df4 = Seq(
+ ("1970-01-01 00:00:02", 1),
+ ("1970-01-01 00:00:12", 2)).toDF("time", "value")
+ checkAnswer(
+ df4.select(window($"time", "10 seconds", "10 seconds", "5 seconds"), $"value")
+ .orderBy($"window.start".asc)
+ .select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"),
+ Seq(
+ Row("1969-12-31 23:59:55", "1970-01-01 00:00:05", 1),
+ Row("1970-01-01 00:00:05", "1970-01-01 00:00:15", 2))
+ )
+ }
+
+ test("multiple time windows in a single operator throws nice exception") {
+ val df = Seq(
+ ("2016-03-27 09:00:02", 3),
+ ("2016-03-27 09:00:35", 6)).toDF("time", "value")
+ val e = intercept[AnalysisException] {
+ df.select(window($"time", "10 second"), window($"time", "15 second")).collect()
+ }
+ assert(e.getMessage.contains(
+ "Multiple time window expressions would result in a cartesian product"))
+ }
+
+ test("aliased windows") {
+ val df = Seq(
+ ("2016-03-27 19:39:34", 1, Seq("a", "b")),
+ ("2016-03-27 19:39:56", 2, Seq("a", "c", "d"))).toDF("time", "value", "ids")
+
+ checkAnswer(
+ df.select(window($"time", "10 seconds").as("time_window"), $"value")
+ .orderBy($"time_window.start".asc)
+ .select("value"),
+ Seq(Row(1), Row(2))
+ )
+ }
+
+ test("millisecond precision sliding windows") {
+ val df = Seq(
+ ("2016-03-27 09:00:00.41", 3),
+ ("2016-03-27 09:00:00.62", 6),
+ ("2016-03-27 09:00:00.715", 8)).toDF("time", "value")
+ checkAnswer(
+ df.groupBy(window($"time", "200 milliseconds", "40 milliseconds", "0 milliseconds"))
+ .agg(count("*").as("counts"))
+ .orderBy($"window.start".asc)
+ .select($"window.start".cast(StringType), $"window.end".cast(StringType), $"counts"),
+ Seq(
+ Row("2016-03-27 09:00:00.24", "2016-03-27 09:00:00.44", 1),
+ Row("2016-03-27 09:00:00.28", "2016-03-27 09:00:00.48", 1),
+ Row("2016-03-27 09:00:00.32", "2016-03-27 09:00:00.52", 1),
+ Row("2016-03-27 09:00:00.36", "2016-03-27 09:00:00.56", 1),
+ Row("2016-03-27 09:00:00.4", "2016-03-27 09:00:00.6", 1),
+ Row("2016-03-27 09:00:00.44", "2016-03-27 09:00:00.64", 1),
+ Row("2016-03-27 09:00:00.48", "2016-03-27 09:00:00.68", 1),
+ Row("2016-03-27 09:00:00.52", "2016-03-27 09:00:00.72", 2),
+ Row("2016-03-27 09:00:00.56", "2016-03-27 09:00:00.76", 2),
+ Row("2016-03-27 09:00:00.6", "2016-03-27 09:00:00.8", 2),
+ Row("2016-03-27 09:00:00.64", "2016-03-27 09:00:00.84", 1),
+ Row("2016-03-27 09:00:00.68", "2016-03-27 09:00:00.88", 1))
+ )
+ }
+}