aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorBurak Yavuz <brkyvz@gmail.com>2016-04-05 13:18:39 -0700
committerDavies Liu <davies.liu@gmail.com>2016-04-05 13:18:39 -0700
commit9ee5c257176d5c7989031d260e74e3eca530c120 (patch)
treec100eb39f8190d73400cdad6e959744168a0838e /sql/core
parent72544d6f2a72b9e44e0a32de1fb379e3342be5c3 (diff)
downloadspark-9ee5c257176d5c7989031d260e74e3eca530c120.tar.gz
spark-9ee5c257176d5c7989031d260e74e3eca530c120.tar.bz2
spark-9ee5c257176d5c7989031d260e74e3eca530c120.zip
[SPARK-14353] Dataset Time Window `window` API for Python, and SQL
## What changes were proposed in this pull request? The `window` function was added to Dataset with [this PR](https://github.com/apache/spark/pull/12008). This PR adds the Python, and SQL, API for this function. With this PR, SQL, Java, and Scala will share the same APIs as in users can use: - `window(timeColumn, windowDuration)` - `window(timeColumn, windowDuration, slideDuration)` - `window(timeColumn, windowDuration, slideDuration, startTime)` In Python, users can access all APIs above, but in addition they can do - In Python: `window(timeColumn, windowDuration, startTime=...)` that is, they can provide the startTime without providing the `slideDuration`. In this case, we will generate tumbling windows. ## How was this patch tested? Unit tests + manual tests Author: Burak Yavuz <brkyvz@gmail.com> Closes #12136 from brkyvz/python-windows.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/functions.scala9
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala57
2 files changed, 60 insertions, 6 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index da58ba2add..5bc0034cb0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -2574,8 +2574,7 @@ object functions {
* processing time.
*
* @param timeColumn The column or the expression to use as the timestamp for windowing by time.
- * The time can be as TimestampType or LongType, however when using LongType,
- * the time must be given in seconds.
+ * The time column must be of TimestampType.
* @param windowDuration A string specifying the width of the window, e.g. `10 minutes`,
* `1 second`. Check [[org.apache.spark.unsafe.types.CalendarInterval]] for
* valid duration identifiers.
@@ -2629,8 +2628,7 @@ object functions {
* processing time.
*
* @param timeColumn The column or the expression to use as the timestamp for windowing by time.
- * The time can be as TimestampType or LongType, however when using LongType,
- * the time must be given in seconds.
+ * The time column must be of TimestampType.
* @param windowDuration A string specifying the width of the window, e.g. `10 minutes`,
* `1 second`. Check [[org.apache.spark.unsafe.types.CalendarInterval]] for
* valid duration identifiers.
@@ -2672,8 +2670,7 @@ object functions {
* processing time.
*
* @param timeColumn The column or the expression to use as the timestamp for windowing by time.
- * The time can be as TimestampType or LongType, however when using LongType,
- * the time must be given in seconds.
+ * The time column must be of TimestampType.
* @param windowDuration A string specifying the width of the window, e.g. `10 minutes`,
* `1 second`. Check [[org.apache.spark.unsafe.types.CalendarInterval]] for
* valid duration identifiers.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
index e8103a31d5..06584ec21e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
@@ -239,4 +239,61 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSQLContext with B
Row("2016-03-27 09:00:00.68", "2016-03-27 09:00:00.88", 1))
)
}
+
+ private def withTempTable(f: String => Unit): Unit = {
+ val tableName = "temp"
+ Seq(
+ ("2016-03-27 19:39:34", 1),
+ ("2016-03-27 19:39:56", 2),
+ ("2016-03-27 19:39:27", 4)).toDF("time", "value").registerTempTable(tableName)
+ try {
+ f(tableName)
+ } finally {
+ sqlContext.dropTempTable(tableName)
+ }
+ }
+
+ test("time window in SQL with single string expression") {
+ withTempTable { table =>
+ checkAnswer(
+ sqlContext.sql(s"""select window(time, "10 seconds"), value from $table""")
+ .select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"),
+ Seq(
+ Row("2016-03-27 19:39:20", "2016-03-27 19:39:30", 4),
+ Row("2016-03-27 19:39:30", "2016-03-27 19:39:40", 1),
+ Row("2016-03-27 19:39:50", "2016-03-27 19:40:00", 2)
+ )
+ )
+ }
+ }
+
+ test("time window in SQL with with two expressions") {
+ withTempTable { table =>
+ checkAnswer(
+ sqlContext.sql(
+ s"""select window(time, "10 seconds", 10000000), value from $table""")
+ .select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"),
+ Seq(
+ Row("2016-03-27 19:39:20", "2016-03-27 19:39:30", 4),
+ Row("2016-03-27 19:39:30", "2016-03-27 19:39:40", 1),
+ Row("2016-03-27 19:39:50", "2016-03-27 19:40:00", 2)
+ )
+ )
+ }
+ }
+
+ test("time window in SQL with with three expressions") {
+ withTempTable { table =>
+ checkAnswer(
+ sqlContext.sql(
+ s"""select window(time, "10 seconds", 10000000, "5 seconds"), value from $table""")
+ .select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"),
+ Seq(
+ Row("2016-03-27 19:39:25", "2016-03-27 19:39:35", 1),
+ Row("2016-03-27 19:39:25", "2016-03-27 19:39:35", 4),
+ Row("2016-03-27 19:39:55", "2016-03-27 19:40:05", 2)
+ )
+ )
+ }
+ }
}