aboutsummaryrefslogtreecommitdiff
path: root/R/pkg/R/functions.R
diff options
context:
space:
mode:
Diffstat (limited to 'R/pkg/R/functions.R')
-rw-r--r--R/pkg/R/functions.R63
1 files changed, 63 insertions, 0 deletions
diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R
index d9c10b4a4b..db877b2d63 100644
--- a/R/pkg/R/functions.R
+++ b/R/pkg/R/functions.R
@@ -2131,6 +2131,69 @@ setMethod("from_unixtime", signature(x = "Column"),
column(jc)
})
+#' window
+#'
+#' Bucketize rows into one or more time windows given a timestamp specifying column. Window
+#' starts are inclusive but the window ends are exclusive, e.g. 12:05 will be in the window
+#' [12:05,12:10) but not in [12:00,12:05). Windows can support microsecond precision. Windows in
+#' the order of months are not supported.
+#'
+#' The time column must be of TimestampType.
+#'
+#' Durations are provided as strings, e.g. '1 second', '1 day 12 hours', '2 minutes'. Valid
+#' interval strings are 'week', 'day', 'hour', 'minute', 'second', 'millisecond', 'microsecond'.
+#' If the `slideDuration` is not provided, the windows will be tumbling windows.
+#'
+#' The startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start
+#' window intervals. For example, in order to have hourly tumbling windows that start 15 minutes
+#' past the hour, e.g. 12:15-13:15, 13:15-14:15... provide `startTime` as `15 minutes`.
+#'
+#' The output column will be a struct called 'window' by default with the nested columns 'start'
+#' and 'end'.
+#'
+#' @family datetime_funcs
+#' @rdname window
+#' @name window
+#' @export
+#' @examples
+#'\dontrun{
+#' # One minute windows every 15 seconds 10 seconds after the minute, e.g. 09:00:10-09:01:10,
+#' # 09:00:25-09:01:25, 09:00:40-09:01:40, ...
+#' window(df$time, "1 minute", "15 seconds", "10 seconds")
+#'
+#' # One minute tumbling windows 15 seconds after the minute, e.g. 09:00:15-09:01:15,
+#' # 09:01:15-09:02:15...
+#' window(df$time, "1 minute", startTime = "15 seconds")
+#'
+#' # Thirty second windows every 10 seconds, e.g. 09:00:00-09:00:30, 09:00:10-09:00:40, ...
+#' window(df$time, "30 seconds", "10 seconds")
+#'}
+setMethod("window", signature(x = "Column"),
+ function(x, windowDuration, slideDuration = NULL, startTime = NULL) {
+ stopifnot(is.character(windowDuration))
+ if (!is.null(slideDuration) && !is.null(startTime)) {
+ stopifnot(is.character(slideDuration) && is.character(startTime))
+ jc <- callJStatic("org.apache.spark.sql.functions",
+ "window",
+ x@jc, windowDuration, slideDuration, startTime)
+ } else if (!is.null(slideDuration)) {
+ stopifnot(is.character(slideDuration))
+ jc <- callJStatic("org.apache.spark.sql.functions",
+ "window",
+ x@jc, windowDuration, slideDuration)
+ } else if (!is.null(startTime)) {
+ stopifnot(is.character(startTime))
+ jc <- callJStatic("org.apache.spark.sql.functions",
+ "window",
+ x@jc, windowDuration, windowDuration, startTime)
+ } else {
+ jc <- callJStatic("org.apache.spark.sql.functions",
+ "window",
+ x@jc, windowDuration)
+ }
+ column(jc)
+ })
+
#' locate
#'
#' Locate the position of the first occurrence of substr.