aboutsummaryrefslogtreecommitdiff
path: root/R/pkg
diff options
context:
space:
mode:
authorBurak Yavuz <brkyvz@gmail.com>2016-04-05 17:21:41 -0700
committerDavies Liu <davies.liu@gmail.com>2016-04-05 17:21:41 -0700
commit1146c534d6c3806f3e920043ba06838ef02cd7e8 (patch)
treefa37966589999ea690cdfd71d18627e2ec234ed9 /R/pkg
parent48682f6bf663e54cb63b7e95a4520d34b6fa890b (diff)
downloadspark-1146c534d6c3806f3e920043ba06838ef02cd7e8.tar.gz
spark-1146c534d6c3806f3e920043ba06838ef02cd7e8.tar.bz2
spark-1146c534d6c3806f3e920043ba06838ef02cd7e8.zip
[SPARK-14353] Dataset Time Window `window` API for R
## What changes were proposed in this pull request? The `window` function was added to Dataset with [this PR](https://github.com/apache/spark/pull/12008). This PR adds the R API for this function. With this PR, SQL, Java, and Scala will share the same APIs as in users can use: - `window(timeColumn, windowDuration)` - `window(timeColumn, windowDuration, slideDuration)` - `window(timeColumn, windowDuration, slideDuration, startTime)` In Python and R, users can access all APIs above, but in addition they can do - In R: `window(timeColumn, windowDuration, startTime=...)` that is, they can provide the startTime without providing the `slideDuration`. In this case, we will generate tumbling windows. ## How was this patch tested? Unit tests + manual tests Author: Burak Yavuz <brkyvz@gmail.com> Closes #12141 from brkyvz/R-windows.
Diffstat (limited to 'R/pkg')
-rw-r--r--R/pkg/NAMESPACE1
-rw-r--r--R/pkg/R/functions.R63
-rw-r--r--R/pkg/R/generics.R4
-rw-r--r--R/pkg/inst/tests/testthat/test_context.R2
-rw-r--r--R/pkg/inst/tests/testthat/test_sparkSQL.R36
5 files changed, 105 insertions, 1 deletions
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index fa3fb0b09a..f48c61c1d5 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -265,6 +265,7 @@ exportMethods("%in%",
"var_samp",
"weekofyear",
"when",
+ "window",
"year")
exportClasses("GroupedData")
diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R
index d9c10b4a4b..db877b2d63 100644
--- a/R/pkg/R/functions.R
+++ b/R/pkg/R/functions.R
@@ -2131,6 +2131,69 @@ setMethod("from_unixtime", signature(x = "Column"),
column(jc)
})
+#' window
+#'
+#' Bucketize rows into one or more time windows given a timestamp specifying column. Window
+#' starts are inclusive but the window ends are exclusive, e.g. 12:05 will be in the window
+#' [12:05,12:10) but not in [12:00,12:05). Windows can support microsecond precision. Windows in
+#' the order of months are not supported.
+#'
+#' The time column must be of TimestampType.
+#'
+#' Durations are provided as strings, e.g. '1 second', '1 day 12 hours', '2 minutes'. Valid
+#' interval strings are 'week', 'day', 'hour', 'minute', 'second', 'millisecond', 'microsecond'.
+#' If the `slideDuration` is not provided, the windows will be tumbling windows.
+#'
+#' The startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start
+#' window intervals. For example, in order to have hourly tumbling windows that start 15 minutes
+#' past the hour, e.g. 12:15-13:15, 13:15-14:15... provide `startTime` as `15 minutes`.
+#'
+#' The output column will be a struct called 'window' by default with the nested columns 'start'
+#' and 'end'.
+#'
+#' @family datetime_funcs
+#' @rdname window
+#' @name window
+#' @export
+#' @examples
+#'\dontrun{
+#' # One minute windows every 15 seconds 10 seconds after the minute, e.g. 09:00:10-09:01:10,
+#' # 09:00:25-09:01:25, 09:00:40-09:01:40, ...
+#' window(df$time, "1 minute", "15 seconds", "10 seconds")
+#'
+#' # One minute tumbling windows 15 seconds after the minute, e.g. 09:00:15-09:01:15,
+#' # 09:01:15-09:02:15...
+#' window(df$time, "1 minute", startTime = "15 seconds")
+#'
+#' # Thirty second windows every 10 seconds, e.g. 09:00:00-09:00:30, 09:00:10-09:00:40, ...
+#' window(df$time, "30 seconds", "10 seconds")
+#'}
+setMethod("window", signature(x = "Column"),
+ function(x, windowDuration, slideDuration = NULL, startTime = NULL) {
+ stopifnot(is.character(windowDuration))
+ if (!is.null(slideDuration) && !is.null(startTime)) {
+ stopifnot(is.character(slideDuration) && is.character(startTime))
+ jc <- callJStatic("org.apache.spark.sql.functions",
+ "window",
+ x@jc, windowDuration, slideDuration, startTime)
+ } else if (!is.null(slideDuration)) {
+ stopifnot(is.character(slideDuration))
+ jc <- callJStatic("org.apache.spark.sql.functions",
+ "window",
+ x@jc, windowDuration, slideDuration)
+ } else if (!is.null(startTime)) {
+ stopifnot(is.character(startTime))
+ jc <- callJStatic("org.apache.spark.sql.functions",
+ "window",
+ x@jc, windowDuration, windowDuration, startTime)
+ } else {
+ jc <- callJStatic("org.apache.spark.sql.functions",
+ "window",
+ x@jc, windowDuration)
+ }
+ column(jc)
+ })
+
#' locate
#'
#' Locate the position of the first occurrence of substr.
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index c6990f4748..ecdeea5ec4 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -1152,6 +1152,10 @@ setGeneric("var_samp", function(x) { standardGeneric("var_samp") })
#' @export
setGeneric("weekofyear", function(x) { standardGeneric("weekofyear") })
+#' @rdname window
+#' @export
+setGeneric("window", function(x, ...) { standardGeneric("window") })
+
#' @rdname year
#' @export
setGeneric("year", function(x) { standardGeneric("year") })
diff --git a/R/pkg/inst/tests/testthat/test_context.R b/R/pkg/inst/tests/testthat/test_context.R
index ad3f9722a4..6e06c974c2 100644
--- a/R/pkg/inst/tests/testthat/test_context.R
+++ b/R/pkg/inst/tests/testthat/test_context.R
@@ -26,7 +26,7 @@ test_that("Check masked functions", {
maskedBySparkR <- masked[funcSparkROrEmpty]
namesOfMasked <- c("describe", "cov", "filter", "lag", "na.omit", "predict", "sd", "var",
"colnames", "colnames<-", "intersect", "rank", "rbind", "sample", "subset",
- "summary", "transform", "drop")
+ "summary", "transform", "drop", "window")
expect_equal(length(maskedBySparkR), length(namesOfMasked))
expect_equal(sort(maskedBySparkR), sort(namesOfMasked))
# above are those reported as masked when `library(SparkR)`
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index eef365b42e..22eb3ec984 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -1204,6 +1204,42 @@ test_that("greatest() and least() on a DataFrame", {
expect_equal(collect(select(df, least(df$a, df$b)))[, 1], c(1, 3))
})
+test_that("time windowing (window()) with all inputs", {
+ df <- createDataFrame(sqlContext, data.frame(t = c("2016-03-11 09:00:07"), v = c(1)))
+ df$window <- window(df$t, "5 seconds", "5 seconds", "0 seconds")
+ local <- collect(df)$v
+ # Not checking time windows because of possible time zone issues. Just checking that the function
+ # works
+ expect_equal(local, c(1))
+})
+
+test_that("time windowing (window()) with slide duration", {
+ df <- createDataFrame(sqlContext, data.frame(t = c("2016-03-11 09:00:07"), v = c(1)))
+ df$window <- window(df$t, "5 seconds", "2 seconds")
+ local <- collect(df)$v
+ # Not checking time windows because of possible time zone issues. Just checking that the function
+ # works
+ expect_equal(local, c(1, 1))
+})
+
+test_that("time windowing (window()) with start time", {
+ df <- createDataFrame(sqlContext, data.frame(t = c("2016-03-11 09:00:07"), v = c(1)))
+ df$window <- window(df$t, "5 seconds", startTime = "2 seconds")
+ local <- collect(df)$v
+ # Not checking time windows because of possible time zone issues. Just checking that the function
+ # works
+ expect_equal(local, c(1))
+})
+
+test_that("time windowing (window()) with just window duration", {
+ df <- createDataFrame(sqlContext, data.frame(t = c("2016-03-11 09:00:07"), v = c(1)))
+ df$window <- window(df$t, "5 seconds")
+ local <- collect(df)$v
+ # Not checking time windows because of possible time zone issues. Just checking that the function
+ # works
+ expect_equal(local, c(1))
+})
+
test_that("when(), otherwise() and ifelse() on a DataFrame", {
l <- list(list(a = 1, b = 2), list(a = 3, b = 4))
df <- createDataFrame(sqlContext, l)