aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/sql
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/sql')
-rw-r--r--python/pyspark/sql/functions.py49
1 files changed, 49 insertions, 0 deletions
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 3b20ba5177..5017ab5b36 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -1053,6 +1053,55 @@ def to_utc_timestamp(timestamp, tz):
return Column(sc._jvm.functions.to_utc_timestamp(_to_java_column(timestamp), tz))
+@since(2.0)
+@ignore_unicode_prefix
+def window(timeColumn, windowDuration, slideDuration=None, startTime=None):
+ """Bucketize rows into one or more time windows given a timestamp specifying column. Window
+ starts are inclusive but the window ends are exclusive, e.g. 12:05 will be in the window
+ [12:05,12:10) but not in [12:00,12:05). Windows can support microsecond precision. Windows in
+ the order of months are not supported.
+
+ The time column must be of TimestampType.
+
+ Durations are provided as strings, e.g. '1 second', '1 day 12 hours', '2 minutes'. Valid
+ interval strings are 'week', 'day', 'hour', 'minute', 'second', 'millisecond', 'microsecond'.
+ If the `slideDuration` is not provided, the windows will be tumbling windows.
+
+ The startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start
+ window intervals. For example, in order to have hourly tumbling windows that start 15 minutes
+ past the hour, e.g. 12:15-13:15, 13:15-14:15... provide `startTime` as `15 minutes`.
+
+ The output column will be a struct called 'window' by default with the nested columns 'start'
+ and 'end', where 'start' and 'end' will be of `TimestampType`.
+
+ >>> df = sqlContext.createDataFrame([("2016-03-11 09:00:07", 1)]).toDF("date", "val")
+ >>> w = df.groupBy(window("date", "5 seconds")).agg(sum("val").alias("sum"))
+ >>> w.select(w.window.start.cast("string").alias("start"),
+ ... w.window.end.cast("string").alias("end"), "sum").collect()
+ [Row(start=u'2016-03-11 09:00:05', end=u'2016-03-11 09:00:10', sum=1)]
+ """
+ def check_string_field(field, fieldName):
+ if not field or type(field) is not str:
+ raise TypeError("%s should be provided as a string" % fieldName)
+
+ sc = SparkContext._active_spark_context
+ time_col = _to_java_column(timeColumn)
+ check_string_field(windowDuration, "windowDuration")
+ if slideDuration and startTime:
+ check_string_field(slideDuration, "slideDuration")
+ check_string_field(startTime, "startTime")
+ res = sc._jvm.functions.window(time_col, windowDuration, slideDuration, startTime)
+ elif slideDuration:
+ check_string_field(slideDuration, "slideDuration")
+ res = sc._jvm.functions.window(time_col, windowDuration, slideDuration)
+ elif startTime:
+ check_string_field(startTime, "startTime")
+ res = sc._jvm.functions.window(time_col, windowDuration, windowDuration, startTime)
+ else:
+ res = sc._jvm.functions.window(time_col, windowDuration)
+ return Column(res)
+
+
# ---------------------------- misc functions ----------------------------------
@since(1.5)