aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorBurak Yavuz <brkyvz@gmail.com>2016-04-05 13:18:39 -0700
committerDavies Liu <davies.liu@gmail.com>2016-04-05 13:18:39 -0700
commit9ee5c257176d5c7989031d260e74e3eca530c120 (patch)
treec100eb39f8190d73400cdad6e959744168a0838e /python
parent72544d6f2a72b9e44e0a32de1fb379e3342be5c3 (diff)
downloadspark-9ee5c257176d5c7989031d260e74e3eca530c120.tar.gz
spark-9ee5c257176d5c7989031d260e74e3eca530c120.tar.bz2
spark-9ee5c257176d5c7989031d260e74e3eca530c120.zip
[SPARK-14353] Dataset Time Window `window` API for Python, and SQL
## What changes were proposed in this pull request? The `window` function was added to Dataset with [this PR](https://github.com/apache/spark/pull/12008). This PR adds the Python, and SQL, API for this function. With this PR, SQL, Java, and Scala will share the same APIs as in users can use: - `window(timeColumn, windowDuration)` - `window(timeColumn, windowDuration, slideDuration)` - `window(timeColumn, windowDuration, slideDuration, startTime)` In Python, users can access all APIs above, but in addition they can do - In Python: `window(timeColumn, windowDuration, startTime=...)` that is, they can provide the startTime without providing the `slideDuration`. In this case, we will generate tumbling windows. ## How was this patch tested? Unit tests + manual tests Author: Burak Yavuz <brkyvz@gmail.com> Closes #12136 from brkyvz/python-windows.
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/sql/functions.py49
1 files changed, 49 insertions, 0 deletions
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 3b20ba5177..5017ab5b36 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -1053,6 +1053,55 @@ def to_utc_timestamp(timestamp, tz):
return Column(sc._jvm.functions.to_utc_timestamp(_to_java_column(timestamp), tz))
+@since(2.0)
+@ignore_unicode_prefix
+def window(timeColumn, windowDuration, slideDuration=None, startTime=None):
+ """Bucketize rows into one or more time windows given a timestamp specifying column. Window
+ starts are inclusive but the window ends are exclusive, e.g. 12:05 will be in the window
+ [12:05,12:10) but not in [12:00,12:05). Windows can support microsecond precision. Windows in
+ the order of months are not supported.
+
+ The time column must be of TimestampType.
+
+ Durations are provided as strings, e.g. '1 second', '1 day 12 hours', '2 minutes'. Valid
+ interval strings are 'week', 'day', 'hour', 'minute', 'second', 'millisecond', 'microsecond'.
+ If the `slideDuration` is not provided, the windows will be tumbling windows.
+
+ The startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start
+ window intervals. For example, in order to have hourly tumbling windows that start 15 minutes
+ past the hour, e.g. 12:15-13:15, 13:15-14:15... provide `startTime` as `15 minutes`.
+
+ The output column will be a struct called 'window' by default with the nested columns 'start'
+ and 'end', where 'start' and 'end' will be of `TimestampType`.
+
+ >>> df = sqlContext.createDataFrame([("2016-03-11 09:00:07", 1)]).toDF("date", "val")
+ >>> w = df.groupBy(window("date", "5 seconds")).agg(sum("val").alias("sum"))
+ >>> w.select(w.window.start.cast("string").alias("start"),
+ ... w.window.end.cast("string").alias("end"), "sum").collect()
+ [Row(start=u'2016-03-11 09:00:05', end=u'2016-03-11 09:00:10', sum=1)]
+ """
+ def check_string_field(field, fieldName):
+ if not field or type(field) is not str:
+ raise TypeError("%s should be provided as a string" % fieldName)
+
+ sc = SparkContext._active_spark_context
+ time_col = _to_java_column(timeColumn)
+ check_string_field(windowDuration, "windowDuration")
+ if slideDuration and startTime:
+ check_string_field(slideDuration, "slideDuration")
+ check_string_field(startTime, "startTime")
+ res = sc._jvm.functions.window(time_col, windowDuration, slideDuration, startTime)
+ elif slideDuration:
+ check_string_field(slideDuration, "slideDuration")
+ res = sc._jvm.functions.window(time_col, windowDuration, slideDuration)
+ elif startTime:
+ check_string_field(startTime, "startTime")
+ res = sc._jvm.functions.window(time_col, windowDuration, windowDuration, startTime)
+ else:
+ res = sc._jvm.functions.window(time_col, windowDuration)
+ return Column(res)
+
+
# ---------------------------- misc functions ----------------------------------
@since(1.5)