diff options
author | Burak Yavuz <brkyvz@gmail.com> | 2016-04-05 13:18:39 -0700 |
---|---|---|
committer | Davies Liu <davies.liu@gmail.com> | 2016-04-05 13:18:39 -0700 |
commit | 9ee5c257176d5c7989031d260e74e3eca530c120 (patch) | |
tree | c100eb39f8190d73400cdad6e959744168a0838e /python | |
parent | 72544d6f2a72b9e44e0a32de1fb379e3342be5c3 (diff) | |
download | spark-9ee5c257176d5c7989031d260e74e3eca530c120.tar.gz spark-9ee5c257176d5c7989031d260e74e3eca530c120.tar.bz2 spark-9ee5c257176d5c7989031d260e74e3eca530c120.zip |
[SPARK-14353] Dataset Time Window `window` API for Python, and SQL
## What changes were proposed in this pull request?
The `window` function was added to Dataset with [this PR](https://github.com/apache/spark/pull/12008).
This PR adds the Python, and SQL, API for this function.
With this PR, SQL, Java, and Scala will share the same APIs as in users can use:
- `window(timeColumn, windowDuration)`
- `window(timeColumn, windowDuration, slideDuration)`
- `window(timeColumn, windowDuration, slideDuration, startTime)`
In Python, users can access all APIs above, but in addition they can do
- In Python:
`window(timeColumn, windowDuration, startTime=...)`
that is, they can provide the startTime without providing the `slideDuration`. In this case, we will generate tumbling windows.
## How was this patch tested?
Unit tests + manual tests
Author: Burak Yavuz <brkyvz@gmail.com>
Closes #12136 from brkyvz/python-windows.
Diffstat (limited to 'python')
-rw-r--r-- | python/pyspark/sql/functions.py | 49 |
1 files changed, 49 insertions, 0 deletions
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 3b20ba5177..5017ab5b36 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -1053,6 +1053,55 @@ def to_utc_timestamp(timestamp, tz): return Column(sc._jvm.functions.to_utc_timestamp(_to_java_column(timestamp), tz)) +@since(2.0) +@ignore_unicode_prefix +def window(timeColumn, windowDuration, slideDuration=None, startTime=None): + """Bucketize rows into one or more time windows given a timestamp specifying column. Window + starts are inclusive but the window ends are exclusive, e.g. 12:05 will be in the window + [12:05,12:10) but not in [12:00,12:05). Windows can support microsecond precision. Windows in + the order of months are not supported. + + The time column must be of TimestampType. + + Durations are provided as strings, e.g. '1 second', '1 day 12 hours', '2 minutes'. Valid + interval strings are 'week', 'day', 'hour', 'minute', 'second', 'millisecond', 'microsecond'. + If the `slideDuration` is not provided, the windows will be tumbling windows. + + The startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start + window intervals. For example, in order to have hourly tumbling windows that start 15 minutes + past the hour, e.g. 12:15-13:15, 13:15-14:15... provide `startTime` as `15 minutes`. + + The output column will be a struct called 'window' by default with the nested columns 'start' + and 'end', where 'start' and 'end' will be of `TimestampType`. + + >>> df = sqlContext.createDataFrame([("2016-03-11 09:00:07", 1)]).toDF("date", "val") + >>> w = df.groupBy(window("date", "5 seconds")).agg(sum("val").alias("sum")) + >>> w.select(w.window.start.cast("string").alias("start"), + ... w.window.end.cast("string").alias("end"), "sum").collect() + [Row(start=u'2016-03-11 09:00:05', end=u'2016-03-11 09:00:10', sum=1)] + """ + def check_string_field(field, fieldName): + if not field or type(field) is not str: + raise TypeError("%s should be provided as a string" % fieldName) + + sc = SparkContext._active_spark_context + time_col = _to_java_column(timeColumn) + check_string_field(windowDuration, "windowDuration") + if slideDuration and startTime: + check_string_field(slideDuration, "slideDuration") + check_string_field(startTime, "startTime") + res = sc._jvm.functions.window(time_col, windowDuration, slideDuration, startTime) + elif slideDuration: + check_string_field(slideDuration, "slideDuration") + res = sc._jvm.functions.window(time_col, windowDuration, slideDuration) + elif startTime: + check_string_field(startTime, "startTime") + res = sc._jvm.functions.window(time_col, windowDuration, windowDuration, startTime) + else: + res = sc._jvm.functions.window(time_col, windowDuration) + return Column(res) + + # ---------------------------- misc functions ---------------------------------- @since(1.5) |