From 1146c534d6c3806f3e920043ba06838ef02cd7e8 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 5 Apr 2016 17:21:41 -0700 Subject: [SPARK-14353] Dataset Time Window `window` API for R ## What changes were proposed in this pull request? The `window` function was added to Dataset with [this PR](https://github.com/apache/spark/pull/12008). This PR adds the R API for this function. With this PR, SQL, Java, and Scala will share the same APIs as in users can use: - `window(timeColumn, windowDuration)` - `window(timeColumn, windowDuration, slideDuration)` - `window(timeColumn, windowDuration, slideDuration, startTime)` In Python and R, users can access all APIs above, but in addition they can do - In R: `window(timeColumn, windowDuration, startTime=...)` that is, they can provide the startTime without providing the `slideDuration`. In this case, we will generate tumbling windows. ## How was this patch tested? Unit tests + manual tests Author: Burak Yavuz Closes #12141 from brkyvz/R-windows. --- R/pkg/NAMESPACE | 1 + R/pkg/R/functions.R | 63 +++++++++++++++++++++++++++++++ R/pkg/R/generics.R | 4 ++ R/pkg/inst/tests/testthat/test_context.R | 2 +- R/pkg/inst/tests/testthat/test_sparkSQL.R | 36 ++++++++++++++++++ 5 files changed, 105 insertions(+), 1 deletion(-) (limited to 'R/pkg') diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index fa3fb0b09a..f48c61c1d5 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -265,6 +265,7 @@ exportMethods("%in%", "var_samp", "weekofyear", "when", + "window", "year") exportClasses("GroupedData") diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index d9c10b4a4b..db877b2d63 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -2131,6 +2131,69 @@ setMethod("from_unixtime", signature(x = "Column"), column(jc) }) +#' window +#' +#' Bucketize rows into one or more time windows given a timestamp specifying column. Window +#' starts are inclusive but the window ends are exclusive, e.g. 12:05 will be in the window +#' [12:05,12:10) but not in [12:00,12:05). Windows can support microsecond precision. Windows in +#' the order of months are not supported. +#' +#' The time column must be of TimestampType. +#' +#' Durations are provided as strings, e.g. '1 second', '1 day 12 hours', '2 minutes'. Valid +#' interval strings are 'week', 'day', 'hour', 'minute', 'second', 'millisecond', 'microsecond'. +#' If the `slideDuration` is not provided, the windows will be tumbling windows. +#' +#' The startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start +#' window intervals. For example, in order to have hourly tumbling windows that start 15 minutes +#' past the hour, e.g. 12:15-13:15, 13:15-14:15... provide `startTime` as `15 minutes`. +#' +#' The output column will be a struct called 'window' by default with the nested columns 'start' +#' and 'end'. +#' +#' @family datetime_funcs +#' @rdname window +#' @name window +#' @export +#' @examples +#'\dontrun{ +#' # One minute windows every 15 seconds 10 seconds after the minute, e.g. 09:00:10-09:01:10, +#' # 09:00:25-09:01:25, 09:00:40-09:01:40, ... +#' window(df$time, "1 minute", "15 seconds", "10 seconds") +#' +#' # One minute tumbling windows 15 seconds after the minute, e.g. 09:00:15-09:01:15, +#' # 09:01:15-09:02:15... +#' window(df$time, "1 minute", startTime = "15 seconds") +#' +#' # Thirty second windows every 10 seconds, e.g. 09:00:00-09:00:30, 09:00:10-09:00:40, ... +#' window(df$time, "30 seconds", "10 seconds") +#'} +setMethod("window", signature(x = "Column"), + function(x, windowDuration, slideDuration = NULL, startTime = NULL) { + stopifnot(is.character(windowDuration)) + if (!is.null(slideDuration) && !is.null(startTime)) { + stopifnot(is.character(slideDuration) && is.character(startTime)) + jc <- callJStatic("org.apache.spark.sql.functions", + "window", + x@jc, windowDuration, slideDuration, startTime) + } else if (!is.null(slideDuration)) { + stopifnot(is.character(slideDuration)) + jc <- callJStatic("org.apache.spark.sql.functions", + "window", + x@jc, windowDuration, slideDuration) + } else if (!is.null(startTime)) { + stopifnot(is.character(startTime)) + jc <- callJStatic("org.apache.spark.sql.functions", + "window", + x@jc, windowDuration, windowDuration, startTime) + } else { + jc <- callJStatic("org.apache.spark.sql.functions", + "window", + x@jc, windowDuration) + } + column(jc) + }) + #' locate #' #' Locate the position of the first occurrence of substr. diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index c6990f4748..ecdeea5ec4 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -1152,6 +1152,10 @@ setGeneric("var_samp", function(x) { standardGeneric("var_samp") }) #' @export setGeneric("weekofyear", function(x) { standardGeneric("weekofyear") }) +#' @rdname window +#' @export +setGeneric("window", function(x, ...) { standardGeneric("window") }) + #' @rdname year #' @export setGeneric("year", function(x) { standardGeneric("year") }) diff --git a/R/pkg/inst/tests/testthat/test_context.R b/R/pkg/inst/tests/testthat/test_context.R index ad3f9722a4..6e06c974c2 100644 --- a/R/pkg/inst/tests/testthat/test_context.R +++ b/R/pkg/inst/tests/testthat/test_context.R @@ -26,7 +26,7 @@ test_that("Check masked functions", { maskedBySparkR <- masked[funcSparkROrEmpty] namesOfMasked <- c("describe", "cov", "filter", "lag", "na.omit", "predict", "sd", "var", "colnames", "colnames<-", "intersect", "rank", "rbind", "sample", "subset", - "summary", "transform", "drop") + "summary", "transform", "drop", "window") expect_equal(length(maskedBySparkR), length(namesOfMasked)) expect_equal(sort(maskedBySparkR), sort(namesOfMasked)) # above are those reported as masked when `library(SparkR)` diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index eef365b42e..22eb3ec984 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -1204,6 +1204,42 @@ test_that("greatest() and least() on a DataFrame", { expect_equal(collect(select(df, least(df$a, df$b)))[, 1], c(1, 3)) }) +test_that("time windowing (window()) with all inputs", { + df <- createDataFrame(sqlContext, data.frame(t = c("2016-03-11 09:00:07"), v = c(1))) + df$window <- window(df$t, "5 seconds", "5 seconds", "0 seconds") + local <- collect(df)$v + # Not checking time windows because of possible time zone issues. Just checking that the function + # works + expect_equal(local, c(1)) +}) + +test_that("time windowing (window()) with slide duration", { + df <- createDataFrame(sqlContext, data.frame(t = c("2016-03-11 09:00:07"), v = c(1))) + df$window <- window(df$t, "5 seconds", "2 seconds") + local <- collect(df)$v + # Not checking time windows because of possible time zone issues. Just checking that the function + # works + expect_equal(local, c(1, 1)) +}) + +test_that("time windowing (window()) with start time", { + df <- createDataFrame(sqlContext, data.frame(t = c("2016-03-11 09:00:07"), v = c(1))) + df$window <- window(df$t, "5 seconds", startTime = "2 seconds") + local <- collect(df)$v + # Not checking time windows because of possible time zone issues. Just checking that the function + # works + expect_equal(local, c(1)) +}) + +test_that("time windowing (window()) with just window duration", { + df <- createDataFrame(sqlContext, data.frame(t = c("2016-03-11 09:00:07"), v = c(1))) + df$window <- window(df$t, "5 seconds") + local <- collect(df)$v + # Not checking time windows because of possible time zone issues. Just checking that the function + # works + expect_equal(local, c(1)) +}) + test_that("when(), otherwise() and ifelse() on a DataFrame", { l <- list(list(a = 1, b = 2), list(a = 3, b = 4)) df <- createDataFrame(sqlContext, l) -- cgit v1.2.3